I have a function that handles incoming connections and it looks sort-of like this:
async fn connection_loop(stream: TcpStream,
jobq: Arc<Mutex<JobQueue>>) -> Result<()> {
let reader = io::BufReader::new(&stream);
let mut lines = reader.lines();
let writer = io::BufWriter::new(&stream);
let writer = Arc::new(Mutex::new(writer));
while let Some(line) = lines.next().await {
let line = line?;
// parse line
// Add jobespec on to queue
let mut jq = jobq.lock().await;
let jobspec = Job { pathname: std::path::PathBuf::from(fname), alg,
id: id.to_string() };
jq.q.push_back(jobspec);
if jq.q.len() < jq.max_jobs {
if let Some(n) = jq.q.pop_back() {
let id = n.id.clone();
let pathname = n.pathname.clone();
let fut = hasher(id, pathname, Arc::clone(&writer));
task::spawn(fut);
}
}
}
Ok(())
}
The idea is that the connection_loop function is the only one that'll read from the stream, and the hasher futures are the only ones that'll write to the stream (though there may be several hasher future instances). I assumed the lifetimes are broken (the BufReader/Writer objects may outlive stream), and indeed I get an error:
error[E0597]: `stream` does not live long enough
--> src/main.rs:76:35
|
76 | let writer = io::BufWriter::new(&stream);
| -------------------^^^^^^^-
| | |
| | borrowed value does not live long enough
| argument requires that `stream` is borrowed for `'static`
...
118 | }
| - `stream` dropped here while still borrowed
What's a good/common/idiomatic pattern to make sure that the stream lives on for as long as it needs to?
I tried to use the pattern used in TODO: Collected Small Patterns - Async programming in Rust with async-std which essentially does that, but uses one extra level of indirection. When I did that I ended up running into lifetime issues; details are fuzzy since it was a while back -- but I recall thinking that the split() function seems to "consume" stream so that only the reader and writers remained, which solved the lifetime issues.
I haven't actually looked at the implementation, but somehow I imagine that when split() is used in tokio it stores the underlying socket handle reference counted in the reader and writer, and once both of them are closed the socket handle is actually released.
This is what I don't understand about the async-std case -- I seemed to need to store the stream object somewhere until the reader and writer objects where done. Or is that not the case?
One option is to convert the TcpStream into the std-lib version, call try_clone, then convert both back. You will get unpredictable results if you try writing or reading to both at the same time though.
The above is doing the perfect relay between stream_src and stream_dst. Unfortunately, the above does not work unless you know that those streams are async_std::net::TcpStream. If you only have a stream that is under another layer (for example TlsStream), you have bad luck.
I have been looking for alternatives to workaround this, but I think in theory unless we have some runtime like tokio that supports split natively or we write those steams using poll time locking and Mutex, this does not seems to work - the stream have to be able to be read and write at the same time, to support all possible protocols.
You can use futures::io::AsyncReadExt::split with async-std's IO types. That provides owned reader/writer halves which share the underlying IO via a bi-lock (which should be marginally faster than a mutex, though I don't know if it's been benchmarked).
I mean, Tokio's also uses a bilock, but my point is that if you want to avoid locking, you have to manually call the poll_read/poll_write methods instead of using the read/write methods on the extension traits.
Sure, when you have direct access to the TcpStream, it's not a problem because you can just split it in two. The issue @earthengine describes is when you are dealing with a stream such as TlsStream, which requires access to both halves simultaneously. This means you have to split the TlsStream, and can't do it at the TcpStream level, which makes it much harder to avoid locking.
It is a bit annoying, but true, using futures::io::AsyncReadExt is confliting with async_std::io::ReadExt, and so you cannot use async_std::prelude::* if you want to do so.
Hopefully the split method will someday migraged to async_std so this would not be an issue.