This wrapper for parallel::mclapply adds the
following features:
reliably detect if a child process failed with a fatal error or if it was killed.
get tracebacks after non-fatal errors in child processes.
retry on fatal and non-fatal errors.
fail early after non-fatal errors in child processes.
get crash dumps from failed child processes.
capture output from child processes.
track warnings, messages and other conditions signaled in the child processes.
return results from child processes using POSIX shared memory to improve performance.
compress character vectors in results to improve performance.
reproducibly seed all function calls.
display a progress bar.
mclapply(
X,
FUN,
...,
mc.preschedule = TRUE,
mc.set.seed = NA,
mc.silent = FALSE,
mc.cores = getOption("mc.cores", 2L),
mc.cleanup = TRUE,
mc.allow.recursive = TRUE,
affinity.list = NULL,
mc.use.names = TRUE,
mc.allow.fatal = FALSE,
mc.allow.error = FALSE,
mc.retry = 0L,
mc.retry.silent = FALSE,
mc.retry.fixed.seed = FALSE,
mc.fail.early = isFALSE(mc.allow.error) && mc.retry == 0L,
mc.dump.frames = c("partial", "full", "full_global", "no"),
mc.dumpto = ifelse(interactive(), "last.dump", "file://last.dump.rds"),
mc.stdout = c("capture", "output", "ignore"),
mc.warnings = c("m_signal", "signal", "m_output", "output", "m_ignore", "ignore",
"stop"),
mc.messages = c("m_signal", "signal", "m_output", "output", "m_ignore", "ignore"),
mc.conditions = c("signal", "ignore"),
mc.system.time = FALSE,
mc.compress.chars = TRUE,
mc.compress.altreps = c("if_allocated", "yes", "no"),
mc.share.vectors = getOption("bettermc.use_shm", TRUE),
mc.share.altreps = c("no", "yes", "if_allocated"),
mc.share.copy = TRUE,
mc.shm.ipc = getOption("bettermc.use_shm", TRUE),
mc.force.fork = FALSE,
mc.progress = interactive()
)crash_dumps # environment with crash dumps created by mclapply (cf. mc.dumpto)
mclapply returns a list of the same length as X and named by
X. In case of fatal/non-fatal errors and depending on
mc.allow.fatal/mc.allow.error/mc.fail.early, some of
the elements might inherit from
"fatal-error"/"etry-error"/"fail-early-error" and "try-error"
or be NULL.
crash_dumps is an initially empty environment used to store
the return values of mclapply (see below) including
crash dumps in case of non-fatal errors and if
mc.dump.frames != "no" & mc.allow.error == FALSE.
a vector (atomic or list) or an expressions vector. Other
objects (including classed objects) will be coerced by
as.list.
the function to be applied to (mclapply) each
element of X or (mcmapply) in parallel to ....
For mclapply, optional arguments to FUN.
For mcmapply and mcMap, vector or list inputs: see
mapply.
if set to TRUE then the computation is
first divided to (at most) as many jobs are there are cores and then
the jobs are started, each job possibly covering more than one
value. If set to FALSE then one job is forked for each value
of X. The former is better for short computations or large
number of values in X, the latter is better for jobs that
have high variance of completion time and not too many values of
X compared to mc.cores.
TRUE or FALSE are directly handled by
parallel::mclapply. bettermc also
supports two additional values: NA (the default) - seed every
invocation of FUN differently but in a reproducible way based on the
current state of the random number generator in the parent process.
integerish value - call set.seed(mc.set.seed) in the parent and then
continue as if mc.set.seed was NA.
In both (NA- and integerish-) cases, the state of the random number
generator, i.e. the object .Random.seed in the global environment,
is restored at the end of the function to what it was when mclapply
was called. If the random number generator is not yet initialized in the
current session, it is initialized internally (by calling runif(1))
and the resulting state is what gets restored later. In particular, this
means that the seed supplied as mc.set.seed does not seed the
code following the call to mclapply. All this ensures that arguments
like mc.cores, mc.force.fork etc. can be adjusted without
affecting the state of the RNG outside of mclapply.
if set to TRUE then all output on
stdout will be suppressed for all parallel processes forked
(stderr is not affected).
The number of cores to use, i.e. at most how many
child processes will be run simultaneously. The option is
initialized from environment variable MC_CORES if set. Must
be at least one, and parallelization requires at least two cores.
if set to TRUE then all children that have
been forked by this function will be killed (by sending
SIGTERM) before this function returns. Under normal
circumstances mclapply waits for the children to deliver
results, so this option usually has only effect when mclapply
is interrupted. If set to FALSE then child processes are
collected, but not forcefully terminated. As a special case this
argument can be set to the number of the signal that should be used
to kill the children instead of SIGTERM.
Unless true, calling mclapply in a
child process will use the child and not fork again.
a vector (atomic or list) containing the CPU
affinity mask for each element of X. The CPU affinity mask
describes on which CPU (core or hyperthread unit) a given item is
allowed to run, see mcaffinity. To use this parameter
prescheduling has to be deactivated (mc.preschedule = FALSE).
if TRUE and if X is character, use X
as names for the result unless it had names already.
should fatal errors in child processes make
mclapply fail (FALSE, default) or merely trigger a warning
(TRUE)?
TRUE returns objects of classes c("fatal-error", "try-error")
for failed invocations. Hence, in contrast to
parallel::mclapply, it is OK for
FUN to return NULL.
NA returns the same as TRUE, but without a warning.
mc.allow.fatal can also be NULL. In this case NULL is
returned (and a warning is signaled), which corresponds to the behavior of
parallel::mclapply.
should non-fatal errors in FUN make
mclapply fail (FALSE, default) or merely trigger a warning
(TRUE)? In the latter case, errors are stored as class
c("etry-error", "try-error") objects, which contain full tracebacks
and potentially crash dumps (c.f. mc.dump.frames and
etry). NA returns the same as TRUE, but without
a warning.
abs(mc.retry) is the maximum number of retries of
failed applications of FUN in case of both fatal and non-fatal
errors. This is useful if we expect FUN to fail either randomly
(e.g. non-convergence of a model) or temporarily (e.g. database
connections). Additionally, if mc.retry <= -1, the value of
mc.cores is gradually decreased with each retry to a minimum of 1 (2
if mc.force.fork = TRUE). This is useful if we expect failures due
to too many parallel processes, e.g. the Linux Out Of Memory Killer
sacrificing some of the child processes.
The environment variable "BMC_RETRY" indicates the current retry. A value of "0" means first try, a value of "1" first retry, etc.
should the messages indicating both fatal and
non-fatal failures during all but the last retry be suppressed
(TRUE) or not (FALSE, default)?
should FUN for a particular element of
X always be invoked with the same fixed seed (TRUE) or should
a different seed be used on each try (FALSE, default)? Only
effective if mc.set.seed is NA or a number.
should we try to fail fast after encountering the first
(non-fatal) error in FUN? Such errors will be recorded as objects of
classes c("fail-early-error", "try-error").
should we dump.frames on non-fatal
errors in FUN? The default "partial" omits the frames (roughly) up
to the call of FUN. See etry for the other options.
where to save the result including the dumped frames if
mc.dump.frames != "no" & mc.allow.error == FALSE? Either the name of
the variable to create in the environment bettermc::crash_dumps or a
path (prefixed with "file://") where to save the object.
how should standard output from FUN be handled?
"capture" captures the output (in the child processes) and prints it in the
parent process after all calls of FUN of the current try (cf.
mc.retry), such that it can be captured, sinked etc. there. "output"
immediately forwards the output to stdout of the parent; it cannot
be captured, sinked etc. there. "ignore" means that the output is not
forwarded in any way to the parent process. For consistency, all of this
also applies if FUN is called directly from the main process, e.g.
because mc.cores = 1.
how should warnings, messages
and other conditions signaled by FUN be handled? "signal" records
all warnings/messages/conditions (in the child processes) and signals them
in the master process after all calls of FUN of the current
try (cf. mc.retry). "stop" converts warnings (only) into non-fatal
errors in the child processes directly. "output" immediately
forwards the messages to stderr of the parent; no condition is signaled in
the parent process nor is the output capturable/sinkable. "ignore" means
that the conditions are not forwarded in any way to the parent process.
Options prefixed with "m" additionally try to invoke the
"muffleWarning"/"muffleMessage" restart in the child process. Note that, if
FUN is called directly from the main process, conditions might be
signaled twice in the main process, depending on these arguments.
should system.time be used to measure
CPU (and other) times used by the invocations of FUN. If
TRUE, the list returned will have an attribute "system_times", which
itself is a list of the same length as X containing the time
measurements.
should character vectors be compressed using
char_map before returning them from the child process? Can
also be the minimum length of character vectors for which to enable
compression. This generally increases performance because (de)serialization
of character vectors is particularly expensive.
should a character vector be compressed if it is an ALTREP? The default "if_allocated" only does so if the regular representation was already created. This was chosen as the default because in this case is is the regular representation which would be serialized.
should non-character atomic
vectors, S3 objects based hereon and factors be returned from the child
processes using POSIX shared memory (cf. copy2shm)? Can also
be the minimum length of vectors for which to use shared memory. This
generally increases performance because shared memory is a much faster form
of inter process communication compared to pipes and we do not need to
serialize the vectors.
should a non-character vector be returned from the child process using POSIX shared memory if it is an ALTREP?
should the parent process use a vector placed in shared
memory due to mc.share.vectors directly (FALSE) or rather a
copy of it (TRUE)? See copy2shm for the implications.
should the results be returned from the child processes
using POSIX shared memory (cf. copy2shm)?
should it be ensured that FUN is always called in
a forked child process, even if length(X) == 1? This is useful if we
use forking to protect the main R process from fatal errors, memory
corruption, memory leaks etc. occurring in FUN. This feature
requires that mc.cores >= 2 and also ensures that the effective
value for mc.cores never drops to less than 2 as a result of
mc.retry being negative.
should a progress bar be printed to stderr of the parent
process (package progress must be installed)?
The shared memory objects created by
mclapply are named as follows (this may be subject to change):
/bmc_ppid_timestamp_idx_cntr (e.g.
/bmc_21479_1601366973201_16_10), with
the time at which
mclapply was invoked (in milliseconds since epoch; on macOS: seconds
since epoch, due to its 31-character limit w.r.t. POSIX
names).
the index of the current element of X
(1-based).
an internal counter (1-based) referring to all the
objects created due to mc.share.vectors for the current value of
X; a value of 0 is used for the object created due to
mc.shm.ipc.
bettermc::mclapply does not err if copying data to shared memory
fails. It will rather only print a message and return results the usual
way.
POSIX shared memory has (at least) kernel persistence, i.e. it is not
automatically freed due to process termination, except if the object is/was
unlinked. bettermc tries hard to not leave any byte behind, but it
could happen that unlinking is incomplete if the parent process is
terminated while bettermc::mclapply is running.
On Linux you can generally inspect the (not-unlinked) objects currently stored in shared memory by listing the files under /dev/shm.
On Linux, POSIX shared memory
is implemented using a
tmpfs
typically mounted under /dev/shm. If not changed by the
distribution, the default size of it is 50% of physical RAM. It can be
changed (temporarily) by remounting it with a different value for the
size option, e.g. mount -o "remount,size=90%" /dev/shm.
When
allocating a shared memory object of at least
getOption("bettermc.hugepage_limit", 104857600) bytes of size
(default is 100 MiB), we use
madvise(...,
MADV_HUGEPAGE) to request the allocation of
(transparent)
huge pages. For this to have any effect, the
tmpfs
used to implement POSIX shared memory on Linux (typically mounted under
/dev/shm) must be (re)mounted with option huge=advise, i.e.
mount -o remount,huge=advise /dev/shm. (The default is
huge=never, but this might be distribution-specific.)
On Windows, otherwise valid values for various arguments are silently replaced as follows:
mc.cores <- 1L
mc.share.vectors <- Inf
mc.shm.ipc <- FALSE
mc.force.fork <- FALSE
mc.progress <- FALSE
if (mc.stdout == "output") mc.stdout <- "ignore"
if (mc.warnings == "output") mc.warnings <- "ignore"
if (mc.messages == "output") mc.messages <- "ignore"Note: parallel::mclapply demands
mc.cores to be exactly 1 on Windows; bettermc::mclapply sets
it to 1 on Windows.
Furthermore, parallel::mclapply ignores
the following arguments on Windows: mc.preschedule, mc.silent,
mc.cleanup, mc.allow.recursive, affinity.list. For mc.set.seed,
only the values TRUE and FALSE are ignored (by
parallel::mclapply); the other values are
handled by bettermc::mclapply as documented above.
copy2shm, char_map,
parallel::mclapply