In this post, I will be sharing my thoughts on how to implement task scheduling utilities like the ubiquitous cron or the popular Quartz library.
Approach 1:
- We can have a main thread which will monitor the crontab file for any job additions.
- As soon as there is a new job added, we can fire a new thread dedicated to that job.
@Override void run() { while(true) { newJob = getNewJobFromCrontabFile() // blocking call JobThread newJobThread = new JobThread(newJob); newJobThread.start(); } } @Override void run() { while(true) { Thread.sleep(job.getNextExecutionTime() - currentTime()); job.execute(); job.setNextExecutionTime(currentTime() + job.getExecutionInterval()); } } - This is a very simple approach.
- However, it suffers from performance issues as a machine can only contain so many threads at a time. Moreover, thread creation and deletion is expensive.
Approach 2:
- To limit the number of threads, we can create a finite-sized thread pool.
- Similar to the previous approach, we will have a main thread which will monitor the crontab file for any job additions.
- But instead of firing a new thread for a new job, we will submit the job to the thread pool.
class JobRunnable implements Runnable { private final Job job; public JobRunnable(Job job) { this.job = job; } @Override void run() { while(true) { Thread.sleep(job.getNextExecutionTime() - currentTime()); job.execute(); job.setNextExecutionTime(currentTime() + job.getExecutionInterval()); } } } public CrontabRunnable implements Runnable { private final ExecutorService executorService = Executors.newFixedThreadPool(4); @Override void run() { while(true) { newJob = getNewJobFromCrontabFile() // blocking call executorService.execute(new JobRunnable(newJob)); } } } - However, this suffers from an even more serious problem.
- Imagine if the size of the thread pool is limited to 4, and we have 4 jobs running whose timeout is 1 hour each.
- Just then, another job arrives whose timeout is 5 minutes.
- This job will get stuck in the job queue for an hour, because all the 4 threads are sleeping.
- So a job that was supposed to run after 5 minutes will instead run after more than an hour.
- This is a very imprecise scheduler.
Approach 3:
- We can have a finite thread-pool which will execute all the tasks by picking them up from a PriorityBlockingQueue (thread-safe heap) prioritized on
job.nextExecutionTime()
. - Meaning that the top element of this heap will be always be the one that will execute the soonest.
- We will be following the standard threadpool producer-consumer pattern.
- We will have one thread which will be running in an infinite loop and submitting new jobs to the thread pool after consuming them from the queue.
- Lets call it QueueConsumerThread:
void goToSleep(job, jobQueue){ jobQueue.push(job); sleep(job.nextExecutionTime() - getCurrentTime()); } void executeJob(job, jobQueue){ threadpool.submit(job); // async call job = job.copy(); job.setNextExecutionTime(getCurrentTime() + job.getExecutionInterval()); jobQueue.add(job); } @Override void run(){ while(true) { job = jobQueue.pop() if(job.nextExecutionTime() > getCurrentTime()){ // Nothing to do goToSleep(job, jobQueue) } else{ executeJob(job, jobQueue) } } } - There will be one more thread which will be monitoring the crontab file for any new job additions and will push them to the queue.
- Lets call it QueueProducerThread:
@Override void run() { while(true) { newJob = getNewJobFromCrontabFile() // blocking call jobQueue.push(newJob) } } - However, there is a problem with this:
- Imagine that Thread1 is sleeping and will wake up after an hour.
- Meanwhile a new task arrives which is supposed to run every minute.
- This new task will not be able to start executing until an hour later.
- To solve this problem, we can have ProducerThread wakeup ConsumerThread from its sleep forcefully whenever the new task has to run sooner than the front task in the queue.
@Override | |
void run() | |
{ | |
while(true) | |
{ | |
newJob = getNewJobFromCrontabFile() // blocking call | |
jobQueue.push(newJob) | |
if(newJob == jobQueue.peek()) | |
{ | |
// The new job is the one that will be scheduled next. | |
// So wakeup consumer thread so that it does not oversleep. | |
jobQueueConsumerThread.interrupt() | |
} | |
} | |
} |
Coincidentally, this is very similar to how ScheduledExecutorService in Java works.
In the next blog post, I will be trying to answer these questions:
- How to recover from system crashes?
- How to handle all the missed recurring jobs?
- How to make the scheduling service more resilient by making it distributed?