The new core we're building for InfluxDB (named InfluxDB IOx) uses Datafusion for query execution. We have multiple team members contributing to this release and we're super excited to be involved with it.
I think it's a really exciting time for new OLAP systems because of Arrow, Rust, and the rise of object store + ephemeral compute for analytical and time series data.
I've been following Arrow and Datafusion dev for a little bit, mostly because the architecture and goals look interesting.
What I'd be curious about is one of the possible use cases mentioned in the Readme: ETL processes. I have yet to come across any projects that are building ETL/ELT/pipeline tools that leverage Datafusion. Might not be looking in the right places.
Would anyone have insight into whether this is simply unexplored territory, or just not as good of a fit as other use cases?
I have done a lot of work in the ETL space in Apache Spark to build Arc (https://arc.tripl.ai/) and have ported a lot of the basic functionality of Arc to Datafusion as a proof-of-concept. The appeal to me of the Apache Spark and Datafusion engines is the ability to a) seperate compute and storage b) express transformation logic in SQL.
Performance: From those early experiments Datafusion would frequently finish processing an entire job _before_ the SparkContext could be started - even on a local Spark instance. Obviously this is at smaller data sizes but in my experience a lot of ETL is about repeatable processes not necessarily huge datasets.
Compatibility: Those experiments were done a few months ago and the SQL compatibility of the Datafusion engine has improved extremely rapidly (WINDOW functions were recently added). There is still some missing SQL functionality (for example to run all the TPC-H queries https://github.com/apache/arrow-datafusion/tree/master/bench...) but it is moving quickly.
I spent some time evaluating Arc for my team's ETL purposes and I was really impressed. I hesitated somewhat to move forward with it because it seemed really tied into the Spark ecosystem (for great reasons). We just weren't at all familiar with deploying and operating Spark, so ended up rolling our own scripts on top of (an existing) Airflow cluster for now.
Besides performance reasons, are there any other advantages to porting Arc to run on top of datafusion? If the porting effort was shared somewhere I'd love to dig in and see what the proof-of-concept looks like.
Hi eduren. Give me a few days and Ill see what i can publish as a WIP repo. The aim of Arc was to always allow swapping the execution engine whilst retaining the logic - hence SQL -so this should hopefully be easy.
Rust stuff tends to be a bit more resource efficient than Java.
Currently using DataFusion from Rust, and being more resource efficient means we can use smaller machines, which means our costs go down. Deploying services is also faster (smaller docker images, faster startup times) and puts less extraneous load on our machines.
I imagine Arc, and thus downstream users, would see similar benefits.
ETL pipeline is a perfect fit for Datafusion and its distributed version Ballista. Personally, this is the main reason I am investing my time into Datafusion.
"DataFusion is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format.DataFusion supports both an SQL and a DataFrame API for building logical query plans as well as a query optimizer and execution engine capable of parallel execution against partitioned data sources (CSV and Parquet) using threads.
DataFusion also supports distributed query execution via the Ballista crate."
"Use Cases:
DataFusion is used to create modern, fast and efficient data pipelines, ETL processes, and database systems, which need the performance of Rust and Apache Arrow and want to provide their users the convenience of an SQL interface or a DataFrame API."
There is a PR from me (Daniël, committer) with for db-benchmark. For the group by benchmarks, on my machine, it is currently somewhat slower than the fastest (Polars).
Also we do support running TPC-H benchmarks. For the queries we can run, those are already finishing faster than Spark. We are planning to do more benchmarking and optimizations in the future.
Yes, that's pretty exciting!
There is even support in Polars to execute the dataframe compute in DataFusion (as it can handle larger than memory datasets).
There is experimental support for distributed query execution with spill-to-disk between stages to support larger than memory datasets. This is implemented in the Ballista crate, which extends DataFusion.
I didn't dive into Vaex's implementation, but based on the example code, I would say they are similar in the sense that they all provide a Dataframe interface for end users to perform compute on relational data.
It looks like Vaex focuses more on end users like data scientists while Datafusion focuses more on being a composable embedded library for building analytical engines. For example, InfluxDB IOx, Ballista and ROAPI all uses Datafusion as the compute engine.
On top of that, Datafusion also comes with a builtin SQL planner so users can choose between Dataframe and SQL interfacts.
The Apache Spark project is many many years ahead of DataFusion & Ballista with more than a decade of work from more than 1,700 contributors and is going strong.
I don't see DataFusion as a competitor to Spark since it is specifically designed as an embedded library and is optimized for in-memory processing with low overhead.
Ballista is highly influenced by Spark and is capable of running some of the same queries that Spark can support. There is enough functionality to be able to run a subset of the TPC-H benchmarks for example, with reasonable performance at scale. So for users wanting to run those kind of SQL queries, maybe Ballista isn't so far off, but Spark has much more functionality than this and it could potentially take years of effort from a community to try and catch up with Spark. It will be interesting to see what happens for sure.
I might be wrong on this, but I don't believe this is a replacement for Spark. Rather this is similar to the Spark SQL execution engine.
I don't believe there is any focus on providing a distributed execution environment, rather platforms like Spark and Flink could integrate DataFusion as an implementation and expose the API for Apache Arrow operations.
Datafusion, and Ballista by definition, also provides a Dataframe API that let's you construct queries programmatically. It also has preliminary support for UDFs.
We also have community members implementing Spark native executors using Datafusion, which showed significant speed improvements in the initial PoC.
Hi. Is it possible to use Datafusion remotely, likea query service? Perhaps using Arrow Flight? I would like to query data with different clients. Python in Jupyter, straight to browser and perhaps even something like Nu shell. This way each tool won’t need to open its own copy of Arrow/Parquet data.
Yes. The Ballista crate (part of the arrow-datafusion repo) provides distributed query execution and the scheduler has a gRPC service. Flight is used internally as well but not directly exposed to users. There is also work in progress to add Python bindings for Ballista (they already exist for DataFusion).
Thank you. I went through its GitHub repo for docs. It seems I need to dig a bit deeper perhaps. How to get started with my Parquet files isn’t immediately obvious.
I assume Python bindings would talk through gRPC. I could use gRPC directly perhaps?
Thanks. I’m experimenting with Rust currently. This might fit the bill. I am curious though why does the client need to use async Rust. I hadn’t gotten that far in my learnings. I would have guessed that synchronous way should work as well.
If the vision is to pseudo-copy the best bits of postgres, I'd be very interested in seeing features that tackle PostGIS type spatial problems. Native spatial work that actually scales to handle global level data in a single node still feels like a pipe dream a lot of the time. Adding things like Dask or xarray feel like hacks on imperfect base layers just to get a base system to be barely operational.
Because if you have many TBs of data it's cheaper to run something like Spark across a bunch of smaller machines than it is to try and set up a many-TB PostgreSQL instance.
A trick that many data warehousing tools use these days is to farm out computing to where the data is stored.
You might have a PB of data spread across 100 different instances. When a SQL query comes in you break that up into a query plan that can be run in parallel against the subset of data on each of those instances, then aggregate together the results.
It's cheaper to send the computation out to run next to the data than it is to copy the data back to the nodes that are executing the computation.
It's all variants of the classic map/reduce technique.
As a result, a data warehouse may be able to run a dumb SQL 'like' query against everything it is storing in a reasonable amount of time - since it gets to run in parallel.
The trade-off is that you don't have consistency - ACID etc - or real-time results against data changes - generally your data warehouse will be repopulated on a schedule, but it won't be great at answering questions about changes that just happened a few seconds ago.
> Why not just use traditional DBMS like PostgreSQL
Serialization. It's usually a non-trivial percentage of time spent in a lot of distributed systems, and for some workloads it can be the bulk of time spent.
If I want to grab 3GB of data from a remote host and process it locally, we have to agree on how that data is going to be transferred so I can use it. Could very well be SQL, so we have some sort of network-based tabular data stream. Maybe it's Parquet files so we're using NFS/S3 to copy the files to local disks before reading into a completely separate in-memory data structure. At the end of this workload, I have 1GB of data I now want to write it back. Maybe the data is stored in-memory as an array of mixed-type structs, but I can't just send those bytes as-is to SQL server or mmap to the filesystem and expect Parquet to know what it means.
Apache Arrow and DataFusion aims to eliminate all that work in rewriting bytes between hosts. Imagine being able to create a cost-based optimized query plan on Host A, send it to Hosts M-P for processing, and even have that query plan trickle down to Parquet predicates when reading files from disk, before returning data to Host A which can simply be copied from the network into local memory and you can start working with it right away.
Data formats like Parquet/Arrow and DataFusion are optimised for high speed read/write (and the processing) of large amounts of data, which is generally what you’re going to be using them for.
Additionally, as others have mentioned, clustered processing for larger-than-machine/ram datasets is a bit easier to manage compared to setting up a database cluster.
Another benefit is ephemeral-compute: we have Kubernetes cluster, and a particular message in a Kafka topic can kickstart a a spark job across several machines in the cluster (possibly causing auto scaling) which processes the x-TB’s of data it needs to, writes the results out and then finishes. Faster, cheaper and more suited than keeping a multi-node db cluster going.
Also lets us run non-SQL stages with less bottlenecks: bulk ML scoring, bulk data enrichment, etc.
What's the relationship between Datafusion and Ballista? They seem to have been merged into a single repo. Do they share a release schedule? Are they a single product or still separate?
Ballista started out as a separate project and was donated in April 2021. They currently share a release schedule (but have different versioning) and this was the first release of DataFusion to include the Ballista crate.
My hope is that Ballista and DataFusion become more integrated over time but remain separate, with DataFusion being an embedded / single-process query engine and Ballista providing distributed execution.
How would you compare the goals, vision, and current status of DataFusion with DuckDB? (www.duckdb.org)
Could DuckDB be an execution engine for Ballista?
DuckDB can work on Arrow data, so I think it could coexist with DataFusion / Ballista quite well.
I am not sure whether having different execution engines for Ballista is on the roadmap, but it's certainly a possibility!
I think it's a really exciting time for new OLAP systems because of Arrow, Rust, and the rise of object store + ephemeral compute for analytical and time series data.