How to do lots of small async I/O efficiently?

I'm experimenting with creating an actor-based async IRC server on top of xactor and I'm having problems with how to efficiently get data on and off the network streams.

Specifically, the individual messages I'm dealing with are very small - dozens of bytes each - and trying to use async_codec (xactor is putting me in async_std ecosystem already, so I used that rather than the tokio version) works but very badly. In a test program I wrote where I piped in a 99k-word dictionary and then echoed it out again, using a codec to split out the lines caused a 6-7x slowdown compared to using AsyncBufReader::lines. Even using lines, a good 60-70% of the execution time is taken up in the writing the output as opposed to reading or the internal actor processing.

I expect that this slowdown is because the architecture generally expects me to be writing a single item at a time, so all my time is being wasted in the overhead of actually setting up the writes vs the transfer of <12 bytes at a time. I'd expect to be able to fix this just by buffering multiple item and only submitting a proper IO call when I've accumulated a significant amount of data, just like the same problem in a sync context.

My problem is, I have no idea how to do this in an async context - particularly the problem of making sure my buffer doesn't get "stuck" if no new output arrives for a while - or whether there is any library that can effectively deal with this case. If there isn't, I'm not averse to learning how to do it from scratch, but I would have no idea where to even start learning, having never written a direct implementation of Future et al before rather than dealing with async functions and combinators.

Does anyone have any suggestions for either how to directly solve this problem, or where to learn enough to do it myself?

It's a bit unclear to me what the full situation here is, but you may be able to find some info in this article: Actors with Tokio

Just to clarify, I'm not having any problems with the actor framework itself, but I'm also not particularly wedded to it if there's some other architecture that might work better to deal with many-to-many relationships between network clients and the internal state of the rooms. (As an exercise, I'm trying to maintain the rooms being independent rather than having a single master process loop that relays everything for every client) Was there any particular detail missing from the OP you'd find helpful?

Thanks for the link too, will try to read it soon.

I think you can just have a loop that does something like this, in pseudocode:

loop {
    let mut msg = read_message().await;
    let _ = timeout(Duration::from_millis(100), async {
        loop {
            msg.concat(read_message().await);
        }
    }).await;
    do_io(msg);
}

Thanks for the suggestion, I knew about timeout but was having problems visualizing how it would help this situation.

Since my last post, I realized that I could spawn a separate task that took items from a channel and use try_recv, which got me the following,

async fn write_channel_to_stream(receiver: Receiver<Bytes>, mut stream: impl AsyncWrite + Unpin) {
    let mut buffer = Vec::with_capacity(4096);
    let mut blocking = false;
    loop {
        let new_item = if blocking {
            blocking = false;
            receiver.recv().await.map_err(|_| TryRecvError::Closed)
        } else {
            receiver.try_recv()
        };
        match new_item {
            Ok(val) => {
                buffer.extend(val);
            }
            Err(TryRecvError::Empty) => {
                stream.write_all(&buffer).await.unwrap_or_else(|_| return); // Break on failure
                buffer.clear();
                blocking = true;
            }
            Err(TryRecvError::Closed) => return,
        }
    }
}

But I'm not very confident whether this is actually correct, whether it would get stuck, or whether it's a good idea to mix blocking and non-blocking reads from the same channel like this. Is this workable or would using timeouts be a better? (I expect they'd be more predictable, which would be helpful for me)

A couple problems with your code:

  • When the channel closes, it immediately returns from the function instead of attempting to flush the buffer first, causing data to be lost.
  • In .unwrap_or_else(|_| return) the return will apply to the closure passed to unwrap_or_else not the outer function - so it is effectively a no-op.
  • The passed-in stream does not need to implement Unpin - you can make it Unpin in the function itself by using a macro like {pin_utils, futures_util, futures}::pin_mut! or {tokio, futures_lite, smol}::pin!.

Here is an alternate version that avoids those problems. I also switched to avoid using the blocking variable and instead just encoded this in the program's structure itself, as it simplifies things.

async fn write_channel_to_stream(receiver: Receiver<Bytes>, stream: impl AsyncWrite) {
    pin!(stream);

    let mut buffer = Vec::with_capacity(4096);
    loop {
        if let Ok(bytes) = receiver.recv().await {
            buffer.extend(bytes);
        } else {
            return;
        }
        while let Ok(bytes) = receiver.try_recv() {
            buffer.extend(bytes);
        }
        if stream.write_all(&buffer).await.is_err() {
            return;
        }
        buffer.clear();
    }
}
1 Like

Thanks for your help, that was much simpler than what I was trying to come up with. (and I'd forgot about return in closures) I had mostly given up on getting pinning working correctly, so thanks for pointing out the macro as well

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.