Some tokio nested tasks run several time more than other

Hi,

I am working on a project to manage some IoT like devices and so I want to manage unknown number of devices by rust.

I made some setup files that describe devices and initiated my app by those data.
Inside app I have some procedures that should run for each device and some general procedures that communicate with other parts of project.

here is a brief sample that shows it very roughly :

struct xitem {
    // some fields
}

impl xitem {
    // do some stuff
}

let item_list: Vec<xitem> = Vec::new();
// initial item_list from a toml config file
// for example 
// [xitem]
// data fileds for item 1
// [xitem]
// data fields for item 2

for item in item_list.iter(){
    tokio::spawn(async move {      // tasks generated for each item 
        loop {
            // check for some outbound data related to item
            // update item
            // send result to a shared queue for sharing to other part of data
        }
    });
}

tokio::spawn( async move {      // general task
    loop{
        // do some general stuff (send and receive data to/from other process)
    }
}).await;

So in code mentioned above it seems all tokio tasks initiated and start running
but stupidly task for some devices repeat several times (even hundred times) more than others!!!!

Also it seems my general task have opportunity to run just once although there is a loop to repeat inside it

Do you have any idea why some task running too much more than other?

I tried using

tokio::task::yield_now().await;

and also

delay_for(dead_delay).await;

inside tasks to throttle them and slow down but I do not understand why there is difference between tasks generated inside the for loop

Do you have tasks that spend a long time between .awaits anywhere? I would generally need to see what the tasks themselves do to give more help.

Hi Alice,

Yes, there are a lot of tasks that may seems complicated,

That's why I tried to make a rough sample to show it.
If I wanted to show entire code here it will takes too much space and unfortunately it is a private code for a client so I can not put it on github or same code repository.

But to explain more about it, every item,(lets call it IoT) generates some sensor data.

In the config file for every device I defined some job to check different data by separated schedule.
So every IoT device at initial has a different job list to do.

By that definition every task related to a device takes different time to complete so I could not make a normal loop (for example a for loop) to do them one by one and every task should check for its own next job by its own schedule.

Every task related to IoT devices generates data should put those data in a general queue so my general task can read it and send it to other parts of project. Also when there is a query come from outside related to any of those devices , the general task is responsible to listen for it and put it in the same queue, so related device can find it there and do the job.

Every single part of this complex seems working normally and data can flow between all needed parts.

But when I run it, the task for some devices run repeatedly faster and other device seems to have less opportunity to run although their code seems to be similar.
Also my general task that is responsible to communicate with other parts of project just run one time!!!

Please let me know which part seems more problematic so then I can explain it in more detail.

Thanks,

My guess is that you are blocking the thread. Can you talk a bit more about the kind of operations you perform. For example, what library or function do you use to talk to these devices? What kind of message passing channels do you use? Do you use any locks (e.g. mutexes)? Do you perform network requests? Do you have any expensive CPU bound computations?

Typically you can see if a task blocks the thread by looking at your .awaits. You should not have any code that spends more than a millisecond when going from one .await to another.

1 Like

Alice, your fast response made me surprise,

It seems you are doing async/await :slight_smile:

For my queue I used a mutex :

pub struct ImQueue {
pub mesg: Mutex<Option<VecDeque>>,
}
So repeatedly I have to lock and unlock this queue to check inside and push new messages to it.

and every device may need to do some tcp communication (all tcp communication by tokio ) depend on the job,

I mostly used tokio and its related library and also used std and chrono
for communication between parts of project I used protobuf

I also tried to define almost all functions async to hesitate blocking but for sure I did a lot of stupid time consuming things inside functions just to achieve a fast result and postponed make it clean after I got the result :smiley:

How do you wait for messages on that ImQueue? Please prefer code blocks (```) to quote blocks (>) for code.

Does the queue have multiple receivers?

Here is implemented function for queue that checks for a tag in the queue messages or sub messages (optional messages inside and message)

    pub async fn check_mesg_for(&self, callee: &str) -> Option<IpcMessage> {
        debug!("Check messages for {}", callee);
        let mut mesg = self.mesg.lock().unwrap();
        let chk = chrono::offset::Utc::now().timestamp();
        if let Some(queue) = mesg.as_mut() {
            let mut qlen = queue.len() as u32;
            while qlen > 0 {
                match queue.pop_front() {
                    Some(msg) => {
                        if msg.get_callee() == callee {
                            return Some(msg);
                        } else if msg.has_smsg() {
                            let smsg = msg.get_smsg().clone();
                            if smsg.get_subcallee() == callee {
                                return Some(msg);
                            }
                        }
                        if msg.get_valid_until() > chk {
                            queue.push_back(msg);
                        }
                        qlen -= 1;
                    }
                    None => return None,
                }
            }
            return None;
        } else {
            return None;
        }
    }

Yes,

every task in the app has access to this queue
so all of them can read it or write new message to it

Well, there's no .await inside that method, so it should not be marked async. It looks like you are filtering messages somehow? Are you calling check_mesg_for in a loop?

You are right, this method tries to filter this queue and if find any answer send it back

I call this function every time any of tasks try to find its related job iside this queue
So although I do not cal it in a close loop I call it repeatedly from diferent tasks

What do you do when it returns None?

Nothing,

It means there is no job for that device at that moment. So it just ignore and continue its job

Could you show a snippet where you insterted yield_now calls?

Here is part of my code generating task for every device

At the end of every task I tested using a delay_for(Duration) or yield_now()

By using delay_for I could throttle those tasks but yield_now had almost no effect

    let dt_pool = pool.clone();
    let xduration: u64 = 900;
    let mut periodic_act_list: Vec<PeriodicAct>;
    let dt_msg_tag = dt.msg_tag.clone().unwrap();
    periodic_act_list = match &dt.pr_act {
        None => Vec::new(),
        Some(actmap) => actmap.to_vec(),
    };
    tokio::spawn({
        let msg_queue = msg_queue.clone();
        async move {
            loop {
                for act in &mut *periodic_act_list {
                    if act.act_enabled {
                        match act.next_run {
                            None => {
                                act.next_run = std::time::SystemTime::now().checked_add(
                                    Duration::from_secs(match act.start_delay {
                                        None => 0,
                                        Some(start) => start,
                                    }),
                                )
                            }
                            _ => {}
                        }
                        let target = "stderr";
                        if act.next_run.unwrap().lt(&std::time::SystemTime::now()) {
                            match act.act_func.as_str() {
                                "Act_01" => dt.Act_01(&target).await,
                                "Act_02" => dt.Act_02(&target).await,
                                "Act_03" => dt.Act_03(&target).await,
                                "Act_04" => dt.Act_04(&target).await,
                                "Act_05" => dt.Act_05(&target).await,
                                _ => {
                                    println!("Act is unknown: {}", act.act_func);
                                }
                            }
                            act.next_run = act.next_run.unwrap().checked_add(
                                Duration::from_secs(match act.period_seconds {
                                    None => {
                                        act.act_enabled = false;
                                        0
                                    }
                                    Some(pr) => pr,
                                }),
                            )
                        }
                    }
                    // tokio::task::yield_now().await;
                }
                if let Some(mesg) = msg_queue.check_mesg_for(&dt_msg_tag).await {
                    let msg: String;
                    if mesg.get_callee() == &dt_msg_tag {
                        msg = mesg.get_msg().into();
                    } else {
                        msg = mesg.get_smsg().get_submsg().into();
                    }
                    if msg.starts_with("<XXX") {
                        let end_idx = msg.find("</XXX>");
                        match end_idx {
                            None => {}
                            Some(mut idx) => {
                                idx += 6;
                                let query: String = (msg[..idx]).into();
                                let mut request_struct = ResponseMessage::new();
                                request_struct.full_request = query;
                                request_struct.target = mesg.get_reply_to().into();
                                debug!("Reply to: {}", request_struct.target);
                                dt.command_queue.insert(
                                    request_struct.full_request.clone(),
                                    request_struct,
                                );
                            }
                        }
                    }
                }
                // 

                if dt.command_queue.len() > 0 {
                    let mut msg_queue_clone = msg_queue.clone();
                    let _ = dt.send_request_to_xcam(&mut msg_queue_clone).await;
                }
                // delay_for(Duration::from_millis(1000)).await;
                tokio::task::yield_now().await;
            }
        }
    });
}

So if every task has next_run in the future and there is nothing in the channel, you pretty much have a busy loop?

:hot_face: yes it is pretty busy

I would consider computing the minimum next_run and using Tokio's delay_until to wait until it. As for the channel, you could also add some sort of notification for when new messages arrive, e.g. maybe Tokio's watch channel with a () message could work there.

let me have a look over delay_until and see how I can use it here.

I will come back with any update I find.

For now I appreciate your kind help and hope it reduce my problem.

Consider also if DelayQueue is a better fit than your manual bookkeeping with next_run.