I need some help, getting confused about `Pin`, again

The problem I have is as follows. I have an async function that currently goes through a Stream in a while loop:

pub async fn start( mut self, mut actor: A ) -> Option<Self>
{
	while let Some(envl) = self.rx.next().await
	{
		if let Err(e) = AssertUnwindSafe( envl.handle(&mut actor) ).catch_unwind().await
		{
			error!( "Actor panicked: {}, with error: {:?}", &self, e );
			return Some(self);
		}
	}

	None
}

So both self and actor get stored in this Future, but if the actor panicks, we give the user back the mailbox, so they can spawn a new actor in it. This code runs fine btw. Even though A is a generic that is not required to be Unpin, I can still hand out &mut actor somehow and it all seems to work fine, at least the compiler is happy.

Now I want to do something for which I need to do a check and potentially stop this future when the channel is empty, eg. self.rx.poll_next() returns Poll::Pending.

I figure the only way I can do this is by creating a custom Future. So I tried:

pub struct MailboxFut<A> where A: Actor
{
	mb: Option< Mailbox<A> >,
	actor: A,
}


impl<A: Actor + Send> Future for MailboxFut<A>
{
	type Output = Option< Mailbox<A> >;

	fn poll( mut self: Pin<&mut Self>, cx: &mut TaskContext<'_> ) -> Poll< Option<Mailbox<A>> >
	{
		let this = self.as_mut();

		match this.mb.as_mut().unwrap().rx.poll_next_unpin( cx )
		{
			Poll::Ready( Some(envl) ) =>
			{
				let mut envl_fut = AssertUnwindSafe( envl.handle( &mut this.actor ) ).catch_unwind();

				if let Err( e ) = ready!( Pin::new( &mut envl_fut ).poll( cx ) )
				{
					error!( "Actor panicked: {}, with error: {:?}", this.mb.as_ref().unwrap(), e );
					return Poll::Ready( Some( this.mb.take().unwrap() ) );
				}
			}

            //...

But now the compiler is not having it. The future is not Unpin, so there's no getting the rx to poll it, nor can I pas &mut this.actor on etc. I won't be able to get the mailbox out to return it in Poll::Ready either because Option::take will require &mut as well.

Is there a way to get this to work without unsafe nor requiring A: Unpin?

What I am trying to understand is how all the exact same things are possible in the async method, but not in my hand rolled future.

I’m unfamiliar with the libraries involved; I don’t even know which ones you use. Googling stuff suggests it’s perhaps actix? Please give more detail.

more details would help a lot. Could you present the error message? Which line is the place it doesn’t work, is this about this.mb.as_mut().unwrap().rx.poll_next_unpin( cx ) or about if let Err( e ) = ready!( Pin::new( &mut envl_fut ).poll( cx ) ) or some other place?

Maybe the best solution for you is to keep using async fn and use something like futures::poll to help you out. Otherwise, you’d need to restructure you code dramatically to get the same kind of state machine implemented that your original implementation did.

1 Like

I'm sorry it's not sufficiently clear. I'm just using the futures library.

So the compiler error is this:

error[E0596]: cannot borrow data in a dereference of `std::pin::Pin<&mut MailboxFut<A>>` as mutable
  --> src/mailbox_fut.rs:23:10
   |
23 |             match this.mb.as_mut().unwrap().rx.poll_next_unpin( cx )
   |                   ^^^^^^^ cannot borrow as mutable
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `std::pin::Pin<&mut MailboxFut<A>>`

which is expected, as DerefMut is only implemented for Pin when the target is Unpin, which is not the case here as A is not bound by it. I would like to keep it that way as it's a user supplied type, so the less constrained it is the better and in principle I'm not trying to do anything that I wasn't already doing in the async method and it didn't need to be Unpin for that.

I could also solve this with unsafe, but I feel that's a last resort because so far the lib enjoys #[ forbid(unsafe_code) ] and it makes it so much trickier to guarantee correctness as I already feel I don't understand all the intricacies here.

The whole point of this is that the only reason I have to have a custom future type is that I need to just check a counter if the stream returns pending. Other than that I'm not doing anything I wasn't already doing. It feels wrong to have to use unsafe or introduce extra constraints on user supplied types for that.

Ah, I didn't know about this one. Looks like it might be a solution.

No worries, unsafe and constraints will not be needed, I promise. I think we can work out a solution here one way or another… you might also need pin_mut if you want to use the poll macro.

Ok, it looks like the poll macro will do the trick. That's awesome. Many thanks. I think I have a few other situations where that will come in handy.

I still hope to learn more of how I could have solved it if I wanted to keep the custom future. If anyone knows...

As I mentioned, there’s a problem that your manual Future as you presented it doesn’t do the same thing as the async code. In general you need some kind of enum for a kind of “state machine” for manual futures that reproduce the multiple possible entry points of an async fn. Some interesting resource that includes a bit of an explanation of async/await as well of a lot more information is this video on YouTube

By the way, instead of poll macro, you might also be able to create a solution by using select (or select_biased) together with poll_fn.

Yes, I would have considered an enum state machine, but it would still be fed to poll as Pin<&mut Self>.

Things I wonder about is for example that I pass the actor field as &mut actor to a trait method implemented by users. As far as I can tell nothing keeps them from moving it out with mem::swap.

I have no idea how that is fine if the future that owns it is Pinned and !Unpin for example. The async fn allows this, without any unsafe (in my code at least). Maybe the difference is just that I have it as a type parameter, which is why the compiler wants it to be Unpin. On the other hand, I re-use it accross await points.

If this is the 3h video, I saw it already... I have actually spent considerable time trying to understand the ins and outs of futures and Pin, yet sometimes it still bytes me and I haven't looked to deep into the generators themselves.

Have you looked at pin_project yet? It can be immensely useful for implementing type where you need to pin fields without using unsafe. Also make sure to read through the module-level documentation of pin. As long as you don’t try to implement futures manually, deep knowledge about pinning won’t be too necessary though.

yes, I have looked at both several times. taik-e is a genius I think to understand this stuff.

Ok, maybe I know the answer to my own question. The actor itself is never pinned. The future wouldn't really care if it was swapped with another one, as it would still have a valid reference. So that's fine. I suppose this might be just one of those situations where I see other people declare the future unsafe{ impl Unpin for Myfuture }.

I don’t feel like I have the full picture of what your library is doing here from the code examples, otherwise I would be happy to try and explain what’s going on and why it isn’t unsound. Is actor itself a Future? (Probably not.) If you’re referring to the fact that actor will be a field of the created future but an ordinary &mut reference to it can be created: It’s important to understand structural pinning (a section in the pin module-level documentation). Structural pinning means that a type can decide itself what it means for it to be pinned. A generic struct containing some field actor: A can decide that Pin<&mut Self> does not mean that the actor field is pinned.

In fact, Unpin is not an unsafe trait, you can have any type you define just unconditionally implement Unpin no matter what kind of fields there are. For situations where certain fields are pinned, you’ll need something like pin_project’s macros.

Yeah, maybe it's time for yet another read through of the Pin docs, as it's been a while.

The actor and the mailbox are just data owned by the future. The stream on the mailbox is Unpin, so that doesn't matter.

This post is a good reading about Pin:

1 Like

For a quick demonstration using select_biased, to add a counter or anything else to a function like

pub async fn demonstration1<T>(mut stream: impl Unpin + Stream<Item = T>) {
    while let Some(item) = stream.next().await {
        // do stuff
    }
}

something like this might work

pub async fn demonstration2<T>(mut stream: impl Unpin + FusedStream<Item = T>) -> Option<()> {
    while let Some(item) = select_biased! {
        // you can add .fuse() after .next() if the stream isn’t known to be FusedStream
        item_or_none = stream.next() => item_or_none,
        // the future below gets polled as long as the
        // stream.next() future is pending
        _ = poll_fn({
                let mut counter = 0;
                move |_| {
                    counter += 1;
                    if counter == 10 {
                        Poll::Ready(())
                    } else {
                        Poll::Pending
                    }
                }
        }).fuse() => return None,
    } {
        // do stuff
    }

    Some(())
}

(untested, but it compiles)

imports:

use std::task::Poll;

use futures::{future::poll_fn, prelude::*};
use futures::{select_biased, stream::FusedStream};

I don’t know what type rx is in your case, but it should be Unpin and Stream based on the fact that your first code example works, maybe it is FusedStream, too.

Yes, what I wanted to do was this:

loop
{
   let envl = match futures::poll!( self.rx.next() )
   {
      Poll::Ready( Some(envl) ) => envl,
      Poll::Ready( None       ) => break,
      Poll::Pending             => 
      {
         if self.weak.strong_count() == 0 { break;    }
         else                             { continue; }
      }
   };

   if let Err( e ) = AssertUnwindSafe( envl.handle( &mut actor ) ).catch_unwind().await
   {
      error!( "Actor panicked: {}, with error: {:?}", &self, e );
      return Some(self);
   }
}

However, the else { continue; } is a problem. This will just execute in a loop. The idea is "If pending and no strong count, stop." Otherwise I still just want this future to be woken up when the stream has more data or closes.

Its an mpsc channel, so initially I just loop until it returns None. But I want some of the Senders to behave like a Weak pointer. Eg. even if they aren't dropped, we stop processing. For that the sender will verify the strong count and refuse to take more messages if it is 0, but the receiver here continues to process everything that is already in the queue, and once it get's pending, it checks the strong count.

However if there are still strong senders around, we wish to continue to behave as normal, thus awaiting to be woken up by the stream or blocking on the pend.

That just makes me think I forgot a scenario. What if we are pending while the strong count goes to 0 and no messages ever arrive after that. We wouldn't terminate. Maybe my counter will need to be async.

@Yandros I'll give that blogpost another read as well.

You can also use select function…

so something like

    while let Left((Some(envl), _)) = future::select(
        self.rx.next(),
        future::poll_fn(|_| {
            if self.weak.strong_count() == 0 {
                Poll::Ready(())
            } else {
                Poll::Pending
            }
        }),
    )
    .await
    {
        // do stuff
    }

might work.

use futures::future::{self, Either::*};
use futures::prelude::*;
use futures::stream::FusedStream;
1 Like

ah, that's smart. I have not given select! enough love to think of this myself... I never use it much. I did like poll_fn, but I've only used it in test code that needed a future. And Either, I literally never used it.

I don't think it quite works, because the Right part will evaluate only once. It will return Pending but never wake the task if subsequently the strong count goes to zero.

I think making the count async will be unavoidable, so it can wake the task up when it goes to zero.

The way that the poll_fn ignores the Context indicates that the future might never get woken up again. I believe, in principle you’d probably need to make sure somehow that the strong_count dropping to zero wakes up the future. I don’t know in detail how this kind of stuff can be done best.

Unless the rx stream somehow guarantees some kind of wake-up in that case anyways? I believe, you haven’t told me what kind of type rx is anyways.