Does Tokio::net::TcpStream::connect really block?

I have this code

#![allow(unreachable_code)]

use anyhow::Result;
use futures::{stream::futures_unordered::FuturesUnordered, FutureExt, StreamExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{
    tcp::{OwnedReadHalf, OwnedWriteHalf},
    TcpStream,
};

pub async fn test_function_read(mut reader: OwnedReadHalf) -> Result<()> {
    let mut bytes = [0; 1024];
    loop {
        let num_bytes_read = reader.read(&mut bytes).await?;
        println!("Received {:?}", &bytes[..num_bytes_read]);
    }
    Ok(())
}

//Actually would have this listening to a channel and writing whatever comes but for simplicity sake, will just take what to write as an argument.
//I really don't think this fundamentally changes the problem.
pub async fn test_function_write(mut writer: OwnedWriteHalf, bytes: &[u8]) -> Result<()> {
    loop {
        writer.write_all(bytes).await?;
        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
    }
    Ok(())
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
    let will_connect = TcpStream::connect("127.0.0.1:1234").await?;
    let wont_connect = TcpStream::connect("127.0.0.1:2345").await?;
    let bytes1 = [0; 1024];
    let bytes2 = [0; 1024];

    let (will_read, will_write) = will_connect.into_split();
    let (wont_read, wont_write) = wont_connect.into_split();

    //I am doing this as I am only running on a single thread and thus don't have need to share across multiple threads, which forces me to use Arc or &'static references.
    //In some cases it forces me to do use Arc<Mutex<OwnedReadHalf>.
    //I'm actually using tokio Frames but, OwnedReadHalf captures the idea for this example.
    let mut futures_unordered = FuturesUnordered::new();

    futures_unordered.push(test_function_read(will_read).boxed());
    futures_unordered.push(test_function_write(will_write, &bytes1[0..10]).boxed());
    futures_unordered.push(test_function_read(wont_read).boxed());
    futures_unordered.push(test_function_write(wont_write, &bytes2[0..10]).boxed());

    while let Some(result) = futures_unordered.next().await {
        match result {
            Ok(()) => {
                // Handle the successful completion of a future.
            }
            Err(e) => {
                println!("Encountered error {e:?} when executing future");
            }
        }
    }

    Ok(())
}

The behavior I would expect is that the will connect call would succeed as there is a server listening on that address. The won't connect would eventually fail is that is there to simulate a server that died and isn't listening anymore. But that it wouldn't block and would allow the successful connection io to happen. Am I missing something? How can I make tokio::net::TcpStream::connect nonblocking to other async tasks?
As a note, when testing with localhost the connection immediately fails, but when I spin up a vm and have it connect through the VM bridge it takes a little longer, when running the actual version of this code on a real network, it takes much longer to fail and thus, blocks for much longer.

You explicitly tell Tokio that you want to suspend the task until connect has completed, by putting a .await in there. Any .await is a point at which the task can be suspended until the thing you are .awaiting has completed execution.

However, if you change things so that you do TcpStream::connect inside the FuturesUnordered (or in a separate task), you should get concurrency:

#![allow(unreachable_code)]

use anyhow::Result;
use futures::{stream::futures_unordered::FuturesUnordered, FutureExt, StreamExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{
    tcp::{OwnedReadHalf, OwnedWriteHalf},
    TcpStream,
};
use tokio::try_join;

pub async fn test_function_read(mut reader: OwnedReadHalf) -> Result<()> {
    let mut bytes = [0; 1024];
    loop {
        let num_bytes_read = reader.read(&mut bytes).await?;
        println!("Received {:?}", &bytes[..num_bytes_read]);
    }
    Ok(())
}

//Actually would have this listening to a channel and writing whatever comes but for simplicity sake, will just take what to write as an argument.
//I really don't think this fundamentally changes the problem.
pub async fn test_function_write(mut writer: OwnedWriteHalf, bytes: &[u8]) -> Result<()> {
    loop {
        writer.write_all(bytes).await?;
        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
    }
    Ok(())
}

async fn connect_and_test(address: &str) -> Result<()> {
    let conn = TcpStream::connect(address).await?;
    
    let bytes = [0;1024];
    
    let (read, write) = conn.into_split();
    
    try_join!(test_function_read(read), test_function_write(write, &bytes[0..10]))?;
    
    Ok(())
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
    let mut futures_unordered = FuturesUnordered::new();

    futures_unordered.push(connect_and_test("127.0.0.1:1234"));
    futures_unordered.push(connect_and_test("127.0.0.1:2345"));

    while let Some(result) = futures_unordered.next().await {
        match result {
            Ok(()) => {
                // Handle the successful completion of a future.
            }
            Err(e) => {
                println!("Encountered error {e:?} when executing future");
            }
        }
    }

    Ok(())
}
1 Like

Thanks! I'll give that a try.

The point of async/await is to make things such as tokio::spawn really cheap. But you still have to use tokio::spawn or a similar utility for things to run at the same time. It's not automatic.

1 Like

It sounds like you're doing something wrong. If you ever write Arc<Mutex<SomeSortOfIOResource>>, you've almost certainly made a mistake somewhere. And the multi-threaded runtime certainly does not force you to write that.

1 Like

My understanding is that the FuturesUnordered also gave me concurrency? I push all the async tasks into the futures_unordered and then just await the next ready future. The reason I wasn't doing tokio::spawn is that it required me to pull in Arc in a lot of references and in some cases Arc<Mutex> in some cases, but my understanding of the current_thread flavor is that it will only ever execute on a single thread, so that seems unnecessary to sprinkle that throughout the codebase when everything will only ever run on a single thread.

Yes, FuturesUnordered is a tool similar to tokio::spawn and will also give you concurrency. But tokio::spawn will usually perform better.

As for Arc, that type is only useful for values that are shared between multiple tasks. Your example has no sharing of values between tasks, so you can write it with tokio::spawn without Arc. As for Mutex, that is only needed when you have values that are not only shared, but also mutable.

Usually, when I see people wrap IO resources in Arc/Mutex, the solution is one of two things:

  • You only ever use it in one task. So stop attempting to share it between several tasks.
  • You legitimately need shared access to the io resource. In that case, you should use the actor pattern instead of Arc/Mutex.

(The situation is different from plain old data such as a HashMap.)

4 Likes

So what still confuses me is that I call TcpStream::connect("").await?;
In the actual code, one level up from this logic is a TcpListener that is listening for incoming connections for a loop and then calling spawn and passing it the function that contains the posted logic.
My understanding is that what I call .await it will attempt to connect and if it cannot make progress will not hold up other tokio tasks spawned one level up.

loop {
        let (stream, addr) = listener.accept().await?;
        info!("Accepted connection from {addr:?}");
        //read stuff from stream then spawn posted logic
        tokio::spawn(posted_logic(/*parameters*/));
}

The behavior I see is that all the other spawned tasks are blocked due to the TcpStream::connect().await? trying to connect to something that is either not there or died.
So the one task wouldn't be able to make progress and would yield to other waiting tasks that may be awaiting on the read part of the function and can make progress

I don't think this is the actual problem, but note that “one level up” doesn't matter; only the spawn does.

  • When a task is spawn()ed it will always execute independently (as far as the executor is not prevented from doing so).
  • Any time you f.await, you're saying "make the future f part of this task” — no concurrency is added that wasn't alreasy present.

The behavior I see is that all the other spawned tasks are blocked due to the TcpStream::connect().await? trying to connect to something that is either not there or died.

As long as you are not accidentally performing a synchronous blocking operation, Tokio tasks should always make progress independently.

Could you post the code you have which uses connect() with spawned tasks? So far we've only seen your original code that uses two connect().awaits in sequence.

1 Like

If tasks are mysteriously stopping at an .await that should complete, then you may be blocking the thread. You can tell whether that is the case by using tokio-console - blocking tasks show up with a busy duration that keeps increasing, but the poll count doesn't change.

Thanks. I'll take a look at that. I did dumber/unscientific test where I just put a tracing statement before and after and would stop seeing messages from other tasks come in and would see the "Attempting to connect" message but would never see the Connected message, instead after the timeout the .await? would return an Err and the task would exit.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.