Writing your own cron service

In this post, I will be sharing my thoughts on how to implement task scheduling utilities like the ubiquitous cron or the popular Quartz library.

Approach 1:

  • We can have a main thread which will monitor the crontab file for any job additions.
  • As soon as there is a new job added, we can fire a new thread dedicated to that job.
    @Override
    void run()
    {
    while(true)
    {
    newJob = getNewJobFromCrontabFile() // blocking call
    JobThread newJobThread = new JobThread(newJob);
    newJobThread.start();
    }
    }

    @Override
    void run()
    {
    while(true)
    {
    Thread.sleep(job.getNextExecutionTime() - currentTime());
    job.execute();
    job.setNextExecutionTime(currentTime() + job.getExecutionInterval());
    }
    }
  • This is a very simple approach.
  • However, it suffers from performance issues as a machine can only contain so many threads at a time. Moreover, thread creation and deletion is expensive.

Approach 2:

  • To limit the number of threads, we can create a finite-sized thread pool.
  • Similar to the previous approach, we will have a main thread which will monitor the crontab file for any job additions.
  • But instead of firing a new thread for a new job, we will submit the job to the thread pool.
    class JobRunnable implements Runnable
    {
    private final Job job;
    public JobRunnable(Job job)
    {
    this.job = job;
    }
    @Override
    void run()
    {
    while(true)
    {
    Thread.sleep(job.getNextExecutionTime() - currentTime());
    job.execute();
    job.setNextExecutionTime(currentTime() + job.getExecutionInterval());
    }
    }
    }
    public CrontabRunnable implements Runnable
    {
    private final ExecutorService executorService = Executors.newFixedThreadPool(4);
    @Override
    void run()
    {
    while(true)
    {
    newJob = getNewJobFromCrontabFile() // blocking call
    executorService.execute(new JobRunnable(newJob));
    }
    }
    }
  • However, this suffers from an even more serious problem.
  • Imagine if the size of the thread pool is limited to 4, and we have 4 jobs running whose timeout is 1 hour each.
  • Just then, another job arrives whose timeout is 5 minutes.
  • This job will get stuck in the job queue for an hour, because all the 4 threads are sleeping.
  • So a job that was supposed to run after 5 minutes will instead run after more than an hour.
  • This is a very imprecise scheduler.

Approach 3:

  • We can have a finite thread-pool which will execute all the tasks by picking them up from a PriorityBlockingQueue (thread-safe heap) prioritized on job.nextExecutionTime().
  • Meaning that the top element of this heap will be always be the one that will execute the soonest.
  • We will be following the standard threadpool producer-consumer pattern.
  • We will have one thread which will be running in an infinite loop and submitting new jobs to the thread pool after consuming them from the queue.
  • Lets call it QueueConsumerThread:
    void goToSleep(job, jobQueue){
    jobQueue.push(job);
    sleep(job.nextExecutionTime() - getCurrentTime());
    }
    void executeJob(job, jobQueue){
    threadpool.submit(job); // async call
    job = job.copy();
    job.setNextExecutionTime(getCurrentTime() + job.getExecutionInterval());
    jobQueue.add(job);
    }
    @Override
    void run(){
    while(true)
    {
    job = jobQueue.pop()
    if(job.nextExecutionTime() > getCurrentTime()){
    // Nothing to do
    goToSleep(job, jobQueue)
    }
    else{
    executeJob(job, jobQueue)
    }
    }
    }
  • There will be one more thread which will be monitoring the crontab file for any new job additions and will push them to the queue.
  • Lets call it QueueProducerThread:
    @Override
    void run()
    {
    while(true)
    {
    newJob = getNewJobFromCrontabFile() // blocking call
    jobQueue.push(newJob)
    }
    }

  • However, there is a problem with this:
    • Imagine that the ConsumerThread is sleeping and will wake up after an hour.
    • Meanwhile a new task arrives which is supposed to run every minute.
    • This new task will not be able to start executing until an hour later.
  • To solve this problem, we can have ProducerThread wakeup ConsumerThread from its sleep forcefully whenever the new task has to run sooner than the front task in the queue.
@Override
void run()
{
while(true)
{
newJob = getNewJobFromCrontabFile() // blocking call
jobQueue.push(newJob)
if(newJob == jobQueue.peek())
{
// The new job is the one that will be scheduled next.
// So wakeup consumer thread so that it does not oversleep.
jobQueueConsumerThread.interrupt()
}
}
}

Coincidentally, this is very similar to how ScheduledExecutorService in Java works.

In the next blog post, I will be trying to answer these questions:

  • How to recover from system crashes?
  • How to handle all the missed recurring jobs?
  • How to support editing the same job?
  • How to make the scheduling service more resilient by making it distributed?
Advertisement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s