In tokio sync brodcast, does the buffer matter?

do i need to set the buffer to how many tasks im planning on keeping active concurrently

lets say i have a 300 task limit via permits,
and i make the broadcast channel buffer is 2x that,
all this is, is to remotely shutdown the rest of the threads if something goes wrong

let (shutdown_send, shutdown_recv) = broadcast::channel(max_concurrent * 2);```

The buffer capacity should be how many messages you need the channel to store. tokio::sync::broadcast - Rust

When a value is sent, all Receiver handles are notified and will receive the value. The value is stored once inside the channel and cloned on demand for each receiver. Once all receivers have received a clone of the value, the value is released from the channel.

A channel is created by calling channel, specifying the maximum number of messages the channel can retain at any given time.

But you don't usually need to do this since async tasks can be shut down from outside.

1 Like

Yes, if the buffer size isn't large enough and it's being written to faster then being read from then eventually you will get an error which you'll be forced to handle (either on write or read).

You can generally handle this with backpressure or by dropping messages.

This video section describes the problem that can occur https://youtu.be/fTXuGRP1ee4?t=1790

A single slow reader could cause the buffer to fill up because all readers need to receive the broadcast message before the space in the buffer can be reused for more writes.

how do i shut it down from the outside?

If it's a plain Future (like in select!), you can just drop it. If it's a tokio task, you can call abort.

well if one thing goes wrong, i want all the other tasks to end

The buffer size is the number of items of "lag" you're willing to accept before a slow Receiver gets to stall the Sender. The idea behind the broadcast channel is that all items sent are seen by all Receivers; if a Receiver can't keep up with the rate at which items are arriving, then a limited number are buffered before the Sender gets backpressure (can't send another item).

Remotely shutting down tasks would not normally be done via a broadcast channel; you'd either share a single CancellationToken between all the tasks that are supposed to shut down, and cancel that when you want them all to terminate, or you'd spawn them all in a JoinSet and use JoinSet::shutdown to abort them all wherever they happen to be right now.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.