Error no method named 'poll' found for struct when I use tokio::select

what's wrong with my code?

use futures::FutureExt;
use log::{info, warn};
use rand::distributions::uniform::SampleBorrow;
use std::sync::{Arc, Mutex};
use std::thread;
use tokio::{
    select,
    sync::{
        mpsc,
        mpsc::{Receiver, Sender},
        oneshot,
    },
};
use tonic::{transport::Server, Request, Response, Status};
use uuid::Uuid;

pub struct FilWindowPostServer {
    pub server_info: Arc<Mutex<ServerInfo>>,
    pub do_task_signal_tx: Sender<String>,
    pub do_task_signal_rx: Receiver<String>,
}

impl Default for FilWindowPostServer {
    fn default() -> Self {
        let (mut task_signal_tx, mut task_signal_rx) = mpsc::channel(1);
        FilWindowPostServer {
            server_info: Arc::new(Mutex::new(ServerInfo::default())),
            do_task_signal_tx: task_signal_tx,
            do_task_signal_rx: task_signal_rx,
        }
    }
}

impl FilWindowPostServer {
    #[tokio::main]
    pub async fn run_task(&self, mut exit_rx: &oneshot::Receiver<&str>) {
        loop {
            select! {
                val = exit_rx => {
                   break;
                }
                flag = self.do_task_signal_rx => {
                    if flag == "Ok".to_string() {
                        let wi = self.server_info.lock().unwrap();
                        info!("start to do task: {}", wi.task_info.task_id);
                        drop(wi);
                    } else {
                        continue;
                    }
                }
            };
        }
    }
}

[dependencies]
storage-proofs-core = { path = "../storage-proofs-core", version = "^10.0.0", default-features = false }
storage-proofs-porep = { path = "../storage-proofs-porep", version = "^10.0.0", default-features = false }
storage-proofs-post = { path = "../storage-proofs-post", version = "^10.0.0", default-features = false }
filecoin-hashers = { version = "^5.0.0", path = "../filecoin-hashers", default-features = false, features = ["poseidon", "sha256"] }
bitvec = "0.17"
rand = "0.8"
lazy_static = "1.2"
memmap = "0.7"
byteorder = "1"
itertools = "0.9"
serde = { version = "1.0", features = ["rc", "derive"] }
serde_json = "1.0"
ff = "0.11.0"
blake2b_simd = "0.5"
bellperson = "0.18.0"
log = "0.4.7"
fil_logger = "0.1"
rayon = "1.1.0"
blake2s_simd = "0.5.8"
hex = "0.4.0"
merkletree = "0.21.0"
bincode = "1.1.2"
anyhow = "1.0.23"
rand_xorshift = "0.3.0"
sha2 = "0.9.1"
typenum = "1.11.2"
gperftools = { version = "0.2", optional = true }
generic-array = "0.14.4"
group = "0.11.0"
byte-slice-cast = "1.0.0"
fr32 = { path = "../fr32", version = "^3.0.0", default-features = false }
once_cell = "1.8.0"
blstrs = "0.4.0"
tonic = "0.5"
prost = "0.8"
tokio = { version = "1.0", features = ["full"] }
uuid = { version = "0.8", features = ["serde", "v4"] }
signal-hook = "0.3.10"
futures = "0.3"

Perhaps because std::future::Future isn't imported?

This doesn't seem to be the case. After importing this, the same error occurs

I'm not an expert, but my guess is: Future is implemented for Receiver<T>, not &Receiver<T>.

Try taking exit_rx: &mut oneshot::... instead of mut exit_rx: &oneshot::....

like this? still not working~ :rofl:

impl FilWindowPostServer {
    #[tokio::main]
    pub async fn run_task(&self, exit_rx: &mut oneshot::Receiver<&str>) {
        loop {
            select! {
                val = exit_rx => {
                   break;
                }
                flag = self.do_task_signal_rx => {
                    if flag == "Ok".to_string() {
                        let wi = self.server_info.lock().unwrap();
                        info!("start to do task: {}", wi.task_info.task_id);
                        drop(wi);
                    } else {
                        continue;
                    }
                }
            };
        }
    }
}

The first one couldn't work because you passed exit_rx by shared reference which means that it wouldn't have been possible for that function to mutate the receiver to receive the &str. This second version might have the same issue with the second future since self is passed by shared reference. You could try &mut self as the first argument to the function.

Alternatively if you have an error message, it would be helpful to diagnose the issue.

You need to use the recv when its an mpsc receiver.

i did it like this, it workers

pub async fn run_task(exit_rx: oneshot::Receiver<String>, mut do_task_signal_rx: UnboundedReceiver<String>, srv_info: Arc<Mutex<ServerInfo>>) {
    info!("task worker run");
    let mission = async {
        loop {
            match do_task_signal_rx.recv().await {
                Some(value) => if value == "ok".to_string() {
                    let si = srv_info.lock().unwrap();
                    info!("start to do task: {}", si.task_info.task_id);
                    drop(si);
                    use std::thread::sleep;
                    use std::time::Duration;
                    sleep(Duration::from_secs(5));
                    info!("task done");
                    let mut si = srv_info.lock().unwrap();
                    si.status=Free;
                    drop(si);
                },
                None => ()
            }
        }
    };

    select! {
        _ = exit_rx => {
            println!("worker received an exit command,will exit after current task done");
        }
        _ = mission => {
            panic!("task failed unexpected");
        }
    }
}

Using this method is a big no-no. Please read this blog post for why: Async: What is blocking? – Alice Ryhl

:rofl:got

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.