I've unsuccessfully tried to spawn every notification in tokio_postgres::Connection.notifications() in separate Task to asynchronously print it.
Author of tokio_postgres test just first notification in the stream
https://github.com/sfackler/rust-postgres/blob/89d39cc5abe4d5d269d6bcbe54886320d54c9d89/tokio-postgres/src/test.rs#L462
futures::Stream::Stream.into_future() documentation states "allow reclamation of its resources" but there is no example.
tokio_postgres::Notifications.into_inner()::Connection is resource to be reclaimed.
My attempt is
extern crate tokio_postgres;
extern crate tokio_core;
extern crate futures;
use tokio_postgres::{Connection, TlsMode, Notification, Error};
use tokio_core::reactor::{Core, Handle};
use futures::{Future, Stream};
fn step<S, F>(handle: &Handle, ns: S) -> F
where
S: Stream<Item = Notification, Error = Error>,
F: Future<Item = Notification, Error = Error>,
{
ns.into_future().map(|(n, ns)| {
let serve_one = futures::future::ok(println!("{:?}", n.unwrap().channel));
handle.spawn(serve_one);
step(handle, ns)
})
}
fn main() {
let mut l = Core::new().unwrap();
let handle = &l.handle();
let done = Connection::connect(
"postgres://postgres:111@172.17.0.2:5432",
TlsMode::None,
&handle,
).then(|c| c.unwrap().batch_execute("LISTEN test_notifications"))
.and_then(|c| {
step(handle, c.notifications()).map_err(|(e, n)| (e, n.into_inner().into_inner()))
})
.map(|(n, _)| {
let n = n.unwrap();
assert_eq!(n.channel, "test_notifications");
assert_eq!(n.payload, "foo");
});
l.run(done).unwrap();
}