Can Tokio Semaphore be used to limit spawned tasks?

Hi all, I want to limit the number of spwaned Tokio tasks, and am evaluating different approaches for it:

  1. Using tokio::sync::Semaphore
    This does not seem to work, as it appears that Semaphore acquire and release are meant to be done in a way where the Semaphore outlives SemaphorePermit, which makes sense.
let permits = tokio::sync::Semaphore::new(10);
let jobs = (0..100).map(|i| Job::new(i));
for job in jobs {
    let permit = permits.acquire().await.expect("unable to acquire permit"); // Acquire a permit
    let jh = tokio::spawn(async move {
         job.run().await;
         drop(permit);  // This is incorrect as the lifetime of permit exceeds that of permits
    });
   ...
}
... wait for all jobs to be completed.

  1. Using an MPSC channel as a Semaphore
let (permits_tx, mut permits_rx) = tokio::sync::mpsc::channel::<bool>(10);
// Seed the permits
 for _ in 0..10 {
    permits_tx.send(true).await.expect("unable to init permits");
 }

let jobs = (0..100).map(|i| Job::new(i));
for job in jobs {
    permits_rx.recv().await; // Acquire a permit
    let jh = tokio::spawn(async move {
        job.run().await;
        permits_tx // Release a permit
                .send(true)
                .await
                .expect("unable to release permit");
    });
   ...
}
... wait for all jobs to be completed.

  • I wanted to check here if it is expected to not be able to use Semaphore in such scenarios, or is there some pattern that others use? I understand that I could have cloned the Semaphore and done both acquire and release from within the spawn to control actual job concurrency, but in this I am trying to prevent spawn itself.

  • Is use of MPSC for this use case te correct way, or there any better primitives/patterns to achieve the same?

Why do you want to limit spawned tasks? The tokio can runs millions of concurrent tasks easily on moderate CPU.

let permits = Arc::new(Semaphore::new(10));
for i in 0..100 {
    let job = Job::new(i);
    let permits = Arc::clone(&permits);
    tokio::spawn(async move {
        let _permit = permits.acquire().await.unwrap();
        job.run().await;
    });
}

Thank you. I understand that it is probably very little overhead to spawn and queue Tokio tasks, and I will probably resort to that.

Mine was a hypothetical query while learning Tokio, to see if one can avoid the overhead of these tasks by keeping upper bound on used memory very tightly in control.

The mpsc also allocate memory to queue the messages. It's not obvious that the mpsc will use less memory than spawning.

But wouldn’t mpsc memory allocation (max in use) be limited to its capacity (10 in this case)?

Right, I misunderstood your intension a bit. What you want is the Semaphore::acquire_owned() method.

2 Likes

Thank you so much! This is exactly what I was trying to achieve originally!