DSL (version 0.1-6)

MapReduce: MapReduce for "DList" Objects

Description

Interface to apply functions on elements of "DList" objects.

Usage

DLapply( x, FUN, parallel, ..., keep = FALSE )
DMap( x, MAP, parallel, keep = FALSE )
DReduce( x, REDUCE, parallel, ... )

Arguments

x

a "DList" object. Other objects (e.g., lists) will be coerced by as.DList.

FUN

the function to be applied to each element (i.e., the values) of x.

MAP

the function to be applied to each key/value pair in x.

REDUCE

the function to be applied to each key/value pair in x.

optional arguments to FUN or REDUCE.

parallel

logical; should the provided functions applied in parallel? Default: FALSE.

keep

logical; should the current data be kept as a separate revision for further processing later? Default: FALSE.

Value

A "DList".

Details

The MapReduce programming model as defined by Dean and Ghemawat (2008) is as follows: the computation takes a set of input key/value pairs, and produces a set of output key/value pairs. The user expresses the computation as two functions: Map and Reduce. The Map function takes an input pair and produces a set of intermediate key/value pairs. The Reduce function accepts an intermediate key and a set of values for that key (possibly grouped by the MapReduce library). It merges these values together to form a possibly smaller set of values. Typically, just zero or one output value is produced per reduce invocation. Furthermore, data is usually stored on a (distributed) file system which is recognized by the MapReduce library. This allows such a framework to handle lists of values (here objects of class "DList") that are too large to fit in main memory (i.e., RAM).

References

J. Dean and S. Ghemawat (2008). MapReduce: Simplified Data Processing on Large Clusters. Communications of the ACM, 51, 107--113.

Examples

Run this code
# NOT RUN {
dl <- DList( line1 = "This is the first line.",
             line2 = "Now, the second line." )
res <- DLapply( dl, function(x) unlist(strsplit(x, " ")) )
as.list( res )

foo <- function( keypair )
    list( key = paste("next_", keypair$key, sep = ""), value =
    gsub("first", "mapped", keypair$value) )

dlm <- DMap( x = dl, MAP = foo)
## retrieve keys
unlist(DGather(dlm, keys = TRUE, names = FALSE))
## retrieve values
as.list( dlm )
## simple wordcount based on two files:
dir(system.file("examples", package = "DSL"))
## first force 1 chunk per file (set max chunk size to 1 byte):
ds <- DStorage("LFS", tempdir(), chunksize = 1L)
## make "DList" from files, i.e., read contents and store in chunks
dl <- as.DList(system.file("examples", package = "DSL"), DStorage = ds)
## read files
dl <- DMap(dl, function( keypair ){
    list( key = keypair$key, value = tryCatch(readLines(keypair$value),
error = function(x) NA) )
})
## split into terms
splitwords <- function( keypair ){
    keys <- unlist(strsplit(keypair$value, " "))
    mapply( function(key, value) list( key = key, value = value), keys, rep(1L, length(keys)),
            SIMPLIFY = FALSE, USE.NAMES = FALSE )
}
res <- DMap( dl, splitwords )
as.list(res)
## now aggregate by term
res <- DReduce( res, sum )
as.list( res )
# }

Run the code above in your browser using DataCamp Workspace