I am working on a project to manage some IoT like devices and so I want to manage unknown number of devices by rust.
I made some setup files that describe devices and initiated my app by those data.
Inside app I have some procedures that should run for each device and some general procedures that communicate with other parts of project.
here is a brief sample that shows it very roughly :
struct xitem {
// some fields
}
impl xitem {
// do some stuff
}
let item_list: Vec<xitem> = Vec::new();
// initial item_list from a toml config file
// for example
// [xitem]
// data fileds for item 1
// [xitem]
// data fields for item 2
for item in item_list.iter(){
tokio::spawn(async move { // tasks generated for each item
loop {
// check for some outbound data related to item
// update item
// send result to a shared queue for sharing to other part of data
}
});
}
tokio::spawn( async move { // general task
loop{
// do some general stuff (send and receive data to/from other process)
}
}).await;
So in code mentioned above it seems all tokio tasks initiated and start running
but stupidly task for some devices repeat several times (even hundred times) more than others!!!!
Also it seems my general task have opportunity to run just once although there is a loop to repeat inside it
Do you have any idea why some task running too much more than other?
I tried using
tokio::task::yield_now().await;
and also
delay_for(dead_delay).await;
inside tasks to throttle them and slow down but I do not understand why there is difference between tasks generated inside the for loop
Yes, there are a lot of tasks that may seems complicated,
That's why I tried to make a rough sample to show it.
If I wanted to show entire code here it will takes too much space and unfortunately it is a private code for a client so I can not put it on github or same code repository.
But to explain more about it, every item,(lets call it IoT) generates some sensor data.
In the config file for every device I defined some job to check different data by separated schedule.
So every IoT device at initial has a different job list to do.
By that definition every task related to a device takes different time to complete so I could not make a normal loop (for example a for loop) to do them one by one and every task should check for its own next job by its own schedule.
Every task related to IoT devices generates data should put those data in a general queue so my general task can read it and send it to other parts of project. Also when there is a query come from outside related to any of those devices , the general task is responsible to listen for it and put it in the same queue, so related device can find it there and do the job.
Every single part of this complex seems working normally and data can flow between all needed parts.
But when I run it, the task for some devices run repeatedly faster and other device seems to have less opportunity to run although their code seems to be similar.
Also my general task that is responsible to communicate with other parts of project just run one time!!!
Please let me know which part seems more problematic so then I can explain it in more detail.
My guess is that you are blocking the thread. Can you talk a bit more about the kind of operations you perform. For example, what library or function do you use to talk to these devices? What kind of message passing channels do you use? Do you use any locks (e.g. mutexes)? Do you perform network requests? Do you have any expensive CPU bound computations?
Typically you can see if a task blocks the thread by looking at your .awaits. You should not have any code that spends more than a millisecond when going from one .await to another.
pub struct ImQueue {
pub mesg: Mutex<Option<VecDeque>>,
}
So repeatedly I have to lock and unlock this queue to check inside and push new messages to it.
and every device may need to do some tcp communication (all tcp communication by tokio ) depend on the job,
I mostly used tokio and its related library and also used std and chrono
for communication between parts of project I used protobuf
I also tried to define almost all functions async to hesitate blocking but for sure I did a lot of stupid time consuming things inside functions just to achieve a fast result and postponed make it clean after I got the result
Well, there's no .await inside that method, so it should not be marked async. It looks like you are filtering messages somehow? Are you calling check_mesg_for in a loop?
You are right, this method tries to filter this queue and if find any answer send it back
I call this function every time any of tasks try to find its related job iside this queue
So although I do not cal it in a close loop I call it repeatedly from diferent tasks
I would consider computing the minimum next_run and using Tokio's delay_until to wait until it. As for the channel, you could also add some sort of notification for when new messages arrive, e.g. maybe Tokio's watch channel with a () message could work there.