How to use tokio::mpsc::channel in async functions?

I'm trying to use mpsc channels to share an http client among a certain number of tasks.

Base Example

use tokio::sync::{oneshot, mpsc};
use hyper::body;
use hyper::{Client, Body, Uri};
use hyper_tls::HttpsConnector;
use std::sync::mpsc::Receiver;    

#[tokio::main]
async fn main() -> Result<()> {
    let (http_tx, mut http_rx) = mpsc::channel::<(&str, oneshot::Sender<String>)>(100);
    
    tokio::spawn(async move {
        let https = HttpsConnector::new();
        let client = Client::builder().build::<_, Body>(https);


        while let Some((url, response)) = http_rx.recv().await {
            let resp = client.get(Uri::from_static(url)).await;

            let body_bytes = body::to_bytes(resp.unwrap().into_body()).await;
            let body = String::from_utf8(body_bytes.unwrap().to_vec()).expect("response was not valid utf-8");
        
            response.send(body).unwrap();
        }
     });

  // [.....] Some unrelated code omitted

    Ok(())
}

The above code works fine but I'd like to move the task in a separate function. I tried a couple of times but I get compile errors

Attempt

use tokio::sync::{oneshot, mpsc};
use hyper::body;
use hyper::{Client, Body, Uri};
use hyper_tls::HttpsConnector;
use std::sync::mpsc::Receiver;

async fn http_layer<T>(rx_chan: Receiver<T>) -> Result<()> {
    let https = HttpsConnector::new();
    let client = Client::builder().build::<_, Body>(https);

    while let Some((url, response)) = rx_chan.recv().await {
        let resp = client.get(Uri::from_static(url)).await;

        let body_bytes = body::to_bytes(resp.unwrap().into_body()).await;
        let body = String::from_utf8(body_bytes.unwrap().to_vec()).expect("response was not valid utf-8");
    
        response.send(body).unwrap();
    }

    Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
    let (http_tx, mut http_rx) = mpsc::channel::<(&str, oneshot::Sender<String>)>(100);
    
    let http_layer = http_layer(http_rx);

  // [.....] Some unrelated code omitted

    Ok(())
}

These are the errors that I get when running cargo build, I have no experience with tokio and rust in general so any help from someone more experienced than me would be really appreciated

error[E0277]: the trait bound std::result::Result<T, std::sync::mpsc::RecvError>: std::future::Future is not satisfied
--> src/main.rs:14:39
|
14 | while let Some((url, response)) = rx_chan.recv().await {
| ^^^^^^^^^^^^^^^^^^^^ the trait std::future::Future is not implemented for std::result::Result<T, std::sync::mpsc::RecvError>

error[E0308]: mismatched types
--> src/main.rs:30:33
|
30 | let http_layer = http_layer(http_rx);
| ^^^^^^^ expected struct std::sync::mpsc::Receiver, found struct tokio::sync::mpsc::bounded::Receiver
|
= note: expected struct std::sync::mpsc::Receiver<_>
found struct tokio::sync::mpsc::bounded::Receiver<(&str, tokio::sync::oneshot::Sender<std::string::String>)>

error: aborting due to 2 previous errors

Some errors have detailed explanations: E0277, E0308.
For more information about an error, try rustc --explain E0277.
error: could not compile hypertest.

To learn more, run the command again with --verbose.

Thanks!

You are using the std receiver.

1 Like

Oops my bad! :smile: it was automatically included by a vscode extension and I didn't check. I'll change and see if I can work out a solution

I finally have a working example, It might be useful to anyone reading this thread. Any advice on possible improvements of this code will be really appreciated.

use hyper::body;
use hyper::{Body, Client, Uri};
use hyper_tls::HttpsConnector;
use tokio::sync::{mpsc, oneshot};

type FutureResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync + 'static>>;

async fn https_layer(
    mut rx_chan: mpsc::Receiver<(&'static str, oneshot::Sender<String>)>,
) -> FutureResult<()> {
    let https = HttpsConnector::new();
    let client = Client::builder().build::<_, Body>(https);

    while let Some((url, response)) = rx_chan.recv().await {
        let resp = client.get(Uri::from_static(url)).await?;

        let body_bytes = body::to_bytes(resp.into_body()).await?;
        let body = String::from_utf8(body_bytes.to_vec())?;

        response.send(body)?;
    }

    Ok(())
}

async fn get_html(
    url: &'static str,
    tx_chan: &mut mpsc::Sender<(&'static str, oneshot::Sender<String>)>,
) -> FutureResult<()> {
    let (resp_tx, resp_rx) = oneshot::channel();

    tx_chan.send((url, resp_tx)).await?;

    let res = resp_rx.await?;

    println!("previous value = {}", res);

    Ok(())
}

#[tokio::main]
async fn main() -> FutureResult<()> {
    let (mut http_tx, http_rx) = mpsc::channel::<(&'static str, oneshot::Sender<String>)>(100);

    let join_handle = tokio::spawn(https_layer(http_rx));

    get_html("https://stackoverflow.com", &mut http_tx).await?;
    get_html("https://google.com", &mut http_tx).await?;

    join_handle.await?;
    Ok(())
}

Be aware that the &'static str type can only be used for strings you literally hard-coded in the source code, except some technical details that involve permanently leaking memory. You may want to consider String instead.