Learn R Programming

⚠️There's a newer version (0.8.3) of this package.Take me there.

disk.frame

Introduction

The disk.frame package aims to be the answer to the question: how do I manipulate structured tabular data that doesn’t fit into Random Access Memory (RAM)?

In a nutshell, disk.frame makes use of two simple ideas

  1. split up a larger-than-RAM dataset into chunks and store each chunk in a separate file inside a folder and
  2. provide a convenient API to manipulate these chunks

disk.frame performs a similar role to distributed systems such as Apache Spark, Python’s Dask, and Julia’s JuliaDB.jl for medium data which are datasets that are too large for RAM but not quite large enough to qualify as big data that require distributing processing over many computers to be effective.

Installation

You can install the released version of disk.frame from CRAN with:

install.packages("disk.frame")

And the development version from GitHub with:

# install.packages("devtools")
devtools::install_github("xiaodaigh/disk.frame")

Vignettes and articles

Please see these vignettes and articles about disk.frame

Common questions

a) What is disk.frame and why create it?

disk.frame is an R package that provides a framework for manipulating larger-than-RAM structured tabular data on disk efficiently. The reason one would want to manipulate data on disk is that it allows arbitrarily large datasets to be processed by R. In other words, we go from “R can only deal with data that fits in RAM” to “R can deal with any data that fits on disk”. See the next section.

b) How is it different to data.frame and data.table?

A data.frame in R is an in-memory data structure, which means that R must load the data in its entirety into RAM. A corollary of this is that only data that can fit into RAM can be processed using data.frames. This places significant restrictions on what R can process with minimal hassle.

In contrast, disk.frame provides a framework to store and manipulate data on the hard drive. It does this by loading only a small part of the data, called a chunk, into RAM; process the chunk, write out the results and repeat with the next chunk. This chunking strategy is widely applied in other packages to enable processing large amounts of data in R, for example, see chunkded arkdb, and iotools.

Furthermore, there is a row-limit of 2^31 for data.frames in R; hence an alternate approach is needed to apply R to these large datasets. The chunking mechanism in disk.frame provides such an avenue to enable data manipulation beyond the 2^31 row limit.

c) How is disk.frame different to previous “big” data solutions for R?

R has many packages that can deal with larger-than-RAM datasets, including ff and bigmemory. However, ff and bigmemory restrict the user to primitive data types such as double, which means they do not support character (string) and factor types. In contrast, disk.frame makes use of data.table::data.table and data.frame directly, so all data types are supported. Also, disk.frame strives to provide an API that is as similar to data.frame’s where possible. disk.frame supports many dplyr verbs for manipulating disk.frames.

Additionally, disk.frame supports parallel data operations using infrastructures provided by the excellent future package to take advantage of multi-core CPUs. Further, disk.frame uses state-of-the-art data storage techniques such as fast data compression, and random access to rows and columns provided by the fst package to provide superior data manipulation speeds.

d) How does disk.frame work?

disk.frame works by breaking large datasets into smaller individual chunks and storing the chunks in fst files inside a folder. Each chunk is a fst file containing a data.frame/data.table. One can construct the original large dataset by loading all the chunks into RAM and row-bind all the chunks into one large data.frame. Of course, in practice this isn’t always possible; hence why we store them as smaller individual chunks.

disk.frame makes it easy to manipulate the underlying chunks by implementing dplyr functions/verbs and other convenient functions (e.g. the map.disk.frame(a.disk.frame, fn, lazy = F) function which applies the function fn to each chunk of a.disk.frame in parallel). So that disk.frame can be manipulated in a similar fashion to in-memory data.frames.

e) How is disk.frame different from Spark, Dask, and JuliaDB.jl?

Spark is primarily a distributed system that also works on a single machine. Dask is a Python package that is most similar to disk.frame, and JuliaDB.jl is a Julia package. All three can distribute work over a cluster of computers. However, disk.frame currently cannot distribute data processes over many computers, and is, therefore, single machine focused.

In R, one can access Spark via sparklyr, but that requires a Spark cluster to be set up. On the other hand disk.frame requires zero-setup apart from running install.packages("disk.frame") or devtools::install_github("xiaodaigh/disk.frame").

Finally, Spark can only apply functions that are implemented for Spark, whereas disk.frame can use any function in R including user-defined functions.

f) How is disk.frame different from multidplyr, partools and distributedR?

The packages multidplyr doesn’t seem to be disk-focused and hence does not allow arbitrarily large dataset to be manipulated; the focus on parallel processing is similar to disk.frame though. For partools [https://matloff.wordpress.com/2015/08/05/partools-a-sensible-r-package-for-large-data-sets/], it seems to use it’s own verbs for aggregating data instead of relying on existing verbs provided by data.table and dplyr. The package distributedR hasn’t been updated for a few years and also seems to require its own functions and verbs.

Set-up disk.frame

disk.frame works best if it can process multiple data chunks in parallel. The best way to set-up disk.frame so that each CPU core runs a background worker is by using

setup_disk.frame()

# this allows large datasets to be transferred between sessions
options(future.globals.maxSize = Inf)

The setup_disk.frame() sets up background workers equal to the number of CPU cores; please note that, by default, hyper-threaded cores are counted as one not two.

Alternatively, one may specify the number of workers using setup_disk.frame(workers = n).

Example usage

library(disk.frame)
#> Loading required package: dplyr
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
#> Loading required package: purrr
#> Registered S3 method overwritten by 'pryr':
#>   method      from
#>   print.bytes Rcpp
#> 
#> ## Message from disk.frame:
#> We have 1 workers to use with disk.frame.
#> To change that, use setup_disk.frame(workers = n) or just setup_disk.frame() to use the defaults.
#> 
#> 
#> It is recommend that you run the following immediately to setup disk.frame with multiple workers in order to parallelize your operations:
#> 
#> 
#> ```r
#> # this willl set disk.frame with multiple workers
#> setup_disk.frame()
#> # this will allow unlimited amount of data to be passed from worker to worker
#> options(future.globals.maxSize = Inf)
#> ```
#> 
#> Attaching package: 'disk.frame'
#> The following objects are masked from 'package:purrr':
#> 
#>     imap, imap_dfr, map, map_dfr, map2
#> The following objects are masked from 'package:base':
#> 
#>     colnames, ncol, nrow
library(nycflights13)

# this will setup disk.frame's parallel backend with number of workers equal to the number of CPU cores (hyper-threaded cores are counted as one not two)
setup_disk.frame()
#> The number of workers available for disk.frame is 6
# this allows large datasets to be transferred between sessions
options(future.globals.maxSize = Inf)

# convert the flights data.frame to a disk.frame
# optionally, you may specify an outdir, otherwise, the 
flights.df <- as.disk.frame(nycflights13::flights)

To find out where the disk.frame is stored on disk:

# where is the disk.frame stored
attr(flights.df, "path")
#> [1] "C:\\Users\\RTX2080\\AppData\\Local\\Temp\\Rtmp8wzlwB\\file349c2f8f65d9.df"

A number of data.frame functions are implemented for disk.frame

# get first few rows
head(flights.df)
#>    year month day dep_time sched_dep_time dep_delay arr_time
#> 1: 2013     1   1      517            515         2      830
#> 2: 2013     1   1      533            529         4      850
#> 3: 2013     1   1      542            540         2      923
#> 4: 2013     1   1      544            545        -1     1004
#> 5: 2013     1   1      554            600        -6      812
#> 6: 2013     1   1      554            558        -4      740
#>    sched_arr_time arr_delay carrier flight tailnum origin dest air_time
#> 1:            819        11      UA   1545  N14228    EWR  IAH      227
#> 2:            830        20      UA   1714  N24211    LGA  IAH      227
#> 3:            850        33      AA   1141  N619AA    JFK  MIA      160
#> 4:           1022       -18      B6    725  N804JB    JFK  BQN      183
#> 5:            837       -25      DL    461  N668DN    LGA  ATL      116
#> 6:            728        12      UA   1696  N39463    EWR  ORD      150
#>    distance hour minute           time_hour
#> 1:     1400    5     15 2013-01-01 05:00:00
#> 2:     1416    5     29 2013-01-01 05:00:00
#> 3:     1089    5     40 2013-01-01 05:00:00
#> 4:     1576    5     45 2013-01-01 05:00:00
#> 5:      762    6      0 2013-01-01 06:00:00
#> 6:      719    5     58 2013-01-01 05:00:00
# get last few rows
tail(flights.df)
#>    year month day dep_time sched_dep_time dep_delay arr_time
#> 1: 2013     9  30       NA           1842        NA       NA
#> 2: 2013     9  30       NA           1455        NA       NA
#> 3: 2013     9  30       NA           2200        NA       NA
#> 4: 2013     9  30       NA           1210        NA       NA
#> 5: 2013     9  30       NA           1159        NA       NA
#> 6: 2013     9  30       NA            840        NA       NA
#>    sched_arr_time arr_delay carrier flight tailnum origin dest air_time
#> 1:           2019        NA      EV   5274  N740EV    LGA  BNA       NA
#> 2:           1634        NA      9E   3393    <NA>    JFK  DCA       NA
#> 3:           2312        NA      9E   3525    <NA>    LGA  SYR       NA
#> 4:           1330        NA      MQ   3461  N535MQ    LGA  BNA       NA
#> 5:           1344        NA      MQ   3572  N511MQ    LGA  CLE       NA
#> 6:           1020        NA      MQ   3531  N839MQ    LGA  RDU       NA
#>    distance hour minute           time_hour
#> 1:      764   18     42 2013-09-30 18:00:00
#> 2:      213   14     55 2013-09-30 14:00:00
#> 3:      198   22      0 2013-09-30 22:00:00
#> 4:      764   12     10 2013-09-30 12:00:00
#> 5:      419   11     59 2013-09-30 11:00:00
#> 6:      431    8     40 2013-09-30 08:00:00
# number of rows
nrow(flights.df)
#> [1] 336776
# number of columns
ncol(flights.df)
#> [1] 19

Example: dplyr verbs

Group by

Group-by in disk.frame are performed within each chunk, hence a two-stage group by is required to obtain the correct group by results. The two-stage approach is preferred for performance reasons too.

To perform group-by one needs to do it in two-stage approach as the group-bys are performed within each chunk. This will be addressed in a future package called disk.frame.db, but for now two-stage aggregation is the best to do group-bys in disk.frame.

flights.df = as.disk.frame(nycflights13::flights)

flights.df %>%
  srckeep(c("year","distance")) %>%  # keep only carrier and distance columns
  group_by(year) %>% 
  summarise(sum_dist = sum(distance)) %>% # this does a count per chunk
  collect
#> # A tibble: 6 x 2
#>    year sum_dist
#>   <int>    <dbl>
#> 1  2013 57446059
#> 2  2013 59302212
#> 3  2013 56585094
#> 4  2013 58476357
#> 5  2013 59407019
#> 6  2013 59000866

This is two-stage group-by in action

# need a 2nd stage to finalise summing
flights.df %>%
  srckeep(c("year","distance")) %>%  # keep only carrier and distance columns
  group_by(year) %>% 
  summarise(sum_dist = sum(distance)) %>% # this does a count per chunk
  collect %>% 
  group_by(year) %>% 
  summarise(sum_dist = sum(sum_dist))
#> # A tibble: 1 x 2
#>    year  sum_dist
#>   <int>     <dbl>
#> 1  2013 350217607

Here an example of using filter

# filter
pt = proc.time()
df_filtered <-
  flights.df %>% 
  filter(month == 1)
cat("filtering a < 0.1 took: ", data.table::timetaken(pt), "\n")
#> filtering a < 0.1 took:  0.010s elapsed (0.010s cpu)
nrow(df_filtered)
#> [1] 336776

You can mix group-by with other dplyr verbs as below

pt = proc.time()
res1 <- flights.df %>% 
  srckeep(c("month", "dep_delay")) %>% 
  filter(month <= 6) %>% 
  mutate(qtr = ifelse(month <= 3, "Q1", "Q2")) %>% 
  group_by(qtr) %>% 
  summarise(sum_delay = sum(dep_delay, na.rm = TRUE), n = n()) %>% 
  collect %>%
  group_by(qtr) %>% 
  summarise(sum_delay = sum(sum_delay), n = sum(n)) %>% 
  mutate(avg_delay = sum_delay/n)
cat("group by took: ", data.table::timetaken(pt), "\n")
#> group by took:  0.580s elapsed (0.170s cpu)

collect(res1)
#> # A tibble: 2 x 4
#>   qtr   sum_delay     n avg_delay
#>   <chr>     <dbl> <int>     <dbl>
#> 1 Q1       892053 80789      11.0
#> 2 Q2      1319941 85369      15.5

However, a one-stage group_by is possible with a hard_group_by to first rechunk the disk.frame. This not recommended for performance reasons, as it can quite slow.

pt = proc.time()
res1 <- flights.df %>% 
  srckeep(c("month", "dep_delay")) %>% 
  filter(month <= 6) %>% 
  mutate(qtr = ifelse(month <= 3, "Q1", "Q2")) %>% 
  hard_group_by(qtr) %>% # hard group_by is MUCH SLOWER but avoid a 2nd stage aggregation
  summarise(avg_delay = mean(dep_delay, na.rm = TRUE)) %>% 
  collect
#> Appending disk.frames:
cat("group by took: ", data.table::timetaken(pt), "\n")
#> group by took:  1.160s elapsed (0.280s cpu)

collect(res1)
#> # A tibble: 2 x 2
#>   qtr   avg_delay
#>   <chr>     <dbl>
#> 1 Q1         11.4
#> 2 Q2         15.9

Example: data.table syntax

library(data.table)
#> 
#> Attaching package: 'data.table'
#> The following object is masked from 'package:purrr':
#> 
#>     transpose
#> The following objects are masked from 'package:dplyr':
#> 
#>     between, first, last

grp_by_stage1 = 
  flights.df[
    keep = c("month", "distance"), # this analysis only required "month" and "dist" so only load those
    month <= 6, 
    .(sum_dist = sum(distance)), 
    .(qtr = ifelse(month <= 3, "Q1", "Q2"))
    ]

grp_by_stage1
#>    qtr sum_dist
#> 1:  Q1 27188805
#> 2:  Q1   953578
#> 3:  Q1 53201567
#> 4:  Q2  3383527
#> 5:  Q2 58476357
#> 6:  Q2 27397926

The result grp_by_stage1 is a data.table so we can finish off the two-stage aggregation using data.table syntax

grp_by_stage2 = grp_by_stage1[,.(sum_dist = sum(sum_dist)), qtr]

grp_by_stage2
#>    qtr sum_dist
#> 1:  Q1 81343950
#> 2:  Q2 89257810

Hex logo

Contributors

This project exists thanks to all the people who contribute.

Open Collective

If you like disk.frame and want to speed up its development or perhaps you have a feature request? Please consider sponsoring me on Open Collective

Backers

Thank you to all our backers!

Copy Link

Version

Install

install.packages('disk.frame')

Monthly Downloads

732

Version

0.1.1

License

MIT + file LICENSE

Maintainer

Dai ZJ

Last Published

August 24th, 2023

Functions in disk.frame (0.1.1)

disk.frame

Create a disk.frame from a folder
evalparseglue

Helper function to evalparse some `glue::glue` string
sample_frac.disk.frame

Sample n rows from a disk.frame
zip_to_disk.frame

`zip_to_disk.frame` is used to read and convert every CSV file within the zip file to disk.frame format
remove_chunk

Removes a chunk from the disk.frame
head.disk.frame

Head and tail of the disk.frame
foverlaps.disk.frame

Apply data.table's foverlaps to the disk.frame
delete

Delete a disk.frame
df_ram_size

Get the size of RAM in gigabytes
collect.disk.frame

Bring the disk.frame into R
gen_datatable_synthetic

Generate synthetic dataset for testing
get_chunk_ids

Get the chunk IDs and files names
merge.disk.frame

Merge function for disk.frames
rbindlist.disk.frame

rbindlist disk.frames together
map2

`map` a function to two disk.frames
print.disk.frame

Print disk.frame
tbl_vars.disk.frame

Column names for RStudio auto-complete
write_disk.frame

Write disk.frame to disk
nchunks

Returns the number of chunks in a disk.frame
group_by.disk.frame

Group by within each disk.frame
move_to

Move or copy a disk.frame to another location
select.disk.frame

The dplyr verbs implemented for disk.frame
get_chunk

Obtain one chunk by chunk id
nrow

Number of rows or columns
rechunk

Increase or decrease the number of chunks in the disk.frame
hard_group_by

Perform a hard group
recommend_nchunks

Recommend number of chunks based on input size
setup_disk.frame

Set up disk.frame environment
is_disk.frame

Checks if a folder is a disk.frame
groups.disk.frame

The shard keys of the disk.frame
anti_join.disk.frame

Performs join/merge for disk.frames
shard

Shard a data.frame/data.table or disk.frame into chunk and saves it into a disk.frame
show_ceremony

Show the code to setup disk.frame
shardkey

Returns the shardkey (not implemented yet)
map

Apply the same function to all chunks
srckeep

Keep only the variables from the input listed in selections
[.disk.frame

[ interface for disk.frame using fst backend
overwrite_check

Check if the outdir exists or not
csv_to_disk.frame

Convert CSV file(s) to disk.frame format
create_dplyr_mapper

Create dplyr function for disk.frame
add_chunk

Add a chunk to the disk.frame
add_meta

Add metadata to the disk.frame
as.data.frame.disk.frame

Convert disk.frame to data.frame by collecting all chunks
colnames

Return the column names of the disk.frame
compute.disk.frame

Compute without writing
as.data.table.disk.frame

Convert disk.frame to data.table by collecting all chunks
as.disk.frame

Make a data.frame into a disk.frame