Mapping over sinkext.send "returns a value referencing data owned by the current function"

Hello everyone,

I was following the warp example here: but wanted to use "sinkext.send" instead of "streamext.forward" to send a string to the client.


use warp::{Filter};
use warp::ws::{Message};
use futures::{FutureExt, StreamExt, SinkExt};

async fn main() {
    // GET /hello/warp => 200 OK with body "Hello, warp!"
    let routes = warp::path("echo")
        // The `ws()` filter will prepare the Websocket handshake.
        .map(|ws: warp::ws::Ws| {
            // And then our closure will be called when it completes...
            ws.on_upgrade(|websocket: warp::filters::ws::WebSocket| {
                let (mut tx, _) = websocket.split();
                tx.send(Message::text("New User!")).map(|_| {})

        warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;

But compiling returns:

error[E0515]: cannot return value referencing local variable `tx`
  --> src\
17 |                 tx.send(Message::text("New User!")).map(|_| {})
   |                 --^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |                 |
   |                 returns a value referencing data owned by the current function
   |                 `tx` is borrowed here

I was able to work around this by changing to:

ws.on_upgrade(|websocket: warp::filters::ws::WebSocket| async {
    let (mut tx, _) = websocket.split();
    tx.send(Message::text("New User!")).map(|_| {}).await

I wanted your help to understand why was the first try failing, when seemingly StreamExt.forward and SinkExt.send do the same thing with sink i.e. take the data and pass to sink and close sink when there is no data left.

I have a hunch that this is due to difference in signatures:
fn send(&mut self, item: Item) -> Send<'_, Self, Item> vs fn forward<S>(self, sink: S) -> Forward<Self, S> where send take a mutable reference and forward takes the object.

Thanks for your help

Your hunch was correct.

When you use the map() combinator it will create a new type which wraps the original one. The tx.send() method returns a futures::sink::Send which borrows from the tx, which means your mapped type will also borrow from tx.

The issue is that tx is created inside your closure and by returning some sort of Map<Send<'tx, ...>> you are returning something which references a local variable, which would lead to a dangling reference.

Switching to await works because an async block expands to a new type which contains all the local variables and implements the Future trait. This side-steps the dangling reference problem because the tx is owned by the generated type, and the contracts in std::pin help ensure safety/correctness.

1 Like

Thanks a lot for the reply, I understand what you are saying.

So there is no way to return the Mapped future since it consumes tx so we have to return a new object (created by async) which encapsulates the consumed object and doesn't expose tx outside of the scope of closure.

I'll try to scrounge more documentation and see if there is a way to do this without doing an allocation.

Really appreciate the help.

  1. You can't return the mapped future since it doesn't consume the tx but only references it. You may able to return it if it consume the tx and owns it instead of to store its reference. But in this case one cannot call .send() over single tx twice since the first call consumes it.

  2. async {} block doesn't allocate. Like closures, it only creates anonymous struct which implements the Future trait. I think it's a good reason to avoid the term "object". In Rust types doesn't allocate implicitly, while the term "object" implies heap allocation in many languages.

  3. What actually allocates is the .split() call. It allocates a new Arc and put the original type into it to share it between two halves. For your specific case you don't need it since you don't use the rx half and the WebSocket itself implements the Sink trait - you can't even call .split() if it doesn't impl the Sink.