Hey guys,
I have this very simple gist that I can't get working correctly:
extern crate futures;
extern crate tokio;
use futures::sync::mpsc;
use futures::Future;
use futures::Sink;
use futures::Stream;
use tokio::io;
use tokio::net::TcpListener;
use std::str;
use std::string::String;
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let (tx, rx) = mpsc::channel::<String>(10);
let rec_future = rx.map(move |rec| {
println!("Received in worker: {:?}", rec);
}).for_each(|_| Ok(()));
let addr = "0.0.0.0:6142".parse().unwrap();
let listener = TcpListener::bind(&addr).unwrap();
let server = listener
.incoming()
.map(move |socket| {
println!("accepted socket; addr={:?}", socket.peer_addr().unwrap());
let tx = tx.clone();
io::read_exact(socket, [0_u8; 10])
.map(|(_, buf)| buf)
.map(|buf| {
let rec_str = str::from_utf8(&buf).unwrap();
println!("Received in server: {:?}", &rec_str);
String::from(rec_str)
})
.map(|data| tx.send(data))
})
.buffer_unordered(10)
.for_each(|_| { Ok(()) })
.map_err(|err| {
println!("accept error = {:?}", err);
});
rt.spawn(server);
rt.spawn(rec_future);
rt.shutdown_on_idle().wait().unwrap();
}
I technically only want to receive some bytes, convert them to a string and print them on another future. Here tx.send(data)
does not seem to be polled at all. If I change this into an tx.start_send(data)
I get the desired result - but why is the send
not working?