Help me understand a bit of async Rust

Here's my situation, I have the following tokio-tungstenite web socket:

let (ws, _) = tokio_tungstenite::connect_async(WEBSOCKET).await.unwrap();
let (write, read) = ws.split();

I will receive an infinite stream of items of type A from read, I need to compute a function f: A -> B on each item of the stream by means of adapters' composition and, at last, I need to send each item of type B into write.

My async understanding is very basic, but I can code f effortlessly. I end up with:

read
    .adapter_1()  // --
    .adapter_2()  //   |
    ...           //   | - f
    .adapter_n()  // -- 

Then I must direct this stream into write which is a SplitSink. Now, I can't use a for_each and reasonably so, because writer.send() mutably borrows writer in the returned future.

Then I tried with forward() and send_all(), but in both cases the program blocks, if I understand correctly, due to the stream being infinite.

Now I have the following idea, which I have not tested yet and I don't know if it is idiomatic or not. It basically consists in spawning an asynchronous tasks and using a channel:

let (s, r) = tokio::sync::mpsc();

tokio::spawn(async move {
    read
        .adapter_1()
        ...
        .adapter_n()
        .for_each(|b| async {
            s.send(b).await.unwrap();
        });
};

write.send(r.recv().await.unwrap()).await.unwrap();

Am I on the right track? Am I missing something?

It seems to me that the simplest is to use a loop.

while let Some(msg) = read.next().await {
    let transformed = adapter_3(adapter_2(adapter_1(msg?)));
    write.send(msg).await?;
}

The main disadvantage of the loop is that you don't start processing the next item until the previous item is sent. If that's a problem, then you should indeed be using an mpsc channel, using its buffer to allow multiple in-flight messages. Though even then, I would still use a loop.

let (send, recv) = tokio::sync::mpsc::channel(16);

tokio::spawn(async move {
    while let Some(msg) = read.next().await {
        let transformed = adapter_3(adapter_2(adapter_1(msg?)));
        send.send(msg).await?;
    }
});

while let Some(msg) = recv.recv().await {
    write.send(msg).await?;
}

In general, I find that stream adapters do not help with making code clearer.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.