dplyr is an R package for working with structured data both in and outside of R. dplyr makes data manipulation for R users easy, consistent, and performant. With dplyr as an interface to manipulating Spark DataFrames, you can:
- Select, filter, and aggregate data
- Use window functions (e.g. for sampling)
- Perform joins on DataFrames
- Collect data from Spark into R
Statements in dplyr can be chained together using pipes defined by the magrittr R package. dplyr also supports non-standard evalution of its arguments. For more information on dplyr, see the introduction, a guide for connecting to databases, and a variety of vignettes.
You can read data into Spark DataFrames using the following functions:
||Reads a CSV file and provides a data source compatible with dplyr|
||Reads a JSON file and provides a data source compatible with dplyr|
||Reads a parquet file and provides a data source compatible with dplyr|
Regardless of the format of your data, Spark supports reading data from a variety of different data sources. These include data stored on HDFS (
hdfs:// protocol), Amazon S3 (
s3n:// protocol), or local files available to the Spark worker nodes (
Each of these functions returns a reference to a Spark DataFrame which can be used as a dplyr table (
This guide will demonstrate some of the basic data manipulation verbs of dplyr by using data from the
nycflights13 R package. This package contains data for all 336,776 flights departing New York City in 2013. It also includes useful metadata on airlines, airports, weather, and planes. The data comes from the US Bureau of Transportation Statistics, and is documented in
Connect to the cluster and copy the flights data using the
copy_to function. Caveat: The flight data in
nycflights13 is convenient for dplyr demonstrations because it is small, but in practice large data should rarely be copied directly from R objects.
library(sparklyr) library(dplyr) library(nycflights13) library(ggplot2) sc <- spark_connect(master="local") flights <- copy_to(sc, flights, "flights") airlines <- copy_to(sc, airlines, "airlines") src_tbls(sc)
Verbs are dplyr commands for manipulating data. When connected to a Spark DataFrame, dplyr translates the commands into Spark SQL statements. Remote data sources use exactly the same five verbs as local data sources. Here are the five verbs with their corresponding SQL commands:
aggregators: sum, min, sd, etc.
operators: +, *, log, etc.
select(flights, year:day, arr_delay, dep_delay)
filter(flights, dep_delay > 1000)
summarise(flights, mean_dep_delay = mean(dep_delay))
mutate(flights, speed = distance / air_time * 60)
When working with databases, dplyr tries to be as lazy as possible:
It never pulls data into R unless you explicitly ask for it.
It delays doing any work until the last possible moment: it collects together everything you want to do and then sends it to the database in one step.
For example, take the following code:
c1 <- filter(flights, day == 17, month == 5, carrier %in% c('UA', 'WN', 'AA', 'DL')) c2 <- select(c1, year, month, day, carrier, dep_delay, air_time, distance) c3 <- arrange(c2, year, month, day, carrier) c4 <- mutate(c3, air_time_hours = air_time / 60)
This sequence of operations never actually touches the database. It's not until you ask for the data (e.g. by printing
c4) that dplyr requests the results from the database.
You can use magrittr pipes to write cleaner syntax. Using the same example from above, you can write a much cleaner version like this:
c4 <- flights %>% filter(month == 5, day == 17, carrier %in% c('UA', 'WN', 'AA', 'DL')) %>% select(carrier, dep_delay, air_time, distance) %>% arrange(carrier) %>% mutate(air_time_hours = air_time / 60)
group_by function corresponds to the
GROUP BY statement in SQL.
c4 %>% group_by(carrier) %>% summarize(count = n(), mean_dep_delay = mean(dep_delay))
Collecting to R
You can copy data from Spark into R's memory by using
carrierhours <- collect(c4)
collect() executes the Spark query and returns the results to R for further analysis and visualization.
# Test the significance of pairwise differences and plot the results with(carrierhours, pairwise.t.test(air_time, carrier)) ggplot(carrierhours, aes(carrier, air_time_hours)) + geom_boxplot()
It's relatively straightforward to translate R code to SQL (or indeed to any programming language) when doing simple mathematical operations of the form you normally use when filtering, mutating and summarizing. dplyr knows how to convert the following R functions to Spark SQL:
# Basic math operators +, -, *, /, %%, ^ # Math functions abs, acos, asin, asinh, atan, atan2, ceiling, cos, cosh, exp, floor, log, log10, round, sign, sin, sinh, sqrt, tan, tanh # Logical comparisons <, <=, !=, >=, >, ==, %in% # Boolean operations &, &&, |, ||, ! # Character functions paste, tolower, toupper, nchar # Casting as.double, as.integer, as.logical, as.character, as.date # Basic aggregations mean, sum, min, max, sd, var, cor, cov, n
dplyr supports Spark SQL window functions. Window functions are used in conjunction with mutate and filter to solve a wide range of problems. You can compare the dplyr syntax to the query it has generated by using
# Find the most and least delayed flight each day bestworst <- flights %>% group_by(year, month, day) %>% select(dep_delay) %>% filter(dep_delay == min(dep_delay) || dep_delay == max(dep_delay)) dbplyr::sql_render(bestworst) bestworst
# Rank each flight within a daily ranked <- flights %>% group_by(year, month, day) %>% select(dep_delay) %>% mutate(rank = rank(desc(dep_delay))) dbplyr::sql_render(ranked) ranked
It's rare that a data analysis involves only a single table of data. In practice, you'll normally have many tables that contribute to an analysis, and you need flexible tools to combine them. In dplyr, there are three families of verbs that work with two tables at a time:
Mutating joins, which add new variables to one table from matching rows in another.
Filtering joins, which filter observations from one table based on whether or not they match an observation in the other table.
Set operations, which combine the observations in the data sets as if they were set elements.
All two-table verbs work similarly. The first two arguments are
y, and provide the tables to combine. The output is always a new table with the same type as
The following statements are equivalent:
flights %>% left_join(airlines) flights %>% left_join(airlines, by = "carrier") flights %>% left_join(airlines, by = c("carrier", "carrier"))
You can use
sample_frac() to take a random sample of rows: use
sample_n() for a fixed number and
sample_frac() for a fixed fraction.
sample_n(flights, 10) sample_frac(flights, 0.01)
It is often useful to save the results of your analysis or the tables that you have generated on your Spark cluster into persistent storage. The best option in many scenarios is to write the table out to a Parquet file using the spark_write_parquet function. For example:
This will write the Spark DataFrame referenced by the tbl R variable to the given HDFS path. You can use the spark_read_parquet function to read the same table back into a subsequent Spark session:
tbl <- spark_read_parquet(sc, "data", "hdfs://hdfs.company.org:9000/hdfs-path/data")
Many of Hive's built-in functions (UDF) and built-in aggregate functions (UDAF) can be called inside dplyr's mutate and summarize. The Languange Reference UDF page provides the list of available functions.
The following example uses the datediff and current_date Hive UDFs to figure the difference between the flight_date and the current system date:
flights %>% mutate(flight_date = paste(year,month,day,sep="-"), days_since = datediff(current_date(), flight_date)) %>% group_by(flight_date,days_since) %>% tally() %>% arrange(-days_since)