I am using rusoto_s3, flate2, and serde to stream a .json.gz file from s3 and deserialize it. I've got it all working by calling
let reader = s3_client
.get_object(req)
.await?
.body
.unwrap()
.into_blocking_read()
and then calling runtime.block_on(reader)
to get something that acts as a synchronous stream, where runtime is a tokio runtime I allocated. Then I can wrap it all with a GzDecoder and a BufReader, and this all works in sync world.
Now I'm trying to do some stuff with sqlx, which requires going async. But I can't just keep on treating the s3 reader as a synchronous read object any more, because I can't create a new lifetime when one is already running, and I can't use flate2::GzDecoder because it doesn't work with async streams. I found async_compression::tokio::bufread::GzipDecoder, which I can feed by wrapping a tokio::BufReader around the result of calling .into_async_read()
:
let async_reader = s3_client
.get_object(req)
.await?
.body
.unwrap()
.into_async_read();
let gzip_decoder = GzipDecoder::new(BufReader::new(async_reader));
But that is really no help because I can't use that with serde, which seems only to work synchronously.
How can I get a synchronous BufReader from rusoto to feed to serde while being able to use sqlx? Is there another crate I should be looking at?