datadr (version 0.8.4)

mrExec: Execute a MapReduce Job

Description

Execute a MapReduce job

Usage

mrExec(data, setup = NULL, map = NULL, reduce = NULL, output = NULL,
  overwrite = FALSE, control = NULL, params = NULL, packages = NULL,
  verbose = TRUE)

Arguments

data
a ddo/ddf object, or list of ddo/ddf objects
setup
an expression of R code (created using the R command expression) to be run before map and reduce
map
an R expression that is evaluated during the map stage. For each task, this expression is executed multiple times (see details).
reduce
a vector of R expressions with names pre, reduce, and post that is evaluated during the reduce stage. For example reduce = expression(pre = {...}, reduce = {...}, post = {...}). reduce is optional, and if not specified the map output key-valu
output
a "kvConnection" object indicating where the output data should reside (see localDiskConn, hdfsConn). If NULL (default), output will be
overwrite
logical; should existing output location be overwritten? (also can specify overwrite = "backup" to move the existing output to _bak)
control
parameters specifying how the backend should handle things (most-likely parameters to rhwatch in RHIPE) - see rhipeControl and localDiskContr
params
a named list of objects external to the input data that are needed in the map or reduce phases
packages
a vector of R package names that contain functions used in fn (most should be taken care of automatically such that this is rarely necessary to specify)
verbose
logical - print messages about what is being done

Value

  • "ddo" object - to keep it simple. It is up to the user to update or cast as "ddf" if that is the desired result.

Examples

Run this code
# compute min and max Sepal Length by species for iris data
# using a random partitioning of it as input
d <- divide(iris, by = rrDiv(20))

mapExp <- expression({
  lapply(map.values, function(r) {
    by(r, r$Species, function(x) {
      collect(
        as.character(x$Species[1]),
        range(x$Sepal.Length, na.rm = TRUE)
      )
    })
  })
})

reduceExp <- expression(
  pre = {
    rng <- c(Inf, -Inf)
  }, reduce = {
    rx <- unlist(reduce.values)
    rng <- c(min(rng[1], rx, na.rm = TRUE), max(rng[2], rx, na.rm = TRUE))
  }, post = {
    collect(reduce.key, rng)
})

res <- mrExec(d, map = mapExp, reduce = reduceExp)
as.list(res)

Run the code above in your browser using DataLab