Tokio stream Sender ownership & backpressure

I think I’ve missed a trick with tokio/futures. I often encounter data flows like this:

  1. I have a stream of values that I’m processing
  2. I take each one using for_each. To enable backpressure, if any downstream processing is required I make sure to return that work as a future, so it will be completed before I accept the next item.
  3. Sometimes in the body of the for_each, I get some event or value that I want to announce to some other arbitrary component via a tokio channel. (That component will take the receiver and spawn its own processing task.) Typically in the for_each I have the Sender end of an mpsc channel. If that channel’s queue is full because of pending downstream work then I want to block on that send() until it finishes.

The main difficulty is that sink’s send() consumes self. I lose it until it completes and this is a major hassle. I understand why. I’m just having trouble finding the natural way to use it.

Here is the basic code I am talking about (playground)

    let mut rt = Runtime::new().unwrap();
    let (mut output_tx, output_rx) = channel(16);
    
    // My downstream component that's processing some results
    let downstream = output_rx
        .for_each(|v: u32| {
            println!("Downstream: value {}", v);
            Ok(())
        })
        .map_err(|_| ());
    rt.spawn(downstream);
    
    // My upstream component that only passes some values to downstream
    let upstream = iter_ok::<_,()>((0..10).collect::<Vec<u32>>())
        .for_each(move |v: u32| {
            println!("Upstream: value {}", v);
            if v % 2 == 0 {
                // PROBLEM: NO BACKPRESSURE
                let _ = output_tx.try_send(v*10);
            }
            Ok(())
        })
        .map_err(|_| ());
    rt.spawn(upstream);
    
    rt.shutdown_on_idle().wait().unwrap();

I am aware of these options:

  1. Use try_send and drop values if downstream is full. This is unacceptable because I want guaranteed delivery.
  2. Use an unbounded channel so I don’t have to block on send(). This is unacceptable because for both RAM and time-sensitive reasons I don’t want queued work to grow unbounded between these components.
  3. Keep the Sender inside an Arc<Mutex<Option<_>>>, and every time I want to do a send, take it out (replacing it with None), and map() after the Send to put it back into my Option for next time. This seems to legitimately meet my requirements but it’s horrible and has locking overhead.
  4. Clone the Sender, call send() on the clone, and return the Send future. This doesn’t significantly cheat the boundedness of the channel because it will block the for_each process until the value makes it into the queue. So it seems to meet my requirements too - but now I have the expense of cloning a Sender every time I want to send a new value, which seems excessive.
  5. Replace for_each with and_then and chain my outgoing channel as a Sink via forward(). I change the and_then body to resolve to Option: if Some, I have a value to pass on to the outgoing channel; I can filter_map out the Nones and forward(). This solution feels right in that I can move my Sender into the forward() call just once and the backpressure works. However I have now contorted my for_each logic. Also I may have two, three or four channels that I will need to fanout my Stream to multiple Sinks and this is going to get old quickly.

Hopefully it’s clear what my difficulty is here. Is there some way of using tokio/futures I’ve missed that would make this easy to do “right”, or are my data flows just not amenable to this?

I’m dreaming of some component that’s like a tokio bounded mpsc channel, except I only need to hold a Arc<NiceSender> to push values (i.e. send() takes &self and returns a future), and it just trusts me that I will wait on the returned future before I send the next value.

Using try_send doesn’t block you from doing another act if it fails, e.g. 4 combined with Either.

Aha that’s a great idea, thanks! I’ve added that to my test playground and it works great. I reduced the channel size to 2 and the log shows that upstream gets put on hold while downstream catches up, which is exactly what I wanted.

EDIT: I should also explicitly check that the error is is_full() before I go and do the clone, but the example works.

fn send_efficiently<T: Copy>(
    sender: &mut Sender<T>,
    value: T,
) -> impl Future<Item = (), Error = ()> {
    match sender.try_send(value) {
        Ok(_) => {
            println!("try_send accepted");
            Either::A(ok(()))
        }
        Err(_) => {
            println!("Backpressure - using a clone");
            Either::B(
                sender
                    .clone()
                    .send(value)
                    .map(|_| ()) // drop clone after send
                    .map_err(|_| ()), // ignore channel error
            )
        }
    }
}

this may also can be done with semaphore

I’ve read the Semaphore and Permit docs and I’m having trouble seeing where it would fit in. Would you mind expanding on when I would acquire/release permits in this situation?