I want to build a tls sniproxy using tokio. I have TcpStream using the .incoming()
method
How can I poll_peek the stream? When calling poll_peek()
, I get no result (match leads NotReady). But I think this is because it is called outside of a tokio-scheduled future.
I want to call poll_peek()
to later split the streams and hand them over to two io::copy
futures, one for each direction.
let server = listener
.incoming()
.for_each(move |source| {
// poll_peek() here does not work
TcpStream::connect(format!("{}:{}", target_host, target_port).as_str()).and_then(
move |target| {
// poll_peek() here does not work
let (source_recv, source_send) = source.split();
let (target_recv, target_send) = target.split();
let async1 = io::copy(source_recv, target_send)
.map(|(n, _, _)| println!("Copied {} bytes from source to target", n))
.map_err(move |err| println!("Error: {:?}", err));
let async2 = io::copy(target_recv, source_send)
.map(|(n, _, _)| println!("Copied {} bytes from target to source", n))
.map_err(move |err| println!("Error: {:?}", err));
tokio::spawn(async1);
tokio::spawn(async2);
Ok(())
},
)
})
.map_err(|err| println!("{:?}", err));
tokio::run(server);
I managed to get it working, but I still have problems accessing some variables (see TODOs). But since this is only a prototype, for the the final sniproxy I will have some manager which is secured by Arc and so the access is hopefully easier.
let server = listener
.incoming()
.for_each(|source| {
let sni_task = loop_fn(source, |mut source| {
let mut buf = [0; 1000];
match source.poll_peek(&mut buf) {
Ok(Async::Ready(n)) => {
println!("Received {} bytes", n);
// TODO check SNI
Ok(Loop::Break(source))
}
Ok(Async::NotReady) => {
// println!("Not ready");
std::thread::sleep(std::time::Duration::from_micros(10));
Ok(Loop::Continue(source))
}
Err(x) => {
println!("poll_peek error: {:?}", x);
Err(x)
}
}
})
.and_then(|source| {
// TODO lookup target
ok((
source,
format!(
"{}:{}",
"10.54.0.254", // TODO _target_host
443 // TODO _target_port
)
.parse::<std::net::SocketAddr>(),
))
})
.and_then(|(source, target_sockaddr)| {
TcpStream::connect(&target_sockaddr.unwrap()).map(|target| (source, target))
})
.and_then(|(source, target)| {
let (source_recv, source_send) = source.split();
let (target_recv, target_send) = target.split();
let async1 = io::copy(target_recv, source_send)
.map(|(n, _, _)| println!("Copied {} bytes from target to source", n))
.map_err(|err| println!("Error: {:?}", err));
let async2 = io::copy(source_recv, target_send)
.map(|(n, _, _)| println!("Copied {} bytes from source to target", n))
.map_err(|err| println!("Error: {:?}", err));
tokio::spawn(async1);
tokio::spawn(async2);
Ok(())
})
.map_err(|err| {
println!("Error: {:?}", err);
});
tokio::spawn(sni_task);
Ok(())
})
.map_err(|err| println!("{:?}", err));
tokio::run(server);
I get some hangs in release mode, I get hangs in the poll_peek() function, though there are bytes available (as netstat shows). I will track this down