The dplyrXdf Package: Bringing Data Munging with Pipes to xdf Files

This post is authored by Hong Ooi, Senior Data Scientist at Microsoft.

Introduction

The dplyr package is a popular toolkit for data transformation and manipulation. In recent times, dplyr has become a hot topic in the R community for the way in which it streamlines and simplifies many common data manipulation tasks.

Out of the box, dplyr supports data frames, data tables (from the data.table package), and the following SQL databases: MySQL/MariaDB, SQLite, and PostgreSQL. However, a feature of dplyr is that it's extensible: by writing a specific backend, you can make it work with many other kinds of data sources. For example the development version of the RSQLServer package implements a dplyr backend for Microsoft SQL Server.

The dplyrXdf package implements such a backend for the xdf file format, a technology supplied as part of Microsoft R Server (MRS). All the data transformation and modelling functions provided with MRS support xdf files, which allow you to break R's memory barrier: by storing the data on disk, rather than in memory, they make it possible to work with multi-gigabyte or terabyte-sized datasets.

dplyrXdf brings the benefits of dplyr to xdf files, including support for pipeline notation, all major verbs, and the ability to incorporate xdf files into dplyr pipelines. It also provides some additional benefits which are more specific to working with xdfs:

  • Working with data on disk requires keeping track of where it is saved. You can often end up with many different versions of the data scattered around your filesystem, introducing reproducibility problems and making it difficult to keep track of changes. dplyrXdf abstracts the task of file management away, so that you can focus on the analytics.
  • Related to the above, the original input data source to a dplyrXdf pipeline is never modified. This provides a measure of security, so that even if there are bugs in your code (maybe you meant to use a mutate rather than a transmute), the original data is safe.
  • Consistency of interface: functions like rxCube and rxSummary use formulas in different ways, because they are designed to do slightly different things. With dplyrXdf, you don't have to remember which formula syntax goes with which function.
  • The verbs in dplyrXdf all read from xdf files and write to xdf files. The data is thus never read entirely into memory, so a dplyrXdf pipeline will work with datasets that are arbitrarily large.

Note that dplyrXdf is a shell on top of the functions provided by Microsoft R Server, which is the commercial distribution of R from Microsoft. It will not work with Microsoft R Open, as that distribution does not include support for xdf files.

Some Sample dplyrXdf Code

For this example, I'll use a dataset containing information from the 2000 U.S. census. You can download the data from https://packages.revolutionanalytics.com/datasets/Census5PCT2000.zip; it's about 900MB zipped.

library(dplyrXdf) # also loads dplyr

# downloaded and unzipped from

https://packages.revolutionanalytics.com/datasets/Census5PCT2000.zip

cens <- RxXdfData("c:/users/hongooi/documents/bigdata/Census5PCT2000.xdf")

file.size(cens@file)

#> [1] 11543031912

dim(cens)

#> [1] 14058983 265

The file contains 14 million rows by 265 columns, and is 11GB in size. While not incredibly large, it's also not small; many desktop or laptop machines would have issues fitting the data into memory.

Let's compute some simple grouped statistics, by region:

# number of observations by region

system.time(cens_counts <- cens %>% count(region))

#> user system elapsed

#> 1.36 0.19 2.46

as.data.frame(cens_counts)

#> region n

#> 1 New England Division 698383

#> 2 Middle Atlantic Division 1991649

#> 3 East North Central Div. 2261891

#> 4 West North Central Div. 965102

#> 5 South Atlantic Division 2574494

#> 6 East South Central Div. 849069

#> 7 West South Central Div. 1552333

#> 8 Mountain Division 914950

#> 9 Pacific Division 2251112

# compute mean, minimum and maximum income by region

system.time(cens_income <- cens %>%

group_by(region) %>%

summarise(avginc=mean(hhincome), mininc=min(hhincome), maxinc=max(hhincome)))

#> user system elapsed

#> 0.69 0.17 2.07

as.data.frame(cens_income)

#> region avginc mininc maxinc

#> 1 New England Division 104394.53 -20000 999999

#> 2 Middle Atlantic Division 97109.93 -30000 999999

#> 3 East North Central Div. 86779.11 -20000 999999

#> 4 West North Central Div. 84629.71 -20000 999999

#> 5 South Atlantic Division 87971.02 -20000 999999

#> 6 East South Central Div. 76899.23 -20000 999999

#> 7 West South Central Div. 81165.25 -20000 999999

#> 8 Mountain Division 79290.19 -26000 999999

#> 9 Pacific Division 91074.12 -26400 999999

On this admittedly powerful laptop, doing these calculations took about 1-2 seconds.

All the verbs in dplyrXdf take xdf files as input and create xdf files as output. This means, for example, that the output from a pipeline can be used in an MRS modelling function:

censSelect <- cens %>%

select(propinsr, age, sex, region, perwt)

mod <- rxGlm(propinsr ~ F(age) + sex + region, data=censSelect, family=rxTweedie(1.5),

pweights="perwt")

summary(mod)

#> Call:

#> rxGlm(formula = propinsr ~ F(age) + sex + region, data = censSelect,

#> family = rxTweedie(1.5), pweights = "perwt")

#>

#> Generalized Linear Model Results for: propinsr ~ F(age) + sex + region

#> Data: censSelect (RxXdfData Data Source)

#> . . .

From this sample code, we can see that dplyrXdf hides the complexity of working with xdf files while retaining their power. In particular, note the following:

  • There is no need to keep track of input and output file locations: the verbs in the dplyrXdf pipeline will automatically create files and reuse them as needed. Files that are no longer used will be deleted, so there won't be multiple orphaned files cluttering up your hard disk.
  • The summarise verb is much simpler to work with than the standard rxSummary function. It doesn't require scanning through a list of output objects to find the information you're after, and it accepts grouping variables of any type (numeric, character or factor).
  • The pipeline notation makes it clear at a glance what is the sequence of operations being carried out. This is one of the major benefits of dplyr, and is now also available for those working with xdf files.

Support for dplyr Functionality

dplyrXdf supports all the major verbs provided with dplyr. To be precise, this includes:

  • filter and select to choose rows and columns
  • mutate, mutate_each and transmute to do data transformation
  • group_by to define groups
  • summarise, summarise_each and do to carry out computations on grouped data
  • arrange to sort by variables
  • rename to rename columns
  • distinct to drop duplicates
  • left_join, right_join, inner_join, full_join, anti_join and semi_join to merge tables

This means that if you know how to use dplyr, you already know how to use dplyrXdf (for the most part). In the next sections, I'll describe the ways that dplyrXdf extends on dplyr to make better use of xdf files.

Tbls, File Management, and the persist Verb

To facilitate the task of file management, dplyrXdf defines a new tbl_xdf class. This is what allows it to keep track of which data sources should remain untouched, and which can be modified or overwritten as part of a pipeline. To the base MRS functions, a tbl_xdf object is just a normal xdf data source; thus, existing code dealing with xdfs will work with minimal modification. However, the verbs implemented in dplyrXdf will recognize when they are passed a tbl_xdf, as opposed to a normal xdf, in which case they will delete their input file after writing the output file. Thus there is always only one file that represents the latest stage of a pipeline.

A side-effect of dplyrXdf handling file management is that you should be careful when passing the result from an initial dplyrXdf pipeline into subsequent pipelines. Consider the following example:

# pipeline 1

output1 <- cens %>%

mutate(wageprop=incwage/inctot)

# use the output from pipeline 1

output2 <- output1 %>%

group_by(region) %>%

summarise(wageprop=mean(wageprop))

# reuse the output from pipeline 1 -- WRONG

output3 <- output1 %>%

group_by(sex) %>%

summarise(wageprop=mean(wageprop))

The problem here is that the second pipeline will overwrite or delete its input, so the third pipeline will fail. This is consistent with dplyrXdf's philosophy of only saving the most recent output of a pipeline, where a pipeline is defined as all operations starting from a raw xdf file. However, in this case it isn't what we want.

Similarly, dplyrXdf creates its output files in R's temporary directory, so when you close your R session, these files will be deleted. This saves you having to manually delete files that are no longer in use, but it does mean that you must copy the output of your pipeline to a permanent location if you want to keep it around.

To deal with these issues, you use the persist verb to save a pipeline's output to a non-temporary location. This also resets the status of the pipeline, so that subsequent operations will know not to modify the data.

# pipeline 1 -- use persist to save the data

output1 <- flightsXdf %>%

mutate(wageprop=incwage/inctot) %>% persist("output1.xdf")

# use the output from pipeline 1

output2 <- output1 %>%

group_by(region) %>%

summarise(wageprop=mean(wageprop))

# reuse the output from pipeline 1 -- this works as expected

output3 <- output1 %>%

group_by(sex) %>%

summarise(wageprop=mean(wageprop))

The .rxArgs Parameter

The MRS RevoScaleR functions typically have several arguments beyond those used by dplyrXdf verbs. While usually you don't need to touch these, it can sometimes be useful to do so. For example, when using mutate or transmute, you can specify more complicated transformations via a transformFunc (see the help for rxTransform). Similarly, rather than chaining together a mutate and a summarise — which would involve creating an intermediate file — you can incorporate the variable transformation into the summarise itself. More low-level uses of such arguments include setting the block size for an xdf file, changing the compression level, limiting the number of rows, and so on.

Most of the one-table dplyrXdf verbs accept an .rxArgs argument as a way of transmitting these extra arguments to the underlying MRS code. This should be a named list specifying the names and values of the arguments to be passed. Here is an example of using the .rxArgs parameter: we fit a model using an open source R function (so we need to sample down the dataset first), and then score the data using a transformFunc.

# fit a model using open source R, and then score the training dataset

# we pass the model object via transformObjects, and the package to load

# via transformPackages

# sample the data so that it fits in memory

censSmall <- cens %>%

filter(age > 20, age < 90, runif(.rxNumRows) < 0.1, .rxArgs=list(

varsToKeep=c("hhincome, propinsr", "age", "sex", "region", "perwt"))) %>%

as.data.frame

# fit a basic neural network with 6 hidden weights (no tuning)

library(nnet)

censSmall_nn <- nnet(factor(propinsr > 0) ~ age + sex + region + hhincome, data=censSmall, size=6)

# get predictions on the full dataset

cens_nnPred <- cens %>%

select(age, sex, region, hhincome, .rxArgs=list(

transformFunc=function(varlst) {

df <- data.frame(varlst)

varlst$pred <- predict(.nnObj, df)

varlst

},

transformObjects=list(.nnObj=censSmall_nn),

transformPackages="nnet"))

The factorise/ze Verb

Many RevoScaleR functions are optimized to work best with factors, or require factors as input. dplyrXdf provides a verb to convert your data to factors:

factorise(data, x1, x2, ...)

where x1, x2, … are the variables to convert. Note that the generated factor variables will overwrite the originals. For performance reasons, the levels of the generated factors are not sorted in alphabetical order. You can also specify the levels for the factor(s) in question, using the standard name=value syntax:

factorise(data, x1=c("a", "b", "c"))

This will convert the variable x1 into a factor with levels a, b and c. Any values that don't match the specified levels will be turned into NAs.

The verbs in dplyrXdf will usually create factors on the fly as needed, so you shouldn't need to call factorise very often. However, should you need it, factorise provides an explicit way to create factors as part of a dplyrXdf pipeline.

Executing Code with do and doXdf

The do verb is an exception to the rule that dplyrXdf verbs write their output as xdf files. This is because do executes arbitrary R code, and can return arbitrary R objects; while a data frame is capable of storing these objects, an xdf file is limited to character and numeric vectors only.

The doXdf verb is similar to do, but where do splits its input into one data frame per group, doXdf splits it into one xdf file per group. This allows do-like functionality with grouped data, where each group can be arbitrarily large. The syntax for the two functions is essentially the same, although the code passed to doXdf must obviously know how to handle xdfs.

# fit a regression model by region, using rxLinMod

censMods <- cens %>%

group_by(region) %>%

doXdf(model=rxLinMod(hhincome ~ F(age) + sex, data=.))

Non-xdf and Non-Local Data Sources

Despite the name, dplyrXdf supports all file data sources defined by Microsoft R Server, not just xdf files. This includes delimited text (RxTextData), SAS datasets (RxSasData) and SPSS datasets (RxSpssData). If you pass one of these data sources to a dplyrXdf pipeline, it will import the data to an xdf file and then execute the rest of the pipeline.

For the moment, dplyrXdf only supports files stored in the local filesystem. Support for datasets stored in HDFS may appear in a future version. For data stored in a SQL database, consider using the dplyr backend for that database, if available.

Obtaining dplyrXdf

You can download the package from Github, at https://github.com/RevolutionAnalytics/dplyrXdf. If you have the devtools package installed, you can download and install dplyrXdf from within R using the install_github function.

library(devtools)

install_github("RevolutionAnalytics/dplyrXdf")

Like dplyr, dplyrXdf is a package under active development. If you have any suggestions on features to add (including bits of dplyr that have been left out) or bugs that need fixing, please contact me at hongooi@microsoft.com.

Hong