04-JUC

absent 英 /ˈæbsənt/ adj. 缺席的;不存在;不在的;缺少;心不在焉的;出神的

JDK5.0后提供了多种并发类容器替代同步类容器,提升性能、吞吐量

1. Map

  1. ConcurrentHashMap <= HashMap, HashTable
  2. ConcurrentSkipListMap <= TreeMap
  3. ConcurrentMap缩小锁的粒度,来提高性能
  4. HashMap(线程不安全) > ConcurrentHashMap > Hashtable > synchronizedMap
// ConcurrentMap父接口
class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable

class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> implements ConcurrentNavigableMap<K,V>, Cloneable, Serializable

interface ConcurrentNavigableMap<K,V> extends ConcurrentMap<K,V>, NavigableMap<K,V>

1. ConcurrentHashMap

/**
 * 性能排序
 * HashMap > ConcurrentHashMap > Hashtable > synchronizedMap
 */
public class T1_Map {

    Map<String, Integer> map;
    Map<String, Integer> map2;
    CountDownLatch latch = new CountDownLatch(100);

    @Test
    public void Hashtable() throws InterruptedException {
        map = new Hashtable<>();
        map2 = new Hashtable<>();
        // 耗时: 13363, map.size(): 10000000, map2.size(): 100000
        map_put();
    }

    @Test
    public void HashMap() throws InterruptedException {
        map = new HashMap<>();
        map2 = new HashMap<>();
        // 耗时: 7868, map.size(): 7306175, map2.size(): 162698
        map_put();
    }

    @Test
    public void synchronizedMap() throws InterruptedException {
        map = Collections.synchronizedMap(new HashMap<>());
        map2 = Collections.synchronizedMap(new HashMap<>());
        // 耗时: 19143, map.size(): 10000000, map2.size(): 100000
        map_put();
    }

    /**
     * 1. 底层仍然为hash表(数组 + 链表),懒汉式,数组大小为2的n次幂
     * 2. 同步:CAS + synchronized来保证
     */
    @Test
    public void ConcurrentHashMap() throws InterruptedException {
        map = new ConcurrentHashMap<>();
        map2 = new ConcurrentHashMap<>();
        // 耗时: 17067, map.size(): 10000000, map2.size(): 100000
        map_put();
    }

    /*
     * ConcurrentSkipListMap <= TreeMap
     *      1. TreeMap进化为ConcurrentSkipListMap
     *      2. 自定义sort
     */
    @Test
    public void ConcurrentSkipListMap() {
        ConcurrentMap<Integer, Integer> map = new ConcurrentSkipListMap<>();
        map.put(1, 1);
        map.put(3, 3);
        map.put(2, 2);
        map.put(5, 5);
        System.out.println("map.put(3, 10) = " + map.put(3, 10));
        map.put(4, 4);
        map.put(6, 6);

        System.out.println("map.size() = " + map.size());
        System.out.println("map = " + map);
    }

    // ------------------------------------------------------------------------

    private void map_put() throws InterruptedException {
        long startTime = System.currentTimeMillis();

        for (int i = 0; i < latch.getCount(); i++) {
            new Thread(() -> {
                for (int j = 0; j < 100_000; j++) {
                    map.put(IdUtil.fastSimpleUUID(), j);

                    // HashMap已崩溃,得到的size()居然 > 100_000,key重复了?
                    map2.put(String.valueOf(j), j);
                }
                latch.countDown();
            }).start();
        }

        latch.await();

        long endTime = System.currentTimeMillis();
        System.out.println(
                StrUtil.format("耗时: {}, map.size(): {}, map2.size(): {}",
                        (endTime - startTime), map.size(), map2.size())
        );
    }

}












 







 







 











 












 




















 


 















2. ConcurrentSkipListMap

  • TreeMap进化为ConcurrentSkipListMap,底层为跳表

2. Collection

T2_Collection

  1. COW容器(Copy_On_Write),写时复制容器(读写分离容器)
  2. CopyonWriteArrayList <= ArrayList, LinkedList
  3. CopyOnWriteArraySet <= HashSet, linkedHashSet
  4. ConcurrentSkipListSet <= TreeSet

1. CopyOnWriteArrayList

  1. 底层为数组,ReentrantLock保证同步
  2. 优点
    1. 写时,先copy出一个新容器,将E添加到新容器,最后原容器的引用指向新容器,进行arr额外复制
    2. 并发读不需要锁定容器,性能极高。读写分离思想,提高并发性能
  3. 缺点
    1. 保证数据的最终一致性,不能保证数据实时一致性
    2. 写入的数据,不能马上读到
  4. 适用场合
    1. 读多写少。高并发读使用
    2. 更新会复制新容器。操作频繁对内存消耗很高

1. API

/*
 * 1. 底层为数组,`ReentrantLock`保证同步
 * 2. 优点
 *      1. 写时,先copy出一个新容器,将E添加到新容器,最后原容器的引用指向新容器,进行arr额外复制
 *      2. 并发读不需要锁定容器,性能极高。读写分离思想,提高并发性能
 * 3. 缺点
 *      1. 保证数据的最终一致性,不能保证数据实时一致性
 *      2. 写入的数据,不能马上读到
 * 4. 适用场合
 *      1. 读多写少。高并发读使用
 *      2. 更新会复制新容器。操作频繁对内存消耗很高
 */
@Test
public void CopyOnWriteArrayList() {
    CopyOnWriteArrayList<Integer> cowList = new CopyOnWriteArrayList<>();
    cowList.add(1);
    cowList.add(3);
    cowList.add(3);
    cowList.add(2);
    System.out.println("cowList = " + cowList);

    // 添加不存在元素
    System.out.println("cowList.addIfAbsent(3) = " + cowList.addIfAbsent(3));
    System.out.println("cowList = " + cowList);
}














 







 


2. SC

  1. 底层volatile数组
  2. ReentrantLock保证线程安全。只作用于写
  3. Arrays.copyOf(), Arrays.toString()
class _CopyOnWriteArrayList<E> {

    // 1. 底层volatile数组
    private transient volatile Object[] array;

    // 2. ReentrantLock
    final transient ReentrantLock lock = new ReentrantLock();

    public _CopyOnWriteArrayList() {
        setArray(new Object[0]);
    }

    final Object[] getArray() {
        return array;
    }

    final void setArray(Object[] a) {
        array = a;
    }

    // 3.1. 添加元素。可重复
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;

            // 3.2. 新数组长度+1,并copy老数组数据
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            // 新元素添加数组尾
            newElements[len] = e;
            // 数组指针,切换到新数组
            setArray(newElements);

            return true;
        } finally {
            lock.unlock();
        }
    }

    // 4.1. 添加元素。不可重复
    public boolean addIfAbsent(E e) {
        // 4.2.1. 并发下查询当前数组是否有该元素
        Object[] snapshot = getArray();
        // 4.2.2. 元素存在,false
        return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false : addIfAbsent(e, snapshot);
    }

    // 4.2.3. 元素不存在,添加元素
    private boolean addIfAbsent(E e, Object[] snapshot) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] current = getArray();
            int len = current.length;

            // 4.3.1. 上锁前,数组可能变了,重新判断
            if (snapshot != current) {
                int common = Math.min(snapshot.length, len);

                // 4.3.2. 新老数组交集位置判断。不一样部分和e比较
                for (int i = 0; i < common; i++)
                    if (current[i] != snapshot[i] && eq(e, current[i]))
                        return false;

                // 4.3.3. 新数组扩容部分,元素判断
                if (indexOf(e, current, common, len) >= 0)
                    return false;
            }

            // 添加
            Object[] newElements = Arrays.copyOf(current, len + 1);
            newElements[len] = e;
            setArray(newElements);

            return true;
        } finally {
            lock.unlock();
        }
    }

    /**
     * 指定范围内查找数组中是否有指定元素
     *
     * @param o        指定元素
     * @param elements 数组
     * @param index    起点
     * @param fence    终点
     * @return int     返回等于o的index,无则返回-1
     */
    private static int indexOf(Object o, Object[] elements, int index, int fence) {
        // 添加元素为null
        if (o == null) {
            for (int i = index; i < fence; i++)
                if (elements[i] == null)
                    return i;
        } else {
            for (int i = index; i < fence; i++)
                if (o.equals(elements[i]))
                    return i;
        }
        return -1;
    }

    private static boolean eq(Object o1, Object o2) {
        return (o1 == null) ? o2 == null : o1.equals(o2);
    }

}



 


 
















 





 

 





 








 





 





 




 



 




 





 




















 










2. CopyOnWriteArraySet

  1. 底层为CopyOnWriteArrayList
  2. add()底层为CopyOnWriteArrayList.addIfAbsent()
  3. COW_Set性能低于COW_List

1. API

/**
 * 1. 底层为CopyOnWriteArrayList
 * 2. `add()`底层为`CopyOnWriteArrayList.addIfAbsent()`
 * 3. COW_Set性能低于COW_List
 */
@Test
public void CopyOnWriteArraySet() {
    CopyOnWriteArraySet<Integer> cowSet = new CopyOnWriteArraySet<>();
    cowSet.add(1);
    cowSet.add(2);
    System.out.println("cowSet.add(2) = " + cowSet.add(2));
    cowSet.add(7);

    // 唯一
    System.out.println("cowSet = " + cowSet);
}

2. SC

// 底层为CopyOnWriteArrayList
class _CopyOnWriteArraySet<E> {

    private final CopyOnWriteArrayList<E> al;

    public _CopyOnWriteArraySet() {
        al = new CopyOnWriteArrayList<E>();
    }

    // 底层CopyOnWriteArrayList.addIfAbsent()
    public boolean add(E e) {
        return al.addIfAbsent(e);
    }
}






 




 


3. ConcurrentSkipListSet

// 底层为ConcurrentSkipListMap
@Test
public void ConcurrentSkipListSet() {
    ConcurrentSkipListSet<Integer> set = new ConcurrentSkipListSet<>();
    set.add(1);
    set.add(2);
    System.out.println("set.add(2) = " + set.add(2));
    set.add(7);

    // 唯一
    System.out.println("set = " + set);
}

1. SC

public class ConcurrentSkipListSet<E>
    extends AbstractSet<E>
    implements NavigableSet<E>, Cloneable, java.io.Serializable {

    @SuppressWarnings("serial") // Conditionally serializable
    private final ConcurrentNavigableMap<E,Object> m;

    public ConcurrentSkipListSet() {
        m = new ConcurrentSkipListMap<E,Object>();
    }

    public ConcurrentSkipListSet(Comparator<? super E> comparator) {
        m = new ConcurrentSkipListMap<E,Object>(comparator);
    }

}








 



 



3. Queue

  1. 先进先出 => FIFO(first-in-first-out)
  2. 公平管理或分配资源
  3. 栈、队列 => 数组、链表都可以实现
  4. offer(), remove(), poll(), element(), peek()
  5. 分类
    1. 队列 image-20220909094414828
    2. 双端队列:两端都可以进队,出队 image-20220909094426313

image-20230320094428381

1. BlockingQueue

  1. put(), take()增加blocking属性,即Queue在empty、full时,thread会blocking
  2. offer(E e, long timeout, TimeUnit unit), poll(long timeout, TimeUnit unit)增加定时功能

2. ArrayBlockingQueue

1. API

public class Q1_ArrayBlockingQueue {

    ArrayBlockingQueue<String> aq = new ArrayBlockingQueue<>(3);

    // 1. `add(), offer(), put()`添加null,NullPointerException
    @Test
    public void _null() throws InterruptedException {
        aq.add(null);
        aq.offer(null);
        aq.put(null);
    }

    /**
     * 2. Queue_full添加元素
     * ----------------------------------------
     * 1. `add()` => 异常
     * 2. `offer(E e, long timeout, TimeUnit unit)` => 阻塞thread时间,尝试添加
     * 3. `put()` => 阻塞thread,尝试添加
     */
    @Test
    public void API_in() throws InterruptedException {
        // 正常添加元素
        System.out.println("aq.add(aaa) = " + aq.add("aaa"));     // true
        System.out.println("aq.offer(bbb) = " + aq.offer("bbb")); // true
        aq.put("ccc");                                            // void

        System.out.println(aq); // [aaa, bbb, ccc]

        // 1. add() => 异常
        aq.add("ddd"); // 出现异常 => IllegalStateException: Queue full

        // 2. offer() => 返回false
        System.out.println(aq.offer("ddd"));
        // 设置阻塞时间
        System.out.println(aq.offer("ddd", 2, TimeUnit.SECONDS));

        // 3. put() => 永远阻塞
        aq.put("ddd");
    }

    /**
     * 3. Queue_empty弹出元素
     * ----------------------------------------
     * 1. `peek()` => null
     * 2. `poll(long timeout, TimeUnit unit)` => 阻塞thread时间,尝试弹出
     * 3. `take()` => 阻塞thread,尝试弹出
     */
    @Test
    public void API_out() throws InterruptedException {
        aq.add("aaa");
        aq.add("bbb");
        aq.add("ccc");

        // 查看
        System.out.println(aq.peek());
        System.out.println(aq);

        // 弹出
        System.out.println(aq.poll());
        System.out.println(aq);

        // 得到头元素并且移除
        System.out.println(aq.take());
        System.out.println(aq);

        aq.clear();

        // null
        System.out.println(aq.peek());

        // null
        System.out.println(aq.poll());
        // 阻塞5s,一直尝试弹出
        System.out.println(aq.poll(5, TimeUnit.SECONDS));

        // 永远阻塞
        System.out.println(aq.take());
    }
}


 


























 


 

 


 






























 




 


 


2. SC

  1. 无noArgsCtor,必须指定数组长度 => 有界
  2. 底层RingArray回环数组
  3. 底层ReentrantLock + Condition来保证线程安全

  1. while不可以换为if。如果notFull被激活瞬间,可能其他线程放入元素,队列又满了
  2. await(),释放锁;signal(),不释放锁
class _ArrayBlockingQueue<E> {

    // 1. 底层数组
    Object[] items;

    // 可取元素index,初始为0
    int takeIndex;
    // 可放元素index,初始为0
    int putIndex;

    // 数组长度
    int count;

    // 2.1. 一把锁
    ReentrantLock lock;
    // 2.2. 锁伴随的一个等待池:notEmpty
    private Condition notEmpty;
    // 2.3. 锁伴随的一个等待池:notFull
    private Condition notFull;

    // 3. argsCtor,没有noArgsCtor
    public _ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    public _ArrayBlockingQueue(int capacity, boolean fair) {
        // 健壮性
        if (capacity <= 0)
            throw new IllegalArgumentException();

        // 初始化数组
        this.items = new Object[capacity];

        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
    }

    public void put(E e) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 不能if
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }


    // 4. 入队(画图理解)
    private void enqueue(E x) {
        final Object[] items = this.items;

        // 放入元素
        items[putIndex] = x;

        // RingArray操作
        if (++putIndex == items.length)
            putIndex = 0;

        count++;
        notEmpty.signal();
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    // 5. 出队(画图理解)
    private E dequeue() {
        final Object[] items = this.items;

        // 取出元素,并返回
        E x = (E) items[takeIndex];

        items[takeIndex] = null; // 置null

        // RingArray操作
        if (++takeIndex == items.length)
            takeIndex = 0;

        count--;

        // if (itrs != null)
        //     itrs.elementDequeued();
        notFull.signal();
        return x;
    }
}



 









 
 
 
 
 
 












 

 
 
 




 
 
 
 
 
 
 
 
 








 


 
 


 



 
 
 
 
 
 
 
 
 









 


 
 





 



3. LinkedBlockingQueue

  1. noArgsCtor,无界queue。不会阻塞
  2. argsCtor。和ArrayBlockingQueue一样
  3. ArrayBlockingQueue:put(), take()为一把锁,不支持读写同时操作。底层数组
  4. LinkedBlockingQueue:put(), take()为两把锁,支持读写同时操作,并发情况下,效率高。底层链表

1. API

public class Q2_LinkedBlockingQueue {

    /**
     * 1. noArgsCtor,无界queue。不会阻塞
     * 2. argsCtor。和ArrayBlockingQueue一样
     * 3. ArrayBlockingQueue:`put(), take()`为一把锁,不支持读写同时操作。底层数组
     * 4. LinkedBlockingQueue:`put(), take()`为两把锁,支持读写同时操作,并发情况下,效率高。底层链表
     */
    @Test
    public void API() throws InterruptedException {
        LinkedBlockingQueue<String> lbq = new LinkedBlockingQueue<>(3);

        lbq.put("ooxx");
        String take = lbq.take();
        System.out.println("lbq.take() = " + take);
    }
}

2. SC

/**
 * 1. 单向链表。noArgsCtor无界,argsCtor有界
 * 2. ReentrantLock => Condition notFull, Condition notEmpty
 * 3. 队列第一个node为null,(null, next)
 */
class _LinkedBlockingQueue<E> {

    // 1. 单向链表
    static class Node<E> {
        E item;

        Node<E> next;

        Node(E x) {
            item = x;
        }
    }

    // 链表长度
    private final int capacity;
    // queue元素个数
    private final AtomicInteger count = new AtomicInteger();
    // 链表头结点
    transient Node<E> head;
    // 链表尾结点
    private transient Node<E> last;

    // 2.1. 存锁
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();
    // queue满了。thread去notFull等待,thread => notFull

    // 2.2. 取锁
    private final ReentrantLock takeLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
    // queue空了。thread去notEmpty等待,thread => notEmpty

    public _LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    public _LinkedBlockingQueue(int capacity) {
        // 健壮性考虑
        if (capacity <= 0) throw new IllegalArgumentException();

        this.capacity = capacity;
        // 3. 初始化last, head
        last = head = new Node<>(null);
    }

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;

        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {   // 4.1. 满了等待
                notFull.await();
            }
            // 5.1. 入队
            enqueue(node);
            c = count.getAndIncrement();        // count++
            if (c + 1 < capacity)               // 4.2. 不满可put()
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)                             // 4.3. 临界非空可take()。count从(0 -> 1)
            signalNotEmpty();
    }

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;

        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();               // 6.1. 空了等待
            }
            // 7.1. 出队
            x = dequeue();
            c = count.getAndDecrement();        // count--
            if (c > 1)
                notEmpty.signal();              // 6.2. 非空可take()
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)                      // 6.3. 临界不满可put()
            signalNotFull();
        return x;
    }

    /**
     * 5.2. 入队
     * (null, next) -> (1, next) -> (2, next) -> (3, null)
     * queue第一个node中的E为null
     */
    private void enqueue(Node<E> node) {
        last = last.next = node;
    }

    /**
     * 7.2. 出队
     * (null, next) -> (1, next) -> (2, next) -> (3, null)
     */
    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h;           // help GC,释放queue第一个node

        head = first;
        E x = first.item;
        first.item = null;    // queue第一个node中的E置为null
        return x;
    }

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

}




























 
 



 
 












 









 
 
 
 
 
 
 
 
 
 
 
 
 
 
 








 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 








 







 
 
 
 
 
 
 
 






 









 






4. SynchronousQueue

无容量,thread间数据精准、高效传递

  1. add()java.lang.IllegalStateException: Queue full
  2. take():阻塞线程,弹出
  3. poll():阻塞线程,指定时间,不断弹出
  4. peek():不能用
public class Q3_SynchronousQueue {

    SynchronousQueue<String> sq = new SynchronousQueue<>();

    /**
     * 无容量,thread间数据精准、高效传递
     * ---------------------------------
     * 1. `add()`:`java.lang.IllegalStateException: Queue full`
     * 2. `take()`:阻塞线程,弹出
     * 3. `poll()`:阻塞线程,指定时间,不断弹出
     * 4. `peek()`:不能用
     */
    @Test
    public void in() throws InterruptedException {
        // java.lang.IllegalStateException: Queue full
        sq.add("ooxx");

        // 阻塞,等待被弹出
        sq.put("ooxx");
    }

    @Test
    public void take() throws InterruptedException {
        put();
        while (true) {
            System.out.println(sq.take());
        }
    }

    @Test
    public void poll() throws InterruptedException {
        put();

        String poll = sq.poll();
        System.out.println("poll() = " + poll);

        // 阻塞线程5s,不断尝试弹出
        poll = sq.poll(5, TimeUnit.SECONDS);
        System.out.println("poll = " + poll);
    }

    private void put() {
        new Thread(() -> {
            try {
                int i = 0;
                while (true) {
                    sq.put(i++ + "");
                    Thread.sleep(1_000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }

}


 












 


 






 







 



 








 









5. PriorityBlockingQueue

  1. 无界Queue,底层数组。无参构造(默认长度11)。直到OutOfMemoryError
  2. put(null); => NullPointerException
  3. 元素 => (inner_comparator || outer_comparator)
  4. put()后queue乱序。take()有序
public class Q4_PriorityBlockingQueue {

    // 1. 无界Queue,**底层数组**。无参构造(默认长度11)。直到`OutOfMemoryError`
    PriorityBlockingQueue<Tmp> pbq = new PriorityBlockingQueue<>();

    // 2. NullPointerException
    @Test
    public void _null() {
        pbq.put(null);
    }

    @Test
    public void ooxx() throws InterruptedException {
        pbq.put(new Tmp("a", 18));
        pbq.put(new Tmp("b", 11));
        pbq.put(new Tmp("c", 6));
        pbq.put(new Tmp("d", 21));

        // queue乱序
        System.out.println(pbq);

        // 4. `put()`后queue乱序。`take()`有序
        System.out.println(pbq.take());
        System.out.println(pbq.take());
        System.out.println(pbq.take());
        System.out.println(pbq.take());
    }

    // 3. 元素 => (inner_comparator || outer_comparator)
    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    static class Tmp implements Comparable<Tmp> {
        String name;
        int age;

        @Override
        public int compareTo(Tmp o) {
            return this.age - o.age;
        }
    }

}



 




 




 








 










 










6. DelayQueue

  1. put(null); => NullPointerException
  2. 底层 ReentrantLock + PriorityQueue
  3. 无界queue,存放Delayed接口对象,E必须重写getDelay(), compareTo()
  4. compareTo():确定E的位置策略
  5. getDelay():决定E的弹出时间策略
public class Q5_DelayQueue {

    /*
     * 2. 底层ReentrantLock + PriorityQueue
     *     - class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>
     *     - interface Delayed extends Comparable<Delayed>
     *
     *      private final transient ReentrantLock lock = new ReentrantLock();
     *      private final PriorityQueue<E> q = new PriorityQueue<E>();
     */
    DelayQueue<Tmp> dq = new DelayQueue<>();

    // 1. NullPointerException
    @Test
    public void _null() {
        dq.put(null);
    }

    @Test
    public void delay() throws InterruptedException {

        dq.put(new Tmp(1, System.currentTimeMillis() + 5_000)); // 在queue中,5s后弹出
        dq.put(new Tmp(2, System.currentTimeMillis() + 8_000)); // 在(id = 1)后,3s后弹出

        System.out.println("dq = " + dq);

        do {
            System.out.println("dq.take() = " + dq.take());
        } while (dq.size() > 0);
    }

    // ----------------------------------------------------------------------

    // 3. 无界queue,存放Delayed接口对象,E必须重写`getDelay(), compareTo()`
    @Data
    @AllArgsConstructor
    static class Tmp implements Delayed {
        private int id;
        private long endTime;

        // 4. compareTo():确定E的位置策略。E排序规则
        @Override
        public int compareTo(Delayed o) {
            Tmp other = (Tmp) o;
            return Integer.compare(this.getId(), other.getId());
        }

        // 5. getDelay():决定E的弹出时间策略。E弹出条件
        @Override
        public long getDelay(TimeUnit unit) {
            // <= 0 可以弹出
            return this.getEndTime() - System.currentTimeMillis();
        }
    }

}










 




 





 





 














 






 






1. 用途

  1. 淘宝订单业务,下单之后如果30min之内没有付款就自动取消订单
  2. 饿了吗订餐通知,下单成功后60s之后给用户发送短信通知
  3. 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭
  4. 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出
  5. 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求

7. Deque

public class Q6_Deque {

    /**
     * `offerFirst(), offerLast(), pollFirst(), pollLast()`
     */
    public static void main(String[] args) {

        Deque<String> ll = new LinkedList<>();

        // Deque<E> extends Queue
        Deque<String> dq = new ArrayDeque<>();

        dq.offer("A");
        System.out.println("dq = " + dq);

        dq.offerFirst("B");                     // 1. 头压入
        dq.offerLast("C");                      // 2. 尾压入
        System.out.println("dq = " + dq);

        System.out.println("dq.pollFirst() = " + dq.pollFirst());    // 3. 头弹出
        System.out.println("dq.pollLast() = " + dq.pollLast());      // 4. 尾弹出

        System.out.println("dq = " + dq);
    }
}