Using Tokio for ETL Work

I have a Python application which lists a Google Cloud Storage Bucket, takes each file, downloads it, reading each line individually, attempting to read a JSON dictionary from each line, transforming the JSON dictionary, then ultimately sending it to BigQuery.

I'm using Python green threads for this. The main program thread iterates on all filenames in the bucket, submitting each file's name to a queue. On the other side of the queue, a thread pool picks up the filename and begins streaming it back, line by line. Each line is attempted to be deserialized into a JSON dictionary and then submitted to a secondary queue. On the other side of this queue is the final stage of the pipeline, transforming the JSON and uploading it into BigQuery.

So in this example, there are three types of threads:

  1. the main thread, submitting filenames within a bucket to queue A
  2. workers picking up filenames from queue A, reading these files line by line, attempting to deserialize each line as a JSON dictionary, and submitting the JSON dictionaries to queue B.
  3. workers picking up JSON dictionaries from queue B, transforming the payload, and then sending to BigQuery.

Obviously, this is unideal for a number of reasons:

  1. It's Python, so there are countless levels of optimization that can be easily had simply by switching to Rust.
  2. It's CPython, so threads aren't really threads, they're I/O green threads that get scheduled whenever I/O blocks the application. This means that the CPU load of JSON deserialization and transformation only happens on one CPU at any time, maximum.
  3. Queues are unbounded so memory is unbounded. This can be easily remedied.

This is going to be a long-running job, so performance is very important. Python's execution time is going to be O(1) dependent on the CPU frequency and idleness, whereas a multi-threaded solution will be O(n) dependent on the number of CPU cores in addition to frequency and idleness.

I've been thinking lately about how to port this into Rust efficiently. I could use crossbeam's multi-producer, multi-receiver channels between the thread pools. I could also throw in rayon perhaps, but it's probably not the most efficient thing for what I'm doing. Obviously parking_lot would be used for any Mutexes or RwLocks to utilize atomic integers instead of costly syscalls.

I then began thinking of Tokio. I have a lot of I/O bound operations occurring, such as paging through all of the filenames in the bucket, buffered reads against the files over TLS, and HTTP uploads into BigQuery. The CPU-bound tasks are largely around JSON serialization/deserialization and transformation. Using multiple thread-pools or even work-stealing threads such as in crossbeam-utils and rayon would be inefficient and would have a lot of OS overhead. This is why I'm thinking of Tokio: it would likely give me a work-stealing thread-pool and would be able to do a lot of concurrent I/O, yielding control back to the pool whenever I/O blocks.

I've used all of the other primitives listed above, but I haven't directly used Tokio yet. Since Tokio seems to be entirely built around running a server, I'm not sure how to set it up for an ETL-like job like this.

First, is this a good idea, is my thinking on this matter sound? Second, is there documentation on using Tokio in such a fashion?

As an aside, I'd like to keep thanking the amazing Rust community for all of these incredible tools. Rust's concurrency features can't be beat, and the fact that I can think of like 5 different ways to solve this problem is awesome. Thank you to the Rust team and to the community for the language and for all of these incredible libraries!

1 Like

Tokio will probably work well for your socket I/O. I’m less sure it’ll be all that useful for filesystem I/O, despite there being tokio-fs; you should experiment with it.

But you can do a hybrid: do file I/O on a threadpool and feed a tokio loop over a channel, and let tokio handle the sockets (BQ uploads).

I wouldn’t worry too much about CPU overhead related to syscalls or context switching as it sounds like you’ll be heavily I/O bound.

1 Like

Actually, I will hopefully not be working with any filesystem I/O. I will be doing buffered reads on a TLS connection.

How can I set up Tokio for doing this? Is there a guide on using it outside of a server?

Ah, my fault - I thought you were doing file I/O.

Client is really no different from a server. At its simplest, you could kick off a tcp conversation like this:

use tokio::net::TcpStream;
use tokio::prelude::*;

fn main() {
    let conn = TcpStream::connect(&"a.b.c.d:port".parse().unwrap());
    let conn = conn.and_then(|conn| {
        println!("Connected to: {}", conn.peer_addr().unwrap());
        Ok(())
    }).map_err(|e| println!("Error: {}", e));
    
    tokio::run(conn);
}

But what you'll end up writing will depend on how you want to design the nuts and bolts. At a high level, you'll tokio::run() some bootstrap future to kick things off, and then you'll tokio::spawn() individual tasks as needed; you can also use the different runtime/executor models in tokio if you want more control.

The bootstrap future might simply be the receiver side of a oneshot::channel, where some other code signals it when the whole thing is complete. Or it might be the initial request to list all files in a bucket. Then, for each bucket you'd spawn() a task that handles its processing.

This is very close in spirit to, e.g., a tcp server - it bootstraps the runtime by establishing a listening socket. For each accepted client connection, it'll spawn a task that serves that particular tcp peer. In your case, you don't have any client connections to wait for, but instead you'll initiate the actions.

I think if you understand the tokio server model, you'll understand the client model just as well.

1 Like

If I had to compose these entirely without using Tokio types, how would I do so? Like, can I submit different types of jobs to Tokio as in std::thread::spawn?