In this post, I will be sharing my thoughts on how to implement task scheduling utilities like the ubiquitous cron or the popular Quartz library.
Approach 1:
- We can have a main thread which will monitor the crontab file for any job additions.
- As soon as there is a new job added, we can fire a new thread dedicated to that job.This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
@Override void run() { while(true) { newJob = getNewJobFromCrontabFile() // blocking call JobThread newJobThread = new JobThread(newJob); newJobThread.start(); } } This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters@Override void run() { while(true) { Thread.sleep(job.getNextExecutionTime() - currentTime()); job.execute(); job.setNextExecutionTime(currentTime() + job.getExecutionInterval()); } } - This is a very simple approach.
- However, it suffers from performance issues as a machine can only contain so many threads at a time. Moreover, thread creation and deletion is expensive.
Approach 2:
- To limit the number of threads, we can create a finite-sized thread pool.
- Similar to the previous approach, we will have a main thread which will monitor the crontab file for any job additions.
- But instead of firing a new thread for a new job, we will submit the job to the thread pool.This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
class JobRunnable implements Runnable { private final Job job; public JobRunnable(Job job) { this.job = job; } @Override void run() { while(true) { Thread.sleep(job.getNextExecutionTime() - currentTime()); job.execute(); job.setNextExecutionTime(currentTime() + job.getExecutionInterval()); } } } public CrontabRunnable implements Runnable { private final ExecutorService executorService = Executors.newFixedThreadPool(4); @Override void run() { while(true) { newJob = getNewJobFromCrontabFile() // blocking call executorService.execute(new JobRunnable(newJob)); } } } - However, this suffers from an even more serious problem.
- Imagine if the size of the thread pool is limited to 4, and we have 4 jobs running whose timeout is 1 hour each.
- Just then, another job arrives whose timeout is 5 minutes.
- This job will get stuck in the job queue for an hour, because all the 4 threads are sleeping.
- So a job that was supposed to run after 5 minutes will instead run after more than an hour.
- This is a very imprecise scheduler.
Approach 3:
- We can have a finite thread-pool which will execute all the tasks by picking them up from a PriorityBlockingQueue (thread-safe heap) prioritized on
job.nextExecutionTime()
. - Meaning that the top element of this heap will be always be the one that will execute the soonest.
- We will be following the standard threadpool producer-consumer pattern.
- We will have one thread which will be running in an infinite loop and submitting new jobs to the thread pool after consuming them from the queue.
- Lets call it QueueConsumerThread:This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
void goToSleep(job, jobQueue){ jobQueue.push(job); sleep(job.nextExecutionTime() - getCurrentTime()); } void executeJob(job, jobQueue){ threadpool.submit(job); // async call job = job.copy(); job.setNextExecutionTime(getCurrentTime() + job.getExecutionInterval()); jobQueue.add(job); } @Override void run(){ while(true) { job = jobQueue.pop() if(job.nextExecutionTime() > getCurrentTime()){ // Nothing to do goToSleep(job, jobQueue) } else{ executeJob(job, jobQueue) } } } - There will be one more thread which will be monitoring the crontab file for any new job additions and will push them to the queue.
- Lets call it QueueProducerThread:This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
@Override void run() { while(true) { newJob = getNewJobFromCrontabFile() // blocking call jobQueue.push(newJob) } } - However, there is a problem with this:
- Imagine that the ConsumerThread is sleeping and will wake up after an hour.
- Meanwhile a new task arrives which is supposed to run every minute.
- This new task will not be able to start executing until an hour later.
- To solve this problem, we can have ProducerThread wakeup ConsumerThread from its sleep forcefully whenever the new task has to run sooner than the front task in the queue.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Override | |
void run() | |
{ | |
while(true) | |
{ | |
newJob = getNewJobFromCrontabFile() // blocking call | |
jobQueue.push(newJob) | |
if(newJob == jobQueue.peek()) | |
{ | |
// The new job is the one that will be scheduled next. | |
// So wakeup consumer thread so that it does not oversleep. | |
jobQueueConsumerThread.interrupt() | |
} | |
} | |
} |
Coincidentally, this is very similar to how ScheduledExecutorService in Java works.
In the next blog post, I will be trying to answer these questions:
- How to recover from system crashes?
- How to handle all the missed recurring jobs?
- How to support editing the same job?
- How to make the scheduling service more resilient by making it distributed?