Issues with future combinators & async using actix-core

I'm building a game server on top of actix-core, and am having problems arranging a back-and-forth communication between actors nicely.

When clients first connect, the server has to send them the data for the environment (which is unbounded and load-on-demand) surrounding their spawn point, which my code represents as a Player actor asking the ChunkManager for a set of ChunkHandle objects, which have references to Chunk actors. The Player then asks each Chunk to send it its contents (as a compound object), and writes that contents to the codec wrapping the underlying network connection. The Chunks are not required to reply in any particular order. In code, this looks like the following:

 /// For this and all other examples, Self==Player, which implements Actor
fn get_chunk_packets(&mut self, 
                     requested_area: ChunkArea, 
                     ctx: &mut <Player as Actor>::Context) {
        fn ask_chunk_for_pkt(addr: ChunkHandle) -> 
            impl Future<Output = (ChunkHandle, ChunkDataMsg)> 
        {
            addr.target.send(RequestPacket())
            .map(Result::unwrap)
            .map(|cdm| (addr, cdm)) 
            // Returning addr as well so it doesn't need to be cloned 
            // and can be saved to the player
        }
        let chunk_handles = self
            .chunk_manager
            .send(requested_area)
            .map(Result::unwrap)
            .map(|AreaResponse(chunks)| chunks);

        let unordered_pending_responses = chunk_handles
            .map(|chunks| chunks.into_iter().map(ask_chunk_for_pkt))
            .map(FuturesUnordered::from_iter);

        let arrived_responses = unordered_pending_responses.then(StreamExt::collect); 
                                                            // Do I need this?

        let response_writes = wrap_future(arrived_responses) 
                             // Gives access to the actor this'll be queued on
        .map(|chunk_responses: Vec<_>, this: &mut Self, _ctx| {
            this.chunk_handles.clear();
            for (handle, chunk_data) in chunk_responses {
                this.chunk_handles.push(handle);
                this.stream.write(rewrap_chunk_pkt(chunk_data));
            }
        });
        ctx.wait(response_writes); 
        // Queues the whole future to run on the actix event loop
}	

My first question is, in the "old way" doing things with future combinators, is this sensible? This type checks (althogh I've not ran it yet because the logic to actually reply to these messages isn't complete) but only after some wrangling, and I feel like I'm missing something about how to phrase this process as combinators. (e.g. Do I have to materialize the whole thing before I can start working on it?) Stream::for_each_concurrent looks very tempting but requires the applied closure to return Future and I don't understand how to apply it in this context.

I have another question which is much more specific to actix, so I appreciate any replies that can only comment on the first part. I tried to rewrite the above as an async function, and got this:

async fn get_chunk_packets_async(&mut self, requested_area: ChunkArea) {
    fn ask_chunk_for_pkt(addr: ChunkHandle) ->
       impl Future<Output = (ChunkHandle, ChunkDataMsg)> 
    {
        addr.target.send(RequestPacket())
        .map(Result::unwrap)
        .map(|cdm| (addr, cdm))
    }
    let AreaResponse(chunk_handles) = self.chunk_manager
                                          .send(requested_area)
                                          .await
                                          .unwrap();

    let chunk_requests = chunk_handles.into_iter().map(ask_chunk_for_pkt);

    let mut request_stream = FuturesUnordered::from_iter(chunk_requests);

    while let Some((handle, chunk_data)) = request_stream.next().await {
        self.chunk_handles.push(handle);
        self.stream.write(chunk_data);
    }
}	

Which is both much more readable and closer to what I want in terms of data dependencies. However, I now have a problem because the return type of the above is Future<()>, and to queue it on my actor's event loop with wait, it needs to be ActorFuture<_> + 'static, which is blocked by it holding &mut self. The trick with wrap_future doesn't work very well because the thing I want to do with the stream is so reliant on interacting with self (and so I can only write half the process before I need to drop out of async to wrap it) and AFAIK I can only interact with the return from wrap_future in terms of combinators, so I'm not improving very much over the entirely-sync version at the start.

Am I missing something here in terms of queuing this as an async function? Is there some way to interact with non-std Futures (ActorFuture : Future) with async fn definitions?

I don't know much about actix specifically, but if you need a 'static future, you can move ownership of the thing you're calling the &mut self method on into an async block:

let fut = async move {
    my_thing.get_chunk_packets_async(data).await
};

This way my_thing is one field in the async block, so it is 'static because it doesn't borrow from anything outside of itself.

If you need more help, which version of actix are you using? Just so I can look at the correct documentation.

1 Like

I'm using actix 0.9.0, but can't see any way of taking ownership of the Player without a lot of hacking with basically temporarily stealing its contents. The method that this will likely be called from is only given a &mut self, AIUI because the actor is owned by the actix internal runtime.

If actix says you can't have references, you will have to use some combination of copies and shared data instead.

I don't remember which actix version switched to async/await. In the new one which uses async, you can have some references in some cases.

In the old ones based on futures v0.1, you basically have to use Arc for everything all the time. All the futures combinators will demand that from you.

I believe I'm using the new one, since awaiting on sent messages compiles fine. Cargo also says that the version of actix I'm using is specifically depending on futures 0.3.4, which is the async-compatible version, right?

1 Like

It's unclear to me in your second example which part is the one that needs &mut Self in a 'static future.

So a generic tip, I don't know if it's applicable to you.

static_spawn(arc_foo.async_function())

makes a non-static Future that temporarily borrows arc_foo. OTOH:

static_spawn(async move {
  arc_foo.async_function().await
})

moves arc_foo into the async block's Future, so the whole thing becomes 'static, because now the borrow happens internally when polling the future, not before it was created.

Sorry for being unclear, the problem is not directly in the example. (Which compiles fine, by itself) The problem arises in trying to queue the Future on the event loop, which currently looks like this:

fn deploy_async(&mut self, requested_area: ChunkArea, ctx: &mut <Player as Actor>::Context) {
        let f = self.get_chunk_packets_async(requested_area);
        let w: actix::fut::FutureWrap<_, Self> = wrap_future(f);
        ctx.wait(w);
    }

Which complains about an inconsistent lifetime on self.

I'm unclear on how that borrow is actually going to happen, even if I wrap my actor state (or at least the relevent bits of it) in an Arc. Does it actually need to be an Arc<RwLock> or similar, because otherwise the two outstanding Arcs will stop me getting a mutable reference to the stream I want to write to?

So here:

let f = self.get_chunk_packets_async(requested_area);

f is tied to &mut self, because it's an argument to the method, and has to be kept so that the method can be called later when the future is polled ("calling" async functions doesn't run them, it only saves their args for later).

You have to make f self-contained, so that it owns everything it needs to run its body later when actix runtime polls it.

Previously you've had a curious construct:

.map(|chunk_responses: Vec<_>, this: &mut Self, _ctx| {

which suggest that something else held a reference to &mut self somewhere, and gave it to you later. It couldn't be &mut self from your get_chunk_packets method arguments, because that would create the same problem you have now.

So something like this could work:

self_arc_mutex: Arc<Mutex<Self>> = …;
let f = async move {
  self_arc_mutex.lock().get_chunk_packets_async(requested_area);
};

or avoid Arc<Mutex<Self>> if you can make actix give you &mut self the way it did in wrap_future().map(||.

It took me some reading to understand, but AIUI that reference is coming from the actix runtime. The result from wrap_future is an ActorFuture<T, Actor>, which implements Future<T>, and re-defines map to have inputs (T, &mut Actor, &mut Actor::Context), not by somehow implementing Future<(T,Actor)> as I misunderstood at one point. It also redefines poll to accept an Actor reference provided by the runtime, rather than it being stored in the object.

Thanks, I'll try that if I can't get any other way to work out.

I could do that, but the problem I then have is that the async part cannot do most of the work I want, because it doesn't have access to self and AFAIK it's impossible to use async syntax in the ActorFuture bit, e.g. my async part can only get this far:

async fn get_chunk_packets_async(chunk_manager: Addr<ChunkManager>, requested_area: ChunkArea) {
	fn ask_chunk_for_pkt(addr: ChunkHandle) ->
	   impl Future<Output = (ChunkHandle, ChunkDataMsg)> 
	{
	    addr.target.send(RequestPacket())
	    .map(Result::unwrap)
	    .map(|cdm| (addr, cdm))
	}
	let AreaResponse(chunk_handles) = chunk_manager
	                                      .send(requested_area)
	                                      .await
	                                      .unwrap();

	let chunk_requests = chunk_handles.into_iter().map(ask_chunk_for_pkt);
	let request_stream = FuturesUnordered::from_iter(chunk_requests);
	// ...What gets returned?
}	

before I have to wrap_future it to and map (synchronously) to get access to &self.

And then this has the effect that if I just return request_stream in the above, my ActorFuture ends up being of a Stream, and I haven't yet worked out how to process a stream in sync code without blocking everything by collecting it. If it doesn't return a Stream, then the only other thing I can think of is for it to return a collection, which effectively means the only thing that changed is the collect happened in the async block rather than after it.

I could easily avoid this problem if I could write the second part, with the &self and context, with async code, but I don't think I can do that.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.