Problem with "duplicating" a (Tcp)Stream and lifetimes

In order to implement a timeout for a thread, I created wrappers for io::Read and io::Write which will set the correct timeout prior to any read or write operation such that my handler can abort the thread if it hangs during an I/O operation. (I assume there is no other way using Rust's standard library to limit the runtime of a thread and terminating it cleanly on timeout, right?)

#![feature(trait_alias)]
use std::io;
use std::time::{Duration, Instant};

pub trait TimeoutSetter = FnMut(Duration) -> io::Result<()>;

pub struct DoomedReader<R: io::Read, S: TimeoutSetter> {
    reader: R,
    set_timeout: S,
    end_of_life: Instant,
}

impl<R: io::Read, S: TimeoutSetter> DoomedReader<R, S> {
    pub fn new(reader: R, set_timeout: S, duration: Duration) -> Self {
        DoomedReader {
            reader,
            set_timeout,
            end_of_life: Instant::now() + duration,
        }
    }
}

impl<R: io::Read, S: TimeoutSetter> io::Read for DoomedReader<R, S> { /* … */ }

I wanted this to work with any sort of stream (i.e. not just TcpStream but also UnixStream or any other, which only have the Read and Write traits in common). This is why I pass a TimeoutSetter closure that can call the corresponding function, e.g. TcpStream::set_read_timeout.

I implemented this for both the reading and the writing direction (as separate traits).

Now my problem is that a TcpStream is a single object that serves both for reading and writing. The usual way to duplicate this stream is to borrow it because Read is implemented also by &TcpStream (in addition to TcpStream).

let conn = TcpStream::connect("localhost:1234")?;
let (reader, writer) = (&conn, &conn);
let doomed_reader = DoomedReader::new(
    reader,
    |duration| reader.set_read_timeout(Some(duration)),
    Duration::new(300, 0), // do not allow reading after 300 seconds
);
let doomed_writer = DoomedWriter::new(
    writer,
    |duration| writer.set_write_timeout(Some(duration)),
    Duration::new(300, 0), // do not allow writing after 300 seconds
);

So far, so good. But now I want to store the reader and writer in a struct. But I cannot, because I had to borrow conn and that will go out of scope once my function returns.

There is TcpStream::try_clone, but it can fail and seems to operate on the OS level?. It doesn't seem like the right solution to me.

So here comes my question: What's the best way of splitting a connection into a reader and a writer, and can I store these in a struct without introducing a lifetime dependency on the original stream?

If you don't want a lifetime relationship, they both must have shared ownership of the value via e.g. Arc.

Maybe you should define a single object that sets both timeouts?

1 Like

You're right. The only alternative is async Rust for which asynchronous runtimes such as Tokio provide general techniques that let you apply a timeout to any operation.

1 Like

Oh right, my DoomedReader and DoomedWriter could have a shared ownership of the original stream using an Arc (or Rc, though I try to avoid using Rc). Then, any methods of the DoomedReader and DoomedWriter could use deref to get a (temporary) borrow whenever needed.

This should solve my problem. Thanks for pointing me to a solution!

From a semantic point of view, however, it doesn't seem to be correct to assume the DoomedReader (or DoomedWriter) shares their underlying reader. Still, I would have to modify the DoomedReader and DoomedWriter to store an Arc<T> (or a Deref<Target = T>) instead of a plain T (where T: Read or T: Write, respectively). Alternatively, I would need to write a boilerplate wrapper to create a newtype that implements Read while internally storing the Arc<TcpStream>. That is because &TcpStream will implement Read, but Arc<TcpStream> does not. Is that correct?

There doesn't seem to be a general solution for this, or at least I didn't find one.

Interestingly, I had other cases where I needed a type to implement a trait directly that is already implemented by the reference obtained through the deref mechanism (but not by the pointer-type itself). If I remember it right, one case had to do with implementing custom traits on str, which are not automatically implemented on String. The deref-magic hides this on method calls (i.e. I can call CustomTrait's methods on a String, even if the trait is only implemented for str). But it fails in other cases, e.g. when calling a function f<S: CustomTrait>(arg: S). I then needed to decide whether I implement the trait twice (on str as well as String, which is boilerplate code) or if I stick to calling f(&*some_string), which felt a bit ugly.

Maybe that's best in my scenario. My motivation was to write the code as abstract as possible (and to also work on read-only or write-only streams, while avoiding boilerplate).

I was undecided whether I should use threads and the standard lib, or start learning asynchronous programming with Rust. The latter seemed to be more difficult to learn, but maybe I'm too limited with the standard library and synchronous programming (particularly if timeouts are involved).

If my main program uses Tokio, for example, can I still have components that work with functions that accept objects implementing std::io::Read and std::io::Write, or do I need to adjust all my functions/structs/traits?

Yeah, you pretty much have to add some sort of newtype. That's what Tokio does with its TcpStream::split and TcpStream::into_split methods (the difference being whether they use a lifetime or Arc).

Generally, async Rust requires that you don't block the thread, and performing IO inside the std IO traits always qualifies as blocking the thread. You can still use the traits if you are operating on in-memory buffers such as Vec<u8> (via a Cursor), but not for real IO resources.

1 Like

Thanks again for all this valuable information and hints!

With let (reader, writer) = (&conn, &conn); there exists a split method for std::net::TcpStream already, if I understand it right, but no split_into method. Would it make sense to add any mechanism to the standard library which resembles Tokio's split_into as well? That would solve my problem more easily (and might also come in handy for other use cases).

It's a pity that Tokio can't provide an implementation of std::io::Read for any of its asynchronous streams. I guess that's because of how asynchronous functions are implemented in Rust. I'm coming from Lua, where (most) functions can yield without requiring any special declaration such as async. Of course, Rust and Lua aren't comparable as they have a totally different focus. But I believe coroutines were part of Rust in an earlier stage of development and then have been discarded. In that context, I just found this: experimental RFC #2033: Experimentally add coroutines to Rust #2033. Not sure if that'd allow to use std::io::Read though.

To my knowledge, there are no RFCs that would let you use the std IO traits asynchronously. Coroutines are simply compiled in such a vastly different way from ordinary function calls, and you can't yield "through" ordinary function calls.

In Lua, yielding "through" ordinary function calls works, but only where the scripting language is used. If there are any C-calls in between, then it is usually not possible (unless the C function provides a method to interrupt its execution, which is somewhat ugly).

I'll read into Tokio and async programming in Rust in general to see if I should switch from my current approach to Tokio.

It's important to note that the technique that Lua uses to make that possible is equivalent to making everything async in Rust.

1 Like

Since I need into_split (split is already provided by borrowing std::net::TcpStream), I made an attempt to create such a newtype:

use std::io;
use std::net::TcpStream;
use std::os::unix::net::UnixStream;
use std::sync::Arc;

struct OwnedReadHalf<C>(Arc<C>);
struct OwnedWriteHalf<C>(Arc<C>);

fn into_split<C>(connection: C) -> (OwnedReadHalf<C>, OwnedWriteHalf<C>) {
    let arc1 = Arc::new(connection);
    let arc2 = arc1.clone();
    (OwnedReadHalf(arc1), OwnedWriteHalf(arc2))
}

impl io::Read for OwnedReadHalf<TcpStream> {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        (&*self.0).read(buf)
    }
    // TODO: implement more methods
}

impl io::Read for OwnedReadHalf<UnixStream> {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        (&*self.0).read(buf)
    }
    // TODO: implement more methods
}

impl io::Write for OwnedWriteHalf<TcpStream> { … }

impl io::Write for OwnedWriteHalf<UnixStream> { … }

This code compiles. However, there are multiple issues:

  • I have to forward each method of Read and Write to (&*self.0). E.g. if I do not implement std::io::Read::read_vectored, then my wrapper would use the default implementation, which may perform worse. Moreover, if Read and Write get additional methods (with default implementations) in future, the same problem will apply to those methods (and users of my code won't even notice the missing methods).
  • I have to provide implementations (for all methods) for all types of streams I want to operate on (e.g. TcpStream, UnixStream, …), resulting in boilerplate code in two dimensions (count of methods in Read and Write × count of different stream types).

I tried to further abstract the implementation to get rid of one dimension of boilerplate. My futile attempt was as follows:

impl<R> io::Read for OwnedReadHalf<R>
where
    &'_ R: io::Read,
{
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        (&*self.0).read(buf)
    }
}

But the compiler won't allow that:

error[E0637]: `'_` cannot be used here
  --> src/io_help.rs:23:6
   |
23 |     &'_ R: io::Read,
   |      ^^ `'_` is a reserved lifetime name

error[E0310]: the parameter type `R` may not live long enough
  --> src/io_help.rs:23:12
   |
21 | impl<R> io::Read for OwnedReadHalf<R>
   |      - help: consider adding an explicit lifetime bound...: `R: 'static`
22 | where
23 |     &'_ R: io::Read,
   |            ^^^^^^^^ ...so that the reference type `&'static R` does not outlive the data it points at

If I try to make R: 'static, the second error looks different:

error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
  --> src/io_help.rs:27:11
   |
26 |     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
   |             --------- this data with an anonymous lifetime `'_`...
27 |         (&*self.0).read(buf)
   |           ^------  ---- ...and is required to live as long as `'static` here
   |            |
   |            ...is captured here...

I tried to introduce a named lifetime:

impl<'a, R> io::Read for OwnedReadHalf<R>
where
    R: 'a,
    &'a R: io::Read,
{
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        (&*self.0).read(buf)
    }
}

But that didn't make things any better:

error[E0495]: cannot infer an appropriate lifetime for lifetime parameter in function call due to conflicting requirements
  --> src/io_help.rs:27:11
   |
27 |         (&*self.0).read(buf)
   |           ^^^^^^^
   |
note: first, the lifetime cannot outlive the anonymous lifetime defined on the method body at 26:13...
  --> src/io_help.rs:26:13
   |
26 |     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
   |             ^^^^^^^^^
note: ...so that reference does not outlive borrowed content
  --> src/io_help.rs:27:12
   |
27 |         (&*self.0).read(buf)
   |            ^^^^^^
note: but, the lifetime must be valid for the lifetime `'a` as defined on the impl at 21:6...
  --> src/io_help.rs:21:6
   |
21 | impl<'a, R> io::Read for OwnedReadHalf<R>
   |      ^^
note: ...so that the types are compatible
  --> src/io_help.rs:27:20
   |
27 |         (&*self.0).read(buf)
   |                    ^^^^
   = note: expected `std::io::Read`
              found `std::io::Read`

So I'm kinda stuck again.

I feel like the standard library's approach of implementing Read for &TcpStream (and &File, etc.) is a hack that only helps in certain scenarios and causes trouble in others.

Try for<'a> &'a R: io::Read

1 Like

That did it! :smiley:

I feel like I need a break now :crazy_face: (and will read more about this for<'…> quantifier later, happy that it exists!).

Edit: Looks like this quantification is called "Higher-Rank Trait Bounds" (HRTB). I didn't find anything in the book (and the chapter "Advanced Lifetimes" seems to be removed, not sure if it was in there), but it is mentioned in the reference (here) and the Rustonomicon (here). The latter only gives an example related to desugaring though.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.