Enstream, (nightly only) no_std alternative to async-stream

Hi everyone! I've implemented an alternative to async-stream crate by using self-referencing with Aliasable from pinned-aliasable crate.

Source code was checked against Miri with -Zmiri-tag-raw-pointers flag, but no errors were given.

The crate requires nightly version of compiler because of generic associated types usage in HandlerFn::Fut.

Example usage looks like this:

fn read_file<'a>(path: &'a Path) -> impl Stream<Item = [u8; 2]> + 'a {
    enstream(StreamState {
        path
    })
}

struct StreamState<'a> {
    path: &'a Path
}

impl<'a> HandlerFn<'a, [u8; 2]> for StreamState<'a> {
    type Fut<'yielder> = impl Future<Output = ()> + 'yielder
    where
        'a: 'yielder;

    fn call<'yielder>(self, mut yielder: Yielder<'yielder, [u8; 2]>) -> Self::Fut<'yielder>
    where
        'a: 'yielder
    {
        async move {
            let mut file = tokio::fs::File::open(self.path)
                .await
                .unwrap();
            
            let mut buf = [0u8; 2];

            loop {
                match file.read_exact(&mut buf).await {
                    Ok(_) => yielder.yield_item(buf).await,
                    Err(_) => break
                }
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let stream = read_file(Path::new("./test.txt"));
    dbg!(stream.collect::<Vec<_>>().await);
}

Since this is my first serious unsafe usage, I would really appreciate code review. Source code available here and documentation available here.

1 Like

Your Yielder should be invariant in T. You can do that like this:

pub struct Yielder<'a, T>(NonNull<Option<T>>, PhantomData<&'a mut T>);
2 Likes

Thanks for catching that! I thought that NonNull is invariant to T, but it's actually covariant, missed that.

I think you might be able to drop the nightly dependency on GAT like this:

pub trait HandlerFn<'scope, 'yielder, T: 'scope>
where
    'scope: 'yielder,
{
    type Fut: Future<Output = ()> + 'yielder;

    /// Create new [`Future`] with the provided [`Yielder`] as a [`Stream`] item source.
    ///
    /// `'yielder` lifetime is defined inside of library internals,
    /// thus you are not allowed to use it to access outer scope elements.
    ///
    /// However, for those cases [`HandlerFn`] provides you with `'scope` lifetime,
    /// which is required to outlive `'yielder`.
    ///
    /// [`Stream`]: futures_util::stream::Stream
    fn call<'yielder>(self, yielder: Yielder<'yielder, T>) -> Self::Fut;
}

pub fn enstream<'scope, T: 'scope, G: 'scope>(generator: G) -> impl FusedStream<Item = T> + 'scope
where
    G: for<'yielder> HandlerFn<'scope, 'yielder, T>,
{
    Enstream {
        cell: Aliasable::new(UnsafeCell::new(None)),
        state: EnstreamState::Gen(MaybeUninit::new(generator)),
    }
}

I think I tried that before, but it gives errors if you attempt to borrow something:

struct StreamState<'a> {
    val: &'a str
}

impl<'scope, 'yielder> HandlerFn<'scope, 'yielder, &'scope str> for StreamState<'scope>
where
    'scope: 'yielder
{
    type Fut = impl Future<Output = ()> + 'yielder;

    fn call(self, mut yielder: Yielder<'yielder, &'scope str>) -> Self::Fut {
        async move {
            yielder.yield_item(self.val).await;
        }
    }
}

let owned = String::from("test");

let stream = enstream(StreamState {
    val: &owned
});

pin_mut!(stream);

assert_eq!(stream.next().now_or_never().flatten(), Some("test"));
error[E0597]: `owned` does not live long enough
  --> src/lib.rs:33:10
   |
30 |   let stream = enstream(StreamState {
   |  ______________-
31 | |     val: &owned
   | |          ^^^^^^ borrowed value does not live long enough
32 | | });
   | |__- argument requires that `owned` is borrowed for `'static`
...
37 |   } _doctest_main_src_lib_rs_5_0() }
   |   - `owned` dropped here while still borrowed

Hm. I would have to think about where you can work around it.

1 Like

You would need to use my GAT workaround, or use the ::nougat crate (which just expands to the GAT workaround).

2 Likes

Fascinating! I was actually able to compile the library itself without GATs being enabled:

pub trait HandlerFnLifetime<'yielder, T, ImplicitBounds: Sealed = Bounds<Yielder<'yielder, T>>> {
	type Fut: Future<Output = ()> + 'yielder;
}

/// [`Future`] generator that can be converted to [`Stream`].
///
/// [`Stream`]: futures_util::stream::Stream
pub trait HandlerFn<'scope, T: 'scope>: for<'yielder> HandlerFnLifetime<'yielder, T> {
    /// Create new [`Future`] with the provided [`Yielder`] as a [`Stream`] item source.
    ///
    /// `'yielder` lifetime is defined inside of library internals,
    /// thus you are not allowed to use it to access outer scope elements.
    ///
    /// However, for those cases [`HandlerFn`] provides you with `'scope` lifetime,
    /// which is required to outlive `'yielder`.
    ///
    /// [`Stream`]: futures_util::stream::Stream
    fn call<'yielder>(self, yielder: Yielder<'yielder, T>) -> <Self as HandlerFnLifetime<'yielder, T>>::Fut
    where
        'scope: 'yielder;
}

However, when I tried to port an example to use HandlerFnLifetime, I've got an error that looks
suspiciously similar to #70263:

struct StreamState<'a> {
    val: &'a str
}

impl<'yielder, 'scope: 'yielder, T: 'scope> HandlerFnLifetime<'yielder, T> for StreamState<'scope> {
    type Fut = impl Future<Output = ()> + 'yielder;
}

impl<'scope> HandlerFn<'scope, &'scope str> for StreamState<'scope> {
    fn call<'yielder>(self, mut yielder: Yielder<'yielder, &'scope str>) -> <Self as HandlerFnLifetime<'yielder, &'scope str>>::Fut
    where
        'scope: 'yielder
    {
        async move {
            yielder.yield_item(self.val).await;
        }
    }
}
error: implementation of `HandlerFnLifetime` is not general enough
  --> src/lib.rs:21:14
   |
19 | impl<'scope> HandlerFn<'scope, &'scope str> for StreamState<'scope> {
   |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ implementation of `HandlerFnLifetime` is not general enough
   |
   = note: `HandlerFnLifetime<'0, &'scope str>` would have to be implemented for the type `StreamState<'scope>`, for any lifetime `'0`...
   = note: ...but `HandlerFnLifetime<'_, &'scope str>` is actually implemented for the type `StreamState<'1>`, for some specific lifetime `'1`

I tried adding the 'scope lifetime itself to HandlerFnLifetime (just to try it out), however, that didn't help:

pub trait HandlerFnLifetime<'yielder, 'scope: 'yielder, T: 'scope, ImplicitBounds: Sealed = Bounds<Yielder<'yielder, T>>> {
	type Fut: Future<Output = ()> + 'yielder;
}

/// [`Future`] generator that can be converted to [`Stream`].
///
/// [`Stream`]: futures_util::stream::Stream
pub trait HandlerFn<'scope, T: 'scope>: for<'yielder> HandlerFnLifetime<'yielder, 'scope, T> {
    /// Create new [`Future`] with the provided [`Yielder`] as a [`Stream`] item source.
    ///
    /// `'yielder` lifetime is defined inside of library internals,
    /// thus you are not allowed to use it to access outer scope elements.
    ///
    /// However, for those cases [`HandlerFn`] provides you with `'scope` lifetime,
    /// which is required to outlive `'yielder`.
    ///
    /// [`Stream`]: futures_util::stream::Stream
    fn call<'yielder>(self, yielder: Yielder<'yielder, T>) -> <Self as HandlerFnLifetime<'yielder, 'scope, T>>::Fut
    where
        'scope: 'yielder;
}
impl<'yielder, 'scope: 'yielder, T: 'scope> HandlerFnLifetime<'yielder, 'scope, T> for StreamState<'scope> {
    type Fut = impl Future<Output = ()> + 'yielder;
}

impl<'scope> HandlerFn<'scope, &'scope str> for StreamState<'scope> {
    fn call<'yielder>(self, mut yielder: Yielder<'yielder, &'scope str>) -> <Self as HandlerFnLifetime<'yielder, 'scope, &'scope str>>::Fut
    where
        'scope: 'yielder
    {
        async move {
            yielder.yield_item(self.val).await;
        }
    }
}
error: implementation of `HandlerFnLifetime` is not general enough
  --> src/lib.rs:21:14
   |
19 | impl<'scope> HandlerFn<'scope, &'scope str> for StreamState<'scope> {
   |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ implementation of `HandlerFnLifetime` is not general enough
   |
   = note: `HandlerFnLifetime<'0, 'scope, &'scope str>` would have to be implemented for the type `StreamState<'scope>`, for any lifetime `'0`...
   = note: ...but `HandlerFnLifetime<'_, '1, &'scope str>` is actually implemented for the type `StreamState<'1>`, for some specific lifetime `'1`

I figured out how to get it to work and now I’ve opened a PR. I’m not sure what was causing your error but without the 'scope lifetime things seem to work just fine.


By the way, I haven’t said this yet but major thanks for writing this crate! I’ve thought about whether this was possible in the past but I’m very happy that someone has done it.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.