How to wrap async stream (for a library)?

I am trying to create a library that acts as a simple wrapper around a WebSocket API. For this I am using tokio_tungstenite, which handles the Websocket Connection. My goal is to wrap tokio_tungstenite's connection into a struct (let's call it MyLib) such that someone could do the following:

let mut client = MyLib::new("wss://example.org/websocket").await.unwrap();
client.create_thing(MyLib::Thing { data: 0u8 }).await.unwrap();
client.listen_for_updates(| update | {
  // Where `update` is a wrapped message type (ie MyLib::Response)
  println("{}", update);
});

The specifics of what it does isn't very important, just that it needs to filter and/or map the original Stream that tokio_tungstenite returns such that the types aren't just serde_json values, but actual Rust structs.

Right now we have the struct:

struct MyLib {
  reader: ??,
  writer: ??,
}

impl MyLib {
  async fn new() -> Self {
    let (mut ws, _) = connect_async(test_url).await.unwrap();
    let (mut writer, mut reader) = ws.split();
    
    // we make a request to authenticate the connection
    
    let mut reader = reader.filter_map(| message | async {
      // map it to some rust type
    });
    
    MyLib {
      reader,
      writer,
    }
  }
  // ...
}

But with the above code we get a very complicated type that MyLib.reader and MyLib.writer would need to be (and any type tried has not successfully compiled). We have also tried saying that the type of MyLib.writer could be a trait object of trait Sink and MyLib.reader could be of trait Stream, but that also has issues. You'll notice that I'm not mentioning any specific compiler errors; at this point I've mangled the code so badly trying to figure this out that I no longer have saved what I've tried before.

I feel like this should be a common issue to solve, but I'm new to Rust and especially async Rust, so I don't know best practices or how to best design a solution to this issue.

Any tips / best practices for solving this problem?

Much thanks!

1 Like

Hi @mkeedlinger :wave: and welcome :slightly_smiling_face:

Dynamic type erasure by boxing is the usual go-to solution, here:

use ::futures::stream::{BoxStream, StreamExt};

let mut reader: BoxStream<_> = reader.filter_map(...).boxed();
1 Like

Thanks @Yandros! I am trying your solution right now.

What does it mean when you have :: at the beginning of the use statement?

:: = crate root My bad! (My dog had just gouged my foot.)
See @Yandros answer below.

2 Likes

So it's equivalent to crate::?

In edition 2018 (the default for new crates), ::name is used to refer to an external crate named name, unambiguously (vs. self::name for some item name in scope, such as a mod name). Since mod and external crates rarely conflict, you can skip that leading :: (or that leading self::) and write name, such as use futures::....

  • Skipping that optional leading :: is, in fact, what is customary to do in Rust / what most Rust programmers do.

  • I have a personal preference for disambiguators whenever possible, especially if they come at the cheap cost of just an extra ::, since it improves the readability of the code, hence my using it :slightly_smiling_face:

1 Like

@Yandros Great to know!

I think I almost have things working! Now I am getting this error:

error: `async fn` return type cannot contain a projection or `Self` that references lifetimes from a parent scope
  --> src/polygon/stream.rs:28:43
   |
28 |     pub async fn new(api_key_id: &str) -> Result<Self, Box<dyn Error>> {
   |                                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^

I make a few .awaited calls in this new function, which I think may be related.

You are currently hitting a limitation of the compiler / Rust, this is


  1. async fn foo (...) -> Ret { ... } is sugar for:

    fn foo (...) -> impl Future<Output = Ret>
    {
        async move { ... }
    }
    
  2. Self, here is a type with some lifetime parameter in it (e.g., Foo<'a>);

    • (That's what Rust means with references lifetimes from a parent scope)

    But given that it is a type alias, it kind of "hides" that lifetime parameter.

  3. Rust does not currently support type aliases that "hide" lifetime parameters when using -> impl ... in return position (no real reason besides technical limitations of the current implementation of the compiler).

Hence the error.

Solution

"Unhide" the lifetime parameter, by not using Self but what the type really refers to (e.g., Foo<'a> instead of Self).

Notes

I am actually surprised by this whole situation:

  • the error message is quite unreadable, and should be improved.

    • At least it has gotten an error index, so it's not that bad.
  • there is no intuitive reason for this error to even exist: it's a compiler internal implementation detail that is leaking to a downstream user. Proof is how "trivial" / stupid the fix is (change the way the type is being referred to).

All this to say that for once, you are hitting a very rough edge of Rust, one that is surprisingly "rough" given the usual quality of Rust error messages and how "logical" they usually are :smile: