How to get a notification, or callback, when data arrives in a socket?

In Boost Asio it's normal to register a callback for when data arrives in a socket using async_read which receives a buffer and a callback. When the buffer is full the callback is executed.

How do I do the same thing using Tokio for a long-lived connection?

As far as I can tell from the documentation the equivalent code with Tokio is:

use tokio::io::AsyncReadExt;

tokio::spawn(async move {
    let mut buf = [0; 256];
    reader.read_exact(&mut buf).await?;
    // the rest of the code here
});

You can look at the AsyncReadExt docs for other useful methods on an AsyncRead.

1 Like

Won't read_exact block on a long lived connection even after reading all data?

.await doesn't block threads. It only suspends execution of code until the data arrives. It behaves exactly like turning rest of that async block into a callback (behind the scenes it is a callback from a Waker given by read_exact to runtime that executes that async block).

1 Like

So this would always be called when data arrives in the socket or only once?

spawn runs code once. read_exact waits until it got exactly as many bytes as required, even if that took multiple syscalls.

I know I've accepted a solution but just so I don't need to open a new thread, is there a way to always be notified whenever data arrives in the socket, assuming the connection will never be closed?

loop {
   let bytes_read = socket.read(&mut buf).await?;
}

How would I make this live for the same amount of time as a struct? Can I turn it into a property?

Async code can be stored as an object — a Future, but it doesn't do anything while it's sitting in a struct. It needs to be spawned to run.

You can tie it to an object using channels:

let (sender, receiver) = tokio::sync::mpsc::channel(1);
// add your own error handling
spawn(async move {
   loop {
      let bytes_read = socket.read(&mut buf).await?;
      // it'll fail when receiver is dropped
      sender.send(buf[0..bytes_read].to_vec())?; 
   }
});

// store receiver in a struct, and later you can do:
somestruct.receiver.recv().await;

but that's a bit overcomplicated, because usually you'd store socket in a struct, and just read from it in async fn.

Callback-based design is rather different than async/future-based design, so I suspect you'll need to adjust your design to fit async code, instead of trying to make async functions work like callbacks.