I'm trying to use mpsc channels to share an http client among a certain number of tasks.
Base Example
use tokio::sync::{oneshot, mpsc};
use hyper::body;
use hyper::{Client, Body, Uri};
use hyper_tls::HttpsConnector;
use std::sync::mpsc::Receiver;
#[tokio::main]
async fn main() -> Result<()> {
let (http_tx, mut http_rx) = mpsc::channel::<(&str, oneshot::Sender<String>)>(100);
tokio::spawn(async move {
let https = HttpsConnector::new();
let client = Client::builder().build::<_, Body>(https);
while let Some((url, response)) = http_rx.recv().await {
let resp = client.get(Uri::from_static(url)).await;
let body_bytes = body::to_bytes(resp.unwrap().into_body()).await;
let body = String::from_utf8(body_bytes.unwrap().to_vec()).expect("response was not valid utf-8");
response.send(body).unwrap();
}
});
// [.....] Some unrelated code omitted
Ok(())
}
The above code works fine but I'd like to move the task in a separate function. I tried a couple of times but I get compile errors
Attempt
use tokio::sync::{oneshot, mpsc};
use hyper::body;
use hyper::{Client, Body, Uri};
use hyper_tls::HttpsConnector;
use std::sync::mpsc::Receiver;
async fn http_layer<T>(rx_chan: Receiver<T>) -> Result<()> {
let https = HttpsConnector::new();
let client = Client::builder().build::<_, Body>(https);
while let Some((url, response)) = rx_chan.recv().await {
let resp = client.get(Uri::from_static(url)).await;
let body_bytes = body::to_bytes(resp.unwrap().into_body()).await;
let body = String::from_utf8(body_bytes.unwrap().to_vec()).expect("response was not valid utf-8");
response.send(body).unwrap();
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
let (http_tx, mut http_rx) = mpsc::channel::<(&str, oneshot::Sender<String>)>(100);
let http_layer = http_layer(http_rx);
// [.....] Some unrelated code omitted
Ok(())
}
These are the errors that I get when running cargo build
, I have no experience with tokio and rust in general so any help from someone more experienced than me would be really appreciated
error[E0277]: the trait bound
std::result::Result<T, std::sync::mpsc::RecvError>: std::future::Future
is not satisfied
--> src/main.rs:14:39
|
14 | while let Some((url, response)) = rx_chan.recv().await {
| ^^^^^^^^^^^^^^^^^^^^ the traitstd::future::Future
is not implemented forstd::result::Result<T, std::sync::mpsc::RecvError>
error[E0308]: mismatched types
--> src/main.rs:30:33
|
30 | let http_layer = http_layer(http_rx);
| ^^^^^^^ expected structstd::sync::mpsc::Receiver
, found structtokio::sync::mpsc::bounded::Receiver
|
= note: expected structstd::sync::mpsc::Receiver<_>
found structtokio::sync::mpsc::bounded::Receiver<(&str, tokio::sync::oneshot::Sender<std::string::String>)>
error: aborting due to 2 previous errors
Some errors have detailed explanations: E0277, E0308.
For more information about an error, tryrustc --explain E0277
.
error: could not compilehypertest
.To learn more, run the command again with --verbose.
Thanks!