Démonstration Apache Arrow en Python

Introduction

Dans cette démonstration, nous allons explorer comment Apache Arrow peut être utilisé pour traiter des fichiers CSV et Parquet, même lorsque ces derniers sont trop volumineux pour tenir en mémoire. Nous allons montrer :

  1. Comment charger un fichier CSV en mémoire avec Arrow et effectuer un tri.
  2. Comment utiliser l’API pyarrow.dataset pour travailler avec un fichier Parquet trop grand pour la mémoire, appliquer des filtres et des agrégations, et convertir les résultats en un DataFrame Pandas.
! pip install pyarrow
! pip install pandas

Chargement d’un fichier CSV avec Arrow

import pyarrow.csv as csv
csv_file = "data1.csv"
table = csv.read_csv(csv_file)

# Trier les données par une colonne spécifique
sorted_table = table.sort_by("name")
print(sorted_table)
pyarrow.Table
name: string
phone: string
email: string
postalZip: string
Salary: string
country: string
age: int64
user_id: int64
----
name: [["Abbot Chang","Adara Sanchez","Aileen O'Neill","Aretha Chen","Ashton Castaneda",...,"Walter Wiggins","Whilemina Nielsen","William Morse","Winter Luna","Wyoming Kelly"]]
phone: [["06 40 44 38 71","01 66 30 28 05","05 87 31 31 53","09 56 23 98 48","03 44 66 36 63",...,"08 04 23 36 41","01 06 54 85 38","03 23 95 87 15","09 84 18 87 87","06 28 66 68 61"]]
email: [["vitae@hotmail.edu","metus.vitae@google.net","cursus.et@hotmail.ca","cum.sociis.natoque@hotmail.edu","vehicula@icloud.edu",...,"mollis.non@icloud.org","curabitur@google.net","magna.phasellus@hotmail.couk","mauris.vestibulum@google.edu","cursus.nunc.mauris@outlook.net"]]
postalZip: [["18986","75641","U1V 1IS","3667","37215",...,"587379","5273 UF","46315","5887","2731"]]
Salary: [["$5,689","$7,095","$8,079","$7,624","$5,068",...,"$5,948","$6,524","$6,032","$5,854","$6,466"]]
country: [["Germany","Austria","Italy","Canada","Sweden",...,"Singapore","Spain","Vietnam","Poland","Germany"]]
age: [[34,21,86,55,55,...,39,74,46,40,23]]
user_id: [[89,45,60,41,11,...,75,99,35,83,10]]

Traitement d’un fichier Parquet trop grand pour la mémoire

import pyarrow.dataset as ds
import pyarrow.compute as pc
import pandas as pd
emplacement_dataset = "[emplacement du dataset]"
dataset = ds.dataset(emplacement_dataset, format="parquet", partitioning="hive")

# Appliquer des filtres et des agrégations
filtered = dataset.filter(pc.field("BEN_SEX_COD") == "1")

La lazy evaluation :

Le code de la cellule précédente était instantanée et celui qui vient produit un résultat, il va donc le déclencher

filtered.count_rows()
filtered.head(20)
pyarrow.Table
FLX_ANN_MOI: string
ORG_CLE_REG: string
AGE_BEN_SNDS: string
BEN_RES_REG: string
BEN_CMU_TOP: string
BEN_QLT_COD: string
BEN_SEX_COD: string
DDP_SPE_COD: string
ETE_CAT_SNDS: string
ETE_REG_COD: string
ETE_TYP_SNDS: string
ETP_REG_COD: string
ETP_CAT_SNDS: string
MDT_TYP_COD: string
MFT_COD: string
PRS_FJH_TYP: string
PRS_ACT_COG: double
PRS_ACT_NBR: double
PRS_ACT_QTE: double
PRS_DEP_MNT: double
PRS_PAI_MNT: double
PRS_REM_BSE: double
PRS_REM_MNT: double
FLT_ACT_COG: double
FLT_ACT_NBR: double
FLT_ACT_QTE: double
FLT_PAI_MNT: double
FLT_DEP_MNT: double
FLT_REM_MNT: double
SOI_ANN: int32
SOI_MOI: int32
ASU_NAT: string
ATT_NAT: string
CPL_COD: string
CPT_ENV_TYP: string
DRG_AFF_NAT: string
ETE_IND_TAA: string
EXO_MTF: string
MTM_NAT: string
PRS_NAT: string
PRS_PPU_SEC: string
PRS_REM_TAU: double
PRS_REM_TYP: string
PRS_PDS_QCP: string
EXE_INS_REG: string
PSE_ACT_SNDS: string
PSE_ACT_CAT: string
PSE_SPE_SNDS: string
PSE_STJ_SNDS: string
PRE_INS_REG: string
PSP_ACT_SNDS: string
PSP_ACT_CAT: string
PSP_SPE_SNDS: string
PSP_STJ_SNDS: string
TOP_PS5_TRG: string
annee: int32
mois: int32
----
FLX_ANN_MOI: [["201801","201801","201801","201801","201801",...,"201801","201801","201801","201801","201801"]]
ORG_CLE_REG: [["24","11","75","52","11",...,"11","93","11","99","27"]]
AGE_BEN_SNDS: [["60","70","30","40","80",...,"50","30","60","80","40"]]
BEN_RES_REG: [["24","75","75","52","99",...,"76","93","11","93","27"]]
BEN_CMU_TOP: [["9","9","9","9","9",...,"9","9","9","9","9"]]
BEN_QLT_COD: [["1","1","1","1","1",...,"1","1","1","1","1"]]
BEN_SEX_COD: [["1","1","1","1","1",...,"1","1","1","1","1"]]
DDP_SPE_COD: [["0","0","0","0","0",...,"0","0","0","0","0"]]
ETE_CAT_SNDS: [["1102","9999","9999","1102","1110",...,"9999","9999","2206","9999","9999"]]
ETE_REG_COD: [["24","99","99","52","11",...,"99","99","52","99","99"]]
...

Faire une aggregation sur une table dépassant la mémoire avec Acero :

import pyarrow.dataset as ds
import pyarrow.acero as acero
import pyarrow.compute as pc

dataset = ds.dataset(
    emplacement_dataset,
    format="parquet",
    partitioning="hive"
)

plan = acero.Declaration(
    "scan",
    acero.ScanNodeOptions(dataset, columns=["FLT_PAI_MNT", "annee", "mois"])
)

plan = acero.Declaration(
    "project",
    acero.ProjectNodeOptions(
        expressions=[
            pc.field("FLT_PAI_MNT"),
            pc.field("annee"),
            pc.field("mois"),
        ],
        names=["FLT_PAI_MNT", "annee", "mois"]
    ),
    inputs=[plan]
)

plan = acero.Declaration(
    "aggregate",
    acero.AggregateNodeOptions(
        aggregates=[
            ("FLT_PAI_MNT", "hash_sum", pc.ScalarAggregateOptions(skip_nulls=False, min_count=0), "FLT_PAI_MNT")
        ],
        keys=["annee", "mois"]
    ),
    inputs=[plan]
)
plan
<pyarrow.acero.Declaration>
ExecPlan with 4 nodes:
:ConsumingSinkNode{}
  :GroupByNode{keys=["annee", "mois"], aggregates=[
    hash_sum(FLT_PAI_MNT, {skip_nulls=false, min_count=0}),
  ]}
    :ProjectNode{projection=[FLT_PAI_MNT, annee, mois]}
      :SourceNode{}

Pour l’instant, c’est une requête, qui fait exactement la même requête que celle que nous avions sous R auparavant.

table = plan.to_table()
print(table)
pyarrow.Table
annee: int32
mois: int32
FLT_PAI_MNT: double
----
annee: [[2018,2018,2018,2018,2018,...,2024,2024,2024,2024,2024]]
mois: [[1,2,3,4,5,...,8,9,10,11,12]]
FLT_PAI_MNT: [[1.1449685652926851e+10,1.0650381771492464e+10,1.1552585152687378e+10,1.1526760891298904e+10,1.1030467262340338e+10,...,1.1074975306814655e+10,1.336205943037979e+10,1.5555087741630299e+10,1.3508940256072056e+10,1.4659206975157728e+10]]

On peut l’exécuter à l’aide de la fonction to_table().

Conversion en pandas

# Convertir en DataFrame Pandas
pandas_df = table.to_pandas()
print(pandas_df)
    annee  mois   FLT_PAI_MNT
0    2018     1  1.144969e+10
1    2018     2  1.065038e+10
2    2018     3  1.155259e+10
3    2018     4  1.152676e+10
4    2018     5  1.103047e+10
..    ...   ...           ...
79   2024     8  1.107498e+10
80   2024     9  1.336206e+10
81   2024    10  1.555509e+10
82   2024    11  1.350894e+10
83   2024    12  1.465921e+10

[84 rows x 3 columns]

Version polars

import polars as pl

plan = (
    pl.scan_parquet(emplacement_dataset)
    .select(["FLT_PAI_MNT", "annee", "mois"])
    .group_by(["annee", "mois"])
    .agg(pl.col("FLT_PAI_MNT").sum())
)
print(plan)
naive plan: (run LazyFrame.explain(optimized=True) to see the optimized plan)

AGGREGATE[maintain_order: false]
  [col("FLT_PAI_MNT").sum()] BY [col("annee"), col("mois")]
  FROM
  SELECT [col("FLT_PAI_MNT"), col("annee"), col("mois")]
    Parquet SCAN [C:/Users/afeldmann/Desktop/openamir/annee=2018/mois=01/00000000000000000001.parquet, ... 2952 other sources]
    PROJECT */57 COLUMNS
    ESTIMATED ROWS: 2953000000

L’on peut également utiliser polars en backend, qui est un moteur alternatif, basé sur le format de donnée arrow, mais qui utilise un moteur alternatif à Acero pour exécuter ses requêtes.

Comme Acero, Polars attend qu’on lui ordonne d’exécuter ses requêtes.

Comme sous Acero, on peut éviter de tout charger en mémoire d’un seul coup sous Polars. Il faut pour cela préciser qu’on utilise l’engine “streaming”.

plan.collect(engine="streaming")
shape: (84, 3)
annee mois FLT_PAI_MNT
i64 i64 f64
2018 1 1.1450e10
2018 4 1.1527e10
2018 10 1.2354e10
2019 1 1.1465e10
2019 3 1.1189e10
2022 7 1.2908e10
2023 3 1.3968e10
2023 9 1.2858e10
2023 10 1.4455e10
2024 9 1.3362e10

Dès lors que l’on atteint la dizaine de secondes pour des requêtes d’études statistiques, l’on peut se dire que l’objectif est rempli et que le benchmark comparatif n’importe pas beaucoup pour des chercheurs. Si l’on reste dans le cadre de requêtes accomplies par l’homme, l’important est la lisibilité du frontend ainsi que la stabilité du framework. De ce point de vue, l’implémentation R d’Acero que l’on a précédemment vue, est très satisfaisante (et a l’avantage d’être maintenue par Apache).

Il est clair que, contrairement à l’implémentation R d’Acero, la lisibilité d’ExecPlan sous pyarrow est assez moyenne (ça reste quelque chose d’assez bas niveau).

Dans ce contexte, polars est sans doute un bon apport à la data-analyse en Python. Elle apporte l’équivalent de open_dataset() en Python, à savoir une interface sous la forme de pipelines.