Write string to SplitSink<Framed<TcpStream, T>>

I have the type MyFrame which implements Encoder and Decoder from tokio_io::codec:

impl Encoder for MyFrame {

    type Item = Bytes;
    type Error = io::Error;

    fn encode(
        &mut self,
        data: Self::Item,
        dst: &mut BytesMut
    ) -> Result<(), Self::Error> {

        dst.reserve(data.len());
        dst.put(data);

        Ok(())
    }
}

I need to write some string to writer of type SplitSink<Framed<TcpStream, MyFrame>>:

let framed: Framed<TcpStream, MyFrame> = socket.framed(MyFrame);
let (writer: SplitSink<Framed<TcpStream, MyFrame>>, reader: SplitSink<Framed<TcpStream, MyFrame>>) = framed.split();

How can I do it?

writer.send(my_string)?

writer.send(my_string)

I can't check it because now I meet with:

error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
  --> src/server.rs:60:28
   |
55 |     let (writer, reader) = framed.split();
   |          ------ captured outer variable
...
60 |                 .and_then( |id| Worker(id, 5).select(Worker(id - 1, 3))
   |                            ^^^^ cannot move out of captured outer variable in an `FnMut` closure

in code:

let framed = socket.framed(MyFrame);
let (writer, reader) = framed.split();

let processor = reader
        .for_each( |frame| {
            frame
                .and_then( |id| Worker(id, 5).select(Worker(id - 1, 3))
                .then( |res| {
                    match res {
                        Ok((winner, outsider)) => {
                            let buf = format!("Worker with id {} wins", winner.0);
                            writer.send(Bytes::from(buf));
                            Ok(())
                        },
                        Err((err1, _err2)) => Err(err1),
                    }
                }))
        })
        .and_then( ... )

The simple task became invincible monster! What does happen inside this error?

Try the following instead:

let processor = reader
        .and_then(|frame| frame.and_then(|id|  Worker(id, 5).select(Worker(id - 1, 3))
        .map_err(|(err1, _err2) err1)
        .map(|(winner, _outsider)| format!("Worker with id {} wins", winner.0))
        .forward(writer)
        .and_then( ... )

Back to the error you have. Sink::send() consumes self and returns a future that signals completion of the send operation. The key bit is it consumes self. for_each invokes the provided closure for each element, and as such it expects an FnMut closure - one that can be called multiple times (with mutation allowed, but that's not interesting here). Since the closure you wrote consumes writer, it's an FnOnce - can be called only once, and so you get the compiler error.

The approach I suggested above re-arranges the problem by creating a pipeline that funnels into Stream::forward(Sink). The idea is you set up a Stream chain that ends up producing a value that the Sink accepts (a String in your case). This Stream chain is then wired up with the Sink using forward - this combinator, as the name implies, sends all values produced by the Stream to the Sink. This avoids consuming the Sink at an individual element level.

You may need to tweak the snippet I gave above (to align the Error type, for example), but that's the gist of how to typically address these cases.

1 Like

@vitalyd, I wrote code according your advice with small change

let processor = reader
        .and_then(|frame| frame.and_then(|id| Worker(id, 3).select(Worker(id+1, 4))
        .map_err(|(err1, _err2)| err1)
        .map(|(winner, _outsider)| Bytes::from(format!("Worker with id {} wins", winner.0)))))
        .forward(writer)
        .and_then(|data| {
            println!("Socket received FIN packet and closed connection");
            Ok(())
        }); 

And it works fine! @vitalyd, Thank you!

Where do you find combinator forward()? I look for it in Future documentation, but I don'n find it inside.

Ah yes, I overlooked the Bytes being the value you want to send.

forward is a Stream combinator (Stream to Sink forwarding): futures::stream::Stream - Rust

It's effectively the inverse of futures::sink::Sink - Rust