Strange behaviour of tokio tcp client

I have been playing a bit with tokio - to see changes in latest version especially async fs interface and blocking code. I created simple server which would just send couple of lines of text and then very simple client. Now problem is with the client - if run with many concurrent connection it gets sometimes stuck - by meaning stuck it - (for 1000 or 2000 concurrent connections - more connections will hit open files limit) - it receives majority of messages and then stops - if I look into open connection with lsof -i4 -itp it should a number (~50) connection in established state. Client stay stuck forever in that state - even if server is closed. This problem was not experience in every client run, but in 30-40% of cases

To confirm that problem is in client I've done following:

  1. Run client against simple netcat script - while true; do echo Hey | nc -l 12345; done - I received indeed many connection refused errors, as netcat receives only one connection and then restarts, but still was able to see similar problem - client got stuck with several (this time less 5-10) pending established connections.
  2. Created a similar simple client in python asyncio (make it similar to tokio client) - python client worked like expected, it never got stuck, run smoothly (tokio client sometimes paused a half second or second) - for 1000 concurrent connections never had issue - for bit more it only complained about open files limit, but never issues similar to described above.

To continue with the weird story - initially tokio client only received bytes - I modified it (now commented in the code) to send small initial message - and it's behaviour changed - I started to get connection refused errors (although there was no reason why server should refuse them) - and it was even for small amount of concurrent connections - like 10 - about 20% of connections was refused - it grow slightly with increase of number of concurrent connections. And again I checked with python client - no problems here, even when it send same message.

I'm totally confused - what can cause strange behaviour? Looks like problem is in tokio client, but where I tried to make it as simple as possible, but problem is still there. Should I report as bug to tokio project?

So here is code for the client:

extern crate tokio;
extern crate futures;
extern crate tokio_io;
#[macro_use]
extern crate lazy_static;

use std::env;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::net::SocketAddr;
use tokio::prelude::*;
use tokio::net::TcpStream;
use std::time::{Instant};

const DEFAULT_CONNECTIONS:usize = 100;
const DEFAULT_SERVER:&str = "127.0.0.1:12345";

lazy_static! {
    static ref JOKES: AtomicUsize  = AtomicUsize::new(0);
    static ref BYTES: AtomicUsize = AtomicUsize::new(0);
}

fn main() {

    let count:usize = env::args().nth(1)
    .map(|x| x.parse().unwrap_or(DEFAULT_CONNECTIONS)).unwrap_or(DEFAULT_CONNECTIONS);
    let server = env::args().nth(2)
    .unwrap_or(DEFAULT_SERVER.into());
    let addr:SocketAddr =server.parse().unwrap();
    let mut rt = tokio::runtime::Runtime::new().unwrap();

    let start = Instant::now();
    for _i in 0..count {
        let client = TcpStream::connect(&addr)
        .map_err(|e| eprintln!("Connection Error: {:?}",e))
        .and_then(|socket| {
            // tokio::io::write_all(socket, b"hey\n\n")
            // .map_err(|e| eprintln!("Write error: {}",e))
            // .and_then(|(socket, _x)| {
            tokio::io::read_to_end(socket, vec![]).map(|(_, v)| {
                let prev = JOKES.fetch_add(1, Ordering::Relaxed);
                BYTES.fetch_add(v.len(), Ordering::Relaxed);
                println!("Got joke  {}", prev);
                })
                .map_err(|e| eprintln!("Read Error: {:?}",e))
        // })
        });
        rt.spawn(client);
    }

    rt.shutdown_on_idle().wait().unwrap();

    let dur = start.elapsed();

    println!("FINISHED - jokes {}, bytes {}, duration {}.{:03}", 
    JOKES.load(Ordering::Relaxed),
    BYTES.load(Ordering::Relaxed),
    dur.as_secs(),
    dur.subsec_nanos() / 1_000_000
    );
}

I've tried with stable and highly rustc and tokio 0.1.6. My platform is 64 bit linux (Ubuntu 16.04). For reference whole project in on github here. Here is the server - server logic is in first function - prepare_server and python reference client is there also python/client.py ( Sorry cannot put additional link as I'm new user)

Any ideas are welcomed because I ran of of any.

Yeah, I think there's something funky going on here (I tried this briefly locally). Here's the kicker: adding trace level logging to the client makes the whole test complete faster, and nothing is ever stuck! @izderadicka, I wonder if you see the same?

Another interesting thing worth trying is rewriting the client using tokio 0.1, and seeing what happens.

Have not tried more logging. What you mean by using tokio 0.1? I'm using 0.1.6. I'm thinking to try with current thread executor - but not exactly sure how. Or maybe just for beginning limit number of threads to 1. It looks like some nasty race condition?

Sorry, I meant using tokio-core (i.e. its Core loop abstraction), which was the "original" tokio.

If you mean tokio::runtime::current_thread::Runtime - Rust, I tried it as well -- it didn't help.

I tried reducing concurrency of both server and client to 1 and although it seems to help somewhat with avoiding hangs, it still hangs occasionally.

Yeah, something like that. The fact that trace level logging seems to make it complete faster and without hangs certainly suggests it, as the logging changes timing of code (I hope it doesn't actually have other unintentional side-effects).

Rewritten with tokio-core

extern crate tokio_core;
extern crate futures;
extern crate tokio_io;
#[macro_use]
extern crate lazy_static;

use std::env;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::net::SocketAddr;
use futures::future::Future;
use futures::stream::Stream;
use futures::sync::mpsc::channel;
use tokio_core::net::TcpStream;
use tokio_core::reactor::Core;
use std::time::{Instant};

const DEFAULT_CONNECTIONS:usize = 100;
const DEFAULT_SERVER:&str = "127.0.0.1:12345";

lazy_static! {
    static ref JOKES: AtomicUsize  = AtomicUsize::new(0);
    static ref BYTES: AtomicUsize = AtomicUsize::new(0);
}

fn main() {

    let count:usize = env::args().nth(1)
    .map(|x| x.parse().unwrap_or(DEFAULT_CONNECTIONS)).unwrap_or(DEFAULT_CONNECTIONS);
    let server = env::args().nth(2)
    .unwrap_or(DEFAULT_SERVER.into());
    let addr:SocketAddr =server.parse().unwrap();
    let mut rt = Core::new().unwrap();
    let handle = rt.handle();
    let (tx,rx) = channel::<()>(1);
    let start = Instant::now();
    for _i in 0..count {
        let mut txi = tx.clone();
        let client = TcpStream::connect(&addr, &handle)
        .map_err(|e| eprintln!("Connection Error: {:?}",e))
        .and_then(move |socket| {
            // tokio::io::write_all(socket, b"hey\n\n")
            // .map_err(|e| eprintln!("Write error: {}",e))
            // .and_then(|(socket, _x)| {
            tokio_io::io::read_to_end(socket, vec![]).map(move |(_, v)| {
                let prev = JOKES.fetch_add(1, Ordering::Relaxed);
                BYTES.fetch_add(v.len(), Ordering::Relaxed);
                println!("Got joke  {}", prev);
                if prev == count-1 {
                    txi.try_send(()).unwrap();
                }
                })
                .map_err(|e| eprintln!("Read Error: {:?}",e))
        // })
        });
        handle.spawn(client);
    }

    rt.run(rx.into_future()).unwrap();

    let dur = start.elapsed();

    println!("FINISHED - jokes {}, bytes {}, duration {}.{:03}", 
    JOKES.load(Ordering::Relaxed),
    BYTES.load(Ordering::Relaxed),
    dur.as_secs(),
    dur.subsec_nanos() / 1_000_000
    );
}

Same problem - although probably not so often - with 1000 concurrent socket about 15-20% got stuck around 900th reply.

@vitalyd what now? Log bug ticket on github?

Yeah, a ticket wouldn’t hurt. I wonder if trace logging unhinges the tokio_core version as well.