sparklyr provides support to run arbitrary R code at scale within your Spark Cluster through
spark_apply(). This is specially useful where there is a need to use functionality available only in R or R packages that is not available in Apache Spark nor Spark Packages.
There are two main ways to make use of
- Computing over Rows enables you to transform each row with a custom function.
- Computing over Groups enables you to transform a custom group with a custom function.
Computing over Rows
Lets run the simplest example, the identify function over a list of numbers:
library(sparklyr) sc <- spark_connect(master = "local") sdf_len(sc, 5, repartition = 1) %>% spark_apply(function(e) e)
Which returns what you would expect. We can take a look at the class of
sdf_len(sc, 10, repartition = 1) %>% spark_apply(function(e) class(e))
Notice that a dataframe is provided which is indended to be used for row-by-row vectorized operations which, as it's well known, is preffered within R.
Since Spark provides no guarantees on how the data is partitioned, specially after complex operations are perfomed over a dataset, it is possible and likely that the data will be partitioned in arbitraries ways so, agregation is not suitable for row-by-row operations. Notice that three entries when the data is explicitly partitioned by three:
sdf_len(sc, 100, repartition = 3) %>% spark_apply(function(e) nrow(e))
That said, as long as the operations are performed row-by-row, there is significant functionality available. From applying custom function, filtering rows, duplicating rows, or modyfing columns as shown in the following example:
iris_tbl <- sdf_copy_to(sc, iris) iris_tbl %>% spark_apply(function(e) e[1:3])
spark_apply() defaults to use the column names from the original Spark dataframe; therefore, if a new column is added, or a rename is needed, the
names parameter should be used:
iris_tbl %>% spark_apply(function(e) cbind(3.1416, e), names = c("Pi", colnames(iris)))
Computing over Groups
While working over rows can be really useful, it certainly restricts other use cases. For instance, suppose that you have a very large dataset that requires linear regression to be computed over several subgroups. To solve this, you can make use of the
group_by parameter as follows:
iris_tbl %>% spark_apply(function(e) nrow(e), group_by = "Species")
This will force Spark to provide all the rows for that particular group to the function closure. To compute the intercept over this group, we can use:
iris_tbl %>% spark_apply( function(e) lm(Petal_Length ~ Petal_Width, e)$coefficients[["(Intercept)"]], group_by = "Species")
One of the most powerful features to use with
spark_apply() is the use of any package the R community provides. To use packages, install them before running
For instance, we can use the broom package to create a tidy dataframe out of the linear regression as follows:
spark_apply( iris_tbl, function(e) broom::tidy(lm(Petal_Width ~ Petal_Length, e)), names = c("term", "estimate", "std.error", "statistic", "p.value"), group_by = "Species")
spark_apply supports packages by copying the contents of your local
.libPaths() into each worker node making use of
SparkConf.addFile(). It's not uncommon for R libraries to use hundreds of megabytes of disk space; which means that all this data will be copied once to each worker node and be persisted in each node while the
spark_connect connection remains open. However, Spark is a cluster designed for big data processing and can efficiently copying much more data between nodes. This makes the distribution of R packages a relatively small one-time tax usually completed within a couple seconds in most clusters.
Notice that the
packages parameter takes no effect under
master="local" since packages already exists in the local machine.
While working with distributed systems, troubleshooting can be less straightforward than expected. For instance, the following code will cause the distributed execution of the function to fail and produce the following error:
spark_apply(iris_tbl, function(e) stop("Make this fail"))
Error in force(code) : sparklyr worker rscript failure, check worker logs for details
When getting this
check worker logs for details message, it indicates that an error triggered while executing this function in a different computing node. While running in local mode,
sparklyr will retrieve the worker logs that will look like:
---- Output Log ---- (17/07/27 21:24:18 ERROR sparklyr: Worker (2427) is shutting down with exception ,java.net.SocketException: Socket closed) 17/07/27 21:24:18 WARN TaskSetManager: Lost task 0.0 in stage 389.0 (TID 429, localhost, executor driver): 17/07/27 21:27:21 INFO sparklyr: RScript (4190) retrieved 150 rows 17/07/27 21:27:21 INFO sparklyr: RScript (4190) computing closure 17/07/27 21:27:21 ERROR sparklyr: RScript (4190) Make this fail
Which points out the real failure being
ERROR sparklyr: RScript (4190) Make this fail as you would expect.
Is worth mentioning that different cluster providers and platforms expose worker logs in different ways. Specific documentation for your environment will point out how to retrieve these logs.
There are a few requirements worth listing out:
- The R Runtime is expected to be pre-installed in the cluster for
spark_applyto function. Failure to install the cluster will trigger a
Cannot run program, no such file or directoryerror while attempting to use
spark_apply(), contact your cluster administrator to consider making the R runtime available throughout the entire cluster.
- An Homogenius Cluster is required since the driver node distributes, and potentially compiles, packages to the workers. For instance, the driver and workers must have the same processor architecture, system libraries, etc.
The following table describes relevant parameters while making use of
||The path to the R binary. Useful to select from multiple R versions.|
||The gateway address to use under each worker node. Defaults to
||The gateway port to use under each worker node. Defaults to
For example, one could make use of an specific R version by running:
config <- spark_config() config[["spark.r.command"]] <- "
" sc <- spark_connect(master = "local", config = config) sdf_len(sc, 10) %>% spark_apply(function(e) e)
Closures are serialized using
serialize, which is described as "A simple low-level interface for serializing to connections.". One of the current limitations of
serialize is that it wont serialize objects being referenced outside of it's environment. For instance, the following function will error out since the closures references
external_value <- 1 spark_apply(iris_tbl, function(e) e + external_value)
Currently, Livy connections do not support distributing packages since the client machine where the libraries are precompiled might not have the same processor architecture, not operating systems that the cluster machines.
Computing over Groups
While performing computations over groups,
spark_apply() will provide partitions over the selected column; however, this implies that each partition can fit into a worker node, if this is not the case an exception will be thrown. To perform operations over groups that exceed the resources of a single node, one can consider partitioning to smaller units or use
dplyr::do which is currently optimized for large partitions.
Since packages are copied only once for the duration of the
spark_connect() connection, installing additional packages is not supported while the connection is active. Therefore, if a new package needs to be installed,
spark_disconnect() the connection, modify packages and reconnect.