Simple futures::mpsc example seemingly not working

Hey guys,

I have this very simple gist that I can't get working correctly:

extern crate futures;
extern crate tokio;

use futures::sync::mpsc;
use futures::Future;
use futures::Sink;
use futures::Stream;
use tokio::io;
use tokio::net::TcpListener;

use std::str;
use std::string::String;

fn main() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();

    let (tx, rx) = mpsc::channel::<String>(10);

    let rec_future = rx.map(move |rec| {
        println!("Received in worker: {:?}", rec);
    }).for_each(|_| Ok(()));



    let addr = "0.0.0.0:6142".parse().unwrap();
    let listener = TcpListener::bind(&addr).unwrap();

    let server = listener
        .incoming()
        .map(move |socket| {
            println!("accepted socket; addr={:?}", socket.peer_addr().unwrap());
            let tx = tx.clone();
            io::read_exact(socket, [0_u8; 10])
                .map(|(_, buf)| buf)
                .map(|buf| {
                    let rec_str = str::from_utf8(&buf).unwrap();
                    println!("Received in server: {:?}", &rec_str);
                    String::from(rec_str)
                })
                .map(|data| tx.send(data))
        })
        .buffer_unordered(10)
        .for_each(|_| { Ok(()) })
        .map_err(|err| {
            println!("accept error = {:?}", err);
        });

    rt.spawn(server);
    rt.spawn(rec_future);
    rt.shutdown_on_idle().wait().unwrap();
}

I technically only want to receive some bytes, convert them to a string and print them on another future. Here tx.send(data) does not seem to be polled at all. If I change this into an tx.start_send(data) I get the desired result - but why is the send not working?

      .map(move |socket| {
            println!("accepted socket; addr={:?}", socket.peer_addr().unwrap());
            let tx = tx.clone();
            io::read_exact(socket, [0_u8; 10])
                .map(|(_, buf)| buf)
                .map(|buf| {
                    let rec_str = str::from_utf8(&buf).unwrap();
                    println!("Received in server: {:?}", &rec_str);
                    String::from(rec_str)
                })
                .map(|data| tx.send(data))
        })

map() is when you want to change the result of a Future (or Stream) (i.e. its Item associated type). But if you map to a future, that future is not scheduled - you need to call a combinator that returns T: IntoFuture, where the returned future will be put on the loop. For example, and_then() or for_each() do that. Otherwise, you risk simply dropping the future.

start_send() doesn't return a future, but instead attempts to start the operation (and succeeds in your case).

Thanks for the hint.
It looks like Result and Option implement IntoFuture. Does that mean I would need to do

...
 Some(String::from(rec_str)) }) 
.and_then(|data| tx.send(data.unwrap()))

?

You probably want something like this:

let server = listener
        .incoming()
        .for_each(move |socket| {
            println!("accepted socket; addr={:?}", socket.peer_addr().unwrap());
            let tx = tx.clone();
            // `serve` is our future chain that represents handling a given connection
            let serve = io::read_exact(socket, [0_u8; 10])
                .map(|(_, buf)| buf)
                .map(|buf| {
                    let rec_str = str::from_utf8(&buf).unwrap();
                    println!("Received in server: {:?}", &rec_str);
                    String::from(rec_str)
                })
                .and_then(|data| tx.send(data).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)));
               // spawn `serve` onto the reactor so it runs in the background, and each connection
               // is driven to completion independently
                tokio::spawn(serve.map(|_| ()).map_err(|e| println!("Error in serving client: {:?}", e)));
                Ok(())
        })
        .map_err(|err| {
            println!("accept error = {:?}", err);
        });

I took out the buffer_unordered() as I don't think it does what you think. Did you want to limit the # of bytes read for each accepted connection (which you already did by virtue of read_exact)? If you clarify what you'd like here, we can work that bit out separately.

Hm, I was told that using for_each effectively makes this computation serial and I wanted to prevent that and buffer_unordered would limit the maximum number of concurrent executions.

for_each drives a Stream to completion, which may mean processing an infinite stream (e.g. a TcpListener that's never terminated - it'll continue accepting connections). In this case, the example I showed uses tokio::spawn to send the connection servicing future to the reactor - it's not a serial operation, the connection will be serviced concurrent to other connections or anything else running on the reactor.

So did you want to limit up to 10 client connections to be serviced at any given time?

That was initially the plan, yes, I had the idea that this might somehow be faster - transforming the whole TCP stream to the point where it only has to iterate over a trivial set (something like .fold()).

Interestingly, you kind of solved the problem by chance. This:

    let server = listener
        .incoming()
        .map(move |socket| {
            println!("accepted socket; addr={:?}", socket.peer_addr().unwrap());
            let tx = tx.clone();
            io::read_exact(socket, [0_u8; 10])
                .map(|(_, buf)| buf)
                .map(|buf| {
                    let rec_str = str::from_utf8(&buf).unwrap();
                    println!("Received: {:?}", &rec_str);
                    String::from(rec_str)
                })
                .and_then(|data| {
                    tx.send(data)
                        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
                })
        })
        .buffer_unordered(10)
        .for_each(|_| Ok(()))
        .map_err(|err| {
            println!("accept error = {:?}", err);
        });

works just fine!
Why does the server future here require this OtherError?

But what you said got me interested now, there are obviously several ways to solve this. How do they differ internally? Which one scales better? I'm trying to dig a little deeper into the library and not just copy-paste snippets, I'd be very glad if you could explain that a little bit :slight_smile:

EDIT:
One reason why I used this buffer_unordered was to control backpressure. As you see, I keep on cloning that sender, making the channel somehow unbounded. With that I could potentially limit the input the receiver has to handle. At least I thought so :smiley:

Tokio uses an efficient file descriptor monitoring API provided by the kernel; on linux, e.g., it's epoll based. This interface can efficiently monitor a huge # of file descriptors for event readiness. As such, this is unlikely to be a concern for you. If you can elaborate on expected traffic patterns, we can dive in a bit further (i.e. speculate :slight_smile:) if you're interested.

Right - this will limit up to 10 of those future chains to be processed at a time.

This is just to adapt the error returned by tx.send(), which is SendError, to be the same as the error type of the first future in the chain: io::read_exact, which is std::io::Error.

As mentioned, the approach you took with buffer_unordered will ensure that at most 10 of those read_exact().map(...).and_then(...) future chains are active. Practically this means that up to 10 concurrent connections can be serviced at a time; if you get 10 buffered, further connection attempts will not be accepted until there's room in this buffer. At the extreme, if your processing takes too long, some of these connections that are sitting in the kernel's backlog will time out, and if there's no backlog room, subsequent connections will be rejected outright. That's probably the right type of backpressure that you want, so all good here.

Removing buffer_unordered leaves you with backpressure only by virtue of you having specified the channel depth to be 10. This, however, will not prevent subsequent connections from being accepted. At the extreme, if the connections are being accepted but the remote isn't sending at least 10 bytes, these futures will not complete and sit in the tokio scheduler. They'll keep piling up, consuming memory - eventually, you may run out of memory and/or file descriptors. So there's essentially no backpressure here, for all intents and purposes. Thus the approach you took seems better to me.

You should be able to see the difference by specifying the buffer size to be something like 1, and then making sure that connections are serviced serially.

Let me know if I misunderstood you or didn't answer something.