About impl/dyn trait types, using tokio-tungstenite: How can I store result of WebSocketStream.split() in a struct field?

I'm writing an async app server using tokio-tungstenite, and I'm having trouble defining the types correctly to store the Sink half of a split WebSocketStream in a struct field. Because I want to handle both TLS and non-TLS connections the same, I need to use the Stream trait (and not a type) to unify the representation of a stream. The server may handle many connections in many threads and tasks. I've defined a Socketo struct to store info about each open WebSocket, including the Sink half of it. If it matters, I'll also need to treat both server-initiated and client-initiated WebSockets the same after the app's handshake (it's a federated system so servers connect to each other).

Conceptually, I want to define:

struct Socketo {
    ws_out: SplitSink<Stream<Item = Message>>,
    ...
}

... but as a trait, Stream has to be dyn, and thus has to be contained in an Arc<Mutex<>> in the struct definition (right?). How compatible is that with a function parameter defined as impl? (Also, the Item needs to be a Result<Message, tungstenite:error:Error>, right?) I'm not sure how to define that function parameter type such that a .split() on it results in a type that I can put in the ws_out struct field. Note that the parameter type definition also has to match the result of accept_async().

Putting that all together, here is a simple failing example:

use std::sync::Arc ;
use futures_util::{Stream, StreamExt, Sink, stream::SplitSink} ;
use tokio::{sync::Mutex, net::TcpListener} ;
use tungstenite::error::Error as TungError ;
use tokio_tungstenite::{accept_async, tungstenite::Message} ;

struct Socketo {
    ws_out: SplitSink<Arc<Mutex<dyn Stream<Item = Result<Message, TungError>>>>, Message>,
}

#[tokio::main]
async fn main() {
    let listener= TcpListener::bind("127.0.0.1:27182").await.unwrap() ;
    let (raw_stream, _)= listener.accept().await.unwrap() ;
    let ws_stream= accept_async(raw_stream).await.unwrap() ;
    handle_ws_connection(ws_stream) ;
}

fn handle_ws_connection(ws_stream: impl Stream<Item = Result<Message, TungError>> + Sink<Message>) {
    let (ws_out, _)= ws_stream.split() ;

    let s= Socketo {
        ws_out,
    } ;
}

This fails when setting s.ws_out, with the compiler error:

error[E0308]: mismatched types
  --> src/main.rs:23:9
   |
19 | fn handle_ws_connection(ws_stream: impl Stream<Item = Result<Message, TungError>> + Sink<Message>) {
   |                                    -------------------------------------------------------------- found this type parameter
...
23 |         ws_out,
   |         ^^^^^^ expected `SplitSink<Arc<...>, ...>`, found `SplitSink<impl Stream<Item = Result<Message, TungError>> + Sink<Message>, ...>`
   |
   = note: expected struct `SplitSink<Arc<tokio::sync::Mutex<(dyn Stream<Item = Result<Message, tungstenite::Error>> + 'static)>>, _>`
              found struct `SplitSink<impl Stream<Item = Result<Message, TungError>> + Sink<Message>, _>`

Besides not compiling, it seems messy, so I wonder if I'm on the wrong track. Is there a good idiomatic way to declare the types of ws_stream and ws_out such that the result of ws_stream.split() is compatible with the ws_out struct field?

Thanks! I'm fairly new to Rust, so all advice is most welcome.

In the first part of this response, I try to answer your prose before diving into the code. After getting to your code, I don't think all the specifics apply to your actual use case. But I'm leaving it as-is because I think it offers at least some pointers in understanding Rust generics and dyn Trait better.[1]

Well, you have to use a type that allows both approaches. dyn Stream<..> and pointers to it like Arc<Mutex<dyn Stream<..>>> are types which may enable that pattern via type-erasing different base types. But you may also have other alternatives, such as an enum that implements Stream with variants for both approaches. (Maybe your own or maybe via a utility enum.)

dyn Trait is unsized,[2] so generally speaking,[3] it's going to be behind some sort of pointer. It doesn't have to be Arc<Mutex<_>>, for example it could be Box<_>. But Arc<Mutex<_>> might be more suitable for your use case. It depends on what you need it to do.

You may also need auto-traits (Arc<Mutex<dyn Stream<..> + Send + Sync>>), depending on you needs and the demands of your async runtime.

impl Trait as a function parameter (APIT) is the same as a generic on the function, and thus will resolve to a single type within the function body; the caller gets to choose the type and the function body must work with any and all possible types that meet the bounds.

If you want to accept any generic/APIT input in the function and store it in your struct, either your struct needs to be generic or you need to set up the bounds in such a way that the generic type can be coerced to the form of dyn Trait you need. Here's a short example.


Here's the second part of my response, where I actually start poking at your code. Preliminary notes:

  • I'm not intimately familiar with these crates off the top of my head, and
  • tungstenite isn't available in the playground, so I can't provide a working example there either, so
  • I'm just going to wing it based on the documentation

So, it looks like you don't really need to store a Stream implementor, you need a Sink implementor. But the Sink implementor you want to store is SplitSink<[some Stream], Message>. Now, do you actually need a SplitSink so you can call its inherent methods, or do you just need some Sink implementor? I'm going to guess the latter for now.

The Sink<_> trait has an associated Error type which you're going to have to deal with. For your use case is it always going to be tungstenite::error::Error (TungError)? I'm going to assume that too.

Let's look at how you're getting ahold of the SplitSink and how it implements Sink<_>. Given an S: Stream<..> + Sink<Message>, the signature of split says you're going to get a SplitStream<S, Message>. And the implementation is going to be analogous to...

impl<S: Sink<Message>> Sink<Message> for Sink<S, Message> {
    type Error = <S as Sink<Message>>::Error;

...so you don't seem to care what S is at all (or even what it returns as a Stream), once you've determined that it's a Stream (so you can call split) that also implements Sink<Item, Error = TungError>.

So maybe this is what you want:

struct Socketo {
    ws_out: Pin<Box<dyn Sink<Message, Error = TungError>>>,
}

fn handle_ws_connection<S>(ws_stream: S)
where
    S: 'static + Stream + Sink<Message, Error = TungError>,
    // doesn's seem like you actually need this:
    // S: Stream<Item = Result<Message, TungError>>,
{
    let (ws_out, _) = ws_stream.split();
    let ws_out = Box::pin(ws_out);
    let s = Socketo { ws_out, };
}

(I chose Pin<Box<_>> based on the implementors of Sink<_>. Again, untested.)


Whispers of a part 3...

Maybe I was wrong and you do need SplitSink<_, Message> specifically for some reason. Type-erasing the stream could still be an approach. But you need to type erase it before you call .split, and in order to call .split, you need something that implements StreamExt. Arc<Mutex<_>> doesn't implement StreamExt, so that's out. At a guess you want Pin<Box<_>> again.

But you also need something that implements Sink<Message>, and you can't combine two non-auto-traits into a dyn _ directly. (dyn Stream + Sink won't compile.) So you'd have to go further to support this approach... for example, a subtrait that covers both of these needs.

// Very rough sketch
trait StreamSink: Stream<Item = Result<Message, TungError>> + Sink<Message> {
}

impl<T> StreamSink for T
where
    T: ?Sized + Stream<Item = Result<Message, TungError>> + Sink<Message>
{}

struct Socketo {
    ws_out: SplitSink<
        Pin<Box<dyn StreamSink>>,
        Message,
    >,
}

fn handle_ws_connection<S>(ws_stream: S)
where
    S: 'static + StreamSink,
{
    let ws_stream: Pin<Box<dyn StreamSink>> = Box::pin(ws_stream);
    let (ws_out, _) = ws_stream.split();
    let s = Socketo { ws_out, };
}

But at this point I feel I'm stretching a bit far, so I'll just leave things here. (Feel free to ask follow-up questions.)


  1. But am too lazy to rewrite it to match your code better :slightly_smiling_face: ↩︎

  2. does not implement Sized, does not have a size known at compile time, is a DST ↩︎

  3. outside of custom DSTs (you don't want this here) ↩︎

1 Like

Thanks for the detailed explanations and examples! They give me a lot to think about and study. I do have some followup questions.

I'm using Arc<Mutex<_>> instead of Box<_> because the app is multithreaded, and I need a Socketo object to be pointed to by two different hash entries (thus Arc<_>), and ws_out needs to be mutable (thus Mutex<_>) to write to it. But if .split() requires StreamExt and Arc<Mutex<_>> doesn't implement that, then I need to figure out another solution. Do you see one?

I probably only need Sink, not SplitSink; I was just using SplitSink because that's the result from .split(). I believe all I do with ws_out is .send() .

I need to read more about Pin and Unpin. Is there a simple explanation why you use Pin<Box<_>> instead of Box<_>, and when Pin is necessary?

If the library types don't implement a trait, and you can see how to implement it, you can wrap the Arc<Mutex<_>> in a new type and implement the trait yourself. I believe there's a blanket implementation of StreamExt for S: Stream, so Stream is the one you'd need to implement.

If there's a chance of some compiler-generated async generator being contained in something, especially in a generic or type-erased form, you usually need Pin<_> to do async things with it. The primary example is that Pin<Box<dyn Future<..>>> implements Future<..>, but Box<dyn Future<..>> does not.

The pattern I saw in the implementations is that Box<S> implements Sink<_> only if S: Unpin...

impl<S, Item> Sink<Item> for Box<S>
where
    S: Sink<Item> + Unpin + ?Sized,

...but Pin<P> implements Sink<_> if P: Unpin and <P as Deref>::Target: Sink (i.e. P::Target, the thing that implements Sink<_>, needs not be Unpin in this case)...

impl<P, Item> Sink<Item> for Pin<P>
where
    P: DerefMut + Unpin,
    <P as Deref>::Target: Sink<Item>,

...and thus you need the Pin implementation because dyn Sink<_> doesn't implement Unpin (and Box<_> does).

Or more succinctly: if you see some pattern like this where the Box implementation requires the contents of the Box to both be Unpin and have the delegated trait, but Pin<P> breaks up the requirements so that P: Unpin and P::Target: TheTrait, you almost surely need the Pin<Box<_>>.

1 Like

OK, thanks again for the help. I'll play around with these things.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.