Tokio-beat: A rust crate which can be used to run in-process background tasks periodically

Background

I am working on a Rust project which one of the requirements is that to run in-process background tasks periodically. After doing some investigations on GitHub, I found that only a few crates are available there and tokio-cron-scheduler may be the best choice. So I integrated it into my project. Everything was fine until I encountered this issue. I tried to solve the issue, but the design and source code of tokio-cron-scheduler is a little hard to understand to me. Finally I gave up and decided to write a new crate to satisfy my requirement.

This is the background why tokio-beat has been created. Thought it works well in my project now, I think there are still some designs and implementations can be improved and a lot of works could to be done to make it satisfy broader requirements. So I ask you Rustaceans do a code view to the initial implementation and leave your precious comments. The repo can be found on GitHub, but before that, I'll try my best to elabrate the goals of the project, core concepts and how it is implemented.

Goals

The goals of tokio-beat is to provide a Rust crate which can be used to run in-process background tasks periodically, to be more specific, it should be:

  • Lightweight with intuitive APIs.
  • Simple and clean design with readablility and performance taking into account.
  • Asynchronized. For simplicity, only support tokio runtime for now.

Core concepts and basic implementations

Timer

A timer is a struct whose responsibility is to produce the nearest incoming datetime from now based on a schedule. Note that though timer has a next method but it is not a iterator as next always returns the nearest incoming datetime from now.

Here the timer's basic implementation:

/// An enum representing the various forms of a timer.
pub enum Timer {
  	/// A crontab timer
  	///
  	/// This timer based on a cron schedule.
    Cron(Cron),
  	/// A oneshot timer
  	///
  	/// This timer only produce a certain datetime in the future.
  	/// After that datetime, it produce nothing to indicate the schedule is done.
    Oneshot(Oneshot),
  	/// A cycle timer
  	///
  	/// This timer produce incoming datetime based on a start datetime and a fixed interval.
    Cycle(Cycle),
}

impl Timer {
  	/// Return the nearest incoming datetime from now for the timer.
  	/// None indicates that the schedule on which this timer based is done.
    pub fn next(&self) -> Option<DateTime<Utc>> {
        use Timer::*;
        match self {
            Cron(cron) => cron.next(),
            Oneshot(oneshot) => oneshot.next(),
            Cycle(cycle) => cycle.next(),
        }
    }
}

The timer is used by a Job to determine when it should be scheduled to run in the nearest future.

Job

A job is a struct which contains following fields:

  • id. A UUID to identify it.
  • timer. A timer to determine when it should be scheduled to run in the nearest future.
  • f. (Maybe we can use a more appropriate name, because f usually be used as acronym for Formatter when implement display). A function object which is the task you want to run. It accpets a context (see below) argument and returns a BoxFuture. Because f is holded by a job in main task and may hold by another task when the job is scheduled to run, it is wrapped in an Arc.
pub struct Job {
    id: Uuid,
    timer: Timer,
    f: Arc<dyn Fn(Context) -> BoxFuture<'static, ()> + Send + Sync>,
}

Context

Used to pass information to the runing task such as the corresponding job, when the task is scheduled to run, etc.

pub struct Context {
    pub job_id: Uuid,
    pub when: DateTime<Utc>,
}

Time Event

A time event represent a job run event at a specific time. When the time event occcurs, which means the associated job should be scheduled to run. The scheduler retrieve the job by job_id of the time event then spawn a task to tokio excutor.

pub struct TimeEvent {
    pub job_id: Uuid,
    pub when: DateTime<Utc>,
}

TimeEvent also implements Ord trait so it can be pushed to a BinaryHeap.

Scheduler

A scheduler is responsible to do following things:

  1. Add and remove job
  2. Process arrived time events and maintain the time events heap
#[derive(Clone)]
pub struct Scheduler {
    shared: Arc<Shared>,
}

struct Shared {
    state: Mutex<State>,
  	/// Notifies the background task processing time events. The background
    /// task waits on this to be notified, then process time event or the
    /// shutdown signal.
    background_task: Notify,
}

struct State {
  	/// Mapping job id to job so that we can retrieve the job by it's id.
    job_store: HashMap<Uuid, Job>,
  	/// New jobs added to the scheduler. When scheduler sees new jobs, 
  	/// it will produce time events for these jobs then drain them to the job_store.
    new_jobs: Vec<Job>,
  	/// The time events will occur in the nearest future. Time events are organized as a min-heap.
    time_events: BinaryHeap<Reverse<TimeEvent>>,
    shutdown: bool,
}

APIs

One of the goals of tokio-beat is to provide intuitive APIs for usage. Bellow are the public APIs I proposed:

// Scheduler
// construct a new scheduler
Scheduler::new()
// add job
scheduler.add_job(job: Job)
// remove job
scheduler.remove_job(job_id: Uuid)

// Job
// construct a cron job
Job::cron(expr: &str).do_(f: Fn(Context) -> BoxFuture<'static, ()> + 'static + Send + Sync)
// construct a oneshot job
Job::oneshot(at: Into<DateTime<Utc>>).do_(f: Fn(Context) -> BoxFuture<'static, ()> + 'static + Send + Sync)
// construct a cycle job
Job::cycle(start_at: Into<DateTime<Utc>>, interval: Duration).end_at(dt: Into<DateTime<Utc>>).do_(f: Fn(Context) -> BoxFuture<'static, ()> + 'static + Send + Sync)

See examples/simple for an example.

Algorithm

  1. Produce next time events for all new jobs and push them into time_events heap then move all new jobs to job_store for retrieving later.
  2. Retrieve a time event from time_events heap. If no time event, sleep until be notified, else sleep until that the time event occurs.
  3. If the time event occurs, retrieve the associated job, spawn a task to run the function object of the job. Produce the next time event of the job into heap.
  4. go to 1.

How did you implement remove_job when you are using a binary heap? The Tokio DelayQueue uses something else precisely because deletions are difficult from a binary heap.

Another thing I note is that I see that you are using DateTime<Utc> for your timestamps. Do you have any code to handle DateTime<Utc> and std::time::Instant getting out of sync (e.g. when the machine goes to sleep or the clock is modified)?

Item of binary heap is TimeEvent. Jobs are store in a HashMap, so remove_job just delete the entry from the hash map.

I haven't consider this situation yet.

Okay, so you don't remove the items from the BinaryHeap. If you want to keep using a heap, then I recommend a strategy like the following:

fn remove_job(&mut self, job_id: Uuid) {
    self.job_store.remove(job_id);
    if 4*self.job_store.len() < self.time_events.len() {
        // Take ownership of `self.time_events`, leaving an empty heap in its
        // place.
        let heap = std::mem::take(&mut self.time_events);
        let mut heap_items = heap.into_vec();
        heap_items.retain(|event| self.job_store.contains_key(event.job_id));
        // After removing some items, put the heap back.
        self.time_events = BinaryHeap::from(heap_items);
    }
}

This removes any items in the heap that have been deleted from the job store if the job store becomes significantly smaller than the heap.

Thank you very much! The current strategy is checking the job still in job_store when processing a time event.

Right, you would still need to do that with my strategy. The idea behind my strategy is that removing even a single item from a BinaryHeap requires looking at all of them, which is quite expensive for a single item. However, if more than half of the items are removed whenever you remove any items, then the cost of removing a single item is at most the cost of looking at two items.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.