The trait bound `tokio::sync::mpsc::UnboundedSender<_>: futures_util::Sink<AsyncMessage>` is not satisfied

I am trying to implement a LISTENER on my PostgreSQL database. Every time a row is added I want to receive a notification in Rust.

This is my current code:

    let psqll = psql.lock().await;

    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
    let dbstream = futures::stream::poll_fn(move |cx| psqll.1.poll_message(cx).map_err(|e| panic!("rofl")));
    let c = dbstream.forward(tx).map(|r| r.unwrap());
    tokio::spawn(c);
    println!("After spawn listener");

    c.batch_execute("LISTEN test_notifications;").await.unwrap();

The error I get:

error[E0277]: the trait bound `tokio::sync::mpsc::UnboundedSender<_>: futures_util::Sink<AsyncMessage>` is not satisfied
    --> src/routes/mod.rs:639:30
     |
639  |     let c = dbstream.forward(tx).map(|r| r.unwrap());
     |                      ------- ^^ the trait `futures_util::Sink<AsyncMessage>` is not implemented for `tokio::sync::mpsc::UnboundedSender<_>`
     |                      |
     |                      required by a bound introduced by this call
     |
     = help: the following other types implement trait `futures_util::Sink<Item>`:
               <futures_channel::mpsc::Sender<T> as futures_util::Sink<T>>
               <futures_channel::mpsc::UnboundedSender<T> as futures_util::Sink<T>>
               <Box<S> as futures_util::Sink<Item>>
               <tokio_tungstenite::WebSocketStream<T> as futures_util::Sink<tokio_tungstenite::tungstenite::Message>>
               <tokio_postgres::connect_raw::StartupStream<S, T> as futures_util::Sink<tokio_postgres::codec::FrontendMessage>>
               <tokio_util::io::stream_reader::StreamReader<S, E> as futures_util::Sink<T>>
               <tokio_util::sync::mpsc::PollSender<T> as futures_util::Sink<T>>
               <tokio_util::io::copy_to_bytes::CopyToBytes<S> as futures_util::Sink<&'a [u8]>>
             and 68 others
note: required by a bound in `futures_util::StreamExt::forward`
    --> /Users/niel/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.29/src/stream/stream/mod.rs:1560:12
     |
1558 |     fn forward<S>(self, sink: S) -> Forward<Self, S>
     |        ------- required by a bound in this associated function
1559 |     where
1560 |         S: Sink<Self::Ok, Error = Self::Error>,
     |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `StreamExt::forward`

error[E0599]: the method `map` exists for struct `Forward<PollFn<...>, ...>`, but its trait bounds were not satisfied
   --> src/routes/mod.rs:639:34
    |
639 |       let c = dbstream.forward(tx).map(|r| r.unwrap());
    |                                    ^^^ method cannot be called on `Forward<PollFn<...>, ...>` due to unsatisfied trait bounds
    |
   ::: /Users/niel/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.29/src/stream/stream/mod.rs:89:1
    |
89  | / delegate_all!(
90  | |     /// Future for the [`forward`](super::StreamExt::fo...
91  | |     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
92  | |     Forward<St, Si>(
...   |
95  | |     where St: TryStream
96  | | );
    | | -
    | | |
    | | doesn't satisfy `_: FutureExt`
    | | doesn't satisfy `_: Future`
    | | doesn't satisfy `_: Iterator`
    | |_doesn't satisfy `_: StreamExt`
    |   doesn't satisfy `_: Stream`
    |
    = note: the full type name has been written to '/Users/niel/Sites/keyrock/rust-api/target/debug/deps/rust_api-d2b9843e079dda5a.long-type-14278467472067523405.txt'
    = note: the following trait bounds were not satisfied:
            `Forward<futures_util::stream::PollFn<{closure@src/routes/mod.rs:638:45: 638:54}>, tokio::sync::mpsc::UnboundedSender<_>>: Stream`
            which is required by `Forward<futures_util::stream::PollFn<{closure@src/routes/mod.rs:638:45: 638:54}>, tokio::sync::mpsc::UnboundedSender<_>>: futures_util::StreamExt`
            `Forward<futures_util::stream::PollFn<{closure@src/routes/mod.rs:638:45: 638:54}>, tokio::sync::mpsc::UnboundedSender<_>>: futures_util::Future`
            which is required by `Forward<futures_util::stream::PollFn<{closure@src/routes/mod.rs:638:45: 638:54}>, tokio::sync::mpsc::UnboundedSender<_>>: futures_util::FutureExt`
            `&Forward<futures_util::stream::PollFn<{closure@src/routes/mod.rs:638:45: 638:54}>, tokio::sync::mpsc::UnboundedSender<_>>: Stream`
            which is required by `&Forward<futures_util::stream::PollFn<{closure@src/routes/mod.rs:638:45: 638:54}>, tokio::sync::mpsc::UnboundedSender<_>>: futures_util::StreamExt`
            `&Forward<futures_util::stream::PollFn<{closure@src/routes/mod.rs:638:45: 638:54}>, tokio::sync::mpsc::UnboundedSender<_>>: futures_util::Future`
            which is required by `&Forward<futures_util::stream::PollFn<{closure@src/routes/mod.rs:638:45: 638:54}>, tokio::sync::mpsc::UnboundedSender<_>>: futures_util::FutureExt`
            `&mut Forward<futures_util::stream::PollFn<{closure@src/routes/mod.rs:638:45: 638:54}>, tokio::sync::mpsc::UnboundedSender<_>>: Stream`
            which is required by `&mut Forward<futures_util::stream::PollFn<{closure@src/routes/mod.rs:638:45: 638:54}>, tokio::sync::mpsc::UnboundedSender<_>>: futures_util::StreamExt`
            `&mut Forward<futures_util::stream::PollFn<{closure@src/routes/mod.rs:638:45: 638:54}>, tokio::sync::mpsc::UnboundedSender<_>>: futures_util::Future`
            which is required by `&mut Forward<futures_util::stream::PollFn<{closure@src/routes/mod.rs:638:45: 638:54}>, tokio::sync::mpsc::UnboundedSender<_>>: futures_util::FutureExt`
            `Forward<futures_util::stream::PollFn<{closure@src/routes/mod.rs:638:45: 638:54}>, tokio::sync::mpsc::UnboundedSender<_>>: Iterator`
            which is required by `&mut Forward<futures_util::stream::PollFn<{closure@src/routes/mod.rs:638:45: 638:54}>, tokio::sync::mpsc::UnboundedSender<_>>: Iterator`

How to solve this?

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.