Learn R Programming

runjags (version 0.9.9-2)

xgrid.run: Remote execution of user-specified R functions on Apple Xgrid distributed computing clusters

Description

Allows arbitrary R code to be executed on Apple Xgrid distributed computing clusters and the results returned to the R session of the user. Jobs can either be run synchronously (the process will wait for the model to complete before returning the results) or asynchronously (the process will terminate on submission of the job and results are retrieved at a later time). Access to an Xgrid cluster with R (along with all packages required by the function) installed is required. Due to the dependance on Xgrid software to perform the underlying submission and retrieval of jobs, these functions can only be used on machines running Mac OS X. Further details of required environmental variables and the optional mgrid script to enable multi-task jobs can be found in the details section.

'xgrid.run' submits jobs to Xgrid that execute the function provided over the number of iterations specified, then intermittently retrieves the status of the job(s) and, if finished, retrieving and returning the results as an R list object.

'xgrid.submit' submits the job to xgrid, and returns the name of the started job (this is a convinience wrapper for xgrid.run with submitandstop=TRUE).

'xgrid.results' returns the results of a job started using 'xgrid.submit' in the current working directory. If the job is not complete the function will return the status of the job, or the results for completed threads (without deleting the job) if partial.retrieve=TRUE 'xapply' is a convinience wrapper for 'xgrid.run' which takes arguments akin to lapply

Usage

xgrid.run(f=function(iteration){}, niters, 
   object.list=list(), file.list=character(0), 
   threads=min(niters,100), arguments=as.list(1:niters), 
   jobname=NA, wait.interval="10 min", 
   xgrid.method=if(threads==1) 'simple' else 
   if(!file.exists(Sys.which('mgrid'))) 'separatejobs' 
   else 'separatetasks', Rpath='/usr/bin/R', Rbuild='64', 
   cleanup = TRUE, submitandstop = FALSE, tempdir=!submitandstop, 
   keep.files = FALSE, show.output = TRUE, max.filesize="1GB", 
   sub.app=if(!file.exists(Sys.which('mgrid'))) 
   'xgrid -job submit -in "$indir"' 
   else 'mgrid -t $ntasks -i "$indir"', sub.options="", 
   sub.command=paste(sub.app, sub.options, '"$cmd"', sep=' '), 
   ...)

xgrid.submit(f=function(iteration){}, niters, object.list=list(), file.list=character(0), threads=min(niters,100), arguments=as.list(1:niters), jobname=NA, xgrid.method=if(threads==1) 'simple' else if(!file.exists(Sys.which('mgrid'))) 'separatejobs' else 'separatetasks', Rpath='/usr/bin/R', Rbuild='64', show.output = TRUE, max.filesize="1GB", sub.app=if(!file.exists(Sys.which('mgrid'))) 'xgrid -job submit -in "$indir"' else 'mgrid -t $ntasks -i "$indir"', sub.options="", sub.command=paste(sub.app, sub.options, '"$cmd"', sep=' '), ...)

xgrid.results(jobname, partial.retrieve=FALSE, cleanup=!partial.retrieve, keep.files=FALSE, show.output=TRUE) xapply(X, FUN, xgrid.options=list(), ...)

Arguments

f
the function to be iterated over on Xgrid. This must take at least 1 argument, the first of which represents the value of the 'arguments' list to be passed to the function for that iteration, which is the iteration number unless 'arguments' (or
niters
the total number of iterations over which to evaluate the function f. This can be less than the number of threads, in which case multiple iterations are evaluated serially as part of the same task. No default.
object.list
a named list of objects that will be copied to the global environment on Xgrid and so will be visible inside the function. Alternatively, this can be a character vector of objects, that will be looked for in the global environment, rather than a named li
file.list
a vector of filenames representing files in the current working directory that will be copied to the working directory of the executed function. This allows R code to be source()d, datasets to be loaded, and compiled code to be dynamically li
threads
the number of threads to generate for the job. Threads is taken to mean jobs if xgrid.method is 'separatejobs' or tasks if xgrid.method is 'separatetasks'. Each thread is sent to a separate node for execution, so the more threads there are the
arguments
a list of values to be passed as the first argument to the function, with each element of the list specifying the value at that iteration. Default is as.list(1:niters) which passes only the iteration number to the function.
jobname
for all functions except xgrid.results.jags, the jobname can be provided to make identification of the job using Xgrid Admin easier. If none is provided, then one is generated using a combination of the username and hostname of the submitting ma
wait.interval
when running xgrid jobs synchronously, the waiting time between retrieving the status of the job. If the job is found to be finished on retrieving the status then results are returned, otherwise the function waits for 'wait.interval' before r
xgrid.method
the method of submitting the work to Xgrid - one of 'simple', 'separatejobs' or 'separatetasks'. The former runs all chains on a single node, whereas 'separatejobs' runs all chains as individual xgrid jobs and 'separatetasks' runs all chains as
Rpath
the path to the R executable on the xgrid machines. If not all machines on the xgrid cluster have R (or a required package) installed then it is possible to use an ART script to ensure the job is sent to only machines that do - see the examples s
Rbuild
the preferred binary of R to invoke. '64' results in '{Rpath}64' (if it exists), '32' in '{Rpath}32' (if it exists) and '' (or either of '32' or '64' if they are not found) results in {Rpath}. Notice that this indicates a preference, not a cer
partial.retrieve
for xgrid.results, option to retrieve results of partially completed jobs. By default makes cleanup FALSE. Default TRUE.
cleanup
option to delete the job(s) from Xgrid after retrieving result.
submitandstop
controls whether job should be run synchronously (submitandstop=FALSE), in which case the process will wait for the model to complete before returning the results, or asynchronously (submitandstop=TRUE), in which case the process will terminate o
tempdir
for xgrid.run, option to use the temporary directory as specified by the system rather than creating files in the working directory. Any files created in the temporary directory are removed when the function exits. A temporary directory cannot
keep.files
option to keep the folder with files needed to run the job rather than deleting it, or copy the folder to the working directory before exiting if tempdir=TRUE. This may be useful for attempting to bug fix failing jobs. Default FALSE.
show.output
option to print the output of the function (obtained using cat, writeLine or print for example) at each iteration after retrieving the job(s) from xgrid. If FALSE, the output is suppressed. Default TRUE.
max.filesize
the maximum total size of the objects produced by the function for each thread if xgrid.method=separatejobs, or for the entire job if xgrid.method=separatetasks. This is a failsafe designed to prevent attempted transfer of huge files bringing th
sub.app
the submission application or script to use for job running/submission. The inbuilt Xgrid application supports most options, but greater functionality is provided by the mgrid script (see the details section for more information and installation instruct
sub.options
one or more option flags to be passed through to the submission application (as a character string). Examples include ART scripts, email on job completion, and when using the mgrid script many other possibilities (see the details section). When providin
sub.command
the actual command to be executed using system() to submit the job. Changing this results in sub.app and sub.options being ignored, and is probably the best option to use for custom submission scripts (see the sub.app argument for the requirements for cu
X
for xapply, a vector (atomic or list) over which to apply the function provided. Equivalent to 'arguments' for xgrid.run, with niters = length(X).
FUN
for xapply, the function to be passed to xgrid.run as 'f'.
xgrid.options
for xapply, any arguments (with the exception of 'f', 'niters' and 'arguments' which are ignored) to be passed to xgrid.run.
...
additional arguments to be passed to the function provided.

Value

  • For xgrid.submit, a list containing the jobname (which will be required by xgrid.results to retrieve the job) and the job ID(s) for use with the xgrid command line facilities. For xgrid.run and xgrid.results, the output of the function over all iterations is returned as a list, with each element of the list representing the results at each iteration. If the function returned an error, then the error will be held in the list as the return value at the iteration that returned the error. If the function returns an object that exceeds the 'max.filesize' when combined with the results for other iterations in that job (or greater than max.filesize/threads for multi-task jobs), the results for that thread are replaced with an error message (this is to prevent the xgrid controller crashing due to transferring large files). The xapply function returns as xgrid.run (or xgrid.submit if xgrid.options=list(submitandstop=TRUE) in which case the results can be retrieved using xgrid.results).

Details

These functions allow JAGS models to be run on Xgrid distributed computing clusters from within R using the same syntax as required to run the models locally. All the functionality could be replicated by saving all necessary objects to files and using the Xgrid command line utility to submit and retrieve the job manually; these functions merely provide the convenience of not having to do this manually. Xgrid support is only available on Mac OS X machines.

The xgrid controller hostname and password must be set as environmental variables. The command line version of R knows about environmental variables set in the .profile file, but unfortunately the GUI version does not and requires them to be set from within R using:

Sys.setenv(XGRID_CONTROLLER_HOSTNAME="")

Sys.setenv(XGRID_CONTROLLER_PASSWORD="")

(These lines could be copied into your .Rprofile file for a 'set and forget' solution)

All functions can be run using the built-in xgrid commands, however some added functionality (including multi-tasks jobs to enable the 'separatetasks' method) is provided by the 'mgrid.sh' BASH shell script which is included with the runjags package (in the 'inst/xgrid' folder for the package source or the 'xgrid' folder for the installed package). More details about this script is given at the top of the mgrid.sh file. To install (optional), see the install.mgrid function.

See Also

xgrid.run.jags for functions to run JAGS models on Xgrid, or run.jags to do so locally.

install.mgrid to install the mgrid script.

mclapply and mcparallel in the multicore package for parallel execution of code over multiple local cores.

Examples

Run this code
# A basic example of synchronous running of code over 100 iterations, 
# split up between 10 tasks (or 10 jobs if mgrid is not installed):

# The function to evaluate:
f <- function(iteration){
	# All objects supplied to object.list will be visible here, but
	# remember to call all necessary libraries within the function
	
	cat("Running iteration", iteration, "\n")
	# Some lengthy code evaluation....
	
	output <- rpois(10, iteration)
	return(output)
}

# Run the function on xgrid for 100 iterations split between 10 machines:
results <- xgrid.run(f, niters=100, threads=10)



# A basic example of xapply to calculate the mean of a list of numbers:

# A list of 3 datasets from which to calculate the mean:
datasets <- list(c(1,5,6,NA), c(9,2,NA,0), c(-1,4,10,20))

# Standard lapply syntax:
results1 <- lapply(datasets, mean, na.rm=TRUE)

# Equivalent xapply syntax:
results2 <- xapply(datasets, mean, 
xgrid.options=list(wait.interval='15s'), na.rm=TRUE)

# Or submit the job:
id <- xapply(datasets, mean, xgrid.options=list(submitandstop=TRUE),
na.rm=TRUE)
# And retrieve the results:
results3 <- xgrid.results(id)



# Any packages required by the function need to be installed on the
# nodes the function is run on.  This function retrieves information
# about the available packages on each of the node names provided:

# The name of one or more nodes to get information about:
nodenames <- c("mynode", "guestnode", "othernode")

# Run the job:
results <- xgrid.run(function(i){
		return(installed.packages()[,'Version'])
	}, 
	niters=length(nodenames), threads=length(nodenames), 
	wait.interval="10 seconds", xgrid.method='separatejobs',
	sub.options=paste("-f -h '", nodenames, "'", sep=""),
	show.output=FALSE)
# Make the names match up to the statistics:
names(results) <- nodenames

# Show the available packages and their versions for each node:
results


# An example of running an Xgrid job within another Xgrid job, using 
# xgrid.submit to submit a job that runs a JAGS model to convergence 
# using xgrid.autorun.jags:

# Create an ART script to make sure that (a) R is installed, 
# (b) JAGS is installed, and (c) the runjags package is installed 
# on the node:
cat('#!/bin/bash

if [ ! -f /usr/bin/R ]; then 
echo 0
exit 0
fi
if [ ! -f /usr/local/bin/jags ]; then 
echo 0
exit 0
fi
/usr/bin/R --slave -e "suppressMessages(r<-require(runjags,quietly=T));cat(r*1,fill=T)"
exit 0
', file='runjagsART.sh')

# Some data etc we will need for the model:
library(runjags)

X <- 1:100
Y <- rnorm(length(X), 2*X + 10, 1)
data <- dump.format(list(X=X, Y=Y, N=length(X)))

# Model in the JAGS format
model <- "model {
for(i in 1 : N){
Y[i] ~ dnorm(true.y[i], precision);
true.y[i] <- (m * X[i]) + c;
}
m ~ dunif(-1000,1000);
c ~ dunif(-1000,1000);
precision ~ dexp(1);
}"

# Get the Xgrid controller hostname and password to be passed 
# to the slave job:
hostname <- Sys.getenv('XGRID_CONTROLLER_HOSTNAME')
password <- Sys.getenv('XGRID_CONTROLLER_PASSWORD')

# The function we are going to call on xgrid:
f <- function(iteration){
	# Make sure the necessary environmental variables are set:
	Sys.setenv(XGRID_CONTROLLER_HOSTNAME=hostname)
	Sys.setenv(XGRID_CONTROLLER_PASSWORD=password)
	
	# Call the library on the node:
	library(runjags)
	
	# Use xgrid.autorun.jags to run 2 chains until convergence:
	results <- xgrid.autorun.jags(model=model, 
		monitor=c("m", "c", "precision"), data=data, n.chains=2, 
		inits=list(list(.RNG.name='base::Wichmann-Hill'), 
		list(.RNG.name='base::Marsaglia-Multicarry')), 
		plots = FALSE, xgrid.method='separatejobs', 
		wait.interval='1 min', jobname='xgridslavejob')
	
	return(results)
}

# Submit the function to xgrid using our ART script to ensure the 
# node can handle the job (the ART script path must be specified as 
# an absolute link as xgrid won't be called in the current working 
# directory, and all paths must be enclosed in quotes to preserve 
# spaces):
name <- xgrid.submit(f, object.list=list(X=X, Y=Y, model=model, 
	data=data, hostname=hostname, password=password), threads=1, 
	niters=1, sub.options=if(!file.exists(Sys.which('mgrid'))) 
	paste('-art "', getwd(), '/runjagsART.sh"', sep='') else 
	paste('-a "', getwd(), '/runjagsART.sh"', sep=''), 
	xgrid.method='simple')
# Cleanup (remove runjagsART file):
unlink('runjagsART.sh')

# Get the results once it is finished:
results <- xgrid.results(name)$iteration.1



# Subit an xgrid job just to see which packages are installed 
# on a particular machine.

# Ensure mgrid is installed:
if(!file.exists(Sys.which('mgrid'))) install.mgrid()

# A function to harvest details of R version and installed packages:
f <- function(i){

archavail <- any(dimnames(installed.packages())[[2]]=='Archs')

# To deal with older versions of R:
if(archavail){
packagesinst <- installed.packages()[,c('Version', 'Archs', 'Built')]
}else{
packagesinst <- installed.packages()[,c('Version', 'OS_type', 'Built')]
}

Rinst <- unlist(R.version[c('version.string', 'arch', 'platform')])
names(Rinst) <- c('Version', 'Archs', 'Built')
return(rbind(R=Rinst, packagesinst))

}

# Or to get more details about a particular package:
g <- function(i){
	p <- library(help='bayescount')
	return(p$info)
}

# Get the information back from 2 specific machines called 'newnode1' 
# and 'newnode2':

results <- xgrid.run(f, niters=2, threads=2, 
sub.options='-h newnode1:newnode2', wait.interval='15 seconds')

Run the code above in your browser using DataLab