Listen for PSQL notification using tokio_postgres?

PostgreSQL has a feature NOTIFY/LISTEN which basically allows you to send a notification when a specific event happens in your database like for example a new row insert.

Now I want to be able in Rust using Tokio postgres to listen for a notification and output it. Tokio does seem to support it.

I tried multiple approaches.

FIRST APPROACH
I tried the following example but it yields an error.

//#![feature(poll_map)] -> Gives error - #![feature] may not be used on the stable release channel
use futures::{stream, StreamExt};
use futures::{FutureExt, TryStreamExt};
use std::env;
use tokio::sync::mpsc;
use tokio_postgres::{connect, NoTls};

#[tokio::main]
async fn main() {
    let connection_parameters = env::var("DBURL").unwrap();
    let (client, mut conn) = connect(&connection_parameters, NoTls).await.unwrap();

    let (tx, mut rx) = mpsc::unbounded_channel();
    let stream = stream::poll_fn(move |cx| conn.poll_message(cx).map_err(|e| panic!(e)));
    let c = stream.forward(tx).map(|r| r.unwrap());
    tokio::spawn(c);
    println!("After spawn listener");

    client.batch_execute("LISTEN test_notifications;").await.unwrap();
    loop {
        let m = rx.recv().await;
        println!("GOT MESSAGE");
    }        
}
error[E0277]: the trait bound `tokio::sync::mpsc::UnboundedSender<_>: futures::Sink<AsyncMessage>` is not satisfied
    --> src/algotester.rs:15:28
     |
15   |     let c = stream.forward(tx).map(|r| r.unwrap());
     |                    ------- ^^ the trait `futures::Sink<AsyncMessage>` is not implemented for `tokio::sync::mpsc::UnboundedSender<_>`
     |                    |
     |                    required by a bound introduced by this call
     |
     = help: the following other types implement trait `futures::Sink<Item>`:
               <futures::futures_channel::mpsc::Sender<T> as futures::Sink<T>>
               <futures::futures_channel::mpsc::UnboundedSender<T> as futures::Sink<T>>
               <Box<S> as futures::Sink<Item>>
               <tokio_postgres::connect_raw::StartupStream<S, T> as futures::Sink<tokio_postgres::codec::FrontendMessage>>
               <tokio_util::io::stream_reader::StreamReader<S, E> as futures::Sink<T>>
               <tokio_util::sync::mpsc::PollSender<T> as futures::Sink<T>>
               <tokio_util::io::copy_to_bytes::CopyToBytes<S> as futures::Sink<&'a [u8]>>
               <tokio_util::codec::framed_write::FramedWrite<T, E> as futures::Sink<I>>
             and 66 others
note: required by a bound in `futures::StreamExt::forward`
    --> /Users/name/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.29/src/stream/stream/mod.rs:1560:12
     |
1558 |     fn forward<S>(self, sink: S) -> Forward<Self, S>
     |        ------- required by a bound in this associated function
1559 |     where
1560 |         S: Sink<Self::Ok, Error = Self::Error>,
     |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `StreamExt::forward`

error[E0599]: the method `map` exists for struct `Forward<PollFn<...>, ...>`, but its trait bounds were not satisfied
  --> src/algotester.rs:15:32
   |
15 |       let c = stream.forward(tx).map(|r| r.unwrap());
   |                                  ^^^ method cannot be called on `Forward<PollFn<...>, ...>` due to unsatisfied trait bounds
   |
  ::: /Users/name/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.29/src/stream/stream/mod.rs:89:1
   |
89 | / delegate_all!(
90 | |     /// Future for the [`forward`](super::StreamExt::forwa...
91 | |     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
92 | |     Forward<St, Si>(
...  |
95 | |     where St: TryStream
96 | | );
   | | -
   | | |
   | | doesn't satisfy `_: FutureExt`
   | | doesn't satisfy `_: Future`
   | | doesn't satisfy `_: Iterator`
   | |_doesn't satisfy `_: StreamExt`
   |   doesn't satisfy `_: Stream`
   |
   = note: the full type name has been written to '/Users/name/Sites/keyrock/rust-api/target/debug/deps/algotester-585fdee191a40d3c.long-type-2506645915138413193.txt'
   = note: the following trait bounds were not satisfied:
           `Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: Stream`
           which is required by `Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: StreamExt`
           `Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: futures::Future`
           which is required by `Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: FutureExt`
           `&Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: Stream`
           which is required by `&Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: StreamExt`
           `&Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: futures::Future`
           which is required by `&Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: FutureExt`
           `&mut Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: Stream`
           which is required by `&mut Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: StreamExt`
           `&mut Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: futures::Future`
           which is required by `&mut Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: FutureExt`
           `Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: Iterator`
           which is required by `&mut Forward<futures::stream::PollFn<{closure@src/algotester.rs:14:34: 14:43}>, tokio::sync::mpsc::UnboundedSender<_>>: Iterator`

warning: unused import: `TryStreamExt`
 --> src/algotester.rs:3:26
  |
3 | use futures::{FutureExt, TryStreamExt};
  |                          ^^^^^^^^^^^^
  |
  = note: `#[warn(unused_imports)]` on by default

warning: unused import: `FutureExt`
 --> src/algotester.rs:3:15
  |
3 | use futures::{FutureExt, TryStreamExt};
  |               ^^^^^^^^^

Some errors have detailed explanations: E0277, E0599.
For more information about an error, try `rustc --explain E0277`.
warning: `rust-api` (bin "algotester") generated 2 warnings

SECOND APPROACH
Comes from this example.

//#![feature(poll_map)] -> Gives error - #![feature] may not be used on the stable release channel
use futures::{stream, StreamExt};
use futures::{FutureExt, TryStreamExt};
use std::env;
use tokio::sync::mpsc;
use tokio_postgres::{connect, NoTls};

#[tokio::main]
async fn main() {
    let connection_parameters = env::var("DBURL").unwrap();
    let (client, mut conn) = connect(&connection_parameters, NoTls).await.unwrap();
    
    // conn.execute() not found so adapting example.
    //conn.execute("LISTEN myevent", &[]).expect("Could not send LISTEN");
    client.query("LISTEN myevent", &[]).await.expect("Could not send LISTEN");

    let notifications = conn.notifications();
    let mut it = notifications.blocking_iter();

    println!("Waiting for notifications...");
    loop {
        let a = it.next();
        match a {
            Ok(Some(b)) => {
                println!("{:?}", b);
            },
            Err(e) => println!("Got error {:?}", e),
            _ => panic!("Unexpected operation!!!")
                                    
        }
            
    }
}

warning: unused import: `stream`
 --> src/algotester.rs:2:15
  |
2 | use futures::{stream, StreamExt};
  |               ^^^^^^
  |
  = note: `#[warn(unused_imports)]` on by default

warning: unused imports: `FutureExt`, `TryStreamExt`
 --> src/algotester.rs:3:15
  |
3 | use futures::{FutureExt, TryStreamExt};
  |               ^^^^^^^^^  ^^^^^^^^^^^^

warning: unused import: `tokio::sync::mpsc`
 --> src/algotester.rs:5:5
  |
5 | use tokio::sync::mpsc;
  |     ^^^^^^^^^^^^^^^^^


error[E0599]: no method named `notifications` found for struct `Connection` in the current scope
  --> src/algotester.rs:14:30
   |
14 |     let notifications = conn.notifications();
   |                              ^^^^^^^^^^^^^ method not found in `Connection<Socket, NoTlsStream>`

How to listen for a Psql notification in Rust?

Your first example is calling the correct method on the connection AFAICT, but the things you're doing with spawning a tokio task aren't quite right. Try just spawning a future that calls poll_message in a loop rather than trying to use Stream and see if that works for you first.

1 Like

tokio's UnboundedSender doesn't implement Sink, that's why forward was giving you an error.

You can use a wrapper, though I haven't tested it so it might not be quite correct. Fortunately it does implement Unpin so no unsafe is necessary.

struct SenderSink<T>(mpsc::UnboundedSender<T>);

impl<T> Sink<T> for SenderSink<T> {
    type Error = SendError<T>;

    fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
        self.get_mut().0.send(item)?;
        Ok(())
    }

    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }
}
Full Code which now compiles
use futures::FutureExt;
use futures::{stream, Sink, StreamExt};
use std::env;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::SendError;
use tokio_postgres::{connect, NoTls};

#[tokio::main]
async fn main() {
    let connection_parameters = env::var("DBURL").unwrap();
    let (client, mut conn) = connect(&connection_parameters, NoTls).await.unwrap();

    let (tx, mut rx) = mpsc::unbounded_channel();
    let stream = stream::poll_fn(move |cx| conn.poll_message(cx).map_err(|e| panic!("{}", e)));
    let c = stream.forward(SenderSink(tx)).map(|r| r.unwrap());
    tokio::spawn(c);
    println!("After spawn listener");

    client
        .batch_execute("LISTEN test_notifications;")
        .await
        .unwrap();

    while let Some(m) = rx.recv().await {
        println!("GOT MESSAGE: {m:?}");
    }
}

struct SenderSink<T>(mpsc::UnboundedSender<T>);

impl<T> Sink<T> for SenderSink<T> {
    type Error = SendError<T>;

    fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
        self.get_mut().0.send(item)?;
        Ok(())
    }

    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }
}
1 Like

My actual code is a bit more complex using async and different threads:

use futures::{stream, StreamExt};
use futures::{FutureExt, TryStreamExt};
use std::env;
use tokio_postgres::{connect, NoTls};
use std::sync::Arc;
use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
    // PostgreSQL connection.
    let (client, mut connection) = tokio_postgres::connect("host=localhost user=postgres dbname=", NoTls).await.unwrap();


    tokio::spawn(async move {
        if let Err(e) = connection.await {
            eprintln!("connection error: {}", e);
        }
    });


    let (tx, rx) = futures_channel::mpsc::unbounded();
    let stream =
        stream::poll_fn(move |cx| connection.poll_message(cx)).map_err(|e| panic!("{}", e));
    let connection = stream.forward(tx).map(|r| r.unwrap());
    tokio::spawn(connection);

    client
        .batch_execute(
            "LISTEN test_notifications;
             NOTIFY test_notifications, 'hello';
             NOTIFY test_notifications, 'world';",
        )
        .await
        .unwrap();

    drop(client);

    let notifications = rx
        .filter_map(|m| match m {
            tokio_postgres::AsyncMessage::Notification(n) => futures_util::future::ready(Some(n)),
            _ => futures_util::future::ready(None),
        })
        .collect::<Vec<_>>()
        .await;
}

Error:

error[E0382]: use of moved value: `connection`
  --> src/algotester.rs:24:25
   |
12 |       let (client, mut connection) = tokio_postgres::con...
   |                    -------------- move occurs because `connection` has type `Connection<Socket, NoTlsStream>`, which does not implement the `Copy` trait
...
15 |       tokio::spawn(async move {
   |  __________________-
16 | |         if let Err(e) = connection.await {
   | |                         ---------- variable moved due to use in generator
17 | |             eprintln!("connection error: {}", e);
18 | |         }
19 | |     });
   | |_____- value moved here
...
24 |           stream::poll_fn(move |cx| connection.poll_mess...
   |                           ^^^^^^^^^ ---------- use occurs due to use in closure
   |                           |
   |                           value used here after move

I tried using Arc and Mutex but when I use tokio's Mutex I can't .await the lock because stream::poll_fn is sync.

You don't need to await the connection directly and use poll_message. awaiting the connection directly is just a simple option for when you don't need to care about the outputs of poll_message.

From the poll_message docs:

Applications that wish to examine those messages should use this method to drive the connection rather than its Future implementation.

use futures::{stream, StreamExt};
use futures::{FutureExt, TryStreamExt};
use std::env;
use tokio_postgres::{connect, NoTls};
use std::sync::Arc;
use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
    // PostgreSQL connection.
    let (client, mut connection) = tokio_postgres::connect("host=localhost user=postgres dbname=cryptoxonline", NoTls).await.unwrap();


    let (tx, rx) = futures_channel::mpsc::unbounded();
    let stream =
        stream::poll_fn(move |cx| connection.poll_message(cx)).map_err(|e| panic!("{}", e));
    let connection = stream.forward(tx).map(|r| r.unwrap());
    tokio::spawn(connection);

    client
        .batch_execute(
            "LISTEN test_notifications;
             NOTIFY test_notifications, 'hello';
             NOTIFY test_notifications, 'world';",
        )
        .await
        .unwrap();


    let notifications = rx
        .filter_map(|m| match m {
            tokio_postgres::AsyncMessage::Notification(n) => futures_util::future::ready(Some(n)),
            _ => futures_util::future::ready(None),
        })
        .collect::<Vec<_>>()
        .await;
       
    // REST OF CODE
    let query =
        client
        .query("
                SELECT btc FROM history LIMIT 1 
            ", &[]).await;

    match query {
        Ok(q) => {
            let r = q[0].get::<_, &str>("btc");
            println!("r {}", r);
        },
        Err(e) => {
            return;
        }
    }
}

This code compiles but the last query is never executed. I have a lot of different async functions each making calls to the sql connection simultaneously. This worked fine until the need to listen for a notification.

The goal of collect is to gather up all of the items from a stream. That of course means it's going to wait for the stream to close, which won't happen until the connection closes or encounters a fatal error.

You'll need to think about what you actually want to wait on there. For a certain amount of time to have passed without getting another notification? For exactly two notifications? Something else?

1 Like

This question is in the context of my HttpServer. Basically in my main I declare the PSQL connection:

    // PostgreSQL connection.
    let (client, connection) = tokio_postgres::connect("host=localhost user=postgres dbname=cryptoxonline", NoTls).await.unwrap();
    tokio::spawn(async move {
        if let Err(e) = connection.await {
            eprintln!("connection error: {}", e);
        }
    });
    let client = Arc::new(Mutex::new(client));

The client gets passed through as parameter to all the methods so it's available in all the route methods. E.g signature of a route-method:

    pub async fn get_algorithm(&self, req: Http, stream: &mut TcpStream, psql: Psql, api: Api) -> Result<HttpResponse, String>;

psql: Psql is the following type type Psql = Arc<Mutex<Client>>.

This way I can query to my Psql in all routes.

Now: There is one route which will upgrade the stream to a websocket. I want to listen for a Psql notification indicating if a row has been added to a specific table. If it has been added, a notification will be sent, my Rust-code will receive it, and send it to the client of the websocket.

So there is only one route which will need to have access to the notifications. The other routes just need to keep working as they are now.

So I was wondering if it's maybe possible to clone the connection, or to initiate another connection copying the parameters from the active connection to wait for the notifications?

I think I solved it. The problem was that collect::<Vec<_>>() .await; blocked the rest of the code resulting on no other query being able to execute. I now listen for the notifications in a separate thread:

use futures::{stream, StreamExt};
use futures::{FutureExt, TryStreamExt};
use std::env;
use tokio_postgres::{connect, NoTls};
use std::sync::Arc;
use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
    // PostgreSQL connection.
    let (client, mut connection) = tokio_postgres::connect("host=localhost user=postgres dbname=", NoTls).await.unwrap();


    // Make transmitter and receiver.
    let (tx, rx) = futures_channel::mpsc::unbounded();
    let stream =
        stream::poll_fn(move |cx| connection.poll_message(cx)).map_err(|e| panic!("{}", e));
    let connection = stream.forward(tx).map(|r| r.unwrap());
    tokio::spawn(connection);


    // Wait for notifications in seperate thread.
    tokio::spawn(async move {
    let notifications = rx
        .filter_map(|m| match m {
            tokio_postgres::AsyncMessage::Notification(n) => {
                println!("Notification {:?}", n);
                futures_util::future::ready(Some(n))
            },
            _ => futures_util::future::ready(None),
        })
        .collect::<Vec<_>>().await;

        // All notifications?
        println!("All notifications {:?}", notifications);
    });
   
    // Execute listen/notify
    match client
        .batch_execute(
            "LISTEN test_notifications;
             NOTIFY test_notifications, 'hello';
             NOTIFY test_notifications, 'world';",
        )
        .await
        {
            Ok(_) => (),
            Err(e) => {
                eprintln!("Error {}", e);
            }
        }
       
    // Execute random query.
    let query =
        client
        .query("
                SELECT order_id FROM history LIMIT 1 
            ", &[]).await;

    match query {
        Ok(q) => {
            let r = q[0].get::<_, &str>("order_id");
            println!("r {}", r);
        },
        Err(e) => {
            return;
        }
    }
}

Github issue: Listen for PSQL notification using tokio_postgres? · Issue #1098 · sfackler/rust-postgres · GitHub

Thanks for the help!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.