Hi @alice , thank you very much for the guidance, i think i've successfully wrap the websocketstream into asyncread and asyncwrite based on your code
#[derive(Debug)]
struct WebsocketTunnel {
inner: StreamReader<StreamWrapper, Bytes>,
}
#[derive(Debug)]
struct StreamWrapper {
inner: WebSocketStream<Upgraded>,
}
impl Stream for StreamWrapper {
type Item = Result<Bytes, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.get_mut().inner).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(res))) => {
if let Message::Binary(b) = res {
Poll::Ready(Some(Ok(Bytes::from(b))))
} else {
Poll::Ready(Some(Err(Error::new(ErrorKind::Other, "unexpected frame"))))
}
}
Poll::Ready(Some(Err(err))) => {
Poll::Ready(Some(Err(Error::new(ErrorKind::Other, err))))
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
impl AsyncRead for WebsocketTunnel {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.get_mut().inner).poll_read(cx, buf)
}
}
impl AsyncBufRead for WebsocketTunnel {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
Pin::new(&mut self.get_mut().inner).poll_fill_buf(cx)
}
fn consume(self: Pin<&mut Self>, amt: usize) {
Pin::new(&mut self.get_mut().inner).consume(amt)
}
}
impl AsyncWrite for WebsocketTunnel {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
let mut inner = self.get_mut().inner.get_mut().inner;
ready!(Pin::new(&mut inner)
.poll_ready(cx)
.map_err(|err| Error::new(ErrorKind::Other, err)))?;
match Pin::new(&mut inner).start_send(Message::Binary(buf.to_vec())) {
Ok(()) => Poll::Ready(Ok(buf.len())),
Err(e) => Poll::Ready(Err(Error::new(ErrorKind::Other, e))),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
Pin::new(&mut self.get_mut().inner.get_mut().inner)
.poll_flush(cx)
.map_err(|err| Error::new(ErrorKind::Other, err))
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
Pin::new(&mut self.get_mut().inner.get_mut().inner)
.poll_close(cx)
.map_err(|err| Error::new(ErrorKind::Other, err))
}
}
But i'm current facing this error when trying to pass the wrapper as Stream needed by the original program
`(dyn upgrade::Io + Send + 'static)` cannot be shared between threads safely
the trait `Sync` is not implemented for `(dyn upgrade::Io + Send + 'static)`
required for `Unique<(dyn upgrade::Io + Send + 'static)>` to implement `Sync`
I've read that this require unsafe code. Do you have any workaround for this? or maybe is it possible to wrap hyper::upgrade::Upgraded so that it is Sync?