Async-std, splitting reader/writer & lifetimes

I have a function that handles incoming connections and it looks sort-of like this:

async fn connection_loop(stream: TcpStream,
      jobq: Arc<Mutex<JobQueue>>) -> Result<()> {
  let reader = io::BufReader::new(&stream);
  let mut lines = reader.lines();

  let writer = io::BufWriter::new(&stream);
  let writer = Arc::new(Mutex::new(writer));

  while let Some(line) = lines.next().await {
    let line = line?;

    // parse line

    // Add jobespec on to queue
    let mut jq = jobq.lock().await;

    let jobspec = Job { pathname: std::path::PathBuf::from(fname), alg,
      id: id.to_string() };

    jq.q.push_back(jobspec);
    if jq.q.len() < jq.max_jobs {
      if let Some(n) = jq.q.pop_back() {
        let id = n.id.clone();
        let pathname = n.pathname.clone();
        let fut = hasher(id, pathname, Arc::clone(&writer));
        task::spawn(fut);
      }
    }
  }

  Ok(())
}

The idea is that the connection_loop function is the only one that'll read from the stream, and the hasher futures are the only ones that'll write to the stream (though there may be several hasher future instances). I assumed the lifetimes are broken (the BufReader/Writer objects may outlive stream), and indeed I get an error:

error[E0597]: `stream` does not live long enough
   --> src/main.rs:76:35
    |
76  |   let writer = io::BufWriter::new(&stream);
    |                -------------------^^^^^^^-
    |                |                  |
    |                |                  borrowed value does not live long enough
    |                argument requires that `stream` is borrowed for `'static`
...
118 | }
    | - `stream` dropped here while still borrowed

What's a good/common/idiomatic pattern to make sure that the stream lives on for as long as it needs to?

Usually there's some way to split the tcp stream into a reader and writer pair, but I can't find it in async_std.

1 Like

Ah, so it would essentially consume the stream object?

I switched from async-std to tokio, which solved this issue.

I don't think that's the case. In std it's OK to start with a single TcpStream and use it for both Read and Write, like so:

let stream = TcpStream::connect(...);
let reader = BufReader::new(&stream);
let writer = BufWriter::new(&stream);

This is OK because TcpStream impl<'_> both Read and Write for &'_ TcpStream.

The same applies to async_std. But split is indeed necessary for tokio::net::TcpStream.

2 Likes

I tried to use the pattern used in TODO: Collected Small Patterns - Async programming in Rust with async-std which essentially does that, but uses one extra level of indirection. When I did that I ended up running into lifetime issues; details are fuzzy since it was a while back -- but I recall thinking that the split() function seems to "consume" stream so that only the reader and writers remained, which solved the lifetime issues.

I haven't actually looked at the implementation, but somehow I imagine that when split() is used in tokio it stores the underlying socket handle reference counted in the reader and writer, and once both of them are closed the socket handle is actually released.

This is what I don't understand about the async-std case -- I seemed to need to store the stream object somewhere until the reader and writer objects where done. Or is that not the case?

1 Like

One option is to convert the TcpStream into the std-lib version, call try_clone, then convert both back. You will get unpredictable results if you try writing or reading to both at the same time though.

2 Likes

Consider the following:

async_std::io::copy(&mut stream_src, &mut stream_dst);
async_std::io::copy(&mut stream_dst, &mut stream_src);

The above is doing the perfect relay between stream_src and stream_dst. Unfortunately, the above does not work unless you know that those streams are async_std::net::TcpStream. If you only have a stream that is under another layer (for example TlsStream), you have bad luck.

I have been looking for alternatives to workaround this, but I think in theory unless we have some runtime like tokio that supports split natively or we write those steams using poll time locking and Mutex, this does not seems to work - the stream have to be able to be read and write at the same time, to support all possible protocols.

That kind of double copy can be implemented without locking using poll_fn or by manually implementing Future on a custom type.

Note that Tokio's io::split also just uses a mutex internally.

You can use futures::io::AsyncReadExt::split with async-std's IO types. That provides owned reader/writer halves which share the underlying IO via a bi-lock (which should be marginally faster than a mutex, though I don't know if it's been benchmarked).

1 Like

I mean, Tokio's also uses a bilock, but my point is that if you want to avoid locking, you have to manually call the poll_read/poll_write methods instead of using the read/write methods on the extension traits.

The way I've solved this one is by using the fact async_std::net::TcpStream has an impl Clone:

let stream = async_std::net::TcpStream::connect(addr).await?;
let in_stream = stream.clone();
let out_stream = stream;

Seems to be working fine for me.

1 Like

Sure, when you have direct access to the TcpStream, it's not a problem because you can just split it in two. The issue @earthengine describes is when you are dealing with a stream such as TlsStream, which requires access to both halves simultaneously. This means you have to split the TlsStream, and can't do it at the TcpStream level, which makes it much harder to avoid locking.

1 Like

It is a bit annoying, but true, using futures::io::AsyncReadExt is confliting with async_std::io::ReadExt, and so you cannot use async_std::prelude::* if you want to do so.

Hopefully the split method will someday migraged to async_std so this would not be an issue.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.