延迟队列

定义

延迟队列中的每个元素都有一个过期时间,并且元素按照过期时间进行升序排序,队头元素的过期时间最近,当一个线程尝试从队列获取元素时候,只有队头元素过期才会出队,否则阻塞线程直到队头元素过期。 image

底层数据结构

延迟队列需要方便的获取当前剩余过期时间的元素,因此底层需要一个有序的数据结构来保存元素。优先队列可以满足我们的需要。以元素过期时间的远近作为评定优先的标准,近的元素优先于远的元素。根据优先队列的性质,优先队列队首的元素一定是过期时间最近的元素。

定义元素接口

定义元素的接口,延迟队列的元素必须有一个过期时间,因此元素必须提供一个方法获取剩余过期时间。定义Delayed接口,延迟队列的元素必须实现Delayed

public interface Delayed {
    long getDelay(); // 获取剩余过期时间
}

实现

出队与入队实现参考JUC中的延迟队列

入队与出队操作支持多线程并发调用。使用leader/follower模式(领导者-跟随者)的变体。以最小化线程不需要的等待

  • 出队时,若队首元素尚未过期。只需一个线程作为领导者等待队首元素过期后主动醒来,其余线程只需一直等待被唤醒
  • 入队时,若队首元素发生变化,则唤醒一个正在等待的线程成为新的领导者,等待新的队首元素过期
public class DelayQueue<T extends Delayed>{

    // 优先队列的比较器,剩余时间短的元素优先于剩余时间长的元素
    private Comparator<T> comparator = (o1, o2) -> {
        long diff = o1.getDelay() - o2.getDelay();
        if (diff > 0)
            return -1;
        else if (diff == 0)
            return 0;
        else
            return 1;
    };

    // 锁,用于线程间同步
    private Lock lock = new ReentrantLock();
    // 条件,用于线程间协作
    private Condition av = lock.newCondition();
    // 领导者-跟随者模式
    // 领导者线程,用于监视队头元素的是否过期,其余线程作为跟随者只需等待
    private Thread leader;
    // 优先队列,队头的元素一定是剩余过期时间最短的元素。
    private PriorityQueue<T> priorityQueue = new PriorityQueue<>(comparator);

    protected Lock getLock(){
        return this.lock;
    }

    public void remove(T t){
        lock.lock();
        try{
            priorityQueue.remove(t);
        }finally {
            lock.unlock();
        }
    }

    public void clear(){
        lock.lock();
        try {
            priorityQueue.clear();
        }finally {
            lock.unlock();
        }
    }

    public void offer(T t) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            // 向优先队列添加元素
            priorityQueue.offer(t);
            // 队头元素是刚刚添加的元素,说明一个剩余过期时间更短的元素被加入延迟队列导致队头元素发生变化
            // (或者原本队列就是空的)
            if (priorityQueue.peek() == t) {
                // 原领导者变为跟随者,因为队头元素发生变化,需要一个新的领导者来监听新队头元素的过期事件
                leader = null;
                // 从跟随者线程中唤醒一个成为新的领导者
                av.signal();
            }
        }finally {
            lock.unlock();
        }
    }

    public T take() throws InterruptedException {
        lock.lockInterruptibly();
        try{
            for (;;) {
                // 获取优先队列队头元素(剩余过期时间最短元素)
                T first = priorityQueue.peek();
                if (first == null) {
                    // 队列空,等着被唤醒
                    av.await();
                } else {
                    // 获取队头元素剩余过期时间
                    long delay = first.getDelay();
                    if (delay <= 0)
                        // 过期,返回该元素
                        return priorityQueue.poll();
                    // 队头元素没过期
                    if (leader != null)
                        // 已经存在一个领导者线程监听该队头元素,等着被唤醒
                        av.await();
                    // 没有领导者线程在监听该队头元素,则把当前线程设置为领导者
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 然后等待队头元素过期 
                        av.awaitNanos(delay);
                    }finally {
                        // 等待时间到,被唤醒(新元素入队) 或者 线程中断
                        // 如果当前线程还是领导者(新元素入队时可能会改变领导者)
                        // 设置当前线程为跟随者
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }finally {
            // 最终返回前
            // 再检查一下,如果队头元素存在并且没有领导者在监视,唤醒一个跟随者成为领导
            if (leader == null && priorityQueue.peek() != null)
                av.signal();
            lock.unlock();
        }
    }
}