Background
I am working on a Rust project which one of the requirements is that to run in-process background tasks periodically. After doing some investigations on GitHub, I found that only a few crates are available there and tokio-cron-scheduler may be the best choice. So I integrated it into my project. Everything was fine until I encountered this issue. I tried to solve the issue, but the design and source code of tokio-cron-scheduler is a little hard to understand to me. Finally I gave up and decided to write a new crate to satisfy my requirement.
This is the background why tokio-beat has been created. Thought it works well in my project now, I think there are still some designs and implementations can be improved and a lot of works could to be done to make it satisfy broader requirements. So I ask you Rustaceans do a code view to the initial implementation and leave your precious comments. The repo can be found on GitHub, but before that, I'll try my best to elabrate the goals of the project, core concepts and how it is implemented.
Goals
The goals of tokio-beat is to provide a Rust crate which can be used to run in-process background tasks periodically, to be more specific, it should be:
- Lightweight with intuitive APIs.
- Simple and clean design with readablility and performance taking into account.
- Asynchronized. For simplicity, only support tokio runtime for now.
Core concepts and basic implementations
Timer
A timer is a struct whose responsibility is to produce the nearest incoming datetime from now based on a schedule. Note that though timer has a next
method but it is not a iterator as next
always returns the nearest incoming datetime from now.
Here the timer's basic implementation:
/// An enum representing the various forms of a timer.
pub enum Timer {
/// A crontab timer
///
/// This timer based on a cron schedule.
Cron(Cron),
/// A oneshot timer
///
/// This timer only produce a certain datetime in the future.
/// After that datetime, it produce nothing to indicate the schedule is done.
Oneshot(Oneshot),
/// A cycle timer
///
/// This timer produce incoming datetime based on a start datetime and a fixed interval.
Cycle(Cycle),
}
impl Timer {
/// Return the nearest incoming datetime from now for the timer.
/// None indicates that the schedule on which this timer based is done.
pub fn next(&self) -> Option<DateTime<Utc>> {
use Timer::*;
match self {
Cron(cron) => cron.next(),
Oneshot(oneshot) => oneshot.next(),
Cycle(cycle) => cycle.next(),
}
}
}
The timer is used by a Job to determine when it should be scheduled to run in the nearest future.
Job
A job is a struct which contains following fields:
- id. A UUID to identify it.
- timer. A timer to determine when it should be scheduled to run in the nearest future.
-
f. (Maybe we can use a more appropriate name, because f usually be used as acronym for
Formatter
when implementdisplay
). A function object which is the task you want to run. It accpets a context (see below) argument and returns aBoxFuture
. Becausef
is holded by a job in main task and may hold by another task when the job is scheduled to run, it is wrapped in anArc
.
pub struct Job {
id: Uuid,
timer: Timer,
f: Arc<dyn Fn(Context) -> BoxFuture<'static, ()> + Send + Sync>,
}
Context
Used to pass information to the runing task such as the corresponding job, when the task is scheduled to run, etc.
pub struct Context {
pub job_id: Uuid,
pub when: DateTime<Utc>,
}
Time Event
A time event represent a job run event at a specific time. When the time event occcurs, which means the associated job should be scheduled to run. The scheduler retrieve the job by job_id
of the time event then spawn a task to tokio excutor.
pub struct TimeEvent {
pub job_id: Uuid,
pub when: DateTime<Utc>,
}
TimeEvent
also implements Ord
trait so it can be pushed to a BinaryHeap
.
Scheduler
A scheduler is responsible to do following things:
- Add and remove job
- Process arrived time events and maintain the time events heap
#[derive(Clone)]
pub struct Scheduler {
shared: Arc<Shared>,
}
struct Shared {
state: Mutex<State>,
/// Notifies the background task processing time events. The background
/// task waits on this to be notified, then process time event or the
/// shutdown signal.
background_task: Notify,
}
struct State {
/// Mapping job id to job so that we can retrieve the job by it's id.
job_store: HashMap<Uuid, Job>,
/// New jobs added to the scheduler. When scheduler sees new jobs,
/// it will produce time events for these jobs then drain them to the job_store.
new_jobs: Vec<Job>,
/// The time events will occur in the nearest future. Time events are organized as a min-heap.
time_events: BinaryHeap<Reverse<TimeEvent>>,
shutdown: bool,
}
APIs
One of the goals of tokio-beat is to provide intuitive APIs for usage. Bellow are the public APIs I proposed:
// Scheduler
// construct a new scheduler
Scheduler::new()
// add job
scheduler.add_job(job: Job)
// remove job
scheduler.remove_job(job_id: Uuid)
// Job
// construct a cron job
Job::cron(expr: &str).do_(f: Fn(Context) -> BoxFuture<'static, ()> + 'static + Send + Sync)
// construct a oneshot job
Job::oneshot(at: Into<DateTime<Utc>>).do_(f: Fn(Context) -> BoxFuture<'static, ()> + 'static + Send + Sync)
// construct a cycle job
Job::cycle(start_at: Into<DateTime<Utc>>, interval: Duration).end_at(dt: Into<DateTime<Utc>>).do_(f: Fn(Context) -> BoxFuture<'static, ()> + 'static + Send + Sync)
See examples/simple for an example.
Algorithm
- Produce next time events for all new jobs and push them into
time_events
heap then move all new jobs tojob_store
for retrieving later. - Retrieve a time event from
time_events
heap. If no time event, sleep until be notified, else sleep until that the time event occurs. - If the time event occurs, retrieve the associated job, spawn a task to run the function object of the job. Produce the next time event of the job into heap.
- go to 1.