Database Operator Pipelines

rquery’s primary workflow is building re-usable database operator pipelines.

Let’s try an example. First let’s set up our example database and data.

library("rquery")
## Loading required package: wrapr
## Loading required package: cdata
db = DBI::dbConnect(RSQLite::SQLite(), 
                    ":memory:")
RSQLite::initExtension(db)

DBI::dbWriteTable(db,
                  'd',
                  data.frame(AUC = 0.6, 
                             R2 = c(0.1, 0.2), 
                             D = NA, z = 2),
                  overwrite = TRUE,
                  temporary = TRUE)
d <- dbi_table(db, 'd')
print(d)
## [1] "table('d')"
qlook(db, d$table_name)
## table `d` SQLiteConnection 
##  nrow: 2 
## 'data.frame':    2 obs. of  4 variables:
##  $ AUC: num  0.6 0.6
##  $ R2 : num  0.1 0.2
##  $ D  : int  NA NA
##  $ z  : num  2 2

Now we can define a query over this table.

q <- d %.>%
  select_rows_nse(., R2 > 0.14) %.>%
  extend_nse(., c = sqrt(R2)) %.>%
  select_columns(., c("AUC", "R2", "c"))

The idea is:

  • The variable d is a table model (name of the table and a set of assumed column names) that allows us to reason about an actual database table to specified later.
  • The query q is a sequence of operators we can hold, examine, and alter.

We can print the query/operator pipeline:

cat(format(q))
table('d') %.>%
 select_rows(.,  := R2 > 0.14) %.>%
 extend(.,
  c := sqrt(R2)) %.>%
 select_columns(., AUC, R2, c)

And we can ask questions of it:

## [1] "AUC" "R2"  "c"
## $d
## [1] "table('d')"
## $d
## [1] "AUC" "R2"

And we can convert the operator pipeline to SQL which can then be applied to an actual database table.

sql <- to_sql(q, db)
cat(sql)
SELECT
 `AUC`,
 `R2`,
 `c`
FROM (
 SELECT
  `AUC`,
  `R2`,
  sqrt ( `R2` )  AS `c`
 FROM (
  SELECT * FROM (
   SELECT
    `d`.`AUC`,
    `d`.`R2`
   FROM
    `d`
  ) tsql_0000
  WHERE `R2` > 0.14
  ) tsql_0001
) tsql_0002
DBI::dbGetQuery(db, sql) %.>%
  knitr::kable(.)
AUC R2 c
0.6 0.2 0.4472136

Ad Hoc mode

rquery also has an “Ad Hoc” mode for interactive analysis.
In this mode things are sped up in that the use can work with in-memory tables and also skip the table modeling step.

Let’s first set the global variable winvector_temp_db_handle to our database handle so the ad hoc mode knows which database to use to implement the analyses.

winvector_temp_db_handle <- list(db = db)

We can now run operators directly on in-memory data.frames.

dL <-  data.frame(AUC = 0.6, 
                  R2 = c(0.1, 0.2), 
                  D = NA, z = 2)

dL %.>%
  select_rows_nse(., R2 > 0.14) %.>%
  knitr::kable(.)
AUC R2 D z
0.6 0.2 NA 2
dL %.>%
  select_rows_nse(., R2 > 0.14) %.>%
  extend_nse(., c = sqrt(R2))  %.>%
  select_columns(., c("AUC", "R2", "c")) %.>%
  knitr::kable(.)
AUC R2 c
0.6 0.2 0.4472136

Using a function wrapper we can also save ad hoc pipelines for later use.

q2 <- . := {
  select_rows_nse(., R2 > 0.14) %.>%
  extend_nse(., c = sqrt(R2)) %.>%
  select_columns(., c("AUC", "R2", "c"))
}

dL %.>% 
  q2 %.>%
  knitr::kable(.)
AUC R2 c
0.6 0.2 0.4472136

Or we can use a table model based pipeline directly (without needing additional wrapping).

needed_columns <- columns_used(q)
print(needed_columns)
## $d
## [1] "AUC" "R2"
q3 <- table_source(table_name = 'tmp', 
                   columns = needed_columns$d) %.>%
  select_rows_nse(., R2 > 0.14) %.>%
  extend_nse(., c = sqrt(R2)) %.>%
  select_columns(., c("AUC", "R2", "c"))

dL %.>% 
  q3 %.>%
  knitr::kable(.)
AUC R2 c
0.6 0.2 0.4472136

For stored queries we either need the table model (which places a bound on what columns are thought to exist in the table) or a function wrapper (which allows us to use the later to be named table as our future table bound).

We can also use the original pipeline q, but only after removing the original backing table (for safety the ad hoc system will not overwrite existing tables).

DBI::dbExecute(db, "DROP TABLE d")
## [1] 1
dL %.>% 
  q %.>%
  knitr::kable(.)
AUC R2 c
0.6 0.2 0.4472136

How Ad Hoc Mode Works

Ad Hoc mode is implemented on top of wrapr::%.>% using a couple of R tricks. Unlike magrittr::%>% the wrapr::%.>% operator does not delay evaluation or capture the entire pipeline, it actually executes it arguments in order at the time of deceleration.

Basic ad hoc mode

The basic version of ad hoc mode is implemented by overriding the S3 classes as.data.frame() and print() for our rquery::"relop" operator trees / pipelines.

Consider our earlier ad hoc pipeline:

z <- dL %.>%
  select_rows_nse(., R2 > 0.14) %.>%
  extend_nse(., c = sqrt(R2))  %.>%
  select_columns(., c("AUC", "R2", "c"))

class(z)
## [1] "relop_select_columns" "relop"                "wrapr_applicable"

Notice z declares class "relop". This means z is a rquery operator tree. Formatting it shows that it is starts with “table+()” node, meaning the operator tree has a reference to an in-memory data.frame bound into it.

cat(format(z))
table+('rquery_tmp_0brypvvr7napopvqohzg_0000000000') %.>%
 select_rows(., rquery_select_condition_1 := R2 > 0.14) %.>%
 extend(.,
  c := sqrt(R2)) %.>%
 select_columns(., AUC, R2, c)
cat(to_sql(z, db))
SELECT
 `AUC`,
 `R2`,
 `c`
FROM (
 SELECT
  `AUC`,
  `R2`,
  sqrt ( `R2` )  AS `c`
 FROM (
  SELECT * FROM (
   SELECT
    `rquery_tmp_0brypvvr7napopvqohzg_0000000000`.`AUC`,
    `rquery_tmp_0brypvvr7napopvqohzg_0000000000`.`R2`
   FROM
    `rquery_tmp_0brypvvr7napopvqohzg_0000000000`
  ) tsql_0000
  WHERE `R2` > 0.14
  ) tsql_0001
) tsql_0002

The production of SQL and execution is triggered if we pass z to one of the generic S3 functions as.data.frame() or print() (including the possible implicit print() implied by R’s statement rules):

print(z)
##   AUC  R2         c
## 1 0.6 0.2 0.4472136
as.data.frame(z)
##   AUC  R2         c
## 1 0.6 0.2 0.4472136

knitr::kable() itself calls as.data.frame() at some point, allowing z results to formatted by passing to knitr::kable():

knitr::kable(z)
AUC R2 c
0.6 0.2 0.4472136

Stored ad hoc pipeline mode

To re-use regular operator trees as ad hoc pipelines we need one more trick: the operator tree object needs to act as if it were a function. To achieve the above we add a wrapr::"wrapr_applicable" class deceleration to our rqeyr::"relop" operator/query nodes. This gives us the ability to treat an rquery operator tree as a data processing pipeline. Results are then produced by overriding the S3 methods as.data.frame() and print().

This is a bit simpler if demonstrated.

class(q)
## [1] "relop_select_columns" "relop"                "wrapr_applicable"
cat(format(q))
table('d') %.>%
 select_rows(.,  := R2 > 0.14) %.>%
 extend(.,
  c := sqrt(R2)) %.>%
 select_columns(., AUC, R2, c)
dL %.>% 
  q %.>%
  knitr::kable(.)
AUC R2 c
0.6 0.2 0.4472136

For an R object q that adds "wrapr_applicable" to its class declarations the wrapr pipeline operator (%.>%) will interpret “dL %.>% q” as essentially “(q$wrapr_function)(dL)”. The "wrapr_applicable" facility allows objects to declare what sort of function they would like to be treated a in a pipeline (and the actual signature of the call is “(q$wrapr_function)(dL, q, parent.frame())”, so the object is given a reference to itself as the second argument of the special function).

Cleanup

The ad hoc method defaults to using a transient RSQLite database connection.
Our a non-transient DBI database connection can be specified by adding one as the “db” value in a list bound to the global variable “winvector_temp_db_handle” (as we did in this note). If one has done this one can use a more powerful database (such as PostgeSQL which has window functions). In this case one should also probably close the DB connection or at least break the reference when finished as follows.

DBI::dbDisconnect(winvector_temp_db_handle$db)
winvector_temp_db_handle <- NULL