Wrap/Bridge std::io::Read to AsyncRead

I'm using interprocess to do ipc. This is working. I now want to develop a communication protocol using capnp-rpc + futures (as the execution environment).

Interprocess's Stream implements std::io::Read & std::io::Write, while capnp-rpc's twoparty::VatNetwork::new() expects async versions (futures_io::AsyncRead & futures_io::AsyncWrite).

Is there a utility crate that bridges between the sync and async versions?

AsyncRead/Write from futures-io has an implementation for Pin. Could this help?

let plugin_interface_client: crate::plugin::protocol_capnp::plugin_interface::Client =
    capnp_rpc::new_client(PluginInterfaceImpl { name: "ACMEPLUGIN".to_string() });

for mut conn in listener.incoming().filter_map(handle_error) {
    let network = twoparty::VatNetwork::new(
        Pin::new(conn.as_read()),
        Pin::new(conn.as_write()),
        rpc_twoparty_capnp::Side::Server,
        Default::default(),
    );

    let rpc_system =
        RpcSystem::new(Box::new(network), Some(plugin_interface_client.clone().client));

    ...
}

gives en error that it needs a &mut version of Read/Write and I don't know how to get one...

error[E0277]: the trait bound `&LocalSocketStream: DerefMut` is not satisfied
   --> prototype/src/plugin/server.rs:78:17
    |
77  |             let network = twoparty::VatNetwork::new(
    |                           ------------------------- required by a bound introduced by this call
78  |                 Pin::new(conn.as_read()),
    |                 ^^^^^^^^^^^^^^^^^^^^^^^^ the trait `DerefMut` is not implemented for `&LocalSocketStream`, which is required by `Pin<&LocalSocketStream>: AsyncRead`
    |
    = help: the trait `AsyncRead` is implemented for `Pin<P>`
    = note: `DerefMut` is implemented for `&mut LocalSocketStream`, but not for `&LocalSocketStream`
    = note: required for `Pin<&LocalSocketStream>` to implement `AsyncRead`
note: required by a bound in `capnp_rpc::twoparty::VatNetwork::<T>::new`
   --> /Users/xxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/capnp-rpc-0.20.2/src/twoparty.rs:231:8
    |
231 |     T: AsyncRead + Unpin,
    |        ^^^^^^^^^ required by this bound in `VatNetwork::<T>::new`
...
243 |     pub fn new<U>(
    |            --- required by a bound in this associated function

Looks like interprocess has an async Stream: Stream in interprocess::local_socket::tokio - Rust

That implements Tokio's async traits instead of futures', but you can bridge those with TokioAsyncReadCompatExt in tokio_util::compat - Rust

Thanks! ...Just found that futures has a wrapper build in: AllowStdIo. This seems to work.

As for why your method doesn't work:

The implementation for Pin is only for when the pinned type already implements AsyncRead, so it's not going to help. The only way to make Read types properly implement AsyncRead is by spawning a thread.

I wouldn't use this unless you're not actually doing something asynchronously, either by only running one future or using a Read type that is always ready.

You're most likely just blocking the current async task with that, which is not good if you want to write something actually async. It is supposed to be used when the underlying I/O operation will return immediately, either because it's ready or because it's in non-blocking mode and will return io::Error::WouldBlock if it's not ready. In the latter case you will also have to manually handle waking your task up when it can make progress, which is a pretty low-level operation.

Ah, I see. Because of <P as Deref>::Target: AsyncRead ... I have much to learn.

I don't plan to use async except that I have to, to get capnp-rpc's RpcSystem running and I intend to isolate that.

At a very high level this is what I want to do:

  1. At startup, for each configured plugin:
  • Spawn a child process

  • Spawn a thread that executes the RpcSystem for that plugin & handles sending/receiving the requests/responses

  1. Call remote functions on the plugins sequentially and synchronously.