Axum streaming data to multiple HTTP clients with bounded memory usage

I'm using Axum to stream data over a long-running HTTP connection and I'd love suggestions on how to refactor it so:

  1. The memory usage can be defined and limited in advance
  2. Slow consumers are disconnected.

Here's a sketch of the current code:

fn make_routes() -> Router {
    async fn stream_data() -> impl axum::response::IntoResponse {
        let (v, mut rx) = {
            let snapshot = shared_data_behind_rw_lock.read().unwrap();
            // Subscribe to the broadcast channel while snapshot is locked so that we don't miss any updates.
            let rx = tx.subscribe();
            (snapshot.clone(), rx)
        };

        let s = stream! {
            yield MyError::Ok(v.into());

            while let Ok(x) = rx.recv().await {
                yield Ok(x)
            };
        };
        Body::from_stream(s)
    }

    Router::new().route("/data", get(stream_data))
}

Essentially, a producer (not shown) appends data to shared_data_behind_rw_lock and also writes to a tx channel.
When a new HTTP client connects, they're first streamed the past data (the yield of the value created by snapshot.clone()) and then streamed each new piece of data as it comes in (via its rx channel).

In the current design, the memory usage is unbounded since the entire dataset is cloned for every new consumer that connects.

This could be resolved by a shared reference a common single data store, but in that case it's not clear to me how to keep a slow consumer from "holding onto the head".

Is there a common pattern or library that would be suitable here?
I imagine something could be done with concurrent ring buffers, but I'm not sure how to set that up in an async context (especially since I intend for this code to be a library, and can't assume that the async executor context is single threaded).

If you want a fixed-size ring buffer with support for one writer and multiple readers, you could consider HistoryBuffer from the heapless crate wrapped in an Arc + RwLock. The ringbuffer crate is widely used too.

The basic idea would be to clone the Arc for every consumer and pass it using app state to your handler function and also your buffer writer task. When a new consumer makes a connection, you can stream all the data from least to most recent from the buffer, which would minimize memory consumption. You might even be able to get rid of the message channels by polling the ring buffer directly (whether this is efficient or not depends on your specific requirements).

As for slow consumers, you could try timing how fast the stream is being transmitted and disconnect clients that are too slow.

Thanks! Yeah, I think disconnecting the slow clients is the tricky part. If they have access to a big slice of the ring buffer (while holding a lock) then clients will block writes, which isn't suitable for my application.

What if there was a monotonic counter associated with the buffer and clients had an API "give me element N or Error if that's no longer in memory". Then new clients would start their iteration from the current N and continue until they got an error, in which case they would know they're going too slow and so should close the connection.

The monotonic counter sounds like something to try. You could implement it yourself by wrapping an existing ringbuffer implementation.

The monotonic ring buffer idea seems to work, though I don't write data-structure-y code often so I wouldn't be surprised if there are some lurking bugs in the following.

Here's the consumer:

    async fn stream_data<const N: usize>(
        State(state): State<AppState<N>>,
    ) -> impl axum::response::IntoResponse {
        let s = stream! {
                let mut idx = rw_locked_buffer.read().await.bottom();
                loop {
                    let r = { rw_locked_buffer.read().await.get(idx) };
                    match r {
                        buffer::GetResult::Ok(x) => {
                            yield MyError::Ok(IntoBytes::into_bytes(x));
                            idx += 1;
                        }
                        buffer::GetResult::Expired => {
                            debug!("Slow consumer, closing connection");
                            break;
                        }
                        buffer::GetResult::WaitUntil(notify) => {
                            notify.notified().await;
                            continue;
                        }
                    }
                }
        };

        Body::from_stream(s)
    }

which is backed by the following RwLock'd ring buffer. T will probably be a f64 in most use cases, and if consumers are faster than producers, it means cloning an Arc<Notify> on every get --- that feels like a lot of overhead, but I don't have great intuition here.

use std::sync::Arc;
use tokio::sync::Notify;

/// Ring buffer with monotonic index
pub struct R<const N: usize, T> {
    top: usize,
    v: Vec<T>, // TODO: replace with mut slice so consumers can provide their own buffer?
    notify: Arc<Notify>,
}

#[derive(Debug)]
pub enum GetResult<T> {
    Ok(T),
    Expired,
    WaitUntil(Arc<Notify>),
}

impl<const N: usize, T: Clone> R<N, T> {
    // TODO: get rid of this default param and use MaybeUninit for the unobservable initial vector data
    pub fn new(default: T) -> Self {
        Self {
            top: 0,
            v: std::iter::repeat(default).take(N).collect(),
            notify: Arc::new(Notify::new()),
        }
    }

    pub fn get(&self, idx: usize) -> GetResult<T> {
        use GetResult::*;
        if idx < self.bottom() {
            return Expired;
        }

        if idx < self.top() {
            return Ok(self.v[idx % N].clone());
        }

        return WaitUntil(self.notify.clone());
    }

    pub fn top(&self) -> usize {
        self.top
    }

    pub fn bottom(&self) -> usize {
        self.top.saturating_sub(N)
    }

    pub fn push(&mut self, x: T) {
        self.v[self.top % N] = x;
        self.top += 1;
        self.notify.notify_waiters();
    }
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.