I am using rusoto_s3, flate2, and serde to stream a .json.gz file from s3 and deserialize it. I've got it all working by calling
let reader = s3_client
.get_object(req)
.await?
.body
.unwrap()
.into_blocking_read()
and then calling runtime.block_on(reader) to get something that acts as a synchronous stream, where runtime is a tokio runtime I allocated. Then I can wrap it all with a GzDecoder and a BufReader, and this all works in sync world.
Now I'm trying to do some stuff with sqlx, which requires going async. But I can't just keep on treating the s3 reader as a synchronous read object any more, because I can't create a new lifetime when one is already running, and I can't use flate2::GzDecoder because it doesn't work with async streams. I found async_compression::tokio::bufread::GzipDecoder, which I can feed by wrapping a tokio::BufReader around the result of calling .into_async_read():
let async_reader = s3_client
.get_object(req)
.await?
.body
.unwrap()
.into_async_read();
let gzip_decoder = GzipDecoder::new(BufReader::new(async_reader));
But that is really no help because I can't use that with serde, which seems only to work synchronously.
How can I get a synchronous BufReader from rusoto to feed to serde while being able to use sqlx? Is there another crate I should be looking at?
There's no resumable parsing support in serde, so you'll need to collect the data up before parsing it (technically it does support repeatedly parsing items in a streaming like way, but you still need the entire item available)
Since GzipDecoder implements AsyncRead from futures, from what I can tell, you should be able to use AsyncReadExt in futures_util::io - Rust to collect the entire body asynchronously.
In general, the async world at the moment tries to keep the base traits like AsyncRead or Stream to the minimum, and add functionality through extension traits, often in higher level utility crates. It's a bit confusing!
The files in question are very large and not suitable for pulling into memory.
I figured out something that works for me:
Get an async_reader using .into_async_read()
use spawn_blocking to start a task where blocking is permitted
use tokio_util::io::SyncIoBridge to convert the async reader into a synchronous one, then wrap it in GzDecoder and a BufReader and do all the parsing in that task.
It turns out that flate2::GzDecoder is about twice as fast as async_compression's GzipDecoder.
Any suggestions for alternative implementations would be appreciated.
That should also work, and much better if your files are large, but be aware you're starting a thread for each concurrent file. If you're processing a bunch of these large files, you might want to limit how many are processed at once.