How to pass borrowed stream into function bounded by Stream trait?

Hello. I am trying to create a custom stream which operates over another one but I am having an error. Simplified code exemplifying the situation:

Code
use core::pin::Pin;

use futures_core::{
	stream::{FusedStream, Stream},
	task::{Context, Poll},
};
use futures_util::{stream::Fuse, StreamExt};

pub struct SomeStruct {
	u32_stream: Pin<Box<dyn Stream<Item = u32>>>,
}
impl SomeStruct {
	pub fn fooIt(&self) {
		let u32_stream: &Pin<Box<dyn Stream<Item = u32>>> = &self.u32_stream;
		fooIt(u32_stream);
	}
}

pub fn fooIt<St1>(stream1: St1) -> FooStruct<St1>
where
	St1: Stream + Unpin,
{
	FooStruct {
		stream1: stream1.fuse(),
	}
}

pub struct FooStruct<St1>
where
	St1: Stream + Unpin,
{
	stream1: Fuse<St1>,
}

impl<St1> Unpin for FooStruct<St1> where St1: Stream + Unpin {}

impl<St1> Stream for FooStruct<St1>
where
	St1: Stream + Unpin,
{
	type Item = St1::Item;

	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
		let poll_res = Pin::new(&mut self.stream1).poll_next(cx);

		let result = match poll_res {
			Poll::Pending => return Poll::Pending,
			Poll::Ready(None) => None,
			Poll::Ready(Some(item1)) => Some(item1),
		};

		Poll::Ready(result)
	}
}

An the errors are:

Errors
error[E0277]: the trait bound `&std::pin::Pin<std::boxed::Box<dyn futures_core::stream::Stream<Item = u32>>>: futures_core::stream::Stream` is not satisfied
  --> src\...\rust_forum_help.rs:15:15
   |
15 |         fooIt(u32_stream);
   |               ^^^^^^^^^^ the trait `futures_core::stream::Stream` is not implemented for `&std::pin::Pin<std::boxed::Box<dyn futures_core::stream::Stream<Item = u32>>>`
...
19 | pub fn fooIt<St1>(stream1: St1) -> FooStruct<St1>
   |        -----
20 | where
21 |     St1: Stream + Unpin,
   |          ------ required by this bound in `rust_forum_help::fooIt`
   |
   = help: the following implementations were found:
             <std::pin::Pin<P> as futures_core::stream::Stream>
   = note: `futures_core::stream::Stream` is implemented for `&mut std::pin::Pin<std::boxed::Box<dyn futures_core::stream::Stream<Item = u32>>>`, but not for `&std::pin::Pin<std::boxed::Box<dyn futures_core::stream::Stream<Item = u32>>>`

error[E0277]: the trait bound `&std::pin::Pin<std::boxed::Box<dyn futures_core::stream::Stream<Item = u32>>>: futures_core::stream::Stream` is not satisfied
  --> src\...\rust_forum_help.rs:15:9
   |
15 |           fooIt(u32_stream);
   |           ^^^^^^^^^^^^^^^^^ the trait `futures_core::stream::Stream` is not implemented for `&std::pin::Pin<std::boxed::Box<dyn futures_core::stream::Stream<Item = u32>>>`
...
28 | / pub struct FooStruct<St1>
29 | | where
30 | |     St1: Stream + Unpin,
31 | | {
32 | |     stream1: Fuse<St1>,
33 | | }
   | |_- required by `rust_forum_help::FooStruct`
   |
   = help: the following implementations were found:
             <std::pin::Pin<P> as futures_core::stream::Stream>
   = note: `futures_core::stream::Stream` is implemented for `&mut std::pin::Pin<std::boxed::Box<dyn futures_core::stream::Stream<Item = u32>>>`, but not for `&std::pin::Pin<std::boxed::Box<dyn futures_core::stream::Stream<Item = u32>>>`

error: aborting due to 2 previous errors

I've realized that if I move instead of borrow it works:

pub fn fooIt(self) {
    let u32_stream: Pin<Box<Stream<Item = u32>>> = self.u32_stream;
    fooIt(u32_stream);
}

But I would like to have it borrow to still be able to use Self without cloning it

How can I make the original code work if it is possible? Something tells me that the upstream must be consumed.
Is the definition of the FooStruct stream sensible in general? For example the Unpin bound is a good idea or is there a less restricting way of doing it? I was using it because otherwise I wouldn't be able to do:
let poll_res = Pin::new(&mut self.stream1).poll_next(cx);

The compiler's complaining about your use of an immutable reference as stream operations mutate the stream.

In particular, polling a stream can advance its internal state, and the goal is usually to take an item out - that item will no longer be produced, so that operation fundamentally mutates the stream.

If you pass in an &mut reference rather than an immutable & reference, your code should work.

1 Like

You need mutable access to a stream to use it. This compiles:

impl SomeStruct {
    pub fn foo_it(&mut self) {
        let u32_stream: &mut Pin<Box<dyn Stream<Item = u32>>> = &mut self.u32_stream;
        foo_it(u32_stream);
    }
}

I also added a dyn to the Stream trait. Not using dyn on trait objects is deprecated.

You can avoid Unpin by using unsafe:

pub struct FooStruct<S> {
    stream1: Fuse<S>,
}

impl<S: Stream> Stream for FooStruct<S> {
    type Item = S::Item;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let stream = unsafe {
            Pin::map_unchecked_mut(self, |me| &mut me.stream1)
        };
        let poll_res = stream.poll_next(cx);

        let result = match poll_res {
            Poll::Pending => return Poll::Pending,
            Poll::Ready(None) => None,
            Poll::Ready(Some(item1)) => Some(item1),
        };

        Poll::Ready(result)
    }
}

playground

1 Like

You can avoid the unsafe by using the pin-project-lite crate:

use pin_project_lite::pin_project;

pin_project! {
    pub struct FooStruct<S> {
        #[pin]
        stream: Fuse<S>,
    }
}

impl<S: Stream> Stream for FooStruct<S> {
    type Item = S::Item;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let poll_res = self.project().stream.poll_next(cx);

        let result = match poll_res {
            Poll::Pending => return Poll::Pending,
            Poll::Ready(value) => value,
        };

        Poll::Ready(result)
    }
}
1 Like

Yes, forgot the dyn, I'll update my original post.

Is the first alternative you propose with unsafe... safe? From what I understand it is safe because the returned data (stream) does not move since there's no await in the middle of that function and we are referring to a field inside a method. Is that right?

I'll use your second alternative since it seems better to avoid unsafe, but how I don't understand how can I arrive a that solution. Checking out pin_project macro and the code it generates, it is somewhat advanced and not sure where should I be reading to know how to do that.

The word for correct unsafe code is "sound", and the first version with unsafe is indeed sound. The unsafe block is a promise to the compiler that the stream field wont be moved (e.g. with swap). This promise is upheld because:

  1. The field is private, so other people can't go move it.
  2. None of our code provides any way to move it.

The pin-project macro generates a project() method with the same unsafe code as my unsafe function. The crate has been written in a way such that you can't cause unsoundness by using it.

1 Like

That worked!. Thank you. Wasn't sure about how to read the error message but this helped to understand it better.

You might be wondering why the user can't just move the FooStruct, moving the stream field with it.

This is because they had to create a Pin<&mut FooStruct> to call poll_next. Since FooStruct is not Unpin, it is impossible to create an Pin<&mut FooStruct> without unsafe code, so the user has to use an unsafe block and promise not to move our stream to call the method at all.

So in that case the unsoundness would be the caller's fault.

Interesting, thanks.
You say that FooStruct is not Unpin, but looking at my code I had:

Reading the documentation for pin_project it says that:

To enforce this, this attribute will automatically generate an Unpin implementation for you,

So do you mean it isn't Unpin because the implementation of the trait is empty?

(how can I add code syntax highlighting to my original post?)

I didn't add any Unpin impl in the example because the one the compiler automatically generates was what I wanted. It looks like this:

impl<S: Unpin> Unpin for FooStruct<S> {}

This means that FooStruct<S> is only Unpin when S is Unpin. So we really have two cases:

  1. If S is Unpin, then there's no problem with moving stream. So we make FooStruct<S> unpin too, because then moving it doesn't cause any problems.
  2. If S is not Unpin, then neither is FooStruct<S>. In this case the arguments about creating the Pin<&mut FooStruct<S>> apply.

This should be the full details on the soundness of the unsafe block. In the case of pin-project, it generates its own Unpin impl to handle more complicated cases with more fields.

You create syntax highlighting by using backticks instead of four spaces:

```
// your code here
```

You can optionally choose the language like this:

```toml
// your toml here
```

The default is Rust code.

1 Like

And would these solutions work with multiple stream fields? From what I can see I think not because with the first project, or the use of unsafe, you are moving self, right?

The solutions work perfectly fine with multiple fields. You merely have to not move the fields you create pins to.

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.