How to use tokio::mpsc::channel in async functions?

I'm trying to use mpsc channels to share an http client among a certain number of tasks.

Base Example

use tokio::sync::{oneshot, mpsc};
use hyper::body;
use hyper::{Client, Body, Uri};
use hyper_tls::HttpsConnector;
use std::sync::mpsc::Receiver;    

#[tokio::main]
async fn main() -> Result<()> {
    let (http_tx, mut http_rx) = mpsc::channel::<(&str, oneshot::Sender<String>)>(100);
    
    tokio::spawn(async move {
        let https = HttpsConnector::new();
        let client = Client::builder().build::<_, Body>(https);


        while let Some((url, response)) = http_rx.recv().await {
            let resp = client.get(Uri::from_static(url)).await;

            let body_bytes = body::to_bytes(resp.unwrap().into_body()).await;
            let body = String::from_utf8(body_bytes.unwrap().to_vec()).expect("response was not valid utf-8");
        
            response.send(body).unwrap();
        }
     });

  // [.....] Some unrelated code omitted

    Ok(())
}

The above code works fine but I'd like to move the task in a separate function. I tried a couple of times but I get compile errors

Attempt

use tokio::sync::{oneshot, mpsc};
use hyper::body;
use hyper::{Client, Body, Uri};
use hyper_tls::HttpsConnector;
use std::sync::mpsc::Receiver;

async fn http_layer<T>(rx_chan: Receiver<T>) -> Result<()> {
    let https = HttpsConnector::new();
    let client = Client::builder().build::<_, Body>(https);

    while let Some((url, response)) = rx_chan.recv().await {
        let resp = client.get(Uri::from_static(url)).await;

        let body_bytes = body::to_bytes(resp.unwrap().into_body()).await;
        let body = String::from_utf8(body_bytes.unwrap().to_vec()).expect("response was not valid utf-8");
    
        response.send(body).unwrap();
    }

    Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
    let (http_tx, mut http_rx) = mpsc::channel::<(&str, oneshot::Sender<String>)>(100);
    
    let http_layer = http_layer(http_rx);

  // [.....] Some unrelated code omitted

    Ok(())
}

These are the errors that I get when running cargo build, I have no experience with tokio and rust in general so any help from someone more experienced than me would be really appreciated

error[E0277]: the trait bound std::result::Result<T, std::sync::mpsc::RecvError>: std::future::Future is not satisfied
--> src/main.rs:14:39
|
14 | while let Some((url, response)) = rx_chan.recv().await {
| ^^^^^^^^^^^^^^^^^^^^ the trait std::future::Future is not implemented for std::result::Result<T, std::sync::mpsc::RecvError>

error[E0308]: mismatched types
--> src/main.rs:30:33
|
30 | let http_layer = http_layer(http_rx);
| ^^^^^^^ expected struct std::sync::mpsc::Receiver, found struct tokio::sync::mpsc::bounded::Receiver
|
= note: expected struct std::sync::mpsc::Receiver<_>
found struct tokio::sync::mpsc::bounded::Receiver<(&str, tokio::sync::oneshot::Sender<std::string::String>)>

error: aborting due to 2 previous errors

Some errors have detailed explanations: E0277, E0308.
For more information about an error, try rustc --explain E0277.
error: could not compile hypertest.

To learn more, run the command again with --verbose.

Thanks!

You are using the std receiver.

1 Like

Oops my bad! :smile: it was automatically included by a vscode extension and I didn't check. I'll change and see if I can work out a solution

I finally have a working example, It might be useful to anyone reading this thread. Any advice on possible improvements of this code will be really appreciated.

use hyper::body;
use hyper::{Body, Client, Uri};
use hyper_tls::HttpsConnector;
use tokio::sync::{mpsc, oneshot};

type FutureResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync + 'static>>;

async fn https_layer(
    mut rx_chan: mpsc::Receiver<(&'static str, oneshot::Sender<String>)>,
) -> FutureResult<()> {
    let https = HttpsConnector::new();
    let client = Client::builder().build::<_, Body>(https);

    while let Some((url, response)) = rx_chan.recv().await {
        let resp = client.get(Uri::from_static(url)).await?;

        let body_bytes = body::to_bytes(resp.into_body()).await?;
        let body = String::from_utf8(body_bytes.to_vec())?;

        response.send(body)?;
    }

    Ok(())
}

async fn get_html(
    url: &'static str,
    tx_chan: &mut mpsc::Sender<(&'static str, oneshot::Sender<String>)>,
) -> FutureResult<()> {
    let (resp_tx, resp_rx) = oneshot::channel();

    tx_chan.send((url, resp_tx)).await?;

    let res = resp_rx.await?;

    println!("previous value = {}", res);

    Ok(())
}

#[tokio::main]
async fn main() -> FutureResult<()> {
    let (mut http_tx, http_rx) = mpsc::channel::<(&'static str, oneshot::Sender<String>)>(100);

    let join_handle = tokio::spawn(https_layer(http_rx));

    get_html("https://stackoverflow.com", &mut http_tx).await?;
    get_html("https://google.com", &mut http_tx).await?;

    join_handle.await?;
    Ok(())
}

Be aware that the &'static str type can only be used for strings you literally hard-coded in the source code, except some technical details that involve permanently leaking memory. You may want to consider String instead.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.