How to handle shared context/const state in async stream adaptor, e.g. as returned by filter_map/scan? Shared between the invocations of the provided closure and owned by the closure and/or resulting stream itself.
Edit: Not only shared between invocations but also between the returned futures by the closure. My use case is a line-wise stream processor of a hyper body that uses an Arc<State> as context. I would prefer not to clone that Arc for every line (I know it‘s probably not a performance issue but it feels unnecessary somehow).
This question has already been asked several times in similar forms but I couldn't find a working/satisfying answer yet. (See this, this, this and this).
Here is a playground with different attempts to solve this. None of them feels to be the right way.
Asked in a different way:
How to get this to work?
async fn foo(input: String, context: &String) -> String {
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
format!("Got {} with context {}", input, context)
}
fn create_stream4(
stream: impl Stream<Item = String>,
context: String,
) -> impl Stream<Item = String> {
// doesn't work
stream.scan(context, |context, input| async move { Some(foo(input, context).await) })
}