How to wrap a Tonic streaming operation in a trait

Hi, I'm using Tonic to talk to a (Java based) system called Axon Server using their gRPC API.

To improve testability I'd like to expose the API operations as a trait, like this:

When I try to implement it, however, it won't compile. It seems that type StaticCommandProviderOutboundStream isn't 'static enough.

(You can guess the ServerHandleTrait from the impl :wink: )

type StaticCommandProviderOutboundStream =
    dyn Stream<Item = CommandProviderOutbound> + Send + 'static;
type CommandProviderOutboundStreamBox =
    Pin<Box<StaticCommandProviderOutboundStream>>;

struct ServerHandle {
    conn: Channel
}

// #[tonic::async_trait]
// trait ServerHandleTrait { ... }

#[tonic::async_trait]
impl ServerHandleTrait for ServerHandle
{
    async fn open_command_provider_inbound_stream(
        &self, request: CommandProviderOutboundStreamBox
    ) -> Result<
        tonic::Response<Streaming<CommandProviderInbound>>,
        tonic::Status
    > {
        let request = Request::new(request);
        let mut client = CommandServiceClient::new(self.conn.clone());
        client.open_stream(request).await
    }
}

There is a complex real-time interaction between inbound and outbound messages, so I can't simply pass in a Vec.

When I remove the Box and make it an impl parameter, the compiler complains that ServerHandleTrait can't be made into an object. When I dereference the request, like this:

        client.open_stream(*request).await

the compiler complains that the size of *request isn't known at compile time. What can I do?

I'm not entirely sure what the errors you're encountering or what changes to the code you're trying to make exactly, but I don't really suggest wrapping the streaming operation itself in a trait. I'd instead swap out the underlying network connection as per the Tonic's mock example: tonic/mock.rs at master 路 hyperium/tonic 路 GitHub

In general, you'll have greater success testing network clients/services in Rust by using fake, in-memory connections with mocks/stubs you write by hand on the opposite side of the "connection".

Ah, my Java habits get in the way. I've come to expect to be able to mock pretty much any type (and was delighted to stumble across a crate mockall, even though it seems to be limited to traits).

What you're saying is that my test should setup an in-memory server that mocks AxonServer and convince Tokio to have my client communicate to the mock server instead of sending real gRPC requests to an external server, right?

Ah, my Java habits get in the way. I've come to expect to be able to mock pretty much any type (and was delighted to stumble across a crate mockall , even though it seems to be limited to traits).

Yeah, Mockito et. al. in Java rely on pervasive reflection, which isn't a thing in Rust. I've seen this be a common stumbling block for Rust users coming from the JVM ecosystem.

What you're saying is that my test should setup an in-memory server that mocks AxonServer and convince Tokio to have my client communicate to the mock server instead of sending real gRPC requests to an external server, right?

That's correct. It's a little more work than Mockito, but in my experience, this sort of approach made me feel much more confident of my tests and code in Java/Python. The mock example I linked to is pretty handy to building that out.

It still bothers me that I expect to be able to expose a method on an async trait, but the compiler won't let me. I struggle with this before. Is there some rule that says "trait, async, and impl: pick two"?

Let me try to explain more clearly. So the signature of the Tonic generated open_stream is

pub struct CommandServiceClient<T> {
    inner: tonic::client::Grpc<T>,
}
impl<T> CommandServiceClient<T> {
    pub async fn open_stream(
        &mut self,
        request: impl tonic::IntoStreamingRequest<
            Message=super::CommandProviderOutbound,
        >,
    ) -> Result<...> {}
}

I understand this is just another way of saying

impl<T> CommandServiceClient<T> {
    pub async fn open_stream<
        R: tonic::IntoStreamingRequest<
          Message=super::CommandProviderOutbound,
        >
    >(
        &mut self,
        request: R,
    ) -> Result<...> {}
}

Now if I create a stream, like so

async fn test() {
    let my_stream = stream! {...};
    let _out_stream = client.open_stream(stream).await.unwrap();
}

Then this compiles, because stream! creates an instance of an anonymous type that fits the bound for <R>. Now I want to refactor it like this:

async fn test() {
    let my_stream = stream! {...};
    exec(my_stream).await;
}
async fn exec(stream: impl tonic::IntoStreamingRequest<
    Message=super::CommandProviderOutbound,
>) {
    // Enrich the stream with metadata
    let _out_stream = client.open_stream(stream).await.unwrap();
    // Post-process the _out_stream
}

And I want to make exec somebody else's problem by defining it on a trait and let them implement it:

#[tonic::async_trait]
trait StreamExecutor {
    async fn exec(stream: impl tonic::IntoStreamingRequest<
        Message=super::CommandProviderOutbound,
    >);
}

Now if I try to make a struct implement this trait and pass it around, the compiler complains that StreamExecutor cannot be made into an object because of the (implicit) generic parameter (remember <R>?). So, as the documentation states that the compiler can simply but non-trivially derive an impl from a dyn, I tried to turn the argument in a dyn. However, a dyn is not Size, so I had to wrap it up in a Box. And because of async I had to Pin it and adorn it with Send + 'static, so:

#[tonic::async_trait]
trait StreamExecutor {
    async fn exec(stream: Pin<Box<dyn tonic::IntoStreamingRequest<
        Message=super::CommandProviderOutbound,
    > + Send + 'static>>);
}

Now the trait compiles, I can call it with a stream! argument, and I can pass implementations of it around. Yeay!

Alas, I can't call open_stream anymore in the implementation

#[tonic::async_trait]
impl StreamExecutor for TestStruct {
    async fn exec(stream: Pin<Box<dyn tonic::IntoStreamingRequest<
        Message=super::CommandProviderOutbound,
    > + Send + 'static>>) {
        // Enrich the stream with metadata
        let _out_stream = client.open_stream(stream).await.unwrap();
        // Post-process the _out_stream
    }
}

because the compiler complains that

the type `Pin<Box<dyn Stream<Item = CommandProviderOutbound> + Send>>` does not fulfill the required lifetime
...
= note: type must satisfy the static lifetime

Note that the 'static that I put after the Send was completely ignored. Also, the stream lived long enough when it was still an impl. Wrapping it up in a Box shouldn't make it any less 'static, right? Why won't the compiler let me do this?

Where does client come from? And what does the rest of the compiler message say?

The client is generated by the Tonic crate that provides a way to talk to gRPC services on top of Tokio. The client type and supporting message types are generated from a protobuf specification that I pulled from the Git repo of the server I wanted to connect to. But I guess I would have the same problem if I tried to make my own simple protobuf specification with a two-way streaming method.

The full compiler message is:

error[E0477]: the type `Pin<Box<dyn Stream<Item = CommandProviderOutbound> + Send>>` does not fulfill the required lifetime
   --> dendrite-lib/src/axon_utils/mod.rs:141:5
    |
141 | /     {
142 | |         let mut client = CommandServiceClient::new(self.conn.clone());
143 | |         client.open_stream(request).await
144 | |     }
    | |_____^
    |
    = note: type must satisfy the static lifetime

For more information about this error, try `rustc --explain E0477`.

(This is from my real code, not the slightly adapted code that I wrote to try and explain the problem more clearly.)

The signature for function client.open_stream is:

        pub async fn open_stream(
            &mut self,
            request: impl tonic::IntoStreamingRequest<
                Message = super::CommandProviderOutbound,
            >,
        ) -> Result<
                tonic::Response<tonic::codec::Streaming<super::CommandProviderInbound>>,
                tonic::Status,
            >;

Would you be able to come up with a minimal example that triggers this, including the .proto definitions? The error message isn't useful enough to solve the issue on its own, and I can't seem to reproduce it myself with some dummy types. I suspect it has to do with how #[tonic::async_trait] is expanded.

Sure. I could reproduce it with this .proto file:

syntax = "proto3";
package issue;

service IssueService {

  /* Opens a stream allowing clients to register command handlers and receive commands. */
  rpc OpenStream (stream OutboundItem) returns (Empty) {
  }
}

message OutboundItem {
  string dummy = 1;
}

message Empty {
}

And this Rust fragment:


type StaticOutboundStream = dyn Stream<Item = OutboundItem> + Send + 'static;
type OutboundStreamBox = Pin<Box<StaticOutboundStream>>;

struct ServerHandle {
    conn: Channel
}

#[tonic::async_trait]
trait ServerHandleTrait {
    async fn send_outbound_stream(&self, request: OutboundStreamBox)
        -> Result<tonic::Response<Empty>, tonic::Status>;
}
#[tonic::async_trait]
impl ServerHandleTrait for ServerHandle
{
    async fn send_outbound_stream(&self, request: OutboundStreamBox)
        -> Result<tonic::Response<Empty>, tonic::Status>
    {
        let mut client = IssueServiceClient::new(self.conn.clone());
        client.open_stream(request).await
    }
}


#[cfg(test)]
mod tests {
    use super::*;
    use async_stream::stream;
    use mockall::mock;
    use crate::axon_utils::ServerHandle;

    mock! {
        #[derive(Debug)]
        ServerHandle {}
        #[tonic::async_trait]
        impl ServerHandleTrait for ServerHandle
        {
            async fn send_outbound_stream(&self, request: OutboundStreamBox)
                -> Result<tonic::Response<Empty>, tonic::Status>;
        }
    }

    #[tokio::test]
    async fn test_submit_command() {
        let server_handle = MockServerHandle::new();
        let outbound_stream = stream! {
            let outbound_item = OutboundItem {
                dummy: "Help!".to_string(),
            };
            yield outbound_item;
        };

        server_handle.send_outbound_stream(Box::pin(outbound_stream));
    }
}

Thank you. The issue isn't in your code but in the compiler's trait resolution. The error message is a complete red herring: the real error is that the compiler can't determine that the Future created by send_outbound_stream is Send. The cause appears to be an interaction between the generic async functions IssueServiceClient::open() and Grpc::client_streaming() and the indirection created by IntoStreamingRequest's Stream type. Here's a minimal example of the error (Rust Playground):

// corresponds to `Stream`
trait Trait {}

// corresponds to `IntoStreamingRequest`
trait Funnel {
    type This: ?Sized + Trait + 'static;
}

impl<T: ?Sized + Trait + 'static> Funnel for T {
    type This = T;
}

// corresponds to the `dyn Future + Send` bound created by `async_trait`
fn assert_send<T: ?Sized + Send>(_: &T) {}

// corresponds to `Grpc::client_streaming()`
async fn foo<T: ?Sized>() {}

// corresponds to `IssueServiceClient::open()`
async fn bar<T: ?Sized + Funnel>() {
    foo::<T::This>().await
}

// corresponds to `ServerHandle::send_outbound_stream()`
pub fn test() {
    let _outer = async {
        let inner = bar::<dyn Trait + Send + 'static>();
        assert_send(&inner);
        inner.await
    };

    // assert_send(&_outer);
    // error[E0477]: the type `dyn Trait + Send` does not fulfill the required lifetime
}

What's really weird is that most trivial variations of this compile successfully, so it must be hitting some kind of corner case in the trait resolution. Also, the async block at the end doesn't appear necessary to trigger the issue; even if I write a manual state machine with poll(), the compiler produces spurious lifetime errors.

Thanks a lot for clearing this up. I've been refactoring in circles the last few days to try and make the compiler happy, but there appears to be no (safe?) way. And I understand from your rendering of the problem that the async_trait macro, the stream! macro and, the Tonic code generator are all guilt-free! :wink:

Well, I got the advice to pursue my original goal from a completely different angle, but I'm just curious: is it possible to convince the compiler that it should work using unsafe? And is this something that the compiler-team would like to be aware of?

I honestly don't know; judging by the broken error messages, something must have gone horribly wrong in the trait resolver. I wouldn't trust any unsafe code to be sound.

And is this something that the compiler-team would like to be aware of?

Almost certainly. I've been thinking of locating the source of the error before filing an issue. (Maybe not today, though, since this issue has already broken my brain.)

I would like to thank both @endsofthreads for the suggestion to try a different approach to my goal (I now have a working test that can serve as a template for many tests to come) and @LegionMammal978 for the analysis of the problem that blocked my original approach. Extra kudos for stripping out complicating factors like Tonic, Prost and async_trait. The use of ?Sized was especially instructive, thanks!

I would like to be able to create my own traits around code provided by others and in that respect I鈥檓 still facing a brick wall, so I鈥檓 still curious for a real solution (either a compiler fix or an unsafe trick) for the fact that the compiler can鈥檛 derive Send where it should. Therefore I didn鈥檛 tick the box Solution yet. :wink: Nevertheless, I'm very grateful for your help.

FYI, I asked around on the Discord a few days ago, and apparently this is an instance of a recently discovered bug (Incorrect "implementation of `Send` is not general enough" error with lifetimed `Send` impl used in `async fn` 路 Issue #96865 路 rust-lang/rust 路 GitHub). Unfortunately, it seems that the fix can only be applied on Tonic's end (by awaiting Grpc::client_streaming() through a wrapper type).

I posted a plea for help on the Tonic Discord channel :crossed_fingers:

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.