Learn R Programming

⚠️There's a newer version (1.4.99) of this package.Take me there.

rquery

rquery is a piped query generator based on Codd’s relational algebra (updated to reflect lessons learned from working with R, SQL, and dplyr at big data scale in production).

rquery is currently recommended for use with data.table (via rqdatatable), PostgreSQL, sparklyr, SparkR, MonetDBLite, and (and with non-window functionality with RSQLite). It can target various databases through its adapter layer.

A Python version of rquery is under initial development as data_algebra.

Note: rquery is a “database first” design. This means choices are made that favor database implementation. These include: capturing the entire calculation prior to doing any work (and using recursive methods to inspect this object, which can limit the calculation depth to under 1000 steps at a time), preferring “tame column names” (which isn’t a bad idea in R anyway as columns and variables are often seen as cousins), not preserving row or column order (or supporting numeric column indexing), and not supporting tables with no columns. Also, rquery does have a fast in-memory implementation: rqdatatable (thanks to the data.table package), so one can in fact use rquery without a database.

Discussion

rquery can be an excellent advanced SQL training tool (it shows how to build some very deep SQL by composing rquery operators). Currently rquery is biased towards the Spark and PostgeSQL SQL dialects.

There are many prior relational algebra inspired specialized query languages. Just a few include:

rquery is realized as a thin translation to an underlying SQL provider. We are trying to put the Codd relational operators front and center (using the original naming, and back-porting SQL progress such as window functions to the appropriate relational operator).

The primary relational operators include:

  • extend(). Extend adds derived columns to a relation table. With a sufficiently powerful SQL provider this includes ordered and partitioned window functions. This operator also includes built-in seplyr-style assignment partitioning. extend() can also alter existing columns, though we note this is not always a relational operation (it can lose row uniqueness).
  • project(). Project is usually portrayed as the equivalent to column selection, though the original definition includes aggregation. In our opinion the original relational nature of the operator is best captured by moving SQL’s “GROUP BY” aggregation functionality.
  • natural_join(). This a specialized relational join operator, using all common columns as an equi-join condition.
  • theta_join(). This is the relational join operator allowing an arbitrary matching predicate.
  • select_rows(). This is Codd’s relational row selection. Obviously select alone is an over-used and now ambiguous term (for example: it is already used as the “doit” verb in SQL and the column selector in dplyr).
  • rename_columns(). This operator renames sets of columns.
  • set_indicator(). This operator produces a new column indicating set membership of a named column.

(Note rquery prior to version 1.2.1 used a _nse() suffix yielding commands such as extend_nse() instead of the newer extend() shown here).

The primary non-relational (traditional SQL) operators are:

  • select_columns(). This allows choice of columns (central to SQL), but is not a relational operator as it can damage row-uniqueness.
  • orderby(). Row order is not a concept in the relational algebra (and also not maintained in most SQL implementations). This operator is only useful when used with its limit= option, or as the last step as data comes out of the relation store and is moved to R (where row-order is usually maintained).
  • map_column_values() re-map values in columns (very useful for re-coding data, currently implemented as a sql_node()).
  • unionall() concatenate tables.

And rquery supports higher-order (written in terms of other operators, both package supplied and user supplied):

  • pick_top_k(). Pick top k rows per group given a row ordering.
  • assign_slice(). Conditionally assign sets of rows and columns a scalar value.
  • if_else_op(). Simulate simultaneous if/else assignments.

rquery also has implementation helpers for building both SQL-nodes (nodes that are just SQL expressions) and non-SQL-nodes (nodes that are general functions of their input data values).

The primary missing relational operators are:

  • Union.
  • Direct set difference, anti-join.
  • Division.

One of the principles of rquery is to prefer expressive nodes, and not depend on complicated in-node expressions.

A great benefit of Codd’s relational algebra is it gives one concepts to decompose complex data transformations into sequences of simpler transformations.

Some reasons SQL seems complicated include:

  • SQL’s realization of sequencing as nested function composition.
  • SQL uses some relational concepts as steps, others as modifiers and predicates.

A lot of the grace of the Codd theory can be recovered through the usual trick changing function composition notation from g(f(x)) to x . f() . g(). This experiment is asking (and not for the first time): “what if SQL were piped (expressed composition as a left to right flow, instead of a right to left nesting)?”

Let’s work a non-trivial example: the dplyr pipeline from Let’s Have Some Sympathy For The Part-time R User.

library("rquery")
library("wrapr")
use_spark <- FALSE

if(use_spark) {
  raw_connection <- sparklyr::spark_connect(version='2.2.0', 
                                   master = "local")
  cname <- rq_connection_name(raw_connection)
  rquery::setDBOption(raw_connection, 
                      "create_options",
                      "USING PARQUET OPTIONS ('compression'='snappy')")
} else {
  #driver <- RPostgres::Postgres()
  driver <- RPostgreSQL::PostgreSQL()
  raw_connection <- DBI::dbConnect(driver,
                          host = 'localhost',
                          port = 5432,
                          user = 'johnmount',
                          password = '')
}

dbopts <- rq_connection_tests(raw_connection)
db <- rquery_db_info(connection = raw_connection,
                     is_dbi = TRUE,
                     connection_options = dbopts)

# copy data in so we have an example
d_local <- build_frame(
   "subjectID", "surveyCategory"     , "assessmentTotal", "irrelevantCol1", "irrelevantCol2" |
   1L         , "withdrawal behavior", 5                , "irrel1"        , "irrel2"         |
   1L         , "positive re-framing", 2                , "irrel1"        , "irrel2"         |
   2L         , "withdrawal behavior", 3                , "irrel1"        , "irrel2"         |
   2L         , "positive re-framing", 4                , "irrel1"        , "irrel2"         )
rq_copy_to(db, 'd',
            d_local,
            temporary = TRUE, 
            overwrite = TRUE)
## [1] "mk_td(\"\"d\"\", c( \"subjectID\", \"surveyCategory\", \"assessmentTotal\", \"irrelevantCol1\", \"irrelevantCol2\"))"
# produce a hande to existing table
d <- db_td(db, "d")

Note: in examples we use rq_copy_to() to create data. This is only for the purpose of having easy portable examples. With big data the data is usually already in the remote database or Spark system. The task is almost always to connect and work with this pre-existing remote data and the method to do this is db_td(), which builds a reference to a remote table given the table name. The suggested pattern for working with remote tables is to get inputs via db_td() and land remote results with materialze(). To work with local data one can copy data from memory to the database with rq_copy_to() and bring back results with execute() (though be aware operation on remote non-memory data is rquery’s primary intent).

First we show the Spark/database version of the original example data:

class(db)
## [1] "rquery_db_info"
print(db)
## [1] "rquery_db_info(PostgreSQLConnection, is_dbi=TRUE, note=\"\")"
class(d)
## [1] "relop_table_source" "relop"
print(d)
## [1] "mk_td(\"\"d\"\", c( \"subjectID\", \"surveyCategory\", \"assessmentTotal\", \"irrelevantCol1\", \"irrelevantCol2\"))"
# remote structure inspection
rstr(db, d$table_name)
## table "d" rquery_db_info 
##  nrow: 4 
## 'data.frame':    4 obs. of  5 variables:
##  $ subjectID      : int  1 1 2 2
##  $ surveyCategory : chr  "withdrawal behavior" "positive re-framing" "withdrawal behavior" "positive re-framing"
##  $ assessmentTotal: num  5 2 3 4
##  $ irrelevantCol1 : chr  "irrel1" "irrel1" "irrel1" "irrel1"
##  $ irrelevantCol2 : chr  "irrel2" "irrel2" "irrel2" "irrel2"
# or execute the table representation to bring back data
d %.>%
  execute(db, .) %.>%
  knitr::kable(.)
subjectIDsurveyCategoryassessmentTotalirrelevantCol1irrelevantCol2
1withdrawal behavior5irrel1irrel2
1positive re-framing2irrel1irrel2
2withdrawal behavior3irrel1irrel2
2positive re-framing4irrel1irrel2

Now we re-write the original calculation in terms of the rquery SQL generating operators.

scale <- 0.237

dq <- d %.>%
  extend(.,
         probability :=
           exp(assessmentTotal * scale))  %.>% 
  normalize_cols(.,
                 "probability",
                 partitionby = 'subjectID') %.>%
  pick_top_k(.,
             partitionby = 'subjectID',
             orderby = c('probability', 'surveyCategory'),
             reverse = c('probability')) %.>% 
  rename_columns(., 'diagnosis' := 'surveyCategory') %.>%
  select_columns(., c('subjectID', 
                      'diagnosis', 
                      'probability')) %.>%
  orderby(., cols = 'subjectID')

(Note one can also use the named map builder alias %:=% if there is concern of aliasing with data.table’s definition of :=.)

We then generate our result:

result <- materialize(db, dq)

class(result)
## [1] "relop_table_source" "relop"
result
## [1] "mk_td(\"\"rquery_mat_72197415925085797270_0000000000\"\", c( \"subjectID\", \"diagnosis\", \"probability\"))"
DBI::dbReadTable(db$connection, result$table_name) %.>%
  knitr::kable(.)
subjectIDdiagnosisprobability
1withdrawal behavior0.6706221
2positive re-framing0.5589742

We see we have quickly reproduced the original result using the new database operators. This means such a calculation could easily be performed at a “big data” scale (using a database or Spark; in this case we would not take the results back, but instead use CREATE TABLE tname AS to build a remote materialized view of the results).

A bonus is, thanks to data.table and the rqdatatable packages we can run the exact same operator pipeline on local data.

library("rqdatatable")

d_local %.>% 
  dq %.>%
  knitr::kable(.)
subjectIDdiagnosisprobability
1withdrawal behavior0.6706221
2positive re-framing0.5589742

Notice we applied the pipeline by piping data into it. This ability is a feature of the dot arrow pipe we are using here.

The actual SQL query that produces the database result is, in fact, quite involved:

cat(to_sql(dq, db, source_limit = 1000))
SELECT * FROM (
 SELECT
  "subjectID",
  "diagnosis",
  "probability"
 FROM (
  SELECT
   "subjectID" AS "subjectID",
   "surveyCategory" AS "diagnosis",
   "probability" AS "probability"
  FROM (
   SELECT * FROM (
    SELECT
     "subjectID",
     "surveyCategory",
     "probability",
     row_number ( ) OVER (  PARTITION BY "subjectID" ORDER BY "probability" DESC, "surveyCategory" ) AS "row_number"
    FROM (
     SELECT
      "subjectID",
      "surveyCategory",
      "probability" / sum ( "probability" ) OVER (  PARTITION BY "subjectID" ) AS "probability"
     FROM (
      SELECT
       "subjectID",
       "surveyCategory",
       exp ( "assessmentTotal" * 0.237 )  AS "probability"
      FROM (
       SELECT
        "subjectID",
        "surveyCategory",
        "assessmentTotal"
       FROM
        "d" LIMIT 1000
       ) tsql_14421193931461477387_0000000000
      ) tsql_14421193931461477387_0000000001
     ) tsql_14421193931461477387_0000000002
   ) tsql_14421193931461477387_0000000003
   WHERE "row_number" <= 1
  ) tsql_14421193931461477387_0000000004
 ) tsql_14421193931461477387_0000000005
) tsql_14421193931461477387_0000000006 ORDER BY "subjectID"

The query is large, but due to its regular structure it should be very amenable to query optimization.

A feature to notice is: the query was automatically restricted to just columns actually needed from the source table to complete the calculation. This has the possibility of decreasing data volume and greatly speeding up query performance. Our initial experiments show rquery narrowed queries to be twice as fast as un-narrowed dplyr on a synthetic problem simulating large disk-based queries. We think if we connected directly to Spark’s relational operators (avoiding the SQL layer) we may be able to achieve even faster performance.

The above optimization is possible because the rquery representation is an intelligible tree of nodes, so we can interrogate the tree for facts about the query. For example:

column_names(dq)
## [1] "subjectID"   "diagnosis"   "probability"
tables_used(dq)
## [1] "d"
columns_used(dq)
## $d
## [1] "subjectID"       "surveyCategory"  "assessmentTotal"

The additional record-keeping in the operator nodes allows checking and optimization (such as query narrowing). The flow itself is represented as follows:

cat(format(dq))
mk_td(""d"", c(
  "subjectID",
  "surveyCategory",
  "assessmentTotal",
  "irrelevantCol1",
  "irrelevantCol2")) %.>%
 extend(.,
  probability := exp(assessmentTotal * 0.237)) %.>%
 extend(.,
  probability := probability / sum(probability),
  partitionby = c('subjectID'),
  orderby = c(),
  reverse = c()) %.>%
 extend(.,
  row_number := row_number(),
  partitionby = c('subjectID'),
  orderby = c('probability', 'surveyCategory'),
  reverse = c('probability')) %.>%
 select_rows(.,
   row_number <= 1) %.>%
 rename_columns(.,
  c('diagnosis' = 'surveyCategory')) %.>%
 select_columns(., c(
   "subjectID", "diagnosis", "probability")) %.>%
 order_rows(.,
  c('subjectID'),
  reverse = c(),
  limit = NULL)
dq %.>%
  op_diagram(.) %.>% 
  DiagrammeR::grViz(.)

rquery also includes a number of useful utilities (both as nodes and as functions).

quantile_cols(db, "d")
##   quantile_probability subjectID      surveyCategory assessmentTotal
## 1                 0.00         1 positive re-framing               2
## 2                 0.25         1 positive re-framing               2
## 3                 0.50         1 positive re-framing               3
## 4                 0.75         2 withdrawal behavior               4
## 5                 1.00         2 withdrawal behavior               5
##   irrelevantCol1 irrelevantCol2
## 1         irrel1         irrel2
## 2         irrel1         irrel2
## 3         irrel1         irrel2
## 4         irrel1         irrel2
## 5         irrel1         irrel2
rsummary(db, "d")
##            column index     class nrows nna nunique min max mean        sd
## 1       subjectID     1   integer     4   0      NA   1   2  1.5 0.5773503
## 2  surveyCategory     2 character     4   0       2  NA  NA   NA        NA
## 3 assessmentTotal     3   numeric     4   0      NA   2   5  3.5 1.2909944
## 4  irrelevantCol1     4 character     4   0       1  NA  NA   NA        NA
## 5  irrelevantCol2     5 character     4   0       1  NA  NA   NA        NA
##                lexmin              lexmax
## 1                <NA>                <NA>
## 2 positive re-framing withdrawal behavior
## 3                <NA>                <NA>
## 4              irrel1              irrel1
## 5              irrel2              irrel2
dq %.>% 
  quantile_node(.) %.>%
  execute(db, .)
##   quantile_probability subjectID           diagnosis probability
## 1                 0.00         1 positive re-framing   0.5589742
## 2                 0.25         1 positive re-framing   0.5589742
## 3                 0.50         1 positive re-framing   0.5589742
## 4                 0.75         2 withdrawal behavior   0.6706221
## 5                 1.00         2 withdrawal behavior   0.6706221
dq %.>% 
  rsummary_node(.) %.>%
  execute(db, .)
##        column index     class nrows nna nunique       min       max
## 1   subjectID     1   integer     2   0      NA 1.0000000 2.0000000
## 2   diagnosis     2 character     2   0       2        NA        NA
## 3 probability     3   numeric     2   0      NA 0.5589742 0.6706221
##        mean         sd              lexmin              lexmax
## 1 1.5000000 0.70710678                <NA>                <NA>
## 2        NA         NA positive re-framing withdrawal behavior
## 3 0.6147982 0.07894697                <NA>                <NA>

We have found most big-data projects either require joining very many tables (something rquery join planners help with, please see here and here) or they require working with wide data-marts (where rquery query narrowing helps, please see here).

We can also stand rquery up on non-DBI sources such as SparkR and also data.table. The data.table adapter is being developed in the rqdatatable package, and can be quite fast. Notice the examples in this mode all essentially use the same query pipeline, the user can choose where to apply it: in memory (data.table), in a DBI database (PostgreSQL, Sparklyr), and with even non-DBI systems (SparkR).

See also

For deeper dives into specific topics, please see also:

  • Join Controller
  • Join Dependency Sorting
  • Assignment Partitioner
  • DifferentDBs
  • rqdatatable

Installing

To install rquery please try install.packages("rquery").

Copy Link

Version

Install

install.packages('rquery')

Monthly Downloads

2,530

Version

1.3.8

License

GPL-2 | GPL-3

Issues

Pull Requests

Stars

Forks

Maintainer

John Mount

Last Published

September 15th, 2019

Functions in rquery (1.3.8)

convert_yaml_to_pipeline

Convert a series of simple objects (from YAML deserializaton) to an rquery pipeline.
drop_columns

Make a drop columns node (not a relational operation).
local_td

Construct a table description of a local data.frame.
execute

Execute an operator tree, bringing back the result to memory.
key_inspector_postgresql

Return all primary key columns as guess at preferred primary keys for a PostgreSQL handle.
expand_grid

Cross product vectors in database.
example_employee_date

Build some example tables (requires DBI).
build_join_plan

Build a join plan.
lookup_by_column

Use one column to pick values from other columns.
count_null_cols

Count NULLs per row for given column set.
key_inspector_sqlite

Return all primary key columns as guess at preferred primary keys for a SQLite handle.
apply_right_S4,rquery_db_info,relop_list-method

Materialize a stages list in pipe notation with relop_list on the right.
materialize_sql

Materialize a user supplied SQL statement as a table.
describe_tables

Build a nice description of a table.
commencify

materialize

Materialize an optree as a table.
if_else_op

Build a relop node simulating a per-row block-if(){}else{}.
complete_design

Complete an experimental design.
mark_null_cols

Indicate NULLs per row for given column set.
if_else_block

Build a sequence of statements simulating an if/else block-if(){}else{}.
mk_td

Make a table description directly.
orderby

Make an orderby node (not a relational operation).
extend

Extend data by adding more columns.
getDBOption

Get a database connection option.
column_names

Return column names
map_column_values

Remap values in a set of columns.
columns_used

Return columns used
make_relop_list

Create a new rquery::relop operator tree collector list
format_node

Format a single node for printing.
materialize_node

Create a materialize node.
materialize_relop_list_stages

Materialize a stages list on a database.
pick_top_k

Build an optree pipeline that selects up to the top k rows from each group in the given order.
op_diagram

Build a diagram of a optree pipeline.
get_relop_list_stages

Return the stages list.
pre_sql_to_query

Return SQL transform of tokens.
pre_sql_to_query.pre_sql_sub_expr

Convert a pre_sql token object to SQL query text.
extend_se

Extend data by adding more columns.
quote_identifier

Quote an identifier.
graph_join_plan

Build a draw-able specification of the join diagram
quantile_node

Compute quantiles over non-NULL values (without interpolation, needs a database with window functions).
row_counts

Build an optree pipeline counts rows.
non_sql_node

Wrap a non-SQL node.
quantile_cols

Compute quantiles of specified columns (without interpolation, needs a database with window functions).
rename_columns

Make a rename columns node (copies columns not renamed).
natural_join

Make a natural_join node.
pre_sql_fn

pre_sql_token funtion name
key_inspector_all_cols

Return all columns as guess of preferred primary keys.
inspect_join_plan

check that a join plan is consistent with table descriptions.
rquery_db_info

Build a db information stand-in
order_expr

Make a order_expr node.
rquery_default_db_info

An example rquery_db_info object useful for formatting SQL without a database connection.
quote_string

Quote a string
quote_table_name

Quote a table name.
rq_remove_table

Remove table
pre_sql_identifier

pre_sql_identifier: abstract name of a column and where it is comming from
reexports

Objects exported from other packages
quote_literal

Quote a value
sql_node

Make a general SQL node.
sql_expr_set

Build a query that applies a SQL expression to a set of columns.
order_expr_se

Make a order_expr node.
project

project data by grouping, and adding aggregate columns.
rq_table_exists

Check if a table exists.
relop_list-class

List of rquery::relop operator trees taken in order.
rquery

rquery: Relational Query Generator for Data Manipulation
rquery_apply_to_data_frame

Execute optree in an environment where d is the only data.
project_se

project data by grouping, and adding aggregate columns.
rq_connection_tests

Try and test database for some option settings.
rq_copy_to

Copy local R table to remote data handle.
pre_sql_to_query.pre_sql_token

Convert a pre_sql token object to SQL query text.
order_rows

Make an orderby node (not a relational operation).
select_rows

Make a select rows node.
tokenize_for_SQL

Cross-parse from an R parse tree into SQL.
select_rows_se

Make a select rows node.
topo_sort_tables

Topologically sort join plan so values are available before uses.
rq_function_mappings

Return function mappings for a connection
rq_execute

Execute a query, typically an update that is not supposed to return results.
normalize_cols

Build an optree pipeline that normalizes a set of columns so each column sums to one in each partition.
null_replace

Create a null_replace node.
rsummary

Compute usable summary of columns of remote table.
rq_connection_name

Build a canonical name for a db connection class.
pre_sql_token

pre_sql_token
rq_connection_advice

Get advice for a DB connection (beyond tests).
rq_get_query

Execute a get query, typically a non-update that is supposed to return results.
pre_sql_string

pre_sql_string
rsummary_node

Create an rsumary relop operator node.
rq_nrow

Count rows and return as numeric
show,relop_list-method

S4 print method
set_indicator

Make a set indicator node.
setDBOpt

Set a database connection option.
pre_sql_sub_expr

pre_sql_sub_expr
setDBOption

Set a database connection option.
rquery_default_methods

Default to_sql method implementations.
theta_join

Make a theta_join node.
rstr

Quick look at remote data
theta_join_se

Make a theta_join node.
to_sql

Return SQL implementation of operation tree.
rq_colnames

List table column names.
rq_coltypes

Get column types by example values as a data.frame.
tables_used

Return vector of table names used.
run_rquery_tests

Run rquery package tests.
select_columns

Make a select columns node (not a relational operation).
unionall

Make an unionall node (not a relational operation).
str_pre_sql_sub_expr

Structure of a pre_sql_sub_expr
to_transport_representation

Convert an rquery op diagram to a simple representation, appropriate for conversion to YAML.
affine_transform

Implement an affine transformaton
apply_right_S4,relop_list,rquery_db_info-method

Materialize a stages list in pipe notation with relop_list on the left.
assign_slice

Assign a value to a slice of data (set of rows meeting a condition, and specified set of columns).
actualize_join_plan

Execute an ordered sequence of left joins.
add_relop

Add a relop to the end of a relop_list.
apply_right_S4,ANY,relop_list-method

Add a relop to the end of a relop_list (piped version).
apply_right.relop

Execute pipeline treating pipe_left_arg as local data to be copied into database.
db_td

Construct a table description from a database source.
apply_right_S4,ANY,rquery_db_info-method

Apply pipeline to a database.