Tokio: briding to sync code for rapid sending

Hey folks!

Let me first explain what I'm doing. Feel free to skip to the part with the code if you find this too wordy. :grinning:

I'm building a little something to operate a network attached device with a midi control surface. To do so, I have to listen/write to a WebSocket and a MIDI connection simultaneously. Using tokio & its mpsc::channels, my solution works well enough in one direction (my first approach was to use just std::thread, which also kind of worked, but gave me problems I'll spare you here). Now, I also want to update the surface's e.g. LEDs based on the state of the device I'm controlling (which is why I have to do the concurrency dance in the first place). The initial state of the remote device comes across the wire easily, that's not an issue. When I press the mute button for a channel on the control surface for instance, I now don't only want to send out a WebSocket message via said messaging channel, but also an update to the control surface itself. And while there is an asynchronous version of tungstenite for the WebSocket part, the same can not be said for the MIDI library that I programmed against.

Following tokio's instructions, I approached bridging asynchronous to synchronous code by spawning a new runtime and blocking at the right place (extra types and conversions abbreviated for readability):

// 'nk' refers to 'NanoKontrol'
let rt = Runtime::new().unwrap();
let _conn_in = midi_in
        .connect(
            // snip
            move |_, message, _| { // has to be a synchronous closure
                rt.block_on(async {
                    command_from_nk_tx // mpsc channel wired up in fn main()
                        .send(message)
                        .await
                        .map_err(|e| error!("Error: {}", e.to_string()))
                        .unwrap()
                })
            },
            // snip
        )
        .unwrap();

while let Some(update) = update_from_ui_rx.recv().await {
            conn_out
                .send(update)
                .unwrap();
        }

In fn main(), this is wired up like this:

tokio::spawn(async move {
  // `command_from_nk_rx`: receiving end of `command_from_nk_tx` from above
  while let Some(command_from_nk) = command_from_nk_rx.recv().await {
              command_for_ui_tx // forwards to WebSocket
                  .send(command_from_nk)
                  .await
                  .expect("Sending cmd to UI over mpsc channel failed");
  
              nk2_update_tx // reflects back to the control surface
                  .send(command_from_nk)
                  .await
                  .expect("Sending cmd back to NK2 over mpsc channel failed");
          }
    });

This is fine if I only press buttons. Those yield a singular MIDI event. When I move a fader though, these events come in quick succession and I receive a SendError after two or three successfully processed updates. The .map_err(|e| error!("Error: {}", e.to_string()))) says the reason is: Error: channel closed.

I assume this is due to the rt.block_on in the above snippet? So when the events come in too quickly, the whole connect block from the above sample is still in a blocking state and all or something. That makes me wonder though, because the error message means that the receiver is unavailable, and doesn't refer to the sender. :thinking: Also, if I remove the nk2_update_tx.send part from the second part, I don't see those errors.

Can anybody make something of this? I'll happily show you more code if needs be.

I'm not sure what the exact problem is, but some troubleshooting advice:

In general, a channel being closed means the receiver was dropped. This can't happen while you have a running recv() loop for that channel, so the loop must have stopped when it should have been continuing — if it doesn't have any break then the obvious conclusion is that it panicked. Consult your output carefully for any panic messages. Even if you can't find any — assume that “channel closed” is a side effect of the problem that you can detect elsewhere, rather than itself much of any evidence of what the problem is.

In this kind of situation it can also be helpful to sprinkle around eprintln!s or other logging for "got to this part of the code", so you know if e.g. a loop exited when you weren't expecting it to — in general, make the operations of your code not silent so you can tell which parts are working as expected.

Thank you! :slightly_smiling_face:

I'm not sure what the exact problem is

Well, it was that I didn't expect the given channel to close. :wink: Sorry if I didn't make myself clear enough though.

I already had those debugging prints in the code, but your post helped clarify what I was already partially suspecting: The blocking call was not the issue, but doing two things at once in the while let Some(command_from_nk) = command_from_nk_rx.recv().await loop apparently was not a good idea. I still don't actually know why this was an issue, maybe the first awaited call took long enough that the sender for the second one became unavailable, but that's just a wild guess. But now that I switched from mpsc::channel to sync::watch which is basically spmc (the inversion of mpsc pattern if you will), my problems are gone. Gives me one more future to await in the main function, but that's not too bad.

So, thanks again!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.