Building a DataFusion CSV reader with arrow-extendr
Want to skip to the end with the source code? Click here.
Goal
For this tutorial we’re going to create a very simple Rust-based R
package using extendr and
arrow-extendr.
The package will use the new and very powerful
DataFusion crate to create
a csv reader.
“DataFusion is a very fast, extensible query engine for building high-quality data-centric systems in Rust, using the Apache Arrow in-memory format.”
We’ll learn a little bit about how extendr, Rust, and arrow-extendr
works along the way.
Create a new R package
We will use {usethis} to create a new R
package called dfusionrdr (prnounced d-fusion reader).
The following section is the standard process for creating a new Rust based R package. It’s a pretty simple process once you get used to it!
usethis::
This will open a new R project with the scaffolding of an R package.
From here, we need to make the R package into an extendr R package. To
do so we userextendr::use_extendr().
Tip
use_extendr()creates the directorysrc/a rust crate insrc/rust/as wll as a fewMakevarsfiles insrc/that are used to define how to compile the Rust library. Rust is a compiled language unlike R and Python which are interpreted. Meaning that instead of being able to run code line by line, we have to run it all at once.Compiled code can be turned into something called a static library. R can call functions and objects from these libraries using the
.Call()function. You do not need to worry about this function. It’s just for context. :)
rextendr::
Before running this, make sure you have a compatible Rust installation
by running rextendr::rust_sitrep(). If you do not, it will tell you
need to do. If you’re on windows, you’re likely missing a target.
Building your package
Once you’ve initialized extendr in your package, we can check to see if
everything worked by running the hello_world() function that is
included. To do so, we can build our package, then document it.
Tip
I use the RStudio shortcut to build my package which is
cmd + shift + bor if on Windows it’s (probably)ctrl + shift + b. If neither of those work for you, rundevtools::build().
To make R functions from Rust available into R, we run
rextendr::document().
rextendr::document() will also compile your R package for you if need
be. Personally, I prefer to build it then document it. For some
reason—and it may just be me—I find that compilation from the console
can freeze? The cargo file lock is wonky and I probably mess it up a
bunch.
Run devtools::load_all() to bring the documented functions into scope
and run the function!
devtools::
#> [1] "Hello world!"
We’ve now ran Rust code directly from R. Pretty simple Rust, but Rust nonetheless.
Adding Rust dependencies
Much like how we like to use R packages to make our lives easier, we can use Rust crates (libraries) to make do crafty things. To do so, we will open up our Rust crate in our preferred editor. I prefer VS Code.
If you haven’t configured VS Code to use with Rust, there are like a
million different ways to configure it. But at minimum, install the
rust-analyzer, BetterTOML, and CodeLLDB extensions (I think
CodeLLDB comes with the rust-analyzer though?)
Open src/rust/ in VS Code. Then we will add 3 additional dependencies.
These are
datafusion- a powerful Arrow-based
DataFramelibrary (like Polars but different)
- a powerful Arrow-based
tokio- which will give us the ability to run code lazily and asynchronously
which is required by
datafusion
- which will give us the ability to run code lazily and asynchronously
which is required by
arrow_extendr- this is a crate I built that lets us send Arrow data from Rust to R and back
In the terminal run the following
cargo add datafusion
cargo add tokio
cargo add arrow_extendr --git https://github.com/JosiahParry/arrow-extendr
arrow-extendr is not published on crates.io yet so we need to pass the
git flag to tell Rust where to find the library.
Note
This is my preferred way of adding dependencies. If you open up
Cargo.tomlyou’ll now see these libraries added under the[Dependencies]heading.
Making R work with DataFusion
DataFusion requires one additional C library that we need to use we
need to add it to our Makevars. This is not something you typically
have to do, but DataFusion requires it from us.
Open Makevars and Makevars.win. One the line that starts with
PKG_LIBS add -llzma to the end.
Again, this is not a common thing you have to do. This is specifically for our use case.
Building our CSV Reader
Open src/lib.rs. This is where your R package is defined. For larger
packages you may want to break it up into multiple smaller files. But
our use case is relatively small (and frankly, not that simple, lol!).
Let’s first start by removing our hello_world example from our code.
Delete the hello world function (lines 3-8) and remove it from the
module declaration under mod dfusionrdr.
Tip
In order to make our Rust functions available to R, we need to include them in our
extendr_module!macro call. Undermod dfusionrdrwe can add additional functions there. Those incldued in there will be made available to R. If the have/// @exportroxygen tag, then they will be exported in the R package as well.extendr_module!
Let’s create the scaffolding for our first function read_csv_dfusion()
/// @export
- The
#[extendr]macro indicates that this function will be made available to R. - We add
/// @exportto indicate that our function will be exported to R. We can add roxygen2 documentation to our functions by prefixing with///which a documentation comment wheras//is a normal comment.
This function prints a message indicating we will read a CSV at the path
provided. It takes one argument csv_path which is an &str. A &str
in Rust is a like a scalar character in R e.g. "my-file.csv"
Next we need to make sure the function is available to R in the module.
extendr_module!
From RStudio, let’s build, document, and load again.
devtools:: # 1.
rextendr:: # 2.
devtools:: # 3.
- Only run if you haven’t built with
cmd + shift + b - This brings functions into the
NAMESPACEand updates arguments and outputs - Loads everything from your package into memory
Import dependencies
In order to use DataFusion to read dependencies we need to import it. A
lot of Rust libraries have something called a prelude. The prelude
is a special module that contains common structs, traits, enums, etc
that are very useful for the crate. Notice that the top of your lib.rs
includes use extendr_api::prelude::*; this brings all of the Rust
based R objects into scope such as Robj, Doubles, Integers etc.
DataFusion also has a useful prelude that we want to bring into scope.
We will add use datafusion::prelude::*; to the top of our file (much
like adding library()). This brings important objects into scope for
us. We will also need tokio::runtime::Runtime as well.
The first 3 lines of your lib.rs should look like this:
use *;
use *;
use Runtime;
Context and Runtime
DataFusion requires something called a
SessionContext.
The session context
“maintains the state of the connection between a user and an instance of the DataFusion engine.”
We need to instantiate this struct inside of our function.
We now have a ctx object which we can use to read our csv. It has a
method called read_csv(). It requires the path of a csv to read as
well as a struct called CsvReadOptions which determines how it will be
read into memory. We will pass csv_path to the first argument and
create a default options struct with the new() method.
This will compile with a bunch of warnings about unused variables. But,
more importantly, the csv variable we created is special. If you have
your Rust analyzer configured you should see that it is of type
impl Future<Output = Result <..., ...>>. That right there is
problematic!
When you see impl Future<...> that tells us it is an asynchronous
result that needs to be polled and executed. async functions are lazy.
They don’t do anything until you ask it to. The way to do this is by
calling the .await attribute. We can then unwrap() the results and
store it into another variable.
Warning
It’s typically a pretty bad idea to use
.unwrap()since the program will “panic!” if it does not get a result that it expected. But it’s a pretty handy way to get working code without error handling. I typically handle errors after I’ve gotten the bulk of what I want working.
If we run cargo check in our terminal we will get the message:
error[E0728]: `await` is only allowed inside `async` functions and blocks
One way to get this to work would be to add async fn instead of fn
but that isn’t supported by extendr since R is single threaded and
doesn’t support async. So how do we get around this?
async with extendr and tokio
In order to run async functions we need to execute it in a runtime.
tokio provides this for us with the Runtime
struct. It lets us run impl Future<...> in a non async function!
We’ll modify our function definition to
With the Runtime object rt we can call the block_on() method which
takes a Future and runs it until it has completed. This means that we
don’t get to use async functionality—e.g. executing 2 or more things at
the same time—but we still get to take the result!
Let’s read the csv into an object called df using the block_on()
method.
The analyzer shows that this is a DataFrame. Awesome! Now, how can we
get this into memory?
Sending DataFrames to R with arrow-extendr
This is where arrow-extendr comes into play. arrow-extendr provides a
couple of traits which allow us to convert a number of arrow-rs types
into an Robj.
See my post on Rust traits for R users
Tip
An
Robjisextendr’s catch all for any type of object that can be returned to R
The IntoArrowRobj trait can convert a Vec<RecordBatch> into an
Robj. The R
documentation
for a RecordBatch says
“A record batch is a collection of equal-length arrays matching a particular Schema. It is a table-like data structure that is semantically a sequence of fields, each a contiguous Arrow Array.”
Based on that, a Vec<RecordBatch> is a collection of chunks of a
table-like data structures.
DataFrames have a method .collect() which creates a
Vec<RecordBatch>.
Let’s modify our function to turn the DataFrame into a
Vec<RecordBatch>.
Note
All things with DataFusion are done async so we need to wrap them in
rt.block_on().
With this, we can send the results to R with the into_arrow_robj()
method! First we need to add use arrow_extendr::to::IntoArrowRobj; to
the top of our script to bring the trait into scope.
Then in our function we need to specify the return type as Robj (see
the first line of the definition -> Robj) and then turn res into an
Robj
Handling arrow-rs from R
Let’s rebuild and document our function again.
I’ve added a csv of
{palmerpenguins} to
the inst/ folder of our package for testing. Let’ try reading this in.
res <-
res
#> <nanoarrow_array_stream struct<rowid: int64, species: string, island: string, bill_length_mm: string, bill_depth_mm: string, flipper_length_mm: string, body_mass_g: string, sex: string, year: int64>>
#> $ get_schema:function ()
#> $ get_next :function (schema = x$get_schema(), validate = TRUE)
#> $ release :function ()
Now, this doesn’t look very familiar to most R users. This is an object
from the
{nanoarrow} R
package called "nanoarrow_array_stream". This is how data is received
from Rust in R. We can process batches from this “stream” using the
method get_next(). But there’s a handy as.data.frame() method for
it.
Tip
This is a good time to note that you should add
nanoarrowas a dependency of your package explicitly withusethis::use_package("nanoarrow").
res <-
|>
#> rowid species island bill_length_mm bill_depth_mm flipper_length_mm
#> 1 1 Adelie Torgersen 39.1 18.7 181
#> 2 2 Adelie Torgersen 39.5 17.4 186
#> 3 3 Adelie Torgersen 40.3 18 195
#> 4 4 Adelie Torgersen NA NA NA
#> 5 5 Adelie Torgersen 36.7 19.3 193
#> 6 6 Adelie Torgersen 39.3 20.6 190
#> body_mass_g sex year
#> 1 3750 male 2007
#> 2 3800 female 2007
#> 3 3250 female 2007
#> 4 NA NA 2007
#> 5 3450 female 2007
#> 6 3650 male 2007
Boom! We’ve written ourselves a reader! Let’s do a simple bench mark
comparing it to readr.
bench::
#> # A tibble: 2 × 6
#> expression min median `itr/sec` mem_alloc `gc/sec`
#> <bch:expr> <bch:tm> <bch:tm> <dbl> <bch:byt> <dbl>
#> 1 datafusion 922.3µs 1.04ms 915. 1.07MB 0
#> 2 readr 14.6ms 15.31ms 65.6 13.2MB 80.8
Insanely fast!
Addendum
The source code for the entire package is below. It also includes a
function read_sql_csv_dfusion() which takes a SQL statement and reads
it into memory if you want to explore that. For example:
query <- 'SELECT count(*) as "n", "species" FROM csv GROUP BY "species"'
#> n species
#> 1 68 Chinstrap
#> 2 152 Adelie
#> 3 124 Gentoo
Source code:
lib.rs
use IntoArrowRobj;
use *;
use *;
use Runtime;
use Result;
/// @export
// Macro to generate exports.
// This ensures exported functions are registered with R.
// See corresponding C code in `entrypoint.c`.
extendr_module!
Cargo.toml
[]
= 'dfusionrdr'
= false
= '0.1.0'
= '2021'
[]
= [ 'staticlib' ]
= 'dfusionrdr'
[]
= { = "https://github.com/JosiahParry/arrow-extendr" }
= "33.0.0"
= '*'
= "1.34.0"