Reduce shard results without gathering all per-shard returns on the master.
shard_reduce() executes map() over shards in parallel and combines results
using an associative combine() function. Unlike shard_map(), it does not
accumulate all per-shard results on the master; it streams partials as chunks
complete.
shard_reduce(
shards,
map,
combine,
init,
borrow = list(),
out = list(),
workers = NULL,
chunk_size = 1L,
profile = c("default", "memory", "speed"),
mem_cap = "2GB",
recycle = TRUE,
cow = c("deny", "audit", "allow"),
seed = NULL,
diagnostics = TRUE,
packages = NULL,
init_expr = NULL,
timeout = 3600,
max_retries = 3L,
health_check_interval = 10L
)A shard_reduce_result with fields:
value: final accumulator
failures: any permanently failed chunks
diagnostics: run telemetry including reduction stats
queue_status, pool_stats
A shard_descriptor from shards(), or an integer N.
Function executed per shard. Receives shard descriptor as first argument, followed by borrowed inputs and outputs.
Function (acc, value) -> acc used to combine results. Should
be associative for deterministic behavior under chunking.
Initial accumulator value.
Named list of shared inputs (same semantics as shard_map()).
Named list of output buffers/sinks (same semantics as shard_map()).
Number of worker processes.
Shards to batch per worker dispatch (default 1).
Execution profile (same semantics as shard_map()).
Memory cap per worker (same semantics as shard_map()).
Worker recycling policy (same semantics as shard_map()).
Copy-on-write policy for borrowed inputs (same semantics as shard_map()).
RNG seed for reproducibility.
Logical; collect diagnostics (default TRUE).
Additional packages to load in workers.
Expression to evaluate in each worker on startup.
Seconds to wait for each chunk.
Maximum retries per chunk.
Check worker health every N completions.
For performance and memory efficiency, reduction is performed in two stages:
per-chunk partial reduction inside each worker, and
streaming combine of partials on the master.
# \donttest{
res <- shard_reduce(
100L,
map = function(s) sum(s$idx),
combine = function(acc, x) acc + x,
init = 0,
workers = 2
)
pool_stop()
res$value
# }
Run the code above in your browser using DataLab