Tokio TCP connection not closed when sender is dropped (Futures 0.3 + compat layer)

#1

I am doing the following steps:

  • Creating a TCP connection using Tokio TcpStream::connect.
  • Applying a codec: LengthDelimitedCodec
  • Splitting into a (sender, receiver) pair.
  • Using the compat layer to transform the (sender, receiver) to be Futures 0.3 compatible.

I expect that dropping the sender part of the connection will close the TCP connection, however, the remote side is not notified of the connection being closed.

I created a self contained code example demonstrating the problem, and I include it here.
I am not really sure what is the origin of this problem (my code, Tokio, Futures 0.3?). I thought someone here might have an idea.

#![feature(futures_api, async_await, await_macro, arbitrary_self_types)]
#![feature(nll)]
#![feature(generators)]
#![feature(unboxed_closures)]
#![deny(warnings)]

use std::net::{IpAddr, Ipv4Addr, SocketAddr};

use bytes::Bytes;

use futures::compat::{Future01CompatExt, Stream01CompatExt};
use futures::executor::ThreadPool;
use futures::{FutureExt, StreamExt};
use futures_01::sink::Sink as Sink01;
use futures_01::stream::Stream as Stream01;

use tokio::codec::{Framed, LengthDelimitedCodec};
use tokio::net::TcpListener as TokioTcpListener;
use tokio::net::TcpStream;

/// Get an available port we can listen on
fn get_available_port_v4() -> u16 {
    // Idea based on code at:
    // https://github.com/rust-lang-nursery/rust-cookbook/pull/137/files
    let loopback = Ipv4Addr::new(127, 0, 0, 1);
    // Assigning port 0 requests the OS to assign a free port
    let socket_addr = SocketAddr::new(IpAddr::V4(loopback), 0);
    let listener = TokioTcpListener::bind(&socket_addr).unwrap();
    listener.local_addr().unwrap().port()
}

fn tcp_stream_to_conn_pair_01(
    tcp_stream: TcpStream,
    max_frame_length: usize,
) -> (
    impl Sink01<SinkItem = Vec<u8>>,
    impl Stream01<Item = Vec<u8>>,
) {
    let mut codec = LengthDelimitedCodec::new();
    codec.set_max_frame_length(max_frame_length);
    let (sender_01, receiver_01) = Framed::new(tcp_stream, codec).split();

    // Conversion layer between Vec<u8> to Bytes:
    let sender_01 = sender_01
        .sink_map_err(|_| ())
        .with(|vec: Vec<u8>| -> Result<Bytes, ()> { Ok(Bytes::from(vec)) });

    let receiver_01 = receiver_01.map(|bytes| bytes.to_vec());

    (sender_01, receiver_01)
}

async fn task_tcp_stream_to_conn_pair_drop_sender() {
    let max_frame_length = 0x100;

    let available_port = get_available_port_v4();
    let loopback = Ipv4Addr::new(127, 0, 0, 1);
    let socket_addr = SocketAddr::new(IpAddr::V4(loopback), available_port);

    // Set up server side (listen):
    let listener = TokioTcpListener::bind(&socket_addr).unwrap();

    let mut incoming_conns = listener.incoming().compat();
    let fut_server_tcp_stream = incoming_conns.next();

    // Set up client side (connect):
    let fut_client_tcp_stream = TcpStream::connect(&socket_addr).compat();

    let (opt_server_tcp_stream, opt_client_tcp_stream) =
        await!(fut_server_tcp_stream.join(fut_client_tcp_stream));

    println!("Connection established");

    let (_server_sender, server_receiver) =
        tcp_stream_to_conn_pair_01(opt_server_tcp_stream.unwrap().unwrap(), max_frame_length);
    let (client_sender, _client_receiver) =
        tcp_stream_to_conn_pair_01(opt_client_tcp_stream.unwrap(), max_frame_length);

    println!("Client sender dropped");
    // Close the client sender:
    drop(client_sender);

    // Expect the server to notice that the connection was closed:
    let mut server_receiver = server_receiver.compat();
    println!("Wait for server to notice connection was closed...");
    let opt_message = await!(server_receiver.next());
    // --- We don't reach here ---
    assert!(opt_message.is_none());
}

fn main() {
    let mut thread_pool = ThreadPool::new().unwrap();
    thread_pool.run(task_tcp_stream_to_conn_pair_drop_sender());
}

My Cargo.toml:

[package]
name = "check_tokio_drop_sender"
version = "0.1.0"
authors = ["real <real@somedomain.org>"]
edition = "2018"

[dependencies]

tokio = "0.1"
futures_01 = { version = "0.1", package = "futures" }
futures-preview = {version = "0.3.0-alpha.15", features = ["compat"] }
bytes = "0.4"

Compiler version:

$ rustc --version
rustc 1.35.0-nightly (3eb4890df 2019-03-19)

Note that in the example above I am dropping client_sender, but server_receiver is not notified.
I noticed that if I drop the whole TcpStream (of the client), server_receiver is notified about the closed connection:

let (_server_sender, server_receiver) =
        tcp_stream_to_conn_pair_01(opt_server_tcp_stream.unwrap().unwrap(), max_frame_length);
drop(opt_client_tcp_stream);
let opt_message = await!(server_receiver.next());
assert!(opt_message.is_none());

Any ideas are appreciated!

0 Likes

#2

I haven’t run into this specific issue with TcpStream, but when testing and stuff, the usual approach seems to be to call incoming_conns.take(1) if you only want one. In any case, next does not wait until the connection is closed, just the next item on the stream.

For me you either do something like:

loop
{
   match await!( incoming.next() )
   {
       Some( conn ) => do something
       None         => break;
   }
}

Or you call take(x) to only get x items…

0 Likes

#3

Stream::split returns a pair of wrappers around the underlying Stream + Sink, these use a lock around the stream to allow both to access it when needed. That means that the TcpStream has no way to know when you drop the SplitSink, only once you drop both will the TcpStream be dropped.

You could try calling close before dropping the Sink, if everything is hooked up correctly this should tell the TcpStream to close the sending side. (In general whenever you’re working with a Sink or AsyncWrite you should be explicitly closing it rather than just dropping it since closing may require IO to occur, which means dropping must either block on this IO, push it off to a background thread, or just not close properly).

0 Likes

#4

@najamelan: Thanks for your reply! I am confused by your idea, from my experience a channel that is closed results in getting a None from incoming.next().

@Nemo157: Interesting idea. I just tried to close the receiver side too. It seems like if I close both the sender and the receiver the connection is indeed closed. Maybe I need to add some code that closes both the receiver and the sender if I want to close the connection.

About using close(): I tried to add client_sender.close() in the code I sent above, but I got the same result: the server can not notice that the connection was closed. I thought it might be due to wrapping the original sender using the with combinator, so I also tried to run close() over the original sender I get directly from the split() call. It didn’t seem to close the connection either.

I still wonder what would be “the correct way” of solving this kind of problem. In my original code the senders and receivers are wrapped in many layers of combinators and abstractions, and I believe that this is what usually happens with Futures code. Eventually I only have access to a pair of (sender, receiver). I expect to be able to close the connection somehow. I imagined that doing something like drop(sender) or sender.close() should be enough.

0 Likes

#5

Ah, it looks like Tokio doesn’t currently support closing only the sender side of a TcpStream, its AsyncWrite::shutdown implementation does nothing. There appears to be a workaround by writing a wrapper that implements the function correctly (after reading the linked issue I’m still not sure why the actual implementation cannot just be changed to do this).

0 Likes

#6

@Nemo157: Your advice was very helpful! I just managed to apply it and solve the shutdown problem I had in my original code. For future reference, this is the function I where I applied the solution:

/// Convert a connection pair (sender Sink, receiver Stream) of Futures 0.1
/// to a pair of (mpsc::Sender, mpsc::Receiver) of Futures 0.3.
fn conn_pair_01_to_03<T, ST, SI, S>(
    conn_pair_01: (SI, ST),
    spawner: &mut S,
) -> (mpsc::Sender<T>, mpsc::Receiver<T>)
where
    T: Send + 'static,
    ST: Stream01<Item = T> + Send + 'static,
    SI: Sink01<SinkItem = T> + Send + 'static,
    S: Spawn + Send,
{
    let (sender_01, receiver_01) = conn_pair_01;

    let (mut user_sender_03, from_user_sender_03) = mpsc::channel::<Result<T, ()>>(0);
    let (to_user_receiver_03, mut user_receiver_03) = mpsc::channel::<Result<T, ()>>(0);

    // Forward messages from user_sender:
    let from_user_sender_01 = Compat::new(from_user_sender_03).map_err(|_| ());

    let sender_01 = sender_01
        .sink_map_err(|_| ())
        .with(|t: T| -> Result<T, ()> { Ok(t) });

    let send_forward_03 = sender_01.send_all(from_user_sender_01).compat().map(|_| ());

    drop(spawner.spawn(send_forward_03));

    // Forward messages to user_receiver:
    let to_user_receiver_01 = Compat::new(to_user_receiver_03)
        .sink_map_err(|_| ())
        .with(|t: T| -> Result<Result<T, ()>, ()> { Ok(Ok(t)) });

    let receiver_01 = receiver_01.map_err(|_| ());

    let recv_forward_01 = to_user_receiver_01
        .send_all(receiver_01)
        .compat()
        .map(|_| ());

    // We keep a handle, to be able to cancel the recv task later:
    let opt_recv_handle = spawner.spawn_with_handle(recv_forward_01);

    // We want to give the user sender and receiver of T (And not Result<T,()>),
    // so another adapting layer is required:

    let (user_sender, mut from_user_sender) = mpsc::channel::<T>(0);
    let (mut to_user_receiver, user_receiver) = mpsc::channel::<T>(0);

    // Forward user_receiver:
    let opt_user_receiver = spawner.spawn_with_handle(async move {
        while let Some(Ok(data)) = await!(user_receiver_03.next()) {
            if let Err(_) = await!(to_user_receiver.send(data)) {
                warn!("conn_pair_01_to_03(): to_user_receiver.send() error");
                return;
            }
        }
    });

    // Forward user_sender:
    let _ = spawner.spawn(async move {
        while let Some(data) = await!(from_user_sender.next()) {
            if let Err(_) = await!(user_sender_03.send(Ok(data))) {
                warn!("Forward user_sender error");
                break;
            }
        }
        // The user closed the sender. We close the connection aggressively.
        // We have to drop all the receiver tasks, because closing the sender is not enough for
        // closing the connection.
        //
        // See also: 
        // https://users.rust-lang.org/t/
        //      tokio-tcp-connection-not-closed-when-sender-is-dropped-futures-0-3-compat-layer/26910/4
        drop(opt_recv_handle);
        drop(opt_user_receiver);
    });

    (user_sender, user_receiver)
}

The line that solves the problem is drop(opt_recv_handle);

0 Likes