Tokio tcpstream poll_peek

#1

I want to build a tls sniproxy using tokio. I have TcpStream using the .incoming() method

How can I poll_peek the stream? When calling poll_peek(), I get no result (match leads NotReady). But I think this is because it is called outside of a tokio-scheduled future.

I want to call poll_peek() to later split the streams and hand them over to two io::copy futures, one for each direction.

let server = listener
    .incoming()
    .for_each(move |source| {
        // poll_peek() here does not work
        TcpStream::connect(format!("{}:{}", target_host, target_port).as_str()).and_then(
            move |target| {
                // poll_peek() here does not work
                let (source_recv, source_send) = source.split();
                let (target_recv, target_send) = target.split();

                let async1 = io::copy(source_recv, target_send)
                    .map(|(n, _, _)| println!("Copied {} bytes from source to target", n))
                    .map_err(move |err| println!("Error: {:?}", err));

                let async2 = io::copy(target_recv, source_send)
                    .map(|(n, _, _)| println!("Copied {} bytes from target to source", n))
                    .map_err(move |err| println!("Error: {:?}", err));

                tokio::spawn(async1);
                tokio::spawn(async2);

                Ok(())
            },
        )
    })
    .map_err(|err| println!("{:?}", err));

tokio::run(server);
#2

I managed to get it working, but I still have problems accessing some variables (see TODOs). But since this is only a prototype, for the the final sniproxy I will have some manager which is secured by Arc and so the access is hopefully easier.

let server = listener
    .incoming()
    .for_each(|source| {
        let sni_task = loop_fn(source, |mut source| {
            let mut buf = [0; 1000];
            match source.poll_peek(&mut buf) {
                Ok(Async::Ready(n)) => {
                    println!("Received {} bytes", n);
                    // TODO check SNI
                    Ok(Loop::Break(source))
                }
                Ok(Async::NotReady) => {
                    // println!("Not ready");
                    std::thread::sleep(std::time::Duration::from_micros(10));
                    Ok(Loop::Continue(source))
                }
                Err(x) => {
                    println!("poll_peek error: {:?}", x);
                    Err(x)
                }
            }
        })
        .and_then(|source| {
            // TODO lookup target
            ok((
                source,
                format!(
                    "{}:{}",
                    "10.54.0.254", // TODO _target_host
                    443            // TODO _target_port
                )
                .parse::<std::net::SocketAddr>(),
            ))
        })
        .and_then(|(source, target_sockaddr)| {
            TcpStream::connect(&target_sockaddr.unwrap()).map(|target| (source, target))
        })
        .and_then(|(source, target)| {
            let (source_recv, source_send) = source.split();
            let (target_recv, target_send) = target.split();

            let async1 = io::copy(target_recv, source_send)
                .map(|(n, _, _)| println!("Copied {} bytes from target to source", n))
                .map_err(|err| println!("Error: {:?}", err));

            let async2 = io::copy(source_recv, target_send)
                .map(|(n, _, _)| println!("Copied {} bytes from source to target", n))
                .map_err(|err| println!("Error: {:?}", err));

            tokio::spawn(async1);
            tokio::spawn(async2);
            Ok(())
        })
        .map_err(|err| {
            println!("Error: {:?}", err);
        });

        tokio::spawn(sni_task);
        Ok(())
    })
.map_err(|err| println!("{:?}", err));

tokio::run(server);

I get some hangs in release mode, I get hangs in the poll_peek() function, though there are bytes available (as netstat shows). I will track this down