How can i send and receive data using tokio channel?

hello everyone

i am trying to figure out a way to send and receive data using tokio channel, i did manage to send the data, but the application blocks after all tasks are finished.

This is the code i wrote, suggestion on how to improve the code and make it run faster are welcomed

use std::time::Instant;
use std::error::Error;
use std::collections::HashMap;

use url::Url;
use futures::future::join_all;
use console::style;
use tokio::net::TcpStream;
use tokio::io::AsyncWriteExt;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncReadExt;
use tokio::io::BufReader;
use regex::Regex;
use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::Sender;

async fn len<'a>(link: String, mut tx: Sender<(String, usize)>) {
    let url = Url::parse(link.as_str()).unwrap();
    let host = url.host_str().unwrap();
    let path = url.path();

    let mut stream = TcpStream::connect(format!("{}:80", host).as_str()).await.unwrap();
    let req = vec![
        format!("GET {} HTTP/1.1", path).as_str(),
        format!("Host: {}", host).as_str(),
        "\r\n",
    ].join("\r\n");

    stream.write_all(req.as_bytes()).await.unwrap();

    let mut line = String::new();
    let mut reader = BufReader::new(stream);
    let mut len = 0;

    reader.read_line(&mut line).await.unwrap();

    while let Ok(size) = reader.read_line(&mut line).await {
        if size == 0 {
            break;
        }

        if line.starts_with("Content-Length: ") {
            let re = Regex::new(r"Content-Length: (\d+)").unwrap();
            len = re.captures(line.as_str()).unwrap()
                .get(1).unwrap()
                .as_str().parse::<usize>().unwrap();
        }

        if line == "\r\n" {
            break
        }

        line.truncate(0);
    }

    let mut content = vec![0; len];
    reader.read_exact(&mut content).await.unwrap();

    let content = String::from_utf8_lossy(&content);
    let re = Regex::new(r"/ebooks/(\d+)").unwrap();
    let len = re.captures_iter(&content).count();

    //println!("{} -> {}", url.as_str(), style(len).bold());
    tx.send((link, len)).await;
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    println!("[START]");

    let now = Instant::now();
    let mut tasks = vec![];
    let mut res = HashMap::new();
    let (mut tx, mut rx) = channel(100);

    let authors = vec![
        'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n',
        'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
    ];

    for author in authors {
        let mut tx = tx.clone();
        let url = format!("http://127.0.0.1:8081/authors/{}", author);

        res.insert(url.clone(), 0usize);

        let task = tokio::spawn(async move {
            len(url, tx).await;
        });

        tasks.push(task);
    }

    join_all(tasks).await;

    while let Some((url, len)) = rx.recv().await {
        println!("{} -> {}", url, len);
    }

    println!("\n > {} seconds", style(now.elapsed().as_secs_f64()).bold());

    Ok(())
}

The program hangs because there is still one Sender that hasn't been dropped yet: the tx variable in the main function. Add a drop(tx); call after the for loop to fix this. (Playground)

2 Likes

Generally I don't think there's any advantage to using join_all on a vector of join handles to something you spawned. A for loop that simply awaits them sequentially has the same effect due to the spawn, but join_all does more work than the for loop due to it polling all the join handles simultaneously. Finally the current approach will not notice if any of the spawned tasks panic: the join handles returns a result in this case, but you ignore it.

You may also want to consider using join handles to send the messages instead of the channel. Any value you return at the end of the spawn is returned by the join handle.

thank you @mbrubeck for your response, once i droped tx the application runs as expected

@alice i did remove join_all and the channel, but i received an error that said Task is canceled the error is gone after i restore join_all can please explain to me how can i wait for tasks to finish without using join_all ?

With

for task in tasks {
    task.await.unwrap();
}
1 Like

Thank you @alice that works like a sharm :grinning:

for the channel, i am trying the get back the number of links found on the page, and keep the order by which the tasks are invoked


#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    println!("[START]\n");

    let now = Instant::now();
    let mut tasks = vec![];
    let mut link_len = HashMap::new();

    let authors = vec![
        'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n',
        'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
    ];

    for author in authors {
        let url = format!("http://127.0.0.1:8080/authors/{}", author);

        let task = tokio::spawn(async move {
            let (link, len) = len(url).await;
            link_len.insert(link, len);
        });

        tasks.push(task);
    }

    for task in tasks {
        task.await?;
    }

    // Keep the data in the hashmap in the same order of the for loop
    // store the results somewhere

    println!("\n > {} seconds", style(now.elapsed().as_secs_f64()).bold());

    Ok(())
}

but the hashmap get dropped inside of the spawn, when you said use the return value, what exactly did you mean?

You can return like this:

let task = tokio::spawn(async move {
    let (link, len) = len(url).await;
    (link, len)
});

Then in the for loop:

for task in tasks {
    let (link, len) = task.await?;
    link_len.insert(link, len);
}

Generally you cannot easily share a hash map between multiple tasks — that would require an Arc<Mutex<HashMap<...>> and involve locking.

1 Like

for anyone coming across this thread, this is the final working application.

suggestion on how to improve are welcomed :grinning:

thank you @alice for your help

use std::time::Instant;
use std::error::Error;

use url::Url;
use console::style;
use tokio::net::TcpStream;
use tokio::io::AsyncWriteExt;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncReadExt;
use tokio::io::BufReader;
use regex::Regex;

async fn len<'a>(link: String) -> (String, usize, f64) {
    let now = Instant::now();
    let url = Url::parse(link.as_str()).unwrap();
    let host = url.host_str().unwrap();
    let path = url.path();

    let mut stream = TcpStream::connect(format!("{}:80", host).as_str()).await.unwrap();
    let req = vec![
        format!("GET {} HTTP/1.1", path).as_str(),
        format!("Host: {}", host).as_str(),
        "\r\n",
    ].join("\r\n");

    stream.write_all(req.as_bytes()).await.unwrap();

    let mut line = String::new();
    let mut reader = BufReader::new(stream);
    let mut len = 0;

    reader.read_line(&mut line).await.unwrap();

    while let Ok(size) = reader.read_line(&mut line).await {
        if size == 0 {
            break;
        }

        if line.starts_with("Content-Length: ") {
            let re = Regex::new(r"Content-Length: (\d+)").unwrap();
            len = re.captures(line.as_str()).unwrap()
                .get(1).unwrap()
                .as_str().parse::<usize>().unwrap();
        }

        if line == "\r\n" {
            break
        }

        line.truncate(0);
    }

    let mut content = vec![0; len];
    reader.read_exact(&mut content).await.unwrap();

    let content = String::from_utf8_lossy(&content);
    let re = Regex::new(r"/ebooks/(\d+)").unwrap();
    let len = re.captures_iter(&content).count();

    let author = url.path_segments().unwrap().last().unwrap();
    let took = now.elapsed().as_secs_f64();

    println!("{}\t{} seconds", link, took);

    (author.to_string(), len, took)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    println!("[START]\n");

    let now = Instant::now();
    let mut tasks = vec![];
    let mut results = vec![];

    let authors = vec![
        'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n',
        'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
    ];

    for author in authors {
        let url = format!("http://127.0.0.1/authors/{}", author);

        let task = tokio::spawn(async move {
            let (author, len, took) = len(url).await;
            (author, len, took)
        });

        tasks.push(task);
    }

    for task in tasks {
        let (author, len, took) = task.await?;

        results.push((author, len, took));
    }

    println!();

    for (author, len, took) in results {
        println!(
            "Author: {}\tNumber of articles: {:>6}\tTook: {} seconds",
            style(author).bold().red(),
            style(len).bold().green(),
            style(took).cyan()
        );
    }

    println!("\n > {} seconds", style(now.elapsed().as_secs_f64()).bold());

    Ok(())
}

Remember that the for loop will iterate the join handles in the order you put them in!

1 Like

@alice yes thats right, i updated the example

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.