Let’s say you want to store tasks in an asynchronous queue but these tasks can be skipped if a certain time has elapsed. For example, after a user signs out, you want to clear a certain cache, however if the task is not executed within a specific time range, you would like to skip it because the cache has its own internal expiration configured. A timed queue executor allows you to do that.
So, how do you implement this?
First thing to do is to wrap your tasks with two new classes: CallableWithTTL and RunnableWithTTL. These two classes will filter out tasks after a certain interval, thereby preventing tasks from being executed after the interval has passed.
public class CallableWithTTL<T> implements Callable<T> {
private final LocalDateTime whenAdded = LocalDateTime.now();
private final Callable<T> task;
private final Duration defaultTTL;
@Override
public T call() throws Exception {
long timePassed = Duration.between(whenAdded,
LocalDateTime.now()).toMillis();
if (defaultTTL.toMillis() < timePassed) {
LOG.warn("Task's TTL exceeded");
return null;
}
return task.call();
}
}
public class RunnableWithTTL implements Runnable {
private final LocalDateTime whenAdded = LocalDateTime.now();
private final Runnable runnable;
private final Duration defaultTTL;
@Override
public void run() {
long timePassed = Duration.between(whenAdded,
LocalDateTime.now()).toMillis();
if (defaultTTL.toMillis() < timePassed) {
LOG.warn("Task's TTL exceeded");
} else {
runnable.run();
}
}
}
Then, we proceed to use these new callable and runnable classes in our custom timed queue executor. This executor class is basically another ThreadPoolTaskExecutor class. The only difference is tasks are wrapped with our custom callable and runnable classes.
public class TimedQueueExecutor extends ThreadPoolTaskExecutor {
private final Duration ttl;
@Override
public <T> Future<T> submit(Callable<T> task) {
return super.submit(new CallableWithTTL(task, ttl));
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task)
{
return super.submitListenable(new CallableWithTTL(task, ttl));
}
@Override
public void execute(Runnable task) {
super.execute(new RunnableWithTTL(task, ttl));
}
All that is remaining is to use our new executor like this
ThreadPoolTaskExecutor executor = new
TimedQueueExecutor(Duration.ofSeconds(30));
where the input to the constructor is the time to live duration for each task.
Leave a Reply