How to spawn every tokio_postgres::Connection.notifications()?


#1

I’ve unsuccessfully tried to spawn every notification in tokio_postgres::Connection.notifications() in separate Task to asynchronously print it.

Author of tokio_postgres test just first notification in the stream

futures::Stream::Stream.into_future() documentation states “allow reclamation of its resources” but there is no example.
tokio_postgres::Notifications.into_inner()::Connection is resource to be reclaimed.

My attempt is

extern crate tokio_postgres;
extern crate tokio_core;
extern crate futures;

use tokio_postgres::{Connection, TlsMode, Notification, Error};
use tokio_core::reactor::{Core, Handle};
use futures::{Future, Stream};

fn step<S, F>(handle: &Handle, ns: S) -> F
where
    S: Stream<Item = Notification, Error = Error>,
    F: Future<Item = Notification, Error = Error>,
{
    ns.into_future().map(|(n, ns)| {
        let serve_one = futures::future::ok(println!("{:?}", n.unwrap().channel));
        handle.spawn(serve_one);
        step(handle, ns)
    })
}

fn main() {
    let mut l = Core::new().unwrap();
    let handle = &l.handle();

    let done = Connection::connect(
        "postgres://postgres:111@172.17.0.2:5432",
        TlsMode::None,
        &handle,
    ).then(|c| c.unwrap().batch_execute("LISTEN test_notifications"))
        .and_then(|c| {
            step(handle, c.notifications()).map_err(|(e, n)| (e, n.into_inner().into_inner()))
        })
        .map(|(n, _)| {
            let n = n.unwrap();
            assert_eq!(n.channel, "test_notifications");
            assert_eq!(n.payload, "foo");
        });

    l.run(done).unwrap();
}