# Distributing R Computations

## Overview

sparklyr provides support to run arbitrary R code at scale within your Spark Cluster through spark_apply(). This is specially useful where there is a need to use functionality available only in R or R packages that is not available in Apache Spark nor Spark Packages.

There are two main ways to make use of spark_apply():

1. Computing over Rows enables you to transform each row with a custom function.
2. Computing over Groups enables you to transform a custom group with a custom function.

### Computing over Rows

Lets run the simplest example, the identify function over a list of numbers:

library(sparklyr) sc <- spark_connect(master = "local") sdf_len(sc, 5, repartition = 1) %>% spark_apply(function(e) e)

Which returns what you would expect. We can take a look at the class of e:

sdf_len(sc, 10, repartition = 1) %>% spark_apply(function(e) class(e))

Notice that a dataframe is provided which is indended to be used for row-by-row vectorized operations which, as it's well known, is preffered within R.

Since Spark provides no guarantees on how the data is partitioned, specially after complex operations are perfomed over a dataset, it is possible and likely that the data will be partitioned in arbitraries ways so, agregation is not suitable for row-by-row operations. Notice that three entries when the data is explicitly partitioned by three:

sdf_len(sc, 100, repartition = 3) %>% spark_apply(function(e) nrow(e))

That said, as long as the operations are performed row-by-row, there is significant functionality available. From applying custom function, filtering rows, duplicating rows, or modyfing columns as shown in the following example:

iris_tbl <- sdf_copy_to(sc, iris) iris_tbl %>% spark_apply(function(e) e[1:3])

spark_apply() defaults to use the column names from the original Spark dataframe; therefore, if a new column is added, or a rename is needed, the names parameter should be used:

iris_tbl %>% spark_apply(function(e) cbind(3.1416, e), names = c("Pi", colnames(iris)))

### Computing over Groups

While working over rows can be really useful, it certainly restricts other use cases. For instance, suppose that you have a very large dataset that requires linear regression to be computed over several subgroups. To solve this, you can make use of the group_by parameter as follows:

iris_tbl %>% spark_apply(function(e) nrow(e), group_by = "Species")

This will force Spark to provide all the rows for that particular group to the function closure. To compute the intercept over this group, we can use:

iris_tbl %>% spark_apply( function(e) lm(Petal_Length ~ Petal_Width, e)\$coefficients[["(Intercept)"]], group_by = "Species")

## Distributing Packages

One of the most powerful features to use with spark_apply() is the use of any package the R community provides. To use packages, install them before running spark_connect().

For instance, we can use the broom package to create a tidy dataframe out of the linear regression as follows:

spark_apply( iris_tbl, function(e) broom::tidy(lm(Petal_Width ~ Petal_Length, e)), names = c("term", "estimate", "std.error", "statistic", "p.value"), group_by = "Species")

spark_apply supports packages by copying the contents of your local .libPaths() into each worker node making use of SparkConf.addFile(). It's not uncommon for R libraries to use hundreds of megabytes of disk space; which means that all this data will be copied once to each worker node and be persisted in each node while the spark_connect connection remains open. However, Spark is a cluster designed for big data processing and can efficiently copying much more data between nodes. This makes the distribution of R packages a relatively small one-time tax usually completed within a couple seconds in most clusters.

Notice that the packages parameter takes no effect under master="local" since packages already exists in the local machine.

## Handling Errors

While working with distributed systems, troubleshooting can be less straightforward than expected. For instance, the following code will cause the distributed execution of the function to fail and produce the following error:

spark_apply(iris_tbl, function(e) stop("Make this fail"))
 Error in force(code) :
sparklyr worker rscript failure, check worker logs for details


When getting this check worker logs for details message, it indicates that an error triggered while executing this function in a different computing node. While running in local mode, sparklyr will retrieve the worker logs that will look like:

---- Output Log ----
(17/07/27 21:24:18 ERROR sparklyr: Worker (2427) is shutting down with exception ,java.net.SocketException: Socket closed)
17/07/27 21:24:18 WARN TaskSetManager: Lost task 0.0 in stage 389.0 (TID 429, localhost, executor driver): 17/07/27 21:27:21 INFO sparklyr: RScript (4190) retrieved 150 rows
17/07/27 21:27:21 INFO sparklyr: RScript (4190) computing closure
17/07/27 21:27:21 ERROR sparklyr: RScript (4190) Make this fail


Which points out the real failure being ERROR sparklyr: RScript (4190) Make this fail as you would expect.

Is worth mentioning that different cluster providers and platforms expose worker logs in different ways. Specific documentation for your environment will point out how to retrieve these logs.

## Requirements

There are a few requirements worth listing out:

• The R Runtime is expected to be pre-installed in the cluster for spark_apply to function. Failure to install the cluster will trigger a Cannot run program, no such file or directory error while attempting to use spark_apply(), contact your cluster administrator to consider making the R runtime available throughout the entire cluster.
• An Homogenius Cluster is required since the driver node distributes, and potentially compiles, packages to the workers. For instance, the driver and workers must have the same processor architecture, system libraries, etc.

## Configuration

The following table describes relevant parameters while making use of spark_apply.

Value Description
spark.r.command The path to the R binary. Useful to select from multiple R versions.
sparklyr.worker.gateway.address The gateway address to use under each worker node. Defaults to sparklyr.gateway.address.
sparklyr.worker.gateway.port The gateway port to use under each worker node. Defaults to sparklyr.gateway.port.

For example, one could make use of an specific R version by running:

config <- spark_config() config[["spark.r.command"]] <- "" sc <- spark_connect(master = "local", config = config) sdf_len(sc, 10) %>% spark_apply(function(e) e)

## Limitations

### Closures

Closures are serialized using serialize, which is described as "A simple low-level interface for serializing to connections.". One of the current limitations of serialize is that it wont serialize objects being referenced outside of it's environment. For instance, the following function will error out since the closures references external_value:

external_value <- 1 spark_apply(iris_tbl, function(e) e + external_value)

### Livy

Currently, Livy connections do not support distributing packages since the client machine where the libraries are precompiled might not have the same processor architecture, not operating systems that the cluster machines.

### Computing over Groups

While performing computations over groups, spark_apply() will provide partitions over the selected column; however, this implies that each partition can fit into a worker node, if this is not the case an exception will be thrown. To perform operations over groups that exceed the resources of a single node, one can consider partitioning to smaller units or use dplyr::do which is currently optimized for large partitions.

### Package Installation

Since packages are copied only once for the duration of the spark_connect() connection, installing additional packages is not supported while the connection is active. Therefore, if a new package needs to be installed, spark_disconnect() the connection, modify packages and reconnect.

spark_disconnect(sc)