Long-lived threads with asynchronous TCP streams

I am working on NSQ bindings using async-std and want to spawn two threads that can read and write to the server. After going through the configuration, the main idea is that the client will do the following in a loop:

  • Read from the server and decode the response (inside of thread 1)
  • Write the response over a channel (presumably using the crossbeam crate)
  • Handle the message
  • Write the reply back to the server (inside of thread 2)

Am I conflating the two principles of multi-threading and asynchronous programming? I can't figure out how I would await the two "infinite" threads. Spawning the threads with owned sockets (after calling try_clone) should be easy. However, how can I await them and let the client be non-blocking?

It feels like you probably want one thread to handle communication with the server, and that thread also pass its work to a thread pool (or other scheduler for presumably CPU-bound work). Responses can be collected from the threads through their Task handles. Something like Rayon, maybe?

Going over the docs, I see references to HTTP as the protocol that NSQ is using under the hood (there may be other options?) with HTTP/1.1 you will get the best concurrency out of more than 1 HTTP connection per client (effectively you need as many connections/threads as you want the upper bound on number of concurrent requests). Connections should be 1:1 with threads (or async tasks).

With HTTP/2, the situation is drastically better; E.g. just clone your hyper client (single connection) across as many threads/tasks as you want concurrent requests (the upper bound is the number of concurrent streams allowed by the server). hyper will manage the logical streams for you, and each clone can await its response without blocking any other async task using the same clone.

Come to think of it, hyper might actually provide the same kind of interface for HTTP/1.1, but should handle connections instead of streams. I'm not sure of the equivalent in the async-std space, and hyper depends on tokio...

Unless I'm mistaken, I think that might help a bit?

Sorry, I should have been more clear. This is in regards to the binary protocol rather than the HTTP protocol it implements.

Thanks for sharing the idea about the thread pool however. That would work best I think.

Generally with async Rust you would never spawn threads to run async functions in, you would spawn a new task to run it in. Spawning threads would really only happen when you want to run some blocking code in a new thread and interact with it over async channels to avoid blocking your async code (although both async-std and tokio provide their own APIs for this sort of setup so they know about your blocking code and can potentially deal with it better).

Spawning new tasks and waiting for them to complete would look something like

async fn do_read(reader: impl AsyncRead, output: impl Sink<SinkItem = Response>) -> io::Result<()>;
async fn do_write(writer: impl AsyncWrite, input: impl Stream<Item = Response>) -> io::Result<()>;

let stream = connect().await;
let (reader, writer) = stream.split();
let (in_tx, in_rx) = mpsc::channel();
let (out_tx, out_rx) = mpsc::channel();
let handling = in_rx.map(|msg| handle(msg).forward(out_tx);
join3(spawn(do_read(reader, in_tx)), spawn(do_write(reader, out_rx)), handling).await;

(also you wouldn't use crossbeam because they are not async-aware, there are async channels in futures, tokio or async-std)

Really, there's no need to spawn any kind of tasks or threads for this though, being able to run both sides of the IO concurrently within one task is one of the main drivers of async. Using something like futures_codec for the parsing and encoding you can end up with something as simple as:

let stream = connect().await;
let framed = Framed::new(stream, NsqCodec);
let (input, output) = framed.split();
input.map(|msg| handle(msg)).forward(output).await?;

(modulo some error handling that should be as easy as some map_err functions where necessary).

Thanks for the answer. I thought I had the two mixed up. I think I will have to see if there are async-std equivalents to those traits as I think they are futures-rs specific. Perhaps they have a codec module as well.

Much appreciated!

Yeah, both tokio, futures and async-std provide their own versions of the asynchronous read/write traits. I believe async-std just calls them Read and Write (which is honestly a bit confusing in my eyes).

async-std’s types implement the traits from futures::io. The Read and Write traits they export are extensions of those that add extra utilities.

Sorry for my intervention. I just wanted to clarify this statement:

The Read and Write traits they export are extensions of those that add extra utilities

As I understand, it means the following code must compile

use async_std::prelude::*;
use async_std::io::prelude::*;
fn take_async_std<A: Read>(x: A) { take_futures(x) }
fn take_futures<F: ::futures::io::AsyncRead>(x: F) {}

However, I get the error:

the trait futures::AsyncRead is not implemented for A

Could anyone explain where am I wrong?

Unfortunately I don't know why that doesn't compile. According to the source pub trait ReadExt: futures_io::AsyncRead, but that's within the extension_trait! macro which might discard that supertrait relationship. The equivalent code using futures::io::AsyncReadExt instead works fine:

use futures::io::{AsyncRead, AsyncReadExt};
fn take_read_ext<R: AsyncReadExt>(r: R) { take_read(r) }
fn take_read<R: AsyncRead>(_: R) {}

That said, I would recommend avoiding using either Read or AsyncReadExt as bounds anywhere. The purpose of these traits is to add methods onto AsyncRead objects, so all your bounds should just directly mention AsyncRead and then you will have the methods available

// Using `as _` here will import the methods into the current
// scope but will leave the trait itself unnamed since you don't
// need to directly refer to it
use futures::io::{AsyncRead, AsyncReadExt as _};

async fn take_read<R: AsyncRead>(r: R) {
    // This uses the method from `AsyncReadExt` which works because
    // there's a blanket implementation over any `R: AsyncRead`

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.