tokio::sync::mpsc::Receiver skips first message when drained inside tokio::spawn

I'm trying to read a ZMQ stream coming through a tokio::sync::mpsc::Receiver channel.

First, here are the internal workings of the zmq subscription:

#[inline]
pub async fn subscribe_single(endpoint: &str) -> Result<Receiver<Result<Message>>> {
    let (tx, rx) = channel(1);
    let context = Context::new();

    let socket = new_socket_internal(&context, endpoint)?;

    tokio::spawn(subscribe_internal::<()>(socket, tx));

    Ok(rx)
}

#[inline]
fn new_socket_internal(context: &Context, endpoint: &str) -> Result<Socket> {
    let socket = context.socket(zmq::SUB)?;
    socket.connect(endpoint)?;
    socket.set_subscribe(b"")?;

    Ok(socket)
}

#[inline]
fn recv_internal(socket: &Socket, data: &mut [u8; DATA_MAX_LEN]) -> Result<Message> {
    let mut topic = [0u8; TOPIC_MAX_LEN];
    let mut sequence = [0u8; SEQUENCE_LEN];

    let topic_len = socket.recv_into(&mut topic, 0)?;
    if topic_len > TOPIC_MAX_LEN {
        return Err(Error::InvalidTopic(topic_len, topic));
    }

    if !socket.get_rcvmore()? {
        return Err(Error::InvalidMutlipartLength(1));
    }

    let data_len = socket.recv_into(data, 0)?;
    if data_len > DATA_MAX_LEN {
        return Err(Error::InvalidDataLength(data_len));
    }

    if !socket.get_rcvmore()? {
        return Err(Error::InvalidMutlipartLength(2));
    }

    let sequence_len = socket.recv_into(&mut sequence, 0)?;
    if sequence_len != SEQUENCE_LEN {
        return Err(Error::InvalidSequenceLength(sequence_len));
    }

    if !socket.get_rcvmore()? {
        return Message::from_parts(&topic[0..topic_len], &data[0..data_len], sequence);
    }

    let mut len = 3;

    loop {
        socket.recv_into(&mut [], 0)?;

        len += 1;

        if !socket.get_rcvmore()? {
            return Err(Error::InvalidMutlipartLength(len));
        }
    }
}

#[inline]
async fn subscribe_internal<B>(
    socket: Socket,
    tx: Sender<Result<Message>>,
) -> ControlFlow<B, Infallible> {
    let mut data: Box<[u8; DATA_MAX_LEN]> =
        vec![0; DATA_MAX_LEN].into_boxed_slice().try_into().unwrap();

    loop {
        let msg = recv_internal(&socket, &mut data);

        match tx.send(msg).await {
            Ok(_) => ControlFlow::Continue(()),
            Err(_) => ControlFlow::Break(()),
        };
    }
}

If I try to read the channel like this, everything goes as expected:

#[tokio::main]
async fn main() ->  Result<(), Box<dyn Error>> {
    let zmq_url = "tcp://127.0.0.1:28332";
    let mut zmq = bitcoincore_zmq::subscribe_single(&zmq_url).await.unwrap();

    while let Some(msg) = zmq.recv().await {
        match msg {
            Ok(msg) => println!("Received message: {msg}"),
            Err(err) => println!("Error receiving message: {err}"),
        }
    }

    Ok(())
}

Meanwhile, what I really want to achieve is something like this:

#[tokio::main]
async fn main() ->  Result<(), Box<dyn Error>> {
    let zmq_url = "tcp://127.0.0.1:28332";
    let zmq = bitcoincore_zmq::subscribe_single(&zmq_url).await.unwrap();

    tokio::spawn(listener(zmq));

    loop {}
}

async fn listener(mut zmq: Receiver<Result<bitcoincore_zmq::Message, bitcoincore_zmq::Error>>) -> Result<(), bitcoincore_zmq::Error> {
    while let Some(msg) = zmq.recv().await {
        match msg {
            Ok(msg) => println!("Received message: {msg}"),
            Err(err) => println!("Error receiving message: {err}"),
        }
    }

    Ok(())
}

But I'm getting a puzzling behavior where the first message is always consistently skipped. I only get to print the second message onwards.

My knowledge on the inner workings of tokio (or async Rust for that matter) is still relatively shallow, so I'd really appreciate any insights on why this is happening, and how I would be able to spawn a listener thread that wouldn't miss any messages.

Thanks in advance!

Your code is blocking the thread in several places. Please see this article:

Specifically, the calls into zmq and the loop {} qualify. You want zmq on a dedicated thread outside of Tokio, and you can use std::future::pending() instead of the loop.

It's not clear that this is your actual issue, but I have absolutely seen this kind of thing being the cause behind "channel not working" in the past.

3 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.