I'm building a game server on top of actix-core, and am having problems arranging a back-and-forth communication between actors nicely.
When clients first connect, the server has to send them the data for the environment (which is unbounded and load-on-demand) surrounding their spawn point, which my code represents as a Player
actor asking the ChunkManager
for a set of ChunkHandle
objects, which have references to Chunk
actors. The Player
then asks each Chunk
to send it its contents (as a compound object), and writes that contents to the codec wrapping the underlying network connection. The Chunks are not required to reply in any particular order. In code, this looks like the following:
/// For this and all other examples, Self==Player, which implements Actor
fn get_chunk_packets(&mut self,
requested_area: ChunkArea,
ctx: &mut <Player as Actor>::Context) {
fn ask_chunk_for_pkt(addr: ChunkHandle) ->
impl Future<Output = (ChunkHandle, ChunkDataMsg)>
{
addr.target.send(RequestPacket())
.map(Result::unwrap)
.map(|cdm| (addr, cdm))
// Returning addr as well so it doesn't need to be cloned
// and can be saved to the player
}
let chunk_handles = self
.chunk_manager
.send(requested_area)
.map(Result::unwrap)
.map(|AreaResponse(chunks)| chunks);
let unordered_pending_responses = chunk_handles
.map(|chunks| chunks.into_iter().map(ask_chunk_for_pkt))
.map(FuturesUnordered::from_iter);
let arrived_responses = unordered_pending_responses.then(StreamExt::collect);
// Do I need this?
let response_writes = wrap_future(arrived_responses)
// Gives access to the actor this'll be queued on
.map(|chunk_responses: Vec<_>, this: &mut Self, _ctx| {
this.chunk_handles.clear();
for (handle, chunk_data) in chunk_responses {
this.chunk_handles.push(handle);
this.stream.write(rewrap_chunk_pkt(chunk_data));
}
});
ctx.wait(response_writes);
// Queues the whole future to run on the actix event loop
}
My first question is, in the "old way" doing things with future combinators, is this sensible? This type checks (althogh I've not ran it yet because the logic to actually reply to these messages isn't complete) but only after some wrangling, and I feel like I'm missing something about how to phrase this process as combinators. (e.g. Do I have to materialize the whole thing before I can start working on it?) Stream::for_each_concurrent
looks very tempting but requires the applied closure to return Future
and I don't understand how to apply it in this context.
I have another question which is much more specific to actix, so I appreciate any replies that can only comment on the first part. I tried to rewrite the above as an async function, and got this:
async fn get_chunk_packets_async(&mut self, requested_area: ChunkArea) {
fn ask_chunk_for_pkt(addr: ChunkHandle) ->
impl Future<Output = (ChunkHandle, ChunkDataMsg)>
{
addr.target.send(RequestPacket())
.map(Result::unwrap)
.map(|cdm| (addr, cdm))
}
let AreaResponse(chunk_handles) = self.chunk_manager
.send(requested_area)
.await
.unwrap();
let chunk_requests = chunk_handles.into_iter().map(ask_chunk_for_pkt);
let mut request_stream = FuturesUnordered::from_iter(chunk_requests);
while let Some((handle, chunk_data)) = request_stream.next().await {
self.chunk_handles.push(handle);
self.stream.write(chunk_data);
}
}
Which is both much more readable and closer to what I want in terms of data dependencies. However, I now have a problem because the return type of the above is Future<()>
, and to queue it on my actor's event loop with wait
, it needs to be ActorFuture<_> + 'static
, which is blocked by it holding &mut self
. The trick with wrap_future
doesn't work very well because the thing I want to do with the stream is so reliant on interacting with self
(and so I can only write half the process before I need to drop out of async to wrap it) and AFAIK I can only interact with the return from wrap_future
in terms of combinators, so I'm not improving very much over the entirely-sync version at the start.
Am I missing something here in terms of queuing this as an async function? Is there some way to interact with non-std
Futures (ActorFuture : Future
) with async fn definitions?