Hub model output re-partition benchmarks

Load packages

Install benchmarking dependencies

install.packages(
 c(
  "arrow",
  "bench",
  "devtools",
  "dplyr",
  "DT",
  "duckdb",
  "fs",
  "knitr",
  "rmarkdown")
)

devtools::install_github("Infectious-Disease-Modeling-Hubs/hubUtils")
library(hubUtils)
library(dplyr)
library(duckdb)

Connect to test hubs

Define paths to test hubs

Baseline std hubverse hub

baseline_hub_path <- "repartioned_hubs/flusight-hub_std_csv"

Baseline parquet hubs

baseline_parquet_hub_path <- "repartioned_hubs/flusight-hub_model_id-reference_date_parquet/model-output"

Reference Date re-partitioned hubs

Will be connecting using connect_model_output function as connect_hub needs some additional work to correctly connect to re-partitioned hubs.

Hence the paths to the repartitioned hubs point to the model-output directory of each hub.

ref_date_csv_hubpath <- "repartioned_hubs/flusight-hub_reference_date_csv/model-output"
ref_date_parquet_hubpath <- "repartioned_hubs/flusight-hub_reference_date_parquet/model-output"

Target re-partitioned hubs

target_csv_hubpath <- "repartioned_hubs/flusight-hub_target_csv/model-output"
target_parquet_hubpath <- "repartioned_hubs/flusight-hub_target_parquet/model-output"

DuckDB database path

duckdb_hub_path <- fs::path("repartioned_hubs", "flusight-hub.duckdb")

Connect to hubs

Baseline hub

baseline_csv_hub_con <- connect_hub(baseline_hub_path)
baseline_csv_hub_con
── <hub_connection/FileSystemDataset> ──
• hub_name: "US CDC FluSight"
• hub_path: 'repartioned_hubs/flusight-hub_std_csv'
• file_format: "csv(613/613)"
• file_system: "LocalFileSystem"
• model_output_dir: "repartioned_hubs/flusight-hub_std_csv/model-output"
• config_admin: 'hub-config/admin.json'
• config_tasks: 'hub-config/tasks.json'
── Connection schema 
hub_connection with 613 csv files
reference_date: date32[day]
target: string
horizon: int32
target_end_date: date32[day]
location: string
output_type: string
output_type_id: string
value: double
model_id: string

Total number of rows in hub:

baseline_csv_hub_con |>
  collect() |>
  nrow()
[1] 3355802
baseline_parquet_hub_con <- arrow::open_dataset(baseline_parquet_hub_path)
baseline_parquet_hub_con
FileSystemDataset with 613 Parquet files
target: string
horizon: int32
target_end_date: date32[day]
location: string
output_type: string
output_type_id: string
value: double
model_id: string
reference_date: string

Reference Date hubs

ref_date_csv_con <- arrow::open_dataset(ref_date_csv_hubpath,
  format = "csv"
)
ref_date_csv_con
FileSystemDataset with 19 csv files
target: string
horizon: int64
target_end_date: date32[day]
location: string
output_type: string
output_type_id: string
value: double
model_id: string
reference_date: string
ref_date_parquet_con <- arrow::open_dataset(ref_date_parquet_hubpath)
ref_date_parquet_con
FileSystemDataset with 19 Parquet files
target: string
horizon: int32
target_end_date: date32[day]
location: string
output_type: string
output_type_id: string
value: double
model_id: string
reference_date: string

Target hubs

target_csv_con <- arrow::open_dataset(target_csv_hubpath, format = "csv")
target_csv_con
FileSystemDataset with 2 csv files
reference_date: date32[day]
horizon: int64
target_end_date: date32[day]
location: string
output_type: string
output_type_id: string
value: double
model_id: string
target: string
target_parquet_con <- arrow::open_dataset(target_parquet_hubpath)
target_parquet_con
FileSystemDataset with 2 Parquet files
reference_date: date32[day]
horizon: int32
target_end_date: date32[day]
location: string
output_type: string
output_type_id: string
value: double
model_id: string
target: string

Connect to DuckDB model-output table

duckdb_con <- dbConnect(duckdb(), dbdir = duckdb_hub_path, read_only = TRUE)
# on.exit(dbDisconnect(duckdb_con))

model_out_db <- tbl(duckdb_con, "model_output")
model_out_db
# Source:   table<model_output> [?? x 9]
# Database: DuckDB v0.9.2 [Anna@Darwin 23.1.0:R 4.3.2/repartioned_hubs/flusight-hub.duckdb]
   reference_date target          horizon target_end_date location output_type
   <date>         <chr>             <int> <date>          <chr>    <chr>      
 1 2023-10-14     wk inc flu hosp      -1 2023-10-07      06       quantile   
 2 2023-10-14     wk inc flu hosp      -1 2023-10-07      06       quantile   
 3 2023-10-14     wk inc flu hosp      -1 2023-10-07      06       quantile   
 4 2023-10-14     wk inc flu hosp      -1 2023-10-07      06       quantile   
 5 2023-10-14     wk inc flu hosp      -1 2023-10-07      06       quantile   
 6 2023-10-14     wk inc flu hosp      -1 2023-10-07      06       quantile   
 7 2023-10-14     wk inc flu hosp      -1 2023-10-07      06       quantile   
 8 2023-10-14     wk inc flu hosp      -1 2023-10-07      06       quantile   
 9 2023-10-14     wk inc flu hosp      -1 2023-10-07      06       quantile   
10 2023-10-14     wk inc flu hosp      -1 2023-10-07      06       quantile   
# ℹ more rows
# ℹ 3 more variables: output_type_id <chr>, value <dbl>, model_id <chr>

Querying hubs for model ouput data for the latest n reference dates

I focus on a typical query for collecting data destined for downstream ensembling and visualization. The query is to get the latest n reference dates for a given target type.

Internal functions

I create a couple of internal functions for getting the latest n reference dates:

  • latest_n_ref_dates - Returns the latest n reference as a one column tibble.
  • latest_n_ref_dates_vec - Returns the latest n reference as a vector.
latest_n_ref_dates <- function(hub_con, n = 6L) {
  arrange(hub_con, desc(reference_date)) |>
    distinct(reference_date) |>
    head(n) |>
    collect()
}


latest_n_ref_dates_vec <- function(hub_con, n = 6L) {
  arrange(hub_con, desc(reference_date)) |>
    distinct(reference_date) |>
    head(n) |>
    collect() |>
    pull()
}

Query functions

The two functions that actually perform the query via two different strategies are:

  • get_latest_inner_join Gets the latest n reference dates via inner join
  • get_latest_filter Gets a vector of the latest n reference dates first and then retunrs hub data via filtering for ref dates

Functions also allow for additional filtering for target type or for returning data for all target types

get_latest_inner_join <- function(hub_con, target_type = "wk inc flu hosp", n = 6L) {
  if (target_type == "all") {
    # Only filter for ref dates
    return(
      inner_join(
        hub_con,
        latest_n_ref_dates(hub_con, n = n),
        by = "reference_date"
      ) |>
        collect()
    )
  }
  # Filter for ref dates and target type
  inner_join(
    filter(hub_con, target %in% target_type),
    latest_n_ref_dates(hub_con, n = n),
    by = "reference_date"
  ) |>
    collect()
}

get_latest_filter <- function(hub_con, target_type = "wk inc flu hosp", n = 6L) {
  ref_dates <- latest_n_ref_dates_vec(hub_con, n = n)

  if (target_type == "all") {
    # Only filter for ref dates
    return(
      filter(
        hub_con,
        reference_date %in% ref_dates
      ) |>
        collect()
    )
  }
  # Filter for ref dates and target type
  filter(
    hub_con,
    reference_date %in% ref_dates,
    target %in% target_type
  ) |>
    collect()
}

Benchmarks

Vary number of rounds

For our benchmarks, we vary the number of n (number of latest rounds requested) from 1 to 18.

We also vary whether we are querying for all target types or for a single target type.

bp <- bench::press(
  n = c(1L, 3L, 6L, 10L, 18L),
  target_type = c("wk inc flu hosp", "all"),
  {
    bench::mark(
      std_hub_csv_filter = get_latest_filter(baseline_csv_hub_con,
        n = n,
        target_type = target_type
      ),
      std_hub_csv_inner_join = get_latest_inner_join(baseline_csv_hub_con,
        n = n,
        target_type = target_type
      ),
      std_hub_parquet_filter = get_latest_filter(baseline_parquet_hub_con,
        n = n,
        target_type = target_type
      ),
      std_hub_parquet_inner_join = get_latest_inner_join(baseline_parquet_hub_con,
        n = n,
        target_type = target_type
      ),
      target_parquet_filter = get_latest_filter(target_parquet_con,
        n = n,
        target_type = target_type
      ),
      ref_date_parquet_filter = get_latest_filter(ref_date_parquet_con,
        n = n,
        target_type = target_type
      ),
      target_parquet_inner_join = get_latest_inner_join(target_parquet_con,
        n = n,
        target_type = target_type
      ),
      ref_date_parquet_inner_join = get_latest_inner_join(ref_date_parquet_con,
        n = n,
        target_type = target_type
      ),
      target_csv_filter = get_latest_filter(target_csv_con,
        n = n,
        target_type = target_type
      ),
      ref_date_csv_filter = get_latest_filter(ref_date_csv_con,
        n = n,
        target_type = target_type
      ),
      target_csv_inner_join = get_latest_inner_join(target_csv_con,
        n = n,
        target_type = target_type
      ),
      ref_date_csv_inner_join = get_latest_inner_join(ref_date_csv_con,
        n = n,
        target_type = target_type
      ),
      duckDB = get_latest_filter(model_out_db,
        n = n,
        target_type = target_type
      ),
      check = FALSE
    )
  }
)
Running with:
       n target_type    
 1     1 wk inc flu hosp
 2     3 wk inc flu hosp
Warning: Some expressions had a GC in every iteration; so filtering is
disabled.
 3     6 wk inc flu hosp
Warning: Some expressions had a GC in every iteration; so filtering is
disabled.
 4    10 wk inc flu hosp
Warning: Some expressions had a GC in every iteration; so filtering is
disabled.
 5    18 wk inc flu hosp
Warning: Some expressions had a GC in every iteration; so filtering is
disabled.
 6     1 all            
Warning: Some expressions had a GC in every iteration; so filtering is
disabled.
 7     3 all            
Warning: Some expressions had a GC in every iteration; so filtering is
disabled.
 8     6 all            
Warning: Some expressions had a GC in every iteration; so filtering is
disabled.
 9    10 all            
Warning: Some expressions had a GC in every iteration; so filtering is
disabled.
10    18 all            
Warning: Some expressions had a GC in every iteration; so filtering is
disabled.
plot(bp)
Loading required namespace: tidyr

APPENDIX - Creating re-partitioned hubs

The basis for the re-partitioned hub is the current FluSight hub. The hub is available at: https://github.com/cdcepi/FluSight-forecast-hub

The Flusight hub was cloned and placed in the same parent directory as this benchmarking repository (i.e. is a sibling directory to this repository).

The hub was cloned and commit 6df410 was checked out to create a new branch “temp” containing all git history up to that commit.

The hub was then re-partitioned by reference_date or target and saved as CSV or Parquet formatted hub.

The hub was also re-partitioned by model_id and reference_date and saved as Parquet to compare a parquet version of a std hub.

The standard hub was also copied to the re-partitioned hub directory.

Finally, the hub was also converted to a DuckDB database file.

library(arrow)
library(hubUtils)
library(dplyr)

# Create a new branch ("temp") containing all git history up to a specific commit ("6df410").
commit <- git4r::git_get(hash = "6df410")
git4r::git_checkout(
  commit = commit,
  onto_new_branch = "temp"
)

# Checkout "temp" branch before connecting to std hub and collecting all data
hub_con <- connect_hub(hub_path = ".")
hub_data <- hub_con |> collect()

# Function to repartition hub
repartition_hub <- function(partition = "reference_date", format = "csv",
                            repartitioned_hub_dir = here::here(
                              "../partition-benchmarks/repartioned_hubs"
                            )) {
  fs::dir_create(repartitioned_hub_dir)

  hub_name <- paste("flusight-hub", paste(partition, collapse = "-"), format, sep = "_")
  part_hub_path <- fs::path(repartitioned_hub_dir, hub_name, "model-output")
  part_hub_config_path <- fs::path(repartitioned_hub_dir, hub_name, "hub-config")
  fs::dir_create(part_hub_config_path)

  write_dataset(hub_data,
    format = format,
    path = part_hub_path,
    partitioning = partition
  )

  fs::dir_copy("hub-config", part_hub_config_path, overwrite = TRUE)
}

# SET PATH TO REPARTITIONED HUB DIRECTORY
repartitioned_hub_dir <- here::here("../partition-benchmarks/repartioned_hubs")


# Create repartitioned hubs
# Partitions by reference_date - csv
repartition_hub()
# Partitions by target - csv
repartition_hub("target")
# Partitions by reference_date - parquet
repartition_hub(format = "parquet")
# Partitions by target - parquet
repartition_hub("target", format = "parquet")
# Partitions by model_id - parquet
repartition_hub("model_id", format = "parquet")
# Partitions by model_id & reference_date - parquet - reflects std hub partitioning
# but as parquet
repartition_hub(c("model_id", "reference_date"), format = "parquet")

# Copy std hub
fs::dir_copy("hub-config", fs::path(repartitioned_hub_dir, "flusight-hub_std_csv", "hub-config"), overwrite = TRUE)
fs::dir_copy("model-output", fs::path(repartitioned_hub_dir, "flusight-hub_std_csv", "model-output"), overwrite = TRUE)


library("duckdb")
# to use a database file (not shared between processes)
con <- dbConnect(duckdb(), dbdir = fs::path(repartitioned_hub_dir, "flusight-hub.duckdb"), read_only = FALSE)
dbWriteTable(con, "model_output", hub_data, overwrite = TRUE)
dbDisconnect(con, shutdown = TRUE)

Session Info

devtools::session_info()
─ Session info ───────────────────────────────────────────────────────────────
 setting  value
 version  R version 4.3.2 (2023-10-31)
 os       macOS Sonoma 14.1.2
 system   aarch64, darwin20
 ui       X11
 language (EN)
 collate  en_US.UTF-8
 ctype    en_US.UTF-8
 tz       Europe/Athens
 date     2024-02-14
 pandoc   3.1.1 @ /Applications/RStudio.app/Contents/Resources/app/quarto/bin/tools/ (via rmarkdown)

─ Packages ───────────────────────────────────────────────────────────────────
 package     * version    date (UTC) lib source
 arrow         14.0.0.2   2023-12-02 [1] CRAN (R 4.3.1)
 askpass       1.2.0      2023-09-03 [1] CRAN (R 4.3.0)
 assertthat    0.2.1      2019-03-21 [1] CRAN (R 4.3.0)
 backports     1.4.1      2021-12-13 [1] CRAN (R 4.3.0)
 beeswarm      0.4.0      2021-06-01 [1] CRAN (R 4.3.0)
 bench         1.1.3      2023-05-04 [1] CRAN (R 4.3.0)
 bit           4.0.5      2022-11-15 [1] CRAN (R 4.3.0)
 bit64         4.0.5      2020-08-30 [1] CRAN (R 4.3.0)
 blob          1.2.4      2023-03-17 [1] CRAN (R 4.3.0)
 brio          1.1.4      2023-12-10 [1] CRAN (R 4.3.1)
 bslib         0.6.1      2023-11-28 [1] CRAN (R 4.3.1)
 cachem        1.0.8      2023-05-01 [1] CRAN (R 4.3.0)
 checkmate     2.3.1      2023-12-04 [1] CRAN (R 4.3.1)
 cli           3.6.2      2023-12-11 [1] CRAN (R 4.3.1)
 colorspace    2.1-0      2023-01-23 [1] CRAN (R 4.3.0)
 credentials   2.0.1      2023-09-06 [1] CRAN (R 4.3.0)
 crosstalk     1.2.1      2023-11-23 [1] CRAN (R 4.3.1)
 DBI         * 1.2.0      2023-12-21 [1] CRAN (R 4.3.1)
 dbplyr        2.4.0      2023-10-26 [1] CRAN (R 4.3.1)
 devtools    * 2.4.5      2022-10-11 [1] CRAN (R 4.3.0)
 digest        0.6.34     2024-01-11 [1] CRAN (R 4.3.1)
 dplyr       * 1.1.4      2023-11-17 [1] CRAN (R 4.3.1)
 DT            0.31       2023-12-09 [1] CRAN (R 4.3.1)
 duckdb      * 0.9.2-1    2023-11-28 [1] CRAN (R 4.3.1)
 ellipsis      0.3.2      2021-04-29 [1] CRAN (R 4.3.0)
 evaluate      0.23       2023-11-01 [1] CRAN (R 4.3.1)
 fansi         1.0.6      2023-12-08 [1] CRAN (R 4.3.1)
 farver        2.1.1      2022-07-06 [1] CRAN (R 4.3.0)
 fastmap       1.1.1      2023-02-24 [1] CRAN (R 4.3.0)
 fs            1.6.3      2023-07-20 [1] CRAN (R 4.3.0)
 generics      0.1.3      2022-07-05 [1] CRAN (R 4.3.0)
 ggbeeswarm    0.7.2      2023-04-29 [1] CRAN (R 4.3.0)
 ggplot2       3.4.4      2023-10-12 [1] CRAN (R 4.3.1)
 glue          1.7.0      2024-01-09 [1] CRAN (R 4.3.1)
 gtable        0.3.4      2023-08-21 [1] CRAN (R 4.3.0)
 htmltools     0.5.7      2023-11-03 [1] CRAN (R 4.3.1)
 htmlwidgets   1.6.4      2023-12-06 [1] CRAN (R 4.3.1)
 httpuv        1.6.14     2024-01-26 [1] CRAN (R 4.3.1)
 hubUtils    * 0.0.0.9018 2024-01-30 [1] local
 jquerylib     0.1.4      2021-04-26 [1] CRAN (R 4.3.0)
 jsonlite      1.8.8      2023-12-04 [1] CRAN (R 4.3.1)
 knitr         1.45       2023-10-30 [1] CRAN (R 4.3.1)
 labeling      0.4.3      2023-08-29 [1] CRAN (R 4.3.0)
 later         1.3.2      2023-12-06 [1] CRAN (R 4.3.1)
 lifecycle     1.0.4      2023-11-07 [1] CRAN (R 4.3.1)
 magrittr      2.0.3      2022-03-30 [1] CRAN (R 4.3.0)
 memoise       2.0.1      2021-11-26 [1] CRAN (R 4.3.0)
 mime          0.12       2021-09-28 [1] CRAN (R 4.3.0)
 miniUI        0.1.1.1    2018-05-18 [1] CRAN (R 4.3.0)
 munsell       0.5.0      2018-06-12 [1] CRAN (R 4.3.0)
 openssl       2.1.1      2023-09-25 [1] CRAN (R 4.3.1)
 pak         * 0.7.1      2023-12-11 [1] CRAN (R 4.3.1)
 pillar        1.9.0      2023-03-22 [1] CRAN (R 4.3.0)
 pkgbuild      1.4.3      2023-12-10 [1] CRAN (R 4.3.1)
 pkgconfig     2.0.3      2019-09-22 [1] CRAN (R 4.3.0)
 pkgload       1.3.3      2023-09-22 [1] CRAN (R 4.3.1)
 profmem       0.6.0      2020-12-13 [1] CRAN (R 4.3.0)
 profvis       0.3.8      2023-05-02 [1] CRAN (R 4.3.0)
 promises      1.2.1      2023-08-10 [1] CRAN (R 4.3.0)
 purrr         1.0.2      2023-08-10 [1] CRAN (R 4.3.0)
 R6            2.5.1      2021-08-19 [1] CRAN (R 4.3.0)
 Rcpp          1.0.12     2024-01-09 [1] CRAN (R 4.3.1)
 remotes       2.4.2.1    2023-07-18 [1] CRAN (R 4.3.0)
 reprex      * 2.0.2      2022-08-17 [1] CRAN (R 4.3.0)
 rlang         1.1.3      2024-01-10 [1] CRAN (R 4.3.1)
 rmarkdown     2.25       2023-09-18 [1] CRAN (R 4.3.1)
 rstudioapi    0.15.0     2023-07-07 [1] CRAN (R 4.3.0)
 sass          0.4.8      2023-12-06 [1] CRAN (R 4.3.1)
 scales        1.3.0      2023-11-28 [1] CRAN (R 4.3.1)
 sessioninfo   1.2.2      2021-12-06 [1] CRAN (R 4.3.0)
 shiny         1.8.0      2023-11-17 [1] CRAN (R 4.3.1)
 stringi       1.8.3      2023-12-11 [1] CRAN (R 4.3.1)
 stringr       1.5.1      2023-11-14 [1] CRAN (R 4.3.1)
 sys           3.4.2      2023-05-23 [1] CRAN (R 4.3.0)
 testthat    * 3.2.1      2023-12-02 [1] CRAN (R 4.3.1)
 tibble        3.2.1      2023-03-20 [1] CRAN (R 4.3.0)
 tidyr         1.3.1      2024-01-24 [1] CRAN (R 4.3.1)
 tidyselect    1.2.0      2022-10-10 [1] CRAN (R 4.3.0)
 urlchecker    1.0.1      2021-11-30 [1] CRAN (R 4.3.0)
 usethis     * 2.2.2      2023-07-06 [1] CRAN (R 4.3.0)
 utf8          1.2.4      2023-10-22 [1] CRAN (R 4.3.1)
 vctrs         0.6.5      2023-12-01 [1] CRAN (R 4.3.1)
 vipor         0.4.7      2023-12-18 [1] CRAN (R 4.3.1)
 withr         3.0.0      2024-01-16 [1] CRAN (R 4.3.1)
 xfun          0.41       2023-11-01 [1] CRAN (R 4.3.1)
 xtable        1.8-4      2019-04-21 [1] CRAN (R 4.3.0)
 yaml          2.3.8      2023-12-11 [1] CRAN (R 4.3.1)

 [1] /Library/Frameworks/R.framework/Versions/4.3-arm64/Resources/library

──────────────────────────────────────────────────────────────────────────────