Shared context in stream processor

How to handle shared context/const state in async stream adaptor, e.g. as returned by filter_map/scan? Shared between the invocations of the provided closure and owned by the closure and/or resulting stream itself.
Edit: Not only shared between invocations but also between the returned futures by the closure. My use case is a line-wise stream processor of a hyper body that uses an Arc<State> as context. I would prefer not to clone that Arc for every line (I know it‘s probably not a performance issue but it feels unnecessary somehow).

This question has already been asked several times in similar forms but I couldn't find a working/satisfying answer yet. (See this, this, this and this).

Here is a playground with different attempts to solve this. None of them feels to be the right way.

Asked in a different way:
How to get this to work?

async fn foo(input: String, context: &String) -> String {
    tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
    format!("Got {} with context {}", input, context)
}

fn create_stream4(
    stream: impl Stream<Item = String>,
    context: String,
) -> impl Stream<Item = String> {
    // doesn't work
    stream.scan(context, |context, input| async move { Some(foo(input, context).await) })
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.