Learn R Programming

batchtools (version 0.9.0)

submitJobs: Submit Jobs to the Batch Systems

Description

Submits defined jobs to the batch system.

If an additional column “chunk” is found in the table ids, jobs will be grouped accordingly to be executed sequentially on the same slave. The utility function chunkIds can assist in grouping jobs. Jobs are submitted in the order of chunks, i.e. jobs which have chunk number unique(ids$chunk)[1] first, then jobs with chunk number unique(ids$chunk)[2] and so on. If no chunks are provided, jobs are submitted in the order of ids$job.id.

After submitting the jobs, you can use waitForJobs to wait for the termination of jobs or call reduceResultsList/reduceResults to collect partial results. The progress can be monitored with getStatus.

Usage

submitJobs(ids = NULL, resources = list(), reg = getDefaultRegistry())

Arguments

ids
[data.frame or integer] A data.frame (or data.table) with a column named “job.id”. Alternatively, you may also pass a vector of integerish job ids. If not set, defaults to the return value of findNotSubmitted.
resources
[named list] Computational resources for the batch jobs. The elements of this list (e.g. something like “walltime” or “nodes”) depend on your template file. See notes for reserved special resource names. Defaults can be stored in the configuration file by providing the named list default.resources. Settings in resources overwrite those in default.resources.
reg
[Registry] Registry. If not explicitly passed, uses the default registry (see setDefaultRegistry).

Value

[data.table] with columns “job.id” and “chunk”.

Examples

Run this code
### Example 1: Using memory measurement
tmp = makeRegistry(file.dir = NA, make.default = FALSE)

# Toy function which creates a large matrix and returns the column sums
fun = function(n, p) colMeans(matrix(runif(n*p), n, p))

# Arguments to fun:
args = expand.grid(n = c(1e4, 1e5), p = c(10, 50))
print(args)

# Map function to create jobs
ids = batchMap(fun, args = args, reg = tmp)

# Set resources: enable memory measurement
res = list(measure.memory = TRUE)

# Submit jobs using the currently configured cluster functions
submitJobs(ids, resources = res, reg = tmp)

# Retrive information about memory, combine with parameters
info = ijoin(getJobStatus(reg = tmp)[, .(job.id, memory)], getJobPars(reg = tmp))
print(info)

# Combine job info with results -> each job is aggregated using mean()
ijoin(info, reduceResultsDataTable(fun = function(res) list(res = mean(res)), reg = tmp))

### Example 2: Multicore execution on the slave
tmp = makeRegistry(file.dir = NA, make.default = FALSE)

# Function which sleeps 10 seconds, i-times
f = function(i) {
  parallelMap::parallelMap(Sys.sleep, rep(10, i))
}

# Create one job with parameter i=4
ids = batchMap(f, i = 4, reg = tmp)

# Set resources: Use parallelMap in multicore mode with 4 CPUs
# batchtools internally loads the namespace of parallelMap and then
# calls parallelStart() before the job and parallelStop() right
# after the job last job in the chunk terminated.
res = list(pm.backend = "multicore", ncpus = 4)

## Not run: ------------------------------------
# # Submit both jobs and wait for them
# submitJobs(resources = res, reg = tmp)
# waitForJobs(reg = tmp)
# 
# # If successfull, the running time should be ~10s
# getJobTable(reg = tmp)[, .(job.id, time.running)]
# 
# # There should also be a note in the log:
# grepLogs(pattern = "parallelMap", reg = tmp)
## ---------------------------------------------

Run the code above in your browser using DataLab