Here's my situation, I have the following tokio-tungstenite
web socket:
let (ws, _) = tokio_tungstenite::connect_async(WEBSOCKET).await.unwrap();
let (write, read) = ws.split();
I will receive an infinite stream of items of type A
from read
, I need to compute a function f: A -> B on each item of the stream by means of adapters' composition and, at last, I need to send each item of type B
into write
.
My async understanding is very basic, but I can code f
effortlessly. I end up with:
read
.adapter_1() // --
.adapter_2() // |
... // | - f
.adapter_n() // --
Then I must direct this stream into write
which is a SplitSink
. Now, I can't use a for_each
and reasonably so, because writer.send()
mutably borrows writer
in the returned future.
Then I tried with forward()
and send_all()
, but in both cases the program blocks, if I understand correctly, due to the stream being infinite.
Now I have the following idea, which I have not tested yet and I don't know if it is idiomatic or not. It basically consists in spawning an asynchronous tasks and using a channel:
let (s, r) = tokio::sync::mpsc();
tokio::spawn(async move {
read
.adapter_1()
...
.adapter_n()
.for_each(|b| async {
s.send(b).await.unwrap();
});
};
write.send(r.recv().await.unwrap()).await.unwrap();
Am I on the right track? Am I missing something?