Tokio share stream between thread

Hi!
I am trying to make something like a client-server socket connection with Tokio.

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
    println!("connected to server");
    let (read, mut write) = stream.split();

    tokio::spawn(async move {
        let stdin = stdin();
        let mut line = String::new();
        loop {
            stdin.read_line(&mut line).unwrap();
            write.write_all(&line.as_bytes()).await.unwrap();
            line.clear();
        }
    });

    tokio::spawn(async move {
        loop {
            let mut reader = BufReader::new(read);
            let mut msg = String::new();

            reader.read_line(&mut msg);
            println!("{}", msg);
        }
    });

    loop {}
}

The thing is ```stream```` is borrowed by the first thread and I am not even sure I should use 2 threads for that but if I use only one thread the program blocks on reading the stream while there is nothing to read.

How can I solve that?

Use .into_split() instead.

2 Likes

An aside: when you call tokio::spawn, you're not creating threads, you're creating tasks.

The difference between a thread and a task is that the OS pre-emptively schedules threads; Tokio co-operatively schedules tasks onto threads, which the OS preempts. With threads, you don't need to care what you put in the thread - if it runs too long, and there's something else to do, the OS will forcibly switch to the other thing.

With tasks, however, you need to worry about wall-clock time between .awaits. At a .await, your task can return control to the Tokio scheduler, and Tokio can decide to run a different task. If you don't hit a .await frequently enough, however, Tokio can't let the other task run, because it never gets a chance to reschedule. This exhibits itself as a program that runs fine most of the time, but sometimes has a task blocked and you can't quite see why.

In your case, you've got this problem - you're using std::io::stdin, which can block indefinitely, and not tokio::io::stdin, which needs .await on its read_line variant. You want to use tokio::io::AsyncBufReadExt so that you've got an async read_line, and replace std::io with tokio::io so that you're using the async stdin. Because of the way Tokio works, you'll also need to wrap the async stdin in a BufReader so that you can use read_line on it - otherwise, there's nowhere to store any excess data that's read after you read a line.

1 Like

what do you think about this code? do you see any error?

use std::error::Error;
use tokio::{
    io::{stdin, AsyncBufReadExt, AsyncWriteExt, BufReader},
    net::TcpStream,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let stream = TcpStream::connect("127.0.0.1:8080").await?;
    println!("connected to server");
    let (read, mut write) = stream.into_split();

    tokio::spawn(async move {
        let stdin = stdin();
        let mut reader = BufReader::new(stdin);
        let mut line = String::new();
        loop {
            reader.read_line(&mut line).await.unwrap();
            write.write_all(&line.as_bytes()).await.unwrap();
            line.clear();
        }
    });

    tokio::spawn(async move {
        let mut reader = BufReader::new(read);
        loop {
            let mut msg = String::new();

            reader.read_line(&mut msg).await.unwrap();
            println!("{}", msg);
        }
    });
}
 handler1.await?;
 handler2.await?;

    Ok(())```

That looks a lot better, yes. You now only ever hit long delays when you .await something - and that's exactly what you want, because when you .await something, you give the Tokio scheduler a chance to run something else on the same thread.

1 Like

I love this community!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.