Free Access Week - Data Engineering + BI
Data Engineering and BI courses are free this week!
Free Access Week - Jun 2-8

fileplyr (version 0.2.0)

fileply: fileply

Description

performs chunk processing or split-apply-combine on the data in a delimited file (example: CSV).

Usage

fileply(file, groupby, fun = identity, collect = "none",
  temploc = getwd(), nbins = 10, chunk = 50000, spill = 1e+06,
  cores = 1, buffer = 1e+09, keepddf = FALSE, ...)

Arguments

file
(string) path to input delimited file
groupby
(character vector) Columns names to used to split the data(if missing, fun is applied on each chunk)
fun
(object of class function) function to apply on each subset after the split
collect
(string) Collect the result as list or dataframe or none. none keeps the resulting ddo on disk.
temploc
(string) Path where intermediary files are kept
nbins
(positive integer) Number of directories into which the distributed dataframe (ddf) or distributed data object (ddo) is distributed
chunk
(positive integer) Number of rows of the file to be read at a time
spill
(positive integer) Maximum number of rows of any subset resulting from split
cores
(positive integer) Number of cores to be used in parallel
buffer
(positive integer) Size of batches of key-value pairs to be passed to the map OR Size of the batches of key-value pairs to flush to intermediate storage from the map output OR Size of the batches of key-value pairs to send to the reduce
keepddf
(flag) whether to save the distributed dataframe (on the disk)
...
Arguments to be passed to data.table function asis.

Value

list or a dataframe or a TRUE(when collect is 'none').

Details

  • Split and Apply Stage: The variables in groupby are used to split the data and load only the subset(possibly many if multiple cores are in action) into the memory. If groupby is missing, chunkwise processing is performed on each subset of the distributed dataframe. A user defined fun is applied and results are written to a distributed object(list or a KV pairs) on disk.
  • Combine Stage: The distributed data object(ddo) is read into memory depending on collect argument. The default is set to 'none' which would not the data back into memory.
Memory usage: While processing heavy files(many times the RAM size), each core might hold a maximum of 800 MB to 1GB of memory overhead without accounting for the memory used by the user defind function. Memory usage depending on size of the subset, how many times it is copied by the user function, how frequently is gc called. Using appropriate number of cores keeps memory utilization in check. Setting a smaller buffer value keeps memory usage low, see localDiskControl, but makes the execution slower.

Examples

Run this code
# split-apply-combine
write.table(mtcars, "mtcars.csv", row.names = FALSE, sep = ",")
temp <- fileply(file     = "mtcars.csv"
             , groupby = c("carb", "gear")
             , fun     = identity
             , collect = "list"
             , sep     =  ","
             , header  = TRUE
             )
temp
unlink("mtcars.csv")

# chunkwise processing
write.table(mtcars, "mtcars.csv", row.names = FALSE, sep = ",")
temp <- fileply(file     = "mtcars.csv"
             , chunk   = 10
             , fun     = function(x){list(nrow(x))}
             , collect = "dataframe"
             , sep     =  ","
             , header  = TRUE
             )
temp
unlink("mtcars.csv")

# example for collect='none'
write.table(mtcars, "mtcars.csv", row.names = FALSE, sep = ",")
outdir <- utils::capture.output(temp <- fileply(file      = "mtcars.csv"
                                                , groupby = c("carb", "gear")
                                                , fun     = identity
                                                , sep     =  ","
                                                , header  = TRUE
                                                )
                                , file = NULL
                                , type = "message"
                                )
outdir <- gsub("Output Directory: ", "", outdir[5])
diskKV <- datadr::ddo(datadr::localDiskConn(outdir))
diskKV
diskKV[[1]]
unlink(outdir, recursive = TRUE)
unlink("mtcars.csv")

Run the code above in your browser using DataLab