Async-std, splitting reader/writer & lifetimes

I have a function that handles incoming connections and it looks sort-of like this:

async fn connection_loop(stream: TcpStream,
      jobq: Arc<Mutex<JobQueue>>) -> Result<()> {
  let reader = io::BufReader::new(&stream);
  let mut lines = reader.lines();

  let writer = io::BufWriter::new(&stream);
  let writer = Arc::new(Mutex::new(writer));

  while let Some(line) = lines.next().await {
    let line = line?;

    // parse line

    // Add jobespec on to queue
    let mut jq = jobq.lock().await;

    let jobspec = Job { pathname: std::path::PathBuf::from(fname), alg,
      id: id.to_string() };

    jq.q.push_back(jobspec);
    if jq.q.len() < jq.max_jobs {
      if let Some(n) = jq.q.pop_back() {
        let id = n.id.clone();
        let pathname = n.pathname.clone();
        let fut = hasher(id, pathname, Arc::clone(&writer));
        task::spawn(fut);
      }
    }
  }

  Ok(())
}

The idea is that the connection_loop function is the only one that'll read from the stream, and the hasher futures are the only ones that'll write to the stream (though there may be several hasher future instances). I assumed the lifetimes are broken (the BufReader/Writer objects may outlive stream), and indeed I get an error:

error[E0597]: `stream` does not live long enough
   --> src/main.rs:76:35
    |
76  |   let writer = io::BufWriter::new(&stream);
    |                -------------------^^^^^^^-
    |                |                  |
    |                |                  borrowed value does not live long enough
    |                argument requires that `stream` is borrowed for `'static`
...
118 | }
    | - `stream` dropped here while still borrowed

What's a good/common/idiomatic pattern to make sure that the stream lives on for as long as it needs to?

Usually there's some way to split the tcp stream into a reader and writer pair, but I can't find it in async_std.

1 Like

Ah, so it would essentially consume the stream object?

I switched from async-std to tokio, which solved this issue.

I don't think that's the case. In std it's OK to start with a single TcpStream and use it for both Read and Write, like so:

let stream = TcpStream::connect(...);
let reader = BufReader::new(&stream);
let writer = BufWriter::new(&stream);

This is OK because TcpStream impl<'_> both Read and Write for &'_ TcpStream.

The same applies to async_std. But split is indeed necessary for tokio::net::TcpStream.

1 Like

I tried to use the pattern used in https://book.async.rs/patterns/small-patterns.html which essentially does that, but uses one extra level of indirection. When I did that I ended up running into lifetime issues; details are fuzzy since it was a while back -- but I recall thinking that the split() function seems to "consume" stream so that only the reader and writers remained, which solved the lifetime issues.

I haven't actually looked at the implementation, but somehow I imagine that when split() is used in tokio it stores the underlying socket handle reference counted in the reader and writer, and once both of them are closed the socket handle is actually released.

This is what I don't understand about the async-std case -- I seemed to need to store the stream object somewhere until the reader and writer objects where done. Or is that not the case?

1 Like

One option is to convert the TcpStream into the std-lib version, call try_clone, then convert both back. You will get unpredictable results if you try writing or reading to both at the same time though.

2 Likes