Tokio per-connection variables

#1

I’m fairly new to Rust, but I’ve got most of the basics down. I’ve written a basic parser for lines of text to do work on them, and now I’m trying to write a server for a client (that I don’t control for this project) that connects via TCP and writes a couple to a few hundred lines and then disconnects.

My basic tokio server looks like the examples:

let socket = TcpListener::bind(&addr).unwrap();
println!("Listening on: {}", addr);

let srv = socket
    .incoming()
    .map_err(|e| println!("failed to accept socket; error = {:?}", e))
    .for_each(move |conn| {
        println!("New Connection! {:?}", conn);

        let framed = LinesCodec::new().framed(conn);
        let (_writer, reader) = framed.split();

        let processor = reader
            .for_each(|line| {
                let result = parser::parse(&line);

                Ok(())
            })
            .and_then(|()| {
                println!("Socket received FIN packet and closed connection");
                Ok(())
            })
            .or_else(|err| {
                println!("Socket closed with error: {:?}", err);
                Err(err)
            })
            .then(|result| {
                println!("Socket closed with result: {:?}", result);
                Ok(())
            });

        tokio::spawn(processor)
    });
tokio::run(srv);

As a stepping stone to more complex behavior, what I’d like to do next is count the number of lines that were processed in each connection, and print them at the end when the client disconnects. In my most naive approach, I did something like this:

socket.for_each(move |conn| {
    let mut line_ct = 0;

    reader.for_each(move |line| {
        line_ct = line_ct + 1;
            // ...
        })
        .and_then(move |()| {
            println!("Lines handled: {}", line_ct);
        })
    //...
}

This “works”, in that it compiles, and if I inspect line_ct inside the for_each(|line| { closure, I can see it being incremented, but when it gets printed in the and_then closure, line_ct is 0. I think I understand this is happening because the for_each and the and_then are each getting different copies of line_ct that was initialized to 0.

In the chat.rs tokio example, before the socket.incoming() part it sets up a Arc::new(Mutex::new(HashMap::new())) to use for the connections to be able to talk to each other. I don’t think I need that, since these connections never need to access other connections data, the line_ct is “local” to the connection. There’s a plethora of options between what I’m doing and Arc. It seems like Rc/RefCell might be what I want, but I’m unsure how to wire the Rc::clone operations into something that can be accessed in the for_each closure.

I think maybe I need to write my own Stream implementation to do this, but following the docs and examples for that bring me to similar problems (how do I pass the initial 0 into the for_each of the Stream? How do I read the final value in the and_then?).

Also, I’m not clear if I should even be using the LinesCodec. The codec docs seem to indicate that it would introduce a bit of safety, but most of the tokio examples dealing with lines just do something like let line = io::read_until(reader, b'\n', Vec::new());. Maybe instead of writing my own Stream, I need to write my own Codec? I started down that path, but it still didn’t help me with passing the counter around.

Finally, I was able to get it working by doing this (cribbed from another post:


socket.for_each(move |conn| {
    let framed = LinesCodec::new().framed(conn);
    let (_writer, reader) = framed.split();

    let counter = Arc::new(AtomicUsize::new(0));
    let counted = Arc::clone(&counter);

    reader.for_each(move |line| {
        counter.fetch_add(1, Ordering::Relaxed);
        // ...
    })
    .and_then(move |()| {
        println!("Lines: {}", counted.load(Ordering::Relaxed));
    })
}

Is this a reasonable way to handle it? Using AtomicUsize here seems like an abuse, since the docs usize imply they are intended for pointers. There’s also Mutex and RwLock, but it was unclear to me from reading the docs on how to safely increment a counter. I’m assuming each for_each iteration can be run in a separate thread, potentially out-of-order?

Are then any tutorials or open-source projects that go into the “next steps” part of working with tokio? I understand most of the things in the tokio ./examples directory, but I’m struggling with how to do more “complex” things with it.

#2

More or less, yes. Because the for_each is move, it’s taking ownership of the captured line_ct (and then dropping it). Because it’s a simple integer which is a Copy type, the move is done as a copy and the outer line_ct variable is still usable afterwards, in the second closure’s capture.

This is one case where Rust’s semantics can be confusing, because in almost all other cases this would be an error. The closure capture is creating (hidden) bindings that mean the variable is not actually the same binding as it looks like from the name, but a local shadowing across the function call.

For the sake of learning, try some variations and look at the errors and changes in behaviour, and see if you can explain each;

  • make either or both of the closures not move
  • make the count a more complex type, that is not Copy
  • pass the count from one closure to the next (rather than passing empty |()| )

There are solutions to your problem (at least at this level of complexity) among the above permutations, keeping your concept of the variable being ‘local’ to each connection. Even as you add complexity, you can keep this by passing along more structured data - either an explicit struct type or just an anonymous tuple.

Bonus challenge: there’s a neat and idiomatic way to do this without ever explicitly initialising or incrementing a counter variable. Hint: look at some of the methods in the std::iter::Iterator trait.

1 Like
#3

I spent at least a couple hours yesterday trying to figure out how to do exactly that, but failed. I was guessing that would have to write my own Stream impl to be able to do that, but you seem to be implying that’s not the case. Any tips?

For your bonus challenge, that’s one of the first things I tried :slight_smile:. I saw count() in Iterator, but adding that between each_line and and_then gives an error I wasn’t quite ready to tackle:

error[E0599]: no method named count found for type tokio::prelude::stream::ForEach<tokio::prelude::stream::SplitStream<tokio_codec::Framed<tokio::net::TcpStream, tokio_codec::LinesCodec>>, [closure@src/bin/server.rs:57:27: 74:18 counters:_], std::result::Result<(), std::io::Error>> in the current scope
–> src/bin/server.rs:75:18
|
75 | .count()
| ^^^^^
|
= note: the method count exists but the following trait bounds were not satisfied:
&mut tokio::prelude::stream::ForEach<tokio::prelude::stream::SplitStream<tokio_codec::Framed<tokio::net::TcpStream, tokio_codec::LinesCodec>>, [closure@src/bin/server.rs:57:27: 74:18 counters:_], std::result::Result<(), std::io::Error>> : std::iter::Iterator

#4

have the first closure return the count, and the second accept it in the ||:

    ...
    line_ct
}).and_then(|line_ct| {
    ...

edit: you probably need to wrap that return value in Ok()

As for the challenge, I was thinking of enumerate() at the top before the .for_each(), passing a (count, line) tuple to the first closure, and just count to the second as above. I haven’t tested it though, and as your errors show it’s easy to trip over the distinction between Iterator and Stream even though they’re generally intended to behave similarly.

#5

Ah, I had tried that, too. The problem, I believe, is that the for_each is expected to return Ok(()). Changing it to return just line_ct gives the error:

error[E0277]: the trait bound `(): tokio::prelude::Future` is not satisfied

Trying Ok(line_ct) doesn’t work either:

error[E0271]: type mismatch resolving `<std::result::Result<{integer}, std::io::Error> as tokio::prelude::IntoFuture>::Item == ()`
#6

Ah, yeah. One of those differences.

try making the for_each() also an and_then(), and jamming something else to consume the stream at the end (e.g. a collect()).

Actually you could probably just swap and_then and for_each: reader.enumerate().and_then().for_each()

#7

@dcarosone That sounds promising, but I’m struggling with the syntax. Do you mind giving me an example?

#8

at this point I should probably make sure I give you a working example, which I’ll have to do a bit later on…

#9

Thanks, I appreciate your help!

#10

Here’s a short, working version, a little different to what I was originally suggesting. My instinct of using enumerate() was off, that’s a method only on iterators, not streams. The other changes are more cosmetic, but worthwhile, and discussed below.

let processor = reader
        .map(|line| println!("Read line: {}", &line))
        .fold(0, |sum, _| future::ok::<u32, std::io::Error>(sum + 1))
        .map(|count| println!("Socket processed {} lines", count))
        // rest of chain, unmodified
        .or_else(|err| {
            println!("Socket closed with error: {:?}", err);
            Err(err)
        })
        .then(|result| {
            println!("Socket closed with result: {:?}", result);
            Ok(())
        });

It uses:

  • map() to augment each future from the stream with additional work (your parser; I just print the line). This could be and_then() as I suggested earlier, but in that case it would need to return an Ok explicitly. It’s neater to use the return value of the function being wrapped, with map() if it’s Ok(T) -> Ok(U), and with and_then() if the function returns a Result (as a parser probably would) or other IntoFuture type. In this case it also allows a simple single-expression closure, which allows the {} and extra newlines to be avoided as well.
  • fold() to drive the stream to completion and do the counting. This could be for_each() with the addition of more complex state handling. We don’t need that here, and can build quite complex behaviour into an explicit state-keeping type if needed, and get rid of the annoying type annotation at the same time.
  • the rest of your existing chain to work on the future that represents the result of the entire stream, amended slightly to take and print the count, again using map() rather than and_then(), as per above.

The key thing to think about here is that something has to drive the inner futures of the stream individually, like for_each() or fold() or collect(), and other calls are about adding behaviour to each as they run (above) or to the final outcome (below).

On this point, as you grow your solution you will probably want an additional or_else() before the fold() to catch and deal with parser errors in a way that makes sense without losing the count of previously-successful lines in the stream.

1 Like