Using async/await internally with no internal runtime, but exposing a nonblocking poll API — is this a reasonable design?

I'm designing a database driver in Rust, and I'm exploring a hybrid design where:

  • async/await is used internally to implement protocol logic
  • but the public API is a libpq-style nonblocking poll interface

So instead of exposing async functions, the API looks more like:

  • start an operation
  • poll for progress
  • if it would block, report whether the caller should wait for read or write readiness
  • poll again
  • eventually return the result

One of my main goals is to expose this library to Python with good async support.

I’ve looked at approaches that embed a Rust async runtime (for example via pyo3 + tokio) and then expose that as Python async APIs. However, in my benchmark, this did not perform as well as I expected, likely because two runtimes end up interacting.

Because of that, I’m intentionally trying to avoid embedding or depending on any async runtime inside the library.

Instead, I want:

  • the Rust library to stay runtime-agnostic
  • no internal executor
  • and the outer environment (for example Python's asyncio) to drive IO readiness

This is partly inspired by libpq/psycopg, where libpq's nonblocking API can be integrated into Python async code.

Internally, though, I would still like to use async/await, because writing the protocol as a manual FSM is quite verbose and hard to maintain once there are multiple write/read steps.

So the design I'm considering is:

  • async/await is used purely as an implementation technique
  • futures are stored internally and manually polled
  • no executor is used; futures are polled directly
  • when IO would block, the code records whether it needs read or write readiness
  • the public poll() API returns that information to the caller

Here this example shows the control-flow shape of the design.

use std::cell::Cell;
use std::future::Future;
use std::io::{self, Read, Write};
use std::net::TcpStream;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll, Waker};

// the reason I’m using Tokio’s traits is to leverage tokio-util’s Codec.
// I found that tokio-postgres uses Framed and Codec, and they seemed
// like very convenient utilities.
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IoInterest {
    Read,
    Write,
}

#[derive(Debug, Clone)]
pub struct SocketInterest(Rc<Cell<Option<IoInterest>>>);

#[derive(Debug)]
pub struct Socket {
    inner: TcpStream,
    interest: SocketInterest,
}

impl Socket {
    pub fn new(stream: TcpStream) -> io::Result<Self> {
        stream.set_nonblocking(true)?;
        Ok(Self {
            inner: stream,
            interest: SocketInterest(Rc::new(Cell::new(None))),
        })
    }

    pub fn interest(&self) -> SocketInterest {
        self.interest.clone()
    }
}

impl AsyncRead for Socket {
    fn poll_read(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<io::Result<()>> {
        let this = self.get_mut();
        let unfilled = buf.initialize_unfilled();

        match this.inner.read(unfilled) {
            Ok(0) => Poll::Ready(Ok(())),
            Ok(n) => {
                buf.advance(n);
                Poll::Ready(Ok(()))
            }
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                this.interest.0.set(Some(IoInterest::Read));
                Poll::Pending
            }
            Err(e) => Poll::Ready(Err(e)),
        }
    }
}

impl AsyncWrite for Socket {
    fn poll_write(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        let this = self.get_mut();

        match this.inner.write(buf) {
            Ok(n) => Poll::Ready(Ok(n)),
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                this.interest.0.set(Some(IoInterest::Write));
                Poll::Pending
            }
            Err(e) => Poll::Ready(Err(e)),
        }
    }

    fn poll_flush(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
    ) -> Poll<io::Result<()>> {
        let this = self.get_mut();

        match this.inner.flush() {
            Ok(()) => Poll::Ready(Ok(())),
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                this.interest.0.set(Some(IoInterest::Write));
                Poll::Pending
            }
            Err(e) => Poll::Ready(Err(e)),
        }
    }

    fn poll_shutdown(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
    ) -> Poll<io::Result<()>> {
        Poll::Ready(self.get_mut().inner.shutdown(std::net::Shutdown::Write))
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ClientResponse {
    CommandComplete { rows_affected: u64 },
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ClientPoll {
    Idle,
    Pending(IoInterest),
    Ready(ClientResponse),
}

type ClientFuture =
    Pin<Box<dyn Future<Output = io::Result<(Socket, ClientResponse)>>>>;

enum ClientState {
    Idle(Socket),
    Executing(ClientFuture, SocketInterest),
    Transitioning,
}

pub struct Client {
    state: ClientState,
}

async fn do_execute(mut socket: Socket) -> io::Result<(Socket, ClientResponse)> {
    // toy protocol:
    //
    // client -> server:
    //   [b'Q'][len: u8][query bytes...]
    //
    // server -> client:
    //   [b'C'][rows_affected: u64 big-endian]

    let query = b"SELECT 1";
    let query_len = u8::try_from(query.len())
        .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "query too long"))?;

    socket.write_all(&[b'Q', query_len]).await?;
    socket.write_all(query).await?;
    socket.flush().await?;

    let mut tag = [0u8; 1];
    socket.read_exact(&mut tag).await?;
    if tag[0] != b'C' {
        return Err(io::Error::new(
            io::ErrorKind::InvalidData,
            "unexpected response tag",
        ));
    }

    let mut rows_buf = [0u8; 8];
    socket.read_exact(&mut rows_buf).await?;
    let rows_affected = u64::from_be_bytes(rows_buf);

    Ok((
        socket,
        ClientResponse::CommandComplete { rows_affected },
    ))
}

impl Client {
    pub fn new(socket: Socket) -> Self {
        Self {
            state: ClientState::Idle(socket),
        }
    }

    pub fn start_execute(&mut self) -> io::Result<()> {
        let old_state = std::mem::replace(&mut self.state, ClientState::Transitioning);

        match old_state {
            ClientState::Idle(socket) => {
                let sensor = socket.interest();
                let fut = Box::pin(do_execute(socket));
                self.state = ClientState::Executing(fut, sensor);
                Ok(())
            }
            other => {
                self.state = other;
                Err(io::Error::new(io::ErrorKind::Other, "client is busy"))
            }
        }
    }

    pub fn poll(&mut self) -> io::Result<ClientPoll> {
        let ClientState::Executing(fut, sensor) = &mut self.state else {
            return Ok(ClientPoll::Idle);
        };

        // no executor: the caller explicitly drives progress
        let waker = Waker::noop();
        let mut cx = Context::from_waker(waker);

        match fut.as_mut().poll(&mut cx) {
            Poll::Ready(result) => {
                let (socket, response) = result?;
                self.state = ClientState::Idle(socket);
                Ok(ClientPoll::Ready(response))
            }
            Poll::Pending => {
                let interest = sensor.0.take().unwrap_or(IoInterest::Read);
                Ok(ClientPoll::Pending(interest))
            }
        }
    }
}

My questions are:

  1. Does this seem like a reasonable architecture?
    (async/await internally, nonblocking poll API externally, no runtime)
  2. What kinds of subtle issues might I be missing?
  3. In practice, would it be better to implement the protocol as a manual FSM?
  4. Are there existing projects using a similar approach?
  5. Any advice would be greatly appreciated.

there's a term for this kind of technique, where the rust async is used to compose state machines, mainly to implement network protocols. people call it "sans IO". you can read details on the concept here:

your solution is a little bit different from typical sans-io approach, in that your code also contains the adapter/wrapper for the actual io, and your state machine is hardcoded to use the specific io (as opposed to, say, generic/parameterized over the underlying io type).

I think coroutines might be more suitable for this purpose, but for now, async/await is what we have in stable. there exists a crate asansio that emulates coroutine style two-way communication on top of async and Future, you can check it out if you are interested.

personally, I think your solution is fine, at least from the usability of a rust library. but I don't know anything about async in python, so I can't give any opinion on that.

another idea I have is, which might be totally nonsense because it's python related, does python expose the async sockets in its API? your example uses the tcp sockets from rust standard library. of course this works, but I wonder maybe your state machine can be made truly IO agnostic, if you can adapt the python socket for it, essentially making the python runtime the reactor for rust async io?

I’ve also looked into the sans-IO approach, and I think it’s a very elegant idea. There are quite a few Rust libraries that implement sans-IO, such as firezone, str0m, and even rustls, which doesn’t explicitly advertise itself as sans-IO but follows a similar structure internally. However, in all of these cases, the state machine seems to be implemented manually.

I’ll take a look at asansio. Thanks for pointing me to that crate. I also wish coroutines were stable in Rust—it’s a bit unfortunate that they’ve remained unstable for so many years. Looking at psycopg3, for example, it uses generators (Python-style coroutines) internally.

As for Python, there are asyncio-specific sockets available. However, I’m not quite sure how (or whether) these should be integrated with Rust, especially in terms of letting Python’s asyncio runtime act as the reactor for Rust-side async I/O.