jsonlite (version 1.5)

stream_in, stream_out: Streaming JSON input/output

Description

The stream_in and stream_out functions implement line-by-line processing of JSON data over a connection, such as a socket, url, file or pipe. JSON streaming requires the ndjson format, which slightly differs from fromJSON and toJSON, see details.

Usage

stream_in(con, handler = NULL, pagesize = 500, verbose = TRUE, ...)

stream_out(x, con = stdout(), pagesize = 500, verbose = TRUE, prefix = "", ...)

Arguments

con

a connection object. If the connection is not open, stream_in and stream_out will automatically open and later close (and destroy) the connection. See details.

handler

a custom function that is called on each page of JSON data. If not specified, the default handler stores all pages and binds them into a single data frame that will be returned by stream_in. See details.

pagesize

number of lines to read/write from/to the connection per iteration.

verbose

print some information on what is going on.

...

arguments for fromJSON and toJSON that control JSON formatting/parsing where applicable. Use with caution.

x

object to be streamed out. Currently only data frames are supported.

prefix

string to write before each line (use "\u001e" to write rfc7464 text sequences)

Value

The stream_out function always returns NULL. When no custom handler is specified, stream_in returns a data frame of all pages binded together. When a custom handler function is specified, stream_in always returns NULL.

Details

Because parsing huge JSON strings is difficult and inefficient, JSON streaming is done using lines of minified JSON records, a.k.a. ndjson. This is pretty standard: JSON databases such as dat or MongoDB use the same format to import/export datasets. Note that this means that the total stream combined is not valid JSON itself; only the individual lines are. Also note that because line-breaks are used as separators, prettified JSON is not permitted: the JSON lines must be minified. In this respect, the format is a bit different from fromJSON and toJSON where all lines are part of a single JSON structure with optional line breaks.

The handler is a callback function which is called for each page (batch) of JSON data with exactly one argument (usually a data frame with pagesize rows). If handler is missing or NULL, a default handler is used which stores all intermediate pages of data, and at the very end binds all pages together into one single data frame that is returned by stream_in. When a custom handler function is specified, stream_in does not store any intermediate results and always returns NULL. It is then up to the handler to process or store data pages. A handler function that does not store intermediate results in memory (for example by writing output to another connection) results in a pipeline that can process an unlimited amount of data. See example.

If a connection is not opened yet, stream_in and stream_out will automatically open and later close the connection. Because R destroys connections when they are closed, they cannot be reused. To use a single connection for multiple calls to stream_in or stream_out, it needs to be opened beforehand. See example.

References

MongoDB export format: http://docs.mongodb.org/manual/reference/program/mongoexport/#cmdoption--query

Documentation for the JSON Lines text file format: http://jsonlines.org/

Examples

Run this code
# NOT RUN {
# compare formats
x <- iris[1:3,]
toJSON(x)
stream_out(x)

# Trivial example
mydata <- stream_in(url("http://httpbin.org/stream/100"))

# }
# NOT RUN {
stream large dataset to file and back
library(nycflights13)
stream_out(flights, file(tmp <- tempfile()))
flights2 <- stream_in(file(tmp))
unlink(tmp)
all.equal(flights2, as.data.frame(flights))

# stream over HTTP
diamonds2 <- stream_in(url("http://jeroen.github.io/data/diamonds.json"))

# stream over HTTP with gzip compression
flights3 <- stream_in(gzcon(url("http://jeroen.github.io/data/nycflights13.json.gz")))
all.equal(flights3, as.data.frame(flights))

# stream over HTTPS (HTTP+SSL) via curl
library(curl)
flights4 <- stream_in(gzcon(curl("https://jeroen.github.io/data/nycflights13.json.gz")))
all.equal(flights4, as.data.frame(flights))

# or alternatively:
flights5 <- stream_in(gzcon(pipe("curl https://jeroen.github.io/data/nycflights13.json.gz")))
all.equal(flights5, as.data.frame(flights))

# Full JSON IO stream from URL to file connection.
# Calculate delays for flights over 1000 miles in batches of 5k
library(dplyr)
con_in <- gzcon(url("http://jeroen.github.io/data/nycflights13.json.gz"))
con_out <- file(tmp <- tempfile(), open = "wb")
stream_in(con_in, handler = function(df){
  df <- dplyr::filter(df, distance > 1000)
  df <- dplyr::mutate(df, delta = dep_delay - arr_delay)
  stream_out(df, con_out, pagesize = 1000)
}, pagesize = 5000)
close(con_out)

# stream it back in
mydata <- stream_in(file(tmp))
nrow(mydata)
unlink(tmp)

# Data from http://openweathermap.org/current#bulk
# Each row contains a nested data frame.
daily14 <- stream_in(gzcon(url("http://78.46.48.103/sample/daily_14.json.gz")), pagesize=50)
subset(daily14, city$name == "Berlin")$data[[1]]

# Or with dplyr:
library(dplyr)
daily14f <- flatten(daily14)
filter(daily14f, city.name == "Berlin")$data[[1]]

# Stream import large data from zip file
tmp <- tempfile()
download.file("http://jsonstudio.com/wp-content/uploads/2014/02/companies.zip", tmp)
companies <- stream_in(unz(tmp, "companies.json"))
# }

Run the code above in your browser using DataLab