Caching Stream Body [Axum + S3]

I have following handler for my endpoint:

pub async fn analysis(
    Path((analysis_id, category,)): Path<(String, String, String, String)>,
    extra_params: Query<ExtraParams>,
) -> Response {

At the end it returns this:

async fn get_stream(s3_bucket: &str, object_path: String, object_type: &str) -> Response {
    match s3::get_stream(s3_bucket, &object_path).await {
        Ok(data_stream) => {
            let body = StreamBody::from(data_stream.bytes);
            let response = Response::builder()
                .header("Content-Type", object_type)
                .body(body)
                .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response());
            response.into_response()
        }
        Err(_) => StatusCode::NOT_FOUND.into_response(),
    }
}

The s3::get_stream returns:

bucket.get_object_stream(remote_path).await?;

Here my goal is to directly stream file from s3 storage to client without putting them on the servers memory. Am I doing it right?

Seems like it works, now I want to cache the stream data (binary) to redis cache on the separate server.
What's best option to cache the data?

I was thinking to use an separate thread within get_stream method to convert stream to bytes and send to Redis cache, and the original thread would serve the stream, but that would kill the purpose of the streaming :confused:

That's not quite what's happening. Your server works as a proxy, not as a redirect. So your s3 buckets streams the data to your server which then streams that data to the client. So the items of the stream will touch the server's memory.

I've never worked with Redis streams, but I'd probably try the async-stream crate to upload every chunk from the stream to redis before streaming it to the client. Here a sketch of what I have in mind:

use async_stream::stream;

use redis::AsyncCommands;

let s = stream! {
    for await chunk in data_stream.bytes {
        // here somehow upload stream to redis
        if let Err(e) = redis::cmd("XADD")
            .arg("your-stream")
            .arg("...")
            .arg(chunk)
            .query_async(&mut con)
            .await
        {
            // here handle upload errors somehow
        }

        yield chunk;
    }
};

let body = StreamBody::from(s);
1 Like

Thanks Jonas, I will try to play with redis streams and see what can I do

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.