Tokio: how to use a Sink without consuming it?

I have the following setup:

  • A reactor::Core spun up in a separate thread.
  • A Remote handle to that detached Core.
  • A Sink to a single outgoing TCP connection.
    (I got this by remote.spawn()ing a TcpStream::connect(), and using a oneshot channel to get the Sink part of the stream back. Technically, it's a SplitSink<Framed<TcpStream, BytesMut>>)

I want to be able to send messages through this sink without waiting for the messages themselves to be physically sent. But I don't see a way to send data that doesn't consume my Sink and require me to set up a oneshot channel to get it back (because the Core is in a separate thread).

At the same time, it doesn't make sense for me that I need a message to be fully sent before I can get the Sink back to add more messages to it.

What I'm trying to have: a place to write bytes to (at any time), that abstracts sending those bytes in the background, so I don't have to wait for the network I/O.

Can someone point me in the right direction?
Thank you so much

1 Like

You could:

  • call start_send/poll_complete instead of send, and decide what to do if the sink isn't ready to accept more items
  • create an unbounded async channel and call send on the UnboundedSender (which doesn't consume it), and forward the UnboundedReceiver into your existing Sink
3 Likes

Thanks, mpsc::unbounded is a good solution to my problem.

For future reference, here's what I did.
Note: I haven't looked into handling errors yet, so there are lines here whose only purpose is to make the compiler happy until I learn how to do it right.

To create the unbounded Sink/Stream pair:

let (unbounded_sender, unbounded_receiver) = mpsc::unbounded::<BytesMut>();

To spawn a task on the (detached) Core that passes all messages from the UnboundedSender to my Sink (framed_sink):

remote.spawn(move |_handle| {
    framed_sink.send_all(unbounded_receiver
        .map_err(|e| Error::new(ErrorKind::Other, "Error")))
    .then(|_| Err(()))
});

To send messages through the unbounded stream/sink pair:
(I store the Sender part in my struct, hence the clone())

let some_data = BytesMut::from("walfierocks");
self.unbounded_sender.clone().wait().send(some_data)
4 Likes