Futures question: split connection into sink/stream in two tasks


#1

I’m experimenting with futures API using the websocket library. I have this code:

use futures::future::Future;
use futures::future;
use futures::sink::Sink;
use futures::stream::Stream;
use futures::sync::mpsc::channel;
use futures::sync::mpsc::{Sender, Receiver};
use tokio_core::reactor::Core;

use websocket::{ClientBuilder, OwnedMessage};

pub fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();

    let handle_clone = handle.clone();

    let (send, recv): (Sender<String>, Receiver<String>) = channel(100);

    let f = ClientBuilder::new("wss://...")
        .unwrap()
        .async_connect(None, &handle_clone)
        .map_err(|e| println!("error: {:?}", e))

        .map(|(duplex, _)| duplex.split())
        .and_then(move |(sink, stream)| {

            // this task consumes the channel, writes messages to the websocket
            handle_clone.spawn(future::loop_fn(recv, |recv: Receiver<String>| {
                sink.send(OwnedMessage::Close(None))
                    .and_then(|_| future::ok(future::Loop::Break(())))
                    .map_err(|_| ())
            }));

            // the main tasks listens the socket
            future::loop_fn(stream, |stream| {
                stream
                    .into_future()
                    .and_then(|_| future::ok(future::Loop::Break(())))
                    .map_err(|_| ())
            })
        });

    loop {
        core.turn(None)
    }
}

After connecting to the server I want to run “listener” and “sender” tasks without one blocking the other one. The problem is I can’t use sink in the new task, it fails with:

error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
  --> src/slack_conn.rs:29:17
   |
25 |         .and_then(move |(sink, stream)| {
   |                          ---- captured outer variable
...
29 |                 sink.send(OwnedMessage::Close(None))
   |                 ^^^^ cannot move out of captured outer variable in an `FnMut` closure

I could directly use duplex to send and receive, but that leads to worse errors. Any ideas on how to make this work? Thanks.


#2

Do you have to also write move in the loop_fn closure argument? Give that a go.


#3

It seems that move can be removable – yeah. That doesn’t fix the main issue though.


#4

I think the issue is that Sink.send takes sink by value, which violates the requirement that FnMut closures only receive a mutable reference and can’t take its captured environment by value.

For the example you provided, you shouldn’t use the loop_fn construct. Just use the Sink.send future as your return value. However I’m assuming you created an example that just illustrates what you want to do. If you’re actually wanting to have the “sender” task waiting on the receiver and forward those messages to the sink, than you need to be able to own those values. This means you need to build a type that implements Future and does this for you.

Unfortunately the constructs provided by the futures library are somewhat limited because of the limitations of closures.


#5

I’m utterly confused

Just use the Sink.send future as your return value

I don’t think I can implement what I’m trying to do here with just a Sink.send call, I really need loop.

However I’m assuming you created an example that just illustrates what you want to do

Right.

If you’re actually wanting to have the “sender” task waiting on the receiver and forward those messages to the sink, than you need to be able to own those values. This means you need to build a type that implements Future and does this for you.

That’s what I’m trying to do. That future is the argument to handle_clone.spawn() in my example. I don’t understand why I need a new type for this and how/why a new type helps.

Unfortunately the constructs provided by the futures library are somewhat limited because of the limitations of closures.

It’s fine if this is possible to implement in another way, but current it seems to me that what I want to do is impossible.

Basically I want an async task that non-blockingly connects to a server and spawns to async tasks:

  1. reads from the connection and takes some action (prints to screen etc.)
  2. reads from a mpsc channel and writes to the connection

How do I do this?


#6

Look for a forward function in the futures crate (I think it’s on Stream) this lets you setup a future that just forwards all values from a stream to a sink. If you need to do some sort of transform then you can do that on the stream before sending it into the sink (or use a custom sink implementation if that makes more sense). That will handle part two for you, and splitting off the receiving side should work.

(Sorry for the brevity, on mobile)


#7

@Nemo157 does present a great solution for problem 2.

Don’t be afraid to create a Future type of your own. They can solve issues that the adaptors and construct of the futures crate cannot. It’s as simple as.

struct CommandReader {
     commander: Commander // Some type that executes the commands received
     strm: Stream // The websocket stream
}

impl Future for CommandReader {
    type Item = (); // Conform with event loop signature
    type Error = (); // Conform with event loop signature

    fn poll(&self mut) -> Poll<Self::Item, Self::Error> {
        // do the command interpreting work here....
    }
}

#8

I don’t know what sink to use here. I just want to print incoming messages to stdout, and I can’t see any relevant impls in the documentation. I tried using a closure that just prints the argument but it didn’t work (not a sink impl).


#9

This was cross-posted to Stack Overflow.


#10

Thanks @shepmaster for your reply on SO, it works.

I slightly changed the code to write socket messages to a channel:

use futures::{Future, Stream, Sink};
use futures::sync::mpsc::channel;
use futures::sync::mpsc::Receiver;
use futures::sync::mpsc::Sender;
use futures::sync::mpsc::SendError;
use tokio_core::reactor::Core;

use serde_json;
use std::boxed::FnBox;
use websocket::ClientBuilder;
use websocket;

pub fn main(recv: Receiver<String>, send: Sender<websocket::OwnedMessage>) -> Box<FnBox() -> () + Send> {
    Box::new(move || {
        let mut core = Core::new().unwrap();
        let handle = core.handle();

        let f = ClientBuilder::new("wss://...")
            .unwrap()
            .async_connect(None, &handle)
            .from_err::<Error>()
            .map(|(duplex, _)| duplex.split())
            .and_then(|(sink, stream)| {
                let reader = stream
                    .forward(send);

                let writer = sink
                    .sink_from_err()
                    .send_all(recv.map(websocket::OwnedMessage::Text).map_err(Error::Receiver))
                    .map(|_| ());

                reader.join(writer)
            });

        core.run(f).expect("Unable to run");
    })
}

quick_error! {
    #[derive(Debug)]
    pub enum Error {
        WebSocket(err: websocket::WebSocketError) {
            from()
            description("websocket error")
            display("WebSocket error: {}", err)
            cause(err)
        }
        Serde(err: serde_json::error::Error) {
            from()
            description("serde_json error")
            display("Serde JSON error: {}", err)
            cause(err)
        }
        Receiver(err: ()) {
            description("receiver error")
            display("Receiver error")
        }
        Sender(err: SendError<websocket::OwnedMessage>) {
            description("sender error")
            display("Sender error")
        }
    }
}

But I can’t figure what map_err()s to call in the forward() call to make this work. Error messages are impossible to understand. @shepmaster how did you do this in your original answer?