Évaluation paresseuse

R et évaluation paresseuse

R est fainéant. Il n’évaluera les arguments d’une fonction qu’au dernier moment s’il en a besoin (évaluation paresseuse des arguments).

lazy_eval <-
  function(x = stop("C'est une erreur !"),
           y = matrix(1:9, nrow = -3L)) {
    TRUE
  }
lazy_eval()
[1] TRUE

Dans l’exemple précédent, si les arguments n’étaient pas évalués de manière paresseuse, autant l’évaluation de x que celle de y précipiterait une erreur :

  • celle de x car stop() est la fonction de R permettant de stopper l’exécution et de retourner une erreur avec le message fourni.
  • celle de y car une matrice avec un nombre de lignes négatifs, ça n’existe pas.

Mais le corps de la fonction lazy_eval étant réduit à son plus simple appareil, à savoir simplement TRUE, il ne requiert pas d’en évaluer les arguments. Il n’y a donc aucune erreur d’exécution et la fonction retourne bien TRUE !

Plus généralement, dans R, les expressions employant des contrôle (if, else…) définissent elles aussi une forme de paresse dans la mesure où leurs sous-parties non-retenues ne sont pas évaluées (évaluation paresseuse des structures de contrôle). À l’intérieur même des expressions, les expressions faites d’opérateurs logiques unitaires (&& et || mais pas & ni | qui sont vectoriels) évaluent elles aussi le moins possible de leurs sous-parties (court-circuit). Si vous voulez en savoir plus, vous pouvez faire cette formation sur la programmation fonctionnelle en R.

Apache Acero

Apache Acero reprend ce principe d’évaluation paresseuse grandement utile dans la programmation fonctionnelle, et l’étend à d’autres champs.

ExecPlan

library(arrow)

Attachement du package : 'arrow'
L'objet suivant est masqué depuis 'package:utils':

    timestamp
library(dplyr)

Attachement du package : 'dplyr'
Les objets suivants sont masqués depuis 'package:stats':

    filter, lag
Les objets suivants sont masqués depuis 'package:base':

    intersect, setdiff, setequal, union
query <-
  starwars %>%
  as_arrow_table() %>%
  select(name, height, mass, homeworld) %>%
  filter(! is.na(mass) & ! is.na(height)) %>%
  mutate(bmi = mass / (height/100)^2) %>%
  group_by(homeworld) %>%
  summarise(
    count = n(),
    avg_height = mean(height),
    avg_mass = mean(mass),
    avg_bmi = mean(bmi)
  ) %>%
  arrange(desc(avg_bmi))

show_query(query)
ExecPlan with 6 nodes:
5:SinkNode{}
  4:OrderByNode{ordering=[FieldRef.Name(avg_bmi) DESC] nulls last}
    3:GroupByNode{keys=["homeworld"], aggregates=[
        hash_count_all(*),
        hash_mean(avg_height, {skip_nulls=false, min_count=0}),
        hash_mean(avg_mass, {skip_nulls=false, min_count=0}),
        hash_mean(avg_bmi, {skip_nulls=false, min_count=0}),
    ]}
      2:ProjectNode{projection=["avg_height": height, "avg_mass": mass, "avg_bmi": divide(cast(mass, {to_type=double, allow_int_overflow=false, allow_time_truncate=false, allow_time_overflow=false, allow_decimal_truncate=false, allow_float_truncate=false, allow_invalid_utf8=false}), cast(power_checked(divide(cast(height, {to_type=double, allow_int_overflow=false, allow_time_truncate=false, allow_time_overflow=false, allow_decimal_truncate=false, allow_float_truncate=false, allow_invalid_utf8=false}), cast(100, {to_type=double, allow_int_overflow=false, allow_time_truncate=false, allow_time_overflow=false, allow_decimal_truncate=false, allow_float_truncate=false, allow_invalid_utf8=false})), 2), {to_type=double, allow_int_overflow=false, allow_time_truncate=false, allow_time_overflow=false, allow_decimal_truncate=false, allow_float_truncate=false, allow_invalid_utf8=false})), homeworld]}
        1:FilterNode{filter=(invert(is_null(mass, {nan_is_null=true})) and invert(is_null(height, {nan_is_null=true})))}
          0:TableSourceNode{}

Nous l’avions déjà vu dans le précédent chapitre précédent. Tant qu’on n’applique pas la fonction collect(), une pipeline en Apache Acero n’est rien de plus qu’un ExecPlan, donc rien n’est exécuté.

query %>%
  collect()
# A tibble: 40 x 5
   homeworld  count avg_height avg_mass avg_bmi
   <chr>      <int>      <dbl>    <dbl>   <dbl>
 1 Nal Hutta      1        175   1358     443. 
 2 Vulpter        1         94     45      50.9
 3 Kalee          1        216    159      34.1
 4 Bestine IV     1        180    110      34.0
 5 <NA>           3        153     82      32.6
 6 Malastare      1        112     40      31.9
 7 Trandosha      1        190    113      31.3
 8 Tatooine       8        169     85.4    29.3
 9 Sullust        1        160     68      26.6
10 Dathomir       1        175     80      26.1
# i 30 more rows

collect() permet effectivement d’effectuer la requête

emplacement_parquet <- paste0(tempdir(), "\\dataset")

dir.create(emplacement_parquet, showWarnings = FALSE)

starwars %>%
  write_parquet(paste0(emplacement_parquet, "\\starwars.parquet"))

open_dataset(emplacement_parquet) %>%
  filter(! is.na(mass) & ! is.na(height)) %>%
  mutate(bmi = mass / (height/100)^2) %>%
  group_by(homeworld) %>%
  summarise(
    count = n(),
    avg_height = mean(height),
    avg_mass = mean(mass),
    avg_bmi = mean(bmi)
  ) %>%
  arrange(desc(avg_bmi)) %>%
  show_query()
ExecPlan with 6 nodes:
5:SinkNode{}
  4:OrderByNode{ordering=[FieldRef.Name(avg_bmi) DESC] nulls last}
    3:GroupByNode{keys=["homeworld"], aggregates=[
        hash_count_all(*),
        hash_mean(avg_height, {skip_nulls=false, min_count=0}),
        hash_mean(avg_mass, {skip_nulls=false, min_count=0}),
        hash_mean(avg_bmi, {skip_nulls=false, min_count=0}),
    ]}
      2:ProjectNode{projection=["avg_height": height, "avg_mass": mass, "avg_bmi": divide(cast(mass, {to_type=double, allow_int_overflow=false, allow_time_truncate=false, allow_time_overflow=false, allow_decimal_truncate=false, allow_float_truncate=false, allow_invalid_utf8=false}), cast(power_checked(divide(cast(height, {to_type=double, allow_int_overflow=false, allow_time_truncate=false, allow_time_overflow=false, allow_decimal_truncate=false, allow_float_truncate=false, allow_invalid_utf8=false}), cast(100, {to_type=double, allow_int_overflow=false, allow_time_truncate=false, allow_time_overflow=false, allow_decimal_truncate=false, allow_float_truncate=false, allow_invalid_utf8=false})), 2), {to_type=double, allow_int_overflow=false, allow_time_truncate=false, allow_time_overflow=false, allow_decimal_truncate=false, allow_float_truncate=false, allow_invalid_utf8=false})), homeworld]}
        1:FilterNode{filter=(invert(is_null(mass, {nan_is_null=true})) and invert(is_null(height, {nan_is_null=true})))}
          0:SourceNode{}

La notion de stream : Paresse sur les fichiers

Tip

Le dataset parquet manipulé par la suite est issu de sept années d’opendamir (2018 à 2024 inclus), transformé en dataset via le script ci-dessous :

library(readr)
schema_readr <- cols_only(
  FLX_ANN_MOI = col_character(),
  ORG_CLE_REG = col_character(),
  AGE_BEN_SNDS = col_character(),
  BEN_RES_REG = col_character(),
  BEN_CMU_TOP = col_character(),
  BEN_QLT_COD = col_character(),
  BEN_SEX_COD = col_character(),
  DDP_SPE_COD = col_character(),
  ETE_CAT_SNDS = col_character(),
  ETE_REG_COD = col_character(),
  ETE_TYP_SNDS = col_character(),
  ETP_REG_COD = col_character(),
  ETP_CAT_SNDS = col_character(),
  MDT_TYP_COD = col_character(),
  MFT_COD = col_character(),
  PRS_FJH_TYP = col_character(),
  PRS_ACT_COG = col_double(),
  PRS_ACT_NBR = col_double(),
  PRS_ACT_QTE = col_double(),
  PRS_DEP_MNT = col_double(),
  PRS_PAI_MNT = col_double(),
  PRS_REM_BSE = col_double(),
  PRS_REM_MNT = col_double(),
  FLT_ACT_COG = col_double(),
  FLT_ACT_NBR = col_double(),
  FLT_ACT_QTE = col_double(),
  FLT_PAI_MNT = col_double(),
  FLT_DEP_MNT = col_double(),
  FLT_REM_MNT = col_double(),
  SOI_ANN = col_integer(),
  SOI_MOI = col_integer(),
  ASU_NAT = col_character(),
  ATT_NAT = col_character(),
  CPL_COD = col_character(),
  CPT_ENV_TYP = col_character(),
  DRG_AFF_NAT = col_character(),
  ETE_IND_TAA = col_character(),
  EXO_MTF = col_character(),
  MTM_NAT = col_character(),
  PRS_NAT = col_character(),
  PRS_PPU_SEC = col_character(),
  PRS_REM_TAU = col_double(),
  PRS_REM_TYP = col_character(),
  PRS_PDS_QCP = col_character(),
  EXE_INS_REG = col_character(),
  PSE_ACT_SNDS = col_character(),
  PSE_ACT_CAT = col_character(),
  PSE_SPE_SNDS = col_character(),
  PSE_STJ_SNDS = col_character(),
  PRE_INS_REG = col_character(),
  PSP_ACT_SNDS = col_character(),
  PSP_ACT_CAT = col_character(),
  PSP_SPE_SNDS = col_character(),
  PSP_STJ_SNDS = col_character(),
  TOP_PS5_TRG = col_character()
)

lapply(list.files(input_csv),
       function(nom_csv) {
         annee <- substr(nom_csv, 2L, 5L)
         mois <- substr(nom_csv, 6L, 7L)
         dest_parquet <- file.path(input_parquet, paste0("annee=", annee), paste0("mois=", mois))
         dir.create(dest_parquet, recursive = TRUE)
         write_chunk_to_parquet <- function(x, pos)  {
           write_parquet(x, file.path(dest_parquet, paste0(sprintf("%020d", pos), ".parquet")))
         }
         read_csv2_chunked(file = file.path(input_csv, nom_csv),
                           callback = SideEffectChunkCallback$new(write_chunk_to_parquet),
                           chunk_size = 1000000L,
                           col_types = schema_readr)
       }
)

Il est à noter que l’on n’a pas chercher à optimiser le choix des types utilisés. Les col_character() utilisés pour les variables avec des modalités de type "00", "01", …, "99" pourraient aussi être encodés en uint8() de manière plus efficiente.

Le réel apport d’Apache Acero concerne la notion de stream. La paresse des ExecPlan permet de dépendre de plusieurs fichiers .parquet qui ne sont pas encore chargés en mémoire, et dont la somme des tailles dépasserait largement la mémoire s’ils étaient chargés. Il permet de les gérer comme une table à travers une abstraction bien commode.

emplacement_dataset <- "[emplacement du dataset]"
open_dataset(emplacement_dataset) %>%
  group_by(annee, mois) %>%
  summarise(FLT_PAI_MNT = sum(FLT_PAI_MNT)) %>%
  collect()
# A tibble: 84 x 3
# Groups:   annee [7]
   annee  mois  FLT_PAI_MNT
   <int> <int>        <dbl>
 1  2018     1 11449685653.
 2  2018     2 10650381772.
 3  2018     3 11552585153.
 4  2018     4 11526760892.
 5  2018     5 11030467263.
 6  2018     6 11660914760.
 7  2018     7 11588717855.
 8  2018     8  9129904544.
 9  2018     9 10119539795.
10  2018    10 12353701435.
# i 74 more rows

Ici, on a requêté 7 ans d’opendamir, l’équivalent d’environ 168Go de csv, en utilisant une simple abstraction globale.

Il est clair que cette requête n’aurait pas pu être faite dans R.

Paresse sur la lecture de colonnes et de lignes

La librairie Datasets d’Arrow sait lire sélectivement les colonnes et lignes pour économiser l’IO et la RAM, comme on peut le lire dans la documentation d’Arrow Datasets.

When combining filters and projections, Arrow will determine all necessary columns to read. For instance, if you filter on a column that isn’t ultimately selected, Arrow will still read the column to evaluate the filter.

Some formats, such as Parquet, can reduce I/O costs here by reading only the specified columns from the filesystem.

A filter can be provided with arrow::dataset::ScannerBuilder::Filter(), so that rows which do not match the filter predicate will not be included in the returned table. Again, some formats, such as Parquet, can use this filter to reduce the amount of I/O needed.

Il est possible d’appeler explicitement les filtres appropriés puis de charger le résultat au format d’une table arrow (qui doit tenir en mémoire). On veut montrer que cette approche est assez proche de l’utilisation d’une syntaxe dplyr agréable à l’usage et à l’oeil.

Comparons et testons les deux approches.

emplacement_dataset <- "[emplacement du dataset]"
library(bench)
library(arrow)
library(dplyr)
ds <- open_dataset(emplacement_dataset)

bench::mark(
  avec_scan = Scanner$create(ds,
                             filter = Expression$field_ref("annee") <= Expression$scalar(2019L),
                             projection = c("annee", "mois", "FLT_PAI_MNT"))$
    ToTable() %>%
    group_by(annee, mois) %>%
    summarise(FLT_PAI_MNT = sum(FLT_PAI_MNT)) %>%
    collect(),
  sans_scan = ds %>%
    filter(annee <= 2019L) %>%
    select(annee, mois, FLT_PAI_MNT) %>%
    group_by(annee, mois) %>%
    summarise(FLT_PAI_MNT = sum(FLT_PAI_MNT)) %>%
    collect(),
  iterations  = 2L,
  check = FALSE,
  memory = FALSE,
  filter_gc = FALSE,
  time_unit = "ms"
) %>% select(expression, median) %>%
  mutate(expression = as.character(expression))
# A tibble: 2 x 2
  expression median
  <chr>       <dbl>
1 avec_scan  52354.
2 sans_scan  19964.

Il semble ici qu’en filtrant ou non, on obtient des performances encore meilleures sans scan, en millisecondes. Nous pouvons par ailleurs avoir une estimation via C du débit de lecture.

library(inline)

# C code pour lire un seul fichier et mesurer le temps
src <- '
if(TYPEOF(path) != STRSXP || LENGTH(path) != 1)
    Rf_error("path must be character(1)");

const char *filepath = CHAR(STRING_ELT(path, 0));
size_t buffer_size = 4 * 1024 * 1024; // 4 MB
char *buffer = (char*) calloc(buffer_size, 1);
if(!buffer) Rf_error("Memory allocation failed");

clock_t start = clock();

FILE *fp = fopen(filepath, "rb");
if(fp) {
    while(fread(buffer, 1, buffer_size, fp) > 0) {
        // lecture uniquement
    }
    fclose(fp);
}

clock_t end = clock();
free(buffer);

double read_time = (double)(end - start) / CLOCKS_PER_SEC;

SEXP result = PROTECT(Rf_allocVector(REALSXP, 1));
REAL(result)[0] = read_time;
UNPROTECT(1);

return result;
'

includes = "
#include <R.h>
#include <Rinternals.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
"
read_file_time <- cfunction(signature(path = "character"), src, includes = includes)

read_dir_time <- function(dir) {
  files <- list.files(dir, full.names = TRUE)
  total_time <- 0
  
  for(f in files) {
    if(file.info(f)$isdir) {
      total_time <- total_time + read_dir_time(f)
    } else {
      total_time <- total_time + read_file_time(f)
    }
  }
  
  return(total_time)
}

temps_total <- read_dir_time(emplacement_dataset)

Lire l’intégralité des fichiers parquet dans C sans même dure 74.67 secondes. Cela semble prouver que l’exécution des ExecPlan est naturellement parcimonieuse. En fait, on trouve la confirmation de ce que l’on observe dans la documentation R d’arrow.

Second, all work is pushed down to the individual data files, and depending on the file format, chunks of data within files. As a result, you can select a subset of data from a much larger data set by collecting the smaller slices from each file: you don’t have to load the whole data set in memory to slice from it.

Third, because of partitioning, you can ignore some files entirely. In this example, by filtering year == 2015, all files corresponding to other years are immediately excluded: you don’t have to load them in order to find that no rows match the filter. For Parquet files – which contain row groups with statistics on the data contained within groups – there may be entire chunks of data you can avoid scanning because they have no rows where total_amount > 100.