Using sparklyr with an Apache Spark cluster
Summary
This document demonstrates how to use sparklyr
with an Cloudera Hadoop & Spark cluster. Data are downloaded from the web and stored in Hive tables on HDFS across multiple worker nodes. RStudio Server is installed on the master node and orchestrates the analysis in spark.
Cloudera Cluster
This demonstration is focused on adding RStudio integration to an existing Cloudera cluster. The assumption will be made that there no aid is needed to setup and administer the cluster.
##CDH 5
We will start with a Cloudera cluster CDH version 5.8.2 (free version) with an underlaying Ubuntu Linux distribution.
##Spark 1.6
The default Spark 1.6.0 parcel is in installed and running
Hive data
For this demo, we have created and populated 3 tables in Hive. The table names are: flights, airlines and airports. Using Hue, we can see the loaded tables. For the links to the data files and their Hive import scripts please see Appendix A.
Install RStudio
The latest version of R is needed. In Ubuntu, the default core R is not the latest so we have to update the source list. We will also install a few other dependencies.
```{bash eval=FALSE} sudo sh -c 'echo "deb http://cran.rstudio.com/bin/linux/ubuntu trusty/" >> /etc/apt/sources.list' gpg --keyserver keyserver.ubuntu.com --recv-key E084DAB9 gpg -a --export E084DAB9 | sudo apt-key add - sudo apt-get update sudo apt-get install r-base sudo apt-get install gdebi-core sudo apt-get -y install libcurl4-gnutls-dev sudo apt-get -y install libssl-dev
We will install the preview version of RStudio Server
```{bash eval=FALSE}
wget https://s3.amazonaws.com/rstudio-dailybuilds/rstudio-server-1.0.40-amd64.deb
sudo gdebi rstudio-server-1.0.49-amd64.deb
Create and configure a User
Create a user called rstudio
that will perform the data analysis.
```{bash eval=FALSE} sudo adduser rstudio
To ease security restriction in this demo, we will add the new user to the default super group defined in the **dfs.permissions.superusergroup** setting in CDH
```{bash eval=FALSE}
sudo groupadd supergroup
sudo usermod -a -G supergroup rstudio
Connect to Spark
Log in to RStudio Server by pointing a browser at your master node IP:8787.
Set the environment variable SPARK_HOME
and then run spark_connect
. After connecting you will be able to browse the Hive metadata in the RStudio Server Spark pane.
library(sparklyr)
library(dplyr)
library(ggplot2)
sc <- spark_connect(master = "yarn-client", version="1.6.0", spark_home = '/opt/cloudera/parcels/CDH/lib/spark/')
Once you are connected, you will see the Spark pane appear along with your hive tables.
You can inspect your tables by clicking on the data icon.
This is what the tables look like loaded in Spark via the History Server Web UI (port 18088)
Data analysis
Is there evidence to suggest that some airline carriers make up time in flight? This analysis predicts time gained in flight by airline carrier.
Cache the tables into memory
Use tbl_cache
to load the flights table into memory. Caching tables will make analysis much faster. Create a dplyr reference to the Spark DataFrame.
# Cache flights Hive table into Spark
tbl_cache(sc, 'flights')
flights_tbl <- tbl(sc, 'flights')
# Cache airlines Hive table into Spark
tbl_cache(sc, 'airlines')
airlines_tbl <- tbl(sc, 'airlines')
# Cache airports Hive table into Spark
tbl_cache(sc, 'airports')
airports_tbl <- tbl(sc, 'airports')
Create a model data set
Filter the data to contain only the records to be used in the fitted model. Join carrier descriptions for reference. Create a new variable called gain
which represents the amount of time gained (or lost) in flight.
# Filter records and create target variable 'gain'
model_data <- flights_tbl %>%
filter(!is.na(arrdelay) & !is.na(depdelay) & !is.na(distance)) %>%
filter(depdelay > 15 & depdelay < 240) %>%
filter(arrdelay > -60 & arrdelay < 360) %>%
filter(year >= 2003 & year <= 2007) %>%
left_join(airlines_tbl, by = c("uniquecarrier" = "code")) %>%
mutate(gain = depdelay - arrdelay) %>%
select(year, month, arrdelay, depdelay, distance, uniquecarrier, description, gain)
# Summarize data by carrier
model_data %>%
group_by(uniquecarrier) %>%
summarize(description = min(description), gain=mean(gain),
distance=mean(distance), depdelay=mean(depdelay)) %>%
select(description, gain, distance, depdelay) %>%
arrange(gain)
Source: query [?? x 4]
Database: spark connection master=yarn-client app=sparklyr local=FALSE
description gain distance depdelay
1 ATA Airlines d/b/a ATA -5.5679651 1240.7219 61.84391
2 Northwest Airlines Inc. -3.1134556 779.1926 48.84979
3 Envoy Air -2.2056576 437.0883 54.54923
4 PSA Airlines Inc. -1.9267647 500.6955 55.60335
5 ExpressJet Airlines Inc. (1) -1.5886314 537.3077 61.58386
6 JetBlue Airways -1.3742524 1087.2337 59.80750
7 SkyWest Airlines Inc. -1.1265678 419.6489 54.04198
8 Delta Air Lines Inc. -0.9829374 956.9576 50.19338
9 American Airlines Inc. -0.9631200 1066.8396 56.78222
10 AirTran Airways Corporation -0.9411572 665.6574 53.38363
# ... with more rows
Train a linear model
Predict time gained or lost in flight as a function of distance, departure delay, and airline carrier.
# Partition the data into training and validation sets
model_partition <- model_data %>%
sdf_partition(train = 0.8, valid = 0.2, seed = 5555)
# Fit a linear model
ml1 <- model_partition$train %>%
ml_linear_regression(gain ~ distance + depdelay + uniquecarrier)
# Summarize the linear model
summary(ml1)
Call: ml_linear_regression(., gain ~ distance + depdelay + uniquecarrier)
Deviance Residuals: (approximate):
Min 1Q Median 3Q Max
-302.343 -5.669 2.714 9.832 104.130
Coefficients:
Estimate Std. Error t value Pr(>|t|)
(Intercept) -1.26566581 0.10385870 -12.1864 < 2.2e-16 ***
distance 0.00308711 0.00002404 128.4155 < 2.2e-16 ***
depdelay -0.01397013 0.00028816 -48.4812 < 2.2e-16 ***
uniquecarrier_AA -2.18483090 0.10985406 -19.8885 < 2.2e-16 ***
uniquecarrier_AQ 3.14330242 0.29114487 10.7964 < 2.2e-16 ***
uniquecarrier_AS 0.09210380 0.12825003 0.7182 0.4726598
uniquecarrier_B6 -2.66988794 0.12682192 -21.0523 < 2.2e-16 ***
uniquecarrier_CO -1.11611186 0.11795564 -9.4621 < 2.2e-16 ***
uniquecarrier_DL -1.95206198 0.11431110 -17.0767 < 2.2e-16 ***
uniquecarrier_EV 1.70420830 0.11337215 15.0320 < 2.2e-16 ***
uniquecarrier_F9 -1.03178176 0.15384863 -6.7065 1.994e-11 ***
uniquecarrier_FL -0.99574060 0.12034738 -8.2739 2.220e-16 ***
uniquecarrier_HA -1.16970713 0.34894788 -3.3521 0.0008020 ***
uniquecarrier_MQ -1.55569040 0.10975613 -14.1741 < 2.2e-16 ***
uniquecarrier_NW -3.58502418 0.11534938 -31.0797 < 2.2e-16 ***
uniquecarrier_OH -1.40654797 0.12034858 -11.6873 < 2.2e-16 ***
uniquecarrier_OO -0.39069404 0.11132164 -3.5096 0.0004488 ***
uniquecarrier_TZ -7.26285217 0.34428509 -21.0955 < 2.2e-16 ***
uniquecarrier_UA -0.56995737 0.11186757 -5.0949 3.489e-07 ***
uniquecarrier_US -0.52000028 0.11218498 -4.6352 3.566e-06 ***
uniquecarrier_WN 4.22838982 0.10629405 39.7801 < 2.2e-16 ***
uniquecarrier_XE -1.13836940 0.11332176 -10.0455 < 2.2e-16 ***
uniquecarrier_YV 3.17149538 0.11709253 27.0854 < 2.2e-16 ***
---
Signif. codes: 0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1
R-Squared: 0.02301
Root Mean Squared Error: 17.83
Assess model performance
Compare the model performance using the validation data.
# Calculate average gains by predicted decile
model_deciles <- lapply(model_partition, function(x) {
sdf_predict(ml1, x) %>%
mutate(decile = ntile(desc(prediction), 10)) %>%
group_by(decile) %>%
summarize(gain = mean(gain)) %>%
select(decile, gain) %>%
collect()
})
# Create a summary dataset for plotting
deciles <- rbind(
data.frame(data = 'train', model_deciles$train),
data.frame(data = 'valid', model_deciles$valid),
make.row.names = FALSE
)
# Plot average gains by predicted decile
deciles %>%
ggplot(aes(factor(decile), gain, fill = data)) +
geom_bar(stat = 'identity', position = 'dodge') +
labs(title = 'Average gain by predicted decile', x = 'Decile', y = 'Minutes')
Visualize predictions
Compare actual gains to predicted gains for an out of time sample.
# Select data from an out of time sample
data_2008 <- flights_tbl %>%
filter(!is.na(arrdelay) & !is.na(depdelay) & !is.na(distance)) %>%
filter(depdelay > 15 & depdelay < 240) %>%
filter(arrdelay > -60 & arrdelay < 360) %>%
filter(year == 2008) %>%
left_join(airlines_tbl, by = c("uniquecarrier" = "code")) %>%
mutate(gain = depdelay - arrdelay) %>%
select(year, month, arrdelay, depdelay, distance, uniquecarrier, description, gain, origin,dest)
# Summarize data by carrier
carrier <- sdf_predict(ml1, data_2008) %>%
group_by(description) %>%
summarize(gain = mean(gain), prediction = mean(prediction), freq = n()) %>%
filter(freq > 10000) %>%
collect
# Plot actual gains and predicted gains by airline carrier
ggplot(carrier, aes(gain, prediction)) +
geom_point(alpha = 0.75, color = 'red', shape = 3) +
geom_abline(intercept = 0, slope = 1, alpha = 0.15, color = 'blue') +
geom_text(aes(label = substr(description, 1, 20)), size = 3, alpha = 0.75, vjust = -1) +
labs(title='Average Gains Forecast', x = 'Actual', y = 'Predicted')
Some carriers make up more time than others in flight, but the differences are relatively small. The average time gains between the best and worst airlines is only six minutes. The best predictor of time gained is not carrier but flight distance. The biggest gains were associated with the longest flights.
Share Insights
This simple linear model contains a wealth of detailed information about carriers, distances traveled, and flight delays. These detailed insights can be conveyed to a non-technical audiance via an interactive flexdashboard.
Build dashboard
Aggregate the scored data by origin, destination, and airline. Save the aggregated data.
# Summarize by origin, destination, and carrier
summary_2008 <- sdf_predict(ml1, data_2008) %>%
rename(carrier = uniquecarrier, airline = description) %>%
group_by(origin, dest, carrier, airline) %>%
summarize(
flights = n(),
distance = mean(distance),
avg_dep_delay = mean(depdelay),
avg_arr_delay = mean(arrdelay),
avg_gain = mean(gain),
pred_gain = mean(prediction)
)
# Collect and save objects
pred_data <- collect(summary_2008)
airports <- collect(select(airports_tbl, name, faa, lat, lon))
ml1_summary <- capture.output(summary(ml1))
save(pred_data, airports, ml1_summary, file = 'flights_pred_2008.RData')
Publish dashboard
Use the saved data to build an R Markdown flexdashboard. Publish the flexdashboard
#Appendix
Appendix A - Data files
Run the following script to download data from the web onto your master node. Download the yearly flight data and the airlines lookup table.
```{bash eval=FALSE}
Make download directory
mkdir /tmp/flights
Download flight data by year
for i in {2006..2008} do echo "$(date) $i Download" fnam=$i.csv.bz2 wget -O /tmp/flights/$fnam http://stat-computing.org/dataexpo/2009/$fnam echo "$(date) $i Unzip" bunzip2 /tmp/flights/$fnam done
Download airline carrier data
wget -O /tmp/airlines.csv http://www.transtats.bts.gov/Download_Lookup.asp?Lookup=L_UNIQUE_CARRIERS
Download airports data
wget -O /tmp/airports.csv https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports.dat
### Hive tables
We used the Hue interface, logged in as 'admin' to load the data into HDFS and then into Hive.
```sql
CREATE EXTERNAL TABLE IF NOT EXISTS flights
(
year int,
month int,
dayofmonth int,
dayofweek int,
deptime int,
crsdeptime int,
arrtime int,
crsarrtime int,
uniquecarrier string,
flightnum int,
tailnum string,
actualelapsedtime int,
crselapsedtime int,
airtime string,
arrdelay int,
depdelay int,
origin string,
dest string,
distance int,
taxiin string,
taxiout string,
cancelled int,
cancellationcode string,
diverted int,
carrierdelay string,
weatherdelay string,
nasdelay string,
securitydelay string,
lateaircraftdelay string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
TBLPROPERTIES("skip.header.line.count"="1");
LOAD DATA INPATH '/user/admin/flights/2006.csv/' INTO TABLE flights;
LOAD DATA INPATH '/user/admin/flights/2007.csv/' INTO TABLE flights;
LOAD DATA INPATH '/user/admin/flights/2008.csv/' INTO TABLE flights;
# Create metadata for airlines
CREATE EXTERNAL TABLE IF NOT EXISTS airlines
(
Code string,
Description string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES
(
"separatorChar" = '\,',
"quoteChar" = '\"'
)
STORED AS TEXTFILE
tblproperties("skip.header.line.count"="1");
LOAD DATA INPATH '/user/admin/L_UNIQUE_CARRIERS.csv' INTO TABLE airlines;
CREATE EXTERNAL TABLE IF NOT EXISTS airports
(
id string,
name string,
city string,
country string,
faa string,
icao string,
lat double,
lon double,
alt int,
tz_offset double,
dst string,
tz_name string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES
(
"separatorChar" = '\,',
"quoteChar" = '\"'
)
STORED AS TEXTFILE;
LOAD DATA INPATH '/user/admin/airports.dat' INTO TABLE airports;