Code smell? tokio::spawn per msg per user per tick

Say we have a server that runs at 20 ticks per second.
Say we want to target 1000 users / server. Here, a 'user' might just be a NPC.

So it needs to handle 20,000 user-ticks per second. Suppose a typical user-tick avgs 3 msgs over websocket, so this is 60,000 websocket msgs / second.

Is it too much to do a tokio::spawn per msg ? The issue here is: for the main game loop, I don't want it to .await at all; I want it to figure out the data it needs to send -- and move on. I.e. we have:

loop { // 20 ticks per second
  for u in user { 
    .. do some work ..
    choiceA: blah.send(...).await; <-- this seems like BAD, as this is main game loop
    choiceB: tokio::spawn(blah.send(..)); <-- whoa, 60,000 spawns / second?
  }
}

If you need to await lots of tasks in parallel, there's this:

Alternatively, make users into a Stream and use .buffer_unordered(…) on the stream.

To spread work over multiple cores, consider processing users in batches, e.g.:

for batch in user.chunks(5000) {
   tokio::spawn(|| {
      for u in batch { … }
   })
}
2 Likes
  1. JoinSet looks very cool. Thanks.

  2. batching/chunkinkg solution involves 2 passes, and managing storing msgs locally. Otherwise, to use tokio::spawn { ... } in blocks that contains updates, we would have to pu5t the game state in an Arc<Mutex<...>> which would defeat multi threading (no fine grained locking yet).

^-- Can you expand on this? I don't know what this solution looks like.

I have a question in that matter. This will basically spawn the set of tasks to be polled automatically by the runtime, right? I.e.

  • There needs to be a runtime context.
  • The JoinSet doesn't need to be awaited in order for the tasks to be executed. Execution will be done "automatically" by the runtime.

I wonder if there is something like JoinSet which doesn't require a runtime but allows awaiting explicitly. Something like futures::future::JoinAll but with the ability to add futures to it dynamically. Does somthing like this exist?

You can dynamically push to FuturesOrdered and FuturesUnordered. Although you can't push while awaiting, I think you can probably get the behavior by selecting over futures.next() and a stream that you submit tasks through.

2 Likes

Thanks. It's exactly what I was looking for!

Or not. I just noticed FuturesOrdered and FuturesUnordered don't implement Future. I can convert them into a future, but this will consume the set (of futures). How can I then add more futures to the set afterwards? Maybe it's possible somehow, but still seems to be a bit confusing.


I just found StreamExt::next, which of course should solve my problem, as it doesn't consume the FuturesOrdered/FuturesUnordered.

Rather than spawn each task, if you're worried about pressure, the general pattern would be to submit jobs to a queue/channel, which workers then pull from. This is essentially what spawn is doing, but you might be able to reduce the overhead with a specialized queue, e.g. roughly

enum Job {
    Send(Whatever, Needed, Context),
    ..
}

let (snd, rcv) = mpmc::channel(1024);

// once at startup
for _ in 0..thread_count() {
    let rcv = rcv.clone();
    spawn(async {
        loop {
            match rcv.next().await? {
                Job::Send(blah, ..) => blah.send(..),
                ..
            }
            process(job).await;
        }
    });
}

// when submitting
for u in user {
    snd.feed(Job::Send(blah, ..)).await; // just the submission
}

This might do better than spawn for two reasons:

  • less needed allocation, since the spawned task is a single job enum type, and
  • backpressure from a bounded channel allowing the producing loop will slow down if the consuming tasks aren't keeping up.

This will capture and require the same items to be 'static as the spawn approach.

Actually I was looking for something that doesn't require 'static.

This seems a bit tedious. I wonder if there's a ready-to-use solution like JoinSet which can be awaited instead of spawning. It seems to be a pretty basic/natural thing but I didn't find it anywhere.

Anyway, I don't need it that urgently; I was mostly asking out of curiosity. In my use-case (opposed to the OP), it's not a big deal for me to spawn every single task. And if it's too much, I can still use JoinSet and will be fine.

I was mostly wondering if I can get rid of the 'static bound if I don't need runtime/worker-driven execution. I think it's possible with the idea you outlined, but will be too tedious to program in my case to be worth it. If there was a ready solution, I'd have considered to use it.

Certainly good to be aware of JoinSet though!

In that case, you need some kind of scoped API, i.e. the callback which enforces that everything spawned will be awaited after callback returns, and not something that can be awaited manually.

Using tokio::spawn to make sending on an mpsc channel complete immediately is just an unbounded mpsc channel with extra steps. Use an unbounded channel instead.

Alternatively, you could use try_send on your mpsc channel and kill the user if they can't keep up with the messages. I discuss this kind of pattern more in this blog post and associated talk.

5 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.