Tokio has a rather neat kind of channel called a oneshot
channel, which is more or less exactly what you're referring to with a task you can mark completed later.
Here are some tips for structuring this kind of program:
First, I would make a Tokio Runtime
object which will handle the Tokio thread pool, along with a channel for gpu tasks and one for cpu tasks.
To spawn a task on the Tokio runtime, you can call spawn
on the runtime object, however if that object is not easily accessible (e.g. from inside a cpu task), you can ask the runtime for a Handle
, which you can use to spawn tasks from anywhere. You can clone the handle freely and pass it around to all your tasks to allow them to spawn tasks. Similarly you can clone the sender of your crossbeam channels to pass around task spawning abilities.
Of course, if you want to use rayon
for cpu-bound tasks, you can just call the global rayon::spawn
function instead of passing around some sort of handle.
In order to call other kinds of tasks when one task finishes, I would simply move the appropriate spawning handle into the task, and call the spawn function at the end of the function. E.g. to spawn a Tokio task at the end of GPU work, simply call handle.spawn(...)
when the GPU work has finished.
If you wish to synchronize on the end of two tasks, e.g. a CPU and GPU task that are running in parallel, but both need to finish to start the next task, using Tokio to synchronize their completion is a good choice:
let (cpu_send, cpu_recv) = oneshot::channel();
let (gpu_send, gpu_recv) = oneshot::channel();
tokio_handle.spawn(async move {
let cpu_result = cpu_recv.await.unwrap();
let gpu_result = gpu_recv.await.unwrap();
// Spawn task now.
});
// Go use cpu_send and gpu_send.
A Tokio task that isn't doing anything does not take up any resources, which makes the above extremely light-weight. Even if the next task to spawn is not a network-bound task, using Tokio to synchronize their completion should work nicely.
As for avoiding nested callbacks, sleeping in cpu or gpu tasks should be avoided, as that takes up a full thread for the duration of the sleep. It is perfectly fine to do in Tokio, as the async await makes sleeping (using .await
) not take up a thread. This means that if you want to write imperative code that controls the flow of task spawning, you should do that in Tokio. Below you find an example that first spawns a cpu task, and then passes the output to a gpu task:
// For errors you just want to bubble up, you want this:
type Error = Box<dyn std::error::Error + Send + Sync>;
async my_job() -> Result<YayType, Error> {
let (cpu_send, cpu_recv) = oneshot::channel();
spawn_cpu_task(cpu_send);
let cpu_result = cpu_recv.await?;
let (gpu_send, gpu_recv) = oneshot::channel();
spawn_gpu_task(gpu_send, cpu_result);
let gpu_result = gpu_recv.await?;
Ok(gpu_result)
}
If the spawned cpu or gpu task fails, the oneshot channel will immediately fail with a RecvError
(this happens when the sender is destroyed), and the question mark operator will make the error bubble up. The actual cause of failure will probably be printed to stderr by the cpu or gpu task — to catch it, one could create a wrapper around oneshot channel that uses catch_unwind
to capture the failure and send it to the Tokio task.
As for prioritizing tasks, you can make your task channel include a priority (e.g. an integer), and use a construct like this, using a binary heap to prioritize them. It will always return the highest-priority task. Unfortunately you will lose the multiple-receiver property of crossbeam-channels, but if you put it in a mutex, I think it should work rather nicely. Note that this doesn't mix very well with rayon; you would need to build your own thread pool to use this approach.
You will probably want to use the reqwest
crate for your network operations, as it integrates nicely with Tokio. You can limit the number of concurrent network operations using a Semaphore
.
A word of warning: All blocking inside Tokio must happen with an .await
. A rule of thumb is this: If your Tokio task doesn't near-instantly go from one .await
to another, you are waiting without giving back control to Tokio.