Hub model output re-partition benchmarks

Load packages

Install benchmarking dependencies



Connect to test hubs

Define paths to test hubs

Baseline std hubverse hub

baseline_hub_path <- "repartioned_hubs/flusight-hub_std_csv"

Baseline parquet hubs

baseline_parquet_hub_path <- "repartioned_hubs/flusight-hub_model_id-reference_date_parquet/model-output"

Reference Date re-partitioned hubs

Will be connecting using connect_model_output function as connect_hub needs some additional work to correctly connect to re-partitioned hubs.

Hence the paths to the repartitioned hubs point to the model-output directory of each hub.

ref_date_csv_hubpath <- "repartioned_hubs/flusight-hub_reference_date_csv/model-output"
ref_date_parquet_hubpath <- "repartioned_hubs/flusight-hub_reference_date_parquet/model-output"

Target re-partitioned hubs

target_csv_hubpath <- "repartioned_hubs/flusight-hub_target_csv/model-output"
target_parquet_hubpath <- "repartioned_hubs/flusight-hub_target_parquet/model-output"

DuckDB database path

duckdb_hub_path <- fs::path("repartioned_hubs", "flusight-hub.duckdb")

Connect to hubs

Baseline hub

baseline_csv_hub_con <- connect_hub(baseline_hub_path)
── <hub_connection/FileSystemDataset> ──
• hub_name: "US CDC FluSight"
• hub_path: 'repartioned_hubs/flusight-hub_std_csv'
• file_format: "csv(613/613)"
• file_system: "LocalFileSystem"
• model_output_dir: "repartioned_hubs/flusight-hub_std_csv/model-output"
• config_admin: 'hub-config/admin.json'
• config_tasks: 'hub-config/tasks.json'
── Connection schema 
hub_connection with 613 csv files
reference_date: date32[day]
target: string
horizon: int32
target_end_date: date32[day]
location: string
output_type: string
output_type_id: string
value: double
model_id: string

Total number of rows in hub:

baseline_csv_hub_con |>
  collect() |>
[1] 3355802
baseline_parquet_hub_con <- arrow::open_dataset(baseline_parquet_hub_path)
FileSystemDataset with 613 Parquet files
target: string
horizon: int32
target_end_date: date32[day]
location: string
output_type: string
output_type_id: string
value: double
model_id: string
reference_date: string

Reference Date hubs

ref_date_csv_con <- arrow::open_dataset(ref_date_csv_hubpath,
  format = "csv"
FileSystemDataset with 19 csv files
target: string
horizon: int64
target_end_date: date32[day]
location: string
output_type: string
output_type_id: string
value: double
model_id: string
reference_date: string
ref_date_parquet_con <- arrow::open_dataset(ref_date_parquet_hubpath)
FileSystemDataset with 19 Parquet files
target: string
horizon: int32
target_end_date: date32[day]
location: string
output_type: string
output_type_id: string
value: double
model_id: string
reference_date: string

Target hubs

target_csv_con <- arrow::open_dataset(target_csv_hubpath, format = "csv")
FileSystemDataset with 2 csv files
reference_date: date32[day]
horizon: int64
target_end_date: date32[day]
location: string
output_type: string
output_type_id: string
value: double
model_id: string
target: string
target_parquet_con <- arrow::open_dataset(target_parquet_hubpath)
FileSystemDataset with 2 Parquet files
reference_date: date32[day]
horizon: int32
target_end_date: date32[day]
location: string
output_type: string
output_type_id: string
value: double
model_id: string
target: string

Connect to DuckDB model-output table

duckdb_con <- dbConnect(duckdb(), dbdir = duckdb_hub_path, read_only = TRUE)
# on.exit(dbDisconnect(duckdb_con))

model_out_db <- tbl(duckdb_con, "model_output")
# Source:   table<model_output> [?? x 9]
# Database: DuckDB v0.9.2 [Anna@Darwin 23.1.0:R 4.3.2/repartioned_hubs/flusight-hub.duckdb]
   reference_date target          horizon target_end_date location output_type
   <date>         <chr>             <int> <date>          <chr>    <chr>      
 1 2023-10-14     wk inc flu hosp      -1 2023-10-07      06       quantile   
 2 2023-10-14     wk inc flu hosp      -1 2023-10-07      06       quantile   
 3 2023-10-14     wk inc flu hosp      -1 2023-10-07      06       quantile   
 4 2023-10-14     wk inc flu hosp      -1 2023-10-07      06       quantile   
 5 2023-10-14     wk inc flu hosp      -1 2023-10-07      06       quantile   
 6 2023-10-14     wk inc flu hosp      -1 2023-10-07      06       quantile   
 7 2023-10-14     wk inc flu hosp      -1 2023-10-07      06       quantile   
 8 2023-10-14     wk inc flu hosp      -1 2023-10-07      06       quantile   
 9 2023-10-14     wk inc flu hosp      -1 2023-10-07      06       quantile   
10 2023-10-14     wk inc flu hosp      -1 2023-10-07      06       quantile   
# ℹ more rows
# ℹ 3 more variables: output_type_id <chr>, value <dbl>, model_id <chr>

Querying hubs for model ouput data for the latest n reference dates

I focus on a typical query for collecting data destined for downstream ensembling and visualization. The query is to get the latest n reference dates for a given target type.

Internal functions

I create a couple of internal functions for getting the latest n reference dates:

  • latest_n_ref_dates - Returns the latest n reference as a one column tibble.
  • latest_n_ref_dates_vec - Returns the latest n reference as a vector.
latest_n_ref_dates <- function(hub_con, n = 6L) {
  arrange(hub_con, desc(reference_date)) |>
    distinct(reference_date) |>
    head(n) |>

latest_n_ref_dates_vec <- function(hub_con, n = 6L) {
  arrange(hub_con, desc(reference_date)) |>
    distinct(reference_date) |>
    head(n) |>
    collect() |>

Query functions

The two functions that actually perform the query via two different strategies are:

  • get_latest_inner_join Gets the latest n reference dates via inner join
  • get_latest_filter Gets a vector of the latest n reference dates first and then retunrs hub data via filtering for ref dates

Functions also allow for additional filtering for target type or for returning data for all target types

get_latest_inner_join <- function(hub_con, target_type = "wk inc flu hosp", n = 6L) {
  if (target_type == "all") {
    # Only filter for ref dates
        latest_n_ref_dates(hub_con, n = n),
        by = "reference_date"
      ) |>
  # Filter for ref dates and target type
    filter(hub_con, target %in% target_type),
    latest_n_ref_dates(hub_con, n = n),
    by = "reference_date"
  ) |>

get_latest_filter <- function(hub_con, target_type = "wk inc flu hosp", n = 6L) {
  ref_dates <- latest_n_ref_dates_vec(hub_con, n = n)

  if (target_type == "all") {
    # Only filter for ref dates
        reference_date %in% ref_dates
      ) |>
  # Filter for ref dates and target type
    reference_date %in% ref_dates,
    target %in% target_type
  ) |>


Vary number of rounds

For our benchmarks, we vary the number of n (number of latest rounds requested) from 1 to 18.

We also vary whether we are querying for all target types or for a single target type.

bp <- bench::press(
  n = c(1L, 3L, 6L, 10L, 18L),
  target_type = c("wk inc flu hosp", "all"),
      std_hub_csv_filter = get_latest_filter(baseline_csv_hub_con,
        n = n,
        target_type = target_type
      std_hub_csv_inner_join = get_latest_inner_join(baseline_csv_hub_con,
        n = n,
        target_type = target_type
      std_hub_parquet_filter = get_latest_filter(baseline_parquet_hub_con,
        n = n,
        target_type = target_type
      std_hub_parquet_inner_join = get_latest_inner_join(baseline_parquet_hub_con,
        n = n,
        target_type = target_type
      target_parquet_filter = get_latest_filter(target_parquet_con,
        n = n,
        target_type = target_type
      ref_date_parquet_filter = get_latest_filter(ref_date_parquet_con,
        n = n,
        target_type = target_type
      target_parquet_inner_join = get_latest_inner_join(target_parquet_con,
        n = n,
        target_type = target_type
      ref_date_parquet_inner_join = get_latest_inner_join(ref_date_parquet_con,
        n = n,
        target_type = target_type
      target_csv_filter = get_latest_filter(target_csv_con,
        n = n,
        target_type = target_type
      ref_date_csv_filter = get_latest_filter(ref_date_csv_con,
        n = n,
        target_type = target_type
      target_csv_inner_join = get_latest_inner_join(target_csv_con,
        n = n,
        target_type = target_type
      ref_date_csv_inner_join = get_latest_inner_join(ref_date_csv_con,
        n = n,
        target_type = target_type
      duckDB = get_latest_filter(model_out_db,
        n = n,
        target_type = target_type
      check = FALSE
Loading required namespace: tidyr

APPENDIX - Creating re-partitioned hubs

The basis for the re-partitioned hub is the current FluSight hub. The hub is available at:

The Flusight hub was cloned and placed in the same parent directory as this benchmarking repository (i.e. is a sibling directory to this repository).

The hub was cloned and commit 6df410 was checked out to create a new branch “temp” containing all git history up to that commit.

The hub was then re-partitioned by reference_date or target and saved as CSV or Parquet formatted hub.

The hub was also re-partitioned by model_id and reference_date and saved as Parquet to compare a parquet version of a std hub.

The standard hub was also copied to the re-partitioned hub directory.

Finally, the hub was also converted to a DuckDB database file.


# Create a new branch ("temp") containing all git history up to a specific commit ("6df410").
commit <- git4r::git_get(hash = "6df410")
  commit = commit,
  onto_new_branch = "temp"

# Checkout "temp" branch before connecting to std hub and collecting all data
hub_con <- connect_hub(hub_path = ".")
hub_data <- hub_con |> collect()

# Function to repartition hub
repartition_hub <- function(partition = "reference_date", format = "csv",
                            repartitioned_hub_dir = here::here(
                            )) {

  hub_name <- paste("flusight-hub", paste(partition, collapse = "-"), format, sep = "_")
  part_hub_path <- fs::path(repartitioned_hub_dir, hub_name, "model-output")
  part_hub_config_path <- fs::path(repartitioned_hub_dir, hub_name, "hub-config")

    format = format,
    path = part_hub_path,
    partitioning = partition

  fs::dir_copy("hub-config", part_hub_config_path, overwrite = TRUE)

repartitioned_hub_dir <- here::here("../partition-benchmarks/repartioned_hubs")

# Create repartitioned hubs
# Partitions by reference_date - csv
# Partitions by target - csv
# Partitions by reference_date - parquet
repartition_hub(format = "parquet")
# Partitions by target - parquet
repartition_hub("target", format = "parquet")
# Partitions by model_id - parquet
repartition_hub("model_id", format = "parquet")
# Partitions by model_id & reference_date - parquet - reflects std hub partitioning
# but as parquet
repartition_hub(c("model_id", "reference_date"), format = "parquet")

# Copy std hub
fs::dir_copy("hub-config", fs::path(repartitioned_hub_dir, "flusight-hub_std_csv", "hub-config"), overwrite = TRUE)
fs::dir_copy("model-output", fs::path(repartitioned_hub_dir, "flusight-hub_std_csv", "model-output"), overwrite = TRUE)

# to use a database file (not shared between processes)
con <- dbConnect(duckdb(), dbdir = fs::path(repartitioned_hub_dir, "flusight-hub.duckdb"), read_only = FALSE)
dbWriteTable(con, "model_output", hub_data, overwrite = TRUE)
dbDisconnect(con, shutdown = TRUE)

Session Info

