06-container

image-20230506135922534
  1. 物理存储两种
    1. 数组
    2. 链表
  2. 容器涉及知识
    1. 算法
    2. 数据结构
    3. 本身的内部结构
    4. 高并发
  3. Queue专门为并发准备的容器

1. Map发展

  1. Hashtable:jdk1.0所有方法都synchronized,设计是不太合理的。所以现在Hashtable、Vector基本不用
  2. HashMap, LinkedHashMap, TreeMap:完全不加锁
  3. Collections.sychronizedXXX:synchronizedMap将线程不安全的容器变为安全的。效率仍然不是特别高。锁有点重
  4. ConcurrentHashMap, ConcurrentSkipListMap:读效率出奇的高,写效率一般
public class T01_Map {
    // 测试数据量(500万)
    public static final int count = 5_000_000;
    // 测试thread数量
    public static final int THREAD_COUNT = 100;

    static Map<UUID, UUID> map = null;

    static UUID[] keys = new UUID[count];
    static UUID[] values = new UUID[count];

    static {
        for (int i = 0; i < count; i++) {
            keys[i] = UUID.randomUUID();
            values[i] = UUID.randomUUID();
        }
    }

    // ---------------------------------------------------------------------

    static class MyThread extends Thread {
        int start;
        int gap = count / THREAD_COUNT;

        public MyThread(int start) {
            this.start = start;
        }

        @Override
        public void run() {
            for (int i = start; i < start + gap; i++) {
                map.put(keys[i], values[i]);
            }
        }
    }

    // ---------------------------------------------------------------------

    private void ooxx() {
        // thread[]准备
        Thread[] threads = new Thread[THREAD_COUNT];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new MyThread(i * (count / THREAD_COUNT));
        }

        long start = System.currentTimeMillis();

        ThreadHelper.start(threads);
        ThreadHelper.join(threads);

        System.out.println("容器装载数据耗时:" + (System.currentTimeMillis() - start));
        System.out.println("Map容器大小:" + map.size());

        // ------------------------- 容器查询效率测试 -------------------------

        for (int i = 0; i < threads.length; i++) {
            // 100个thread,每个thread对容器里一个key查询100万次
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1_000_000; j++) {
                    map.get(keys[10]);
                }
            });
        }

        start = System.currentTimeMillis();

        ThreadHelper.start(threads);
        ThreadHelper.join(threads);

        System.out.println("容器查询数据耗时:" + (System.currentTimeMillis() - start));
    }

    /*
     * 1. Hashtable
     *      线程安全。几乎所有方法synchronized
     *      jdk1.0,已经基本不用Hashtable、vector
     *
     * 容器装载数据耗时:3111
     * Map容器大小:5000000
     * 容器查询数据耗时:5548
     */
    @Test
    public void T1_HashTable() {
        map = new Hashtable<>();
        ooxx();
    }

    /*
     * 2. HashMap
     *      线程不安全。完全不加锁
     *
     * 容器装载数据耗时:2251
     * Map容器大小:4438642
     * 容器查询数据耗时:2408
     */
    @Test
    public void T2_HashMap() {
        map = new HashMap<>();
        ooxx();
    }

    /*
     * 3. synchronizedMap()
     *      Collections容器工具类,锁有点重
     *
     * 容器装载数据耗时:2757
     * Map容器大小:5000000
     * 容器查询数据耗时:6809
     */
    @Test
    public void T3_synchronizedMap() {
        map = Collections.synchronizedMap(new HashMap<>());
        ooxx();
    }

    /*
     * 4. ConcurrentHashMap
     *      读效率出奇的高,写效率一般
     *
     * 容器装载数据耗时:3930
     * Map容器大小:5000000
     * 容器查询数据耗时:572
     */
    @Test
    public void T4_ConcurrentHashMap() {
        map = new ConcurrentHashMap<>();
        ooxx();
    }
}













 
 
















 



























 























 













 













 













 



2. Collection发展

  1. Vector:早期使用synchronized实现
  2. ArrayList, LinkedList, HashSet:未考虑多线程安全(未实现同步)
  3. Collections.synchronizedList()工厂方法使用的也是synchronized
  4. CopyOnWriteList
  5. ConcurrentLinkedQueue
  6. StringBuilder vs StringBuffer
/**
 * 有N张火车票,每张票都有一个编号,同时有10个窗口对外售票
 * 请写一个模拟程序
 */
@Slf4j
public class T02_Collection {
    static CountDownLatch latch = new CountDownLatch(10);

    static Vector<String> vector = new Vector<>();
    static List<String> arrayList = new ArrayList<>();
    static final List<String> linkedList = new LinkedList<>();
    static Queue<String> clq = new ConcurrentLinkedQueue<>();

    static {
        for (int i = 0; i < 1_000; i++) {
            vector.add("票编号:" + i);
            arrayList.add("票编号:" + i);
            linkedList.add("票编号:" + i);
            clq.add("票编号:" + i);
        }
    }

    /*
     * 1. Vector、Collections.synchronizedXXX
     *      1. 线程安全,大多方法自带锁,synchronized
     *      2. 分析一下,这样能解决问题吗?重复销售、超量销售
     */
    @Test
    public void T1_Vector() throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                while (vector.size() > 0) {
                    // 执行到这里锁被释放了,多个线程会进来
                    ThreadHelper.sleepMilli(10);
                    // 打印数据条数 = 1000,—— 线程安全
                    log.debug("销售了--" + vector.remove(0));
                }

                latch.countDown();
            }).start();
        }
        latch.await();
    }

    /*
     * 2. ArrayList
     *      1. 线程不安全
     *      2. 分析下面的程序可能会产生哪些问题?重复销售、超量销售
     */
    @Test
    public void T2_ArrayList() throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                while (arrayList.size() > 0) {
                    ThreadHelper.sleepMilli(10);
                    // 打印数据条数 > 1000,—— 线程不安全
                    log.debug("销售了--" + arrayList.remove(0));
                }

                latch.countDown();
            }).start();
        }
        latch.await();
    }

    /*
     * 3. synchronized()
     *      size()和进行remove()必须整体为原子操作
     */
    @Test
    public void T3_LinkedList() throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                while (true) {
                    synchronized (linkedList) {
                        if (linkedList.size() <= 0) break;
                        ThreadHelper.sleepMilli(10);
                        log.debug("销售了--" + linkedList.remove(0));
                    }
                }

                latch.countDown();
            }).start();
        }
        latch.await();
    }

    /*
     * 4. ConcurrentQueue提高并发性
     *
     * CAS 与 synchronized 效率高低是相对的
     * 1. 单线程:HashMap、ArrayList
     * 2. 低并发,执行时间短:ConcurrentLinkedQueue、ConcurrentHashMap
     * 3. 高并发,执行时间长:synchronized
     * 4. 根据实际压测
     */
    @Test
    public void T4_ConcurrentLinkedQueue() throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                while (true) {
                    // 无锁,CAS来实现。效率上高很多
                    // the head of this queue, or {@code null} if this queue is empty
                    String s = clq.poll();
                    if (s == null) break;
                    else {
                        ThreadHelper.sleepMilli(10);
                        log.debug("销售了--" + s);
                    }
                }

                latch.countDown();
            }).start();
        }
        latch.await();
    }

}















 
 
 
 
















 




















 




















 

























 














3. 多线程下容器

1. Map

跳表(SkipList)及ConcurrentSkipListMap源码解析open in new window

  1. Map
    1. HashMap(HashTable) —— 无序
    2. TreeMap(红黑树)—— 有序
  2. ConcurrentMap
    1. ConcurrentHashMap —— 无序
    2. ConcurrentSkipListMap(CAS用tree实现太复杂)—— 有序

跳表结构:在中间关键元素加索引。时间复杂度:O(lgn)

/*
 * 1. Map
 *      1. HashMap —— 无序
 *      2. TreeMap(红黑树)—— 有序
 * 2. ConcurrentMap
 *      1. ConcurrentHashMap —— 无序
 *      2. ConcurrentSkipListMap有序(CAS用tree实现太复杂)—— 有序
 */
public class T03_ConcurrentMap {
    private Map<String, String> map = null;

    private void ooxx() throws InterruptedException {
        Thread[] ts = new Thread[100];
        CountDownLatch latch = new CountDownLatch(ts.length);

        long start = System.currentTimeMillis();
        for (int i = 0; i < ts.length; i++) {
            ts[i] = new Thread(() -> {
                for (int j = 0; j < 1_000; j++) {
                    map.put(IdUtil.fastSimpleUUID(), IdUtil.fastSimpleUUID());
                }

                latch.countDown();
            });
        }

        ThreadHelper.start(ts);
        latch.await();

        System.out.println("耗时:" + (System.currentTimeMillis() - start));
        System.out.println("map.size:" + map.size());
    }

    // 耗时:176
    // map.size:100000
    @Test
    public void T1_Hashtable() throws InterruptedException {
        map = new Hashtable<>();
        ooxx();
    }

    // 耗时:110
    // map.size:96898
    @Test
    public void T2_HashMap() throws InterruptedException {
        map = new HashMap<>(); // Collections.synchronizedXXX
        ooxx();
    }

    // 耗时:178
    // map.size:100000
    @Test
    public void T3_ConcurrentHashMap() throws InterruptedException {
        // CAS
        map = new ConcurrentHashMap<>();
        ooxx();
    }

    // 耗时:332
    // map.size:100000
    @Test
    public void T4_ConcurrentSkipListMap() throws InterruptedException {
        // 高并发并且排序
        // TreeMap排序,底层红黑树。treeMap实现CAS复杂,所以SkipListMap来实现
        map = new ConcurrentSkipListMap<>();
        ooxx();
    }

}



















 

















 







 








 









 




2. Collection

  1. Vector
  2. Collections.synchronizedList(new ArrayList<>());
  3. new CopyOnWriteArrayList<>();
public class T04_ConcurrentList {

    static List<String> lists;

    public static void ooxx() {
        Thread[] ts = new Thread[100];

        for (int i = 0; i < ts.length; i++) {
            ts[i] = new Thread(() -> {
                for (int i1 = 0; i1 < 1_000; i1++) {
                    lists.add(IdUtil.fastSimpleUUID());
                }
            });
        }

        long l = System.currentTimeMillis();
        ThreadHelper.start(ts);
        ThreadHelper.join(ts);
        System.out.println("耗时:" + (System.currentTimeMillis() - l));

        System.out.println("lists.size():" + lists.size());
    }

    // 耗时:83
    // lists.size():100000
    @Test
    public void T1_Vector() {
        lists = new Vector<>();
        ooxx();
    }

    // 耗时:111
    // lists.size():100000
    @Test
    public void T2_ArrayList() {
        // 并发问题
        lists = Collections.synchronizedList(new ArrayList<>());
        ooxx();
    }

    /**
     * 写时复制
     * 1. 读多写少。多线程环境下,写时效率低,读时效率高
     * 2. 写加锁,读无锁
     */
    // 耗时:2387
    // lists.size():100000
    @Test
    public void T3_CopyOnWriteArrayList() {
        lists = new CopyOnWriteArrayList<>();
        ooxx();
    }
}
/*
CopyOnWriteArrayList源码分析
写synchronized,读无锁

public boolean add(E e) {
    synchronized (lock) {                // 1. 整体加锁
        Object[] es = getArray();        // 2. 得到原数组
        int len = es.length;
        es = Arrays.copyOf(es, len + 1); // 3. 原数组copy到新数组
        es[len] = e;                     // 4. 新数组添加新元素
        setArray(es);                    // 5. 原数组引用切到新数组
        return true;
    }
}
*/










 
















 








 












 








 









4. Queue

list、queue区别

  1. queue提供对线程友好API,offer(), peek(), poll()
    • ConcurrentLinkedQueue
  2. PriorityQueue
/*
 * 1. list、queue区别
 *      1. queue提供对线程友好API,offer(), peek(), poll()
 *      2. BlockingQueue,put(), take()阻塞
 */
@Slf4j
public class T05_Queue {

    /**
     * offer()、poll()、peek(),线程安全
     */
    @Test
    public void T1_Api() {
        Queue<String> q = new ConcurrentLinkedQueue<>();

        for (int i = 0; i < 10; i++) {
            // list.add()加不进去 exception
            // offer()返回boolean
            q.offer("a" + i);
        }

        // remove
        System.out.println(q.poll());
        System.out.println(q.size());

        // not remove
        System.out.println(q.peek());
        System.out.println(q.size());

        // 双端队列Deque
    }

    /**
     * 底层tree实现
     */
    @Test
    public void T2_PriorityQueue() {
        PriorityQueue<String> q = new PriorityQueue<>();

        q.add("c");
        q.add("e");
        q.add("a");
        q.add("d");
        q.add("z");
        // q.size(), poll()导致size时刻都在变。必须为固定大小
        for (int i = 0; i < 5; i++) {
            System.out.println(q.poll());
        }
    }

}













 























 













1. BlockingQueue

put(), take()阻塞thread体现Blocking

  1. LinkedBQ:无界BQ
  2. ArrayBQ:有界BQ
  3. DelayQueue:按时间进行任务调度,本质上:PriorityQueue
  4. SynchronusQ:线程间任务调度,juc用处最大的queue
  5. TransferQ。LinkedTransferQueue:无界队列,不会阻塞生产者
    • 提供LinkedBlockingQueue功能
    • 提供SynchronousQueue功能
    • LinkedTransferQueue与SynchronousQueue是CAS实现,LinkedBlockingQueue锁来实现的
@Slf4j
public class T06_BlockingQueue {

    /*
     * 1. 无界BQ
     *
     * put(), take(),体现blocking
     * 1. put(): 如果queue满了,就会等待。天生的生产者
     * 2. take(): 如果queue空了,就会等待。天生的消费者
     */
    @Test
    public void T1_LinkedBlockingQueue() {
        // 无界队列
        BlockingQueue<String> queue = new LinkedBlockingQueue<>();

        // product
        new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                try {
                    // queue满了,线程等待(不会满的,容量为Integer.MAX_VALUE)
                    queue.put("a" + i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                ThreadHelper.sleepSeconds(1);
            }
        }, "p").start();

        // consumer
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                for (; ; ) {
                    try {
                        // queue空了,线程等待
                        log.debug(" take -" + queue.take());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "c" + i).start();
        }

        LockSupport.park();
    }

    /*
     * 2. 有界BQ
     *
     * put(), add(), offer()
     */
    @Test
    public void T2_ArrayBlockingQueue() throws InterruptedException {
        // 有界的
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

        for (int i = 0; i < 10; i++) {
            queue.put("a" + i);
        }

        queue.put("aaa");     // 满了。就会等待,程序阻塞
        queue.add("aaa");     // 满了。IllegalStateException: Queue full
        queue.offer("aaa");   // 满了。返回boolean
        queue.offer("aaa", 1, TimeUnit.SECONDS); // 满了。延迟添加

        System.out.println(queue);
    }

    /*
     * 3. DelayQueue本质上:PriorityQueue
     *      场景:按时间进行任务调度
     */
    @Test
    public void T3_DelayQueue() throws InterruptedException {
        // 时间排序
        BlockingQueue<MyTask> bq = new DelayQueue<>();

        long now = System.currentTimeMillis();
        MyTask t1 = new MyTask("t1", now + 1000);
        MyTask t2 = new MyTask("t2", now + 2000);
        MyTask t3 = new MyTask("t3", now + 1500);
        MyTask t4 = new MyTask("t4", now + 2500);
        MyTask t5 = new MyTask("t5", now + 500);

        bq.put(t1);
        bq.put(t2);
        bq.put(t3);
        bq.put(t4);
        bq.put(t5);

        System.out.println(bq);

        for (int i = 0; i < 5; i++) {
            System.out.println(bq.take());
        }
    }

    /*
     * 4. 线程间任务调度
     *      juc用处最大的queue
     *      对比:Exchanger.java
     */
    @Test
    public void T4_SynchronousQueue() throws InterruptedException {

        // 线程间传递任务。容量为0
        BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();

        new Thread(() -> {
            while (true) {
                try {
                    log.error(synchronousQueue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 阻塞main_thread,等待消费者.take()
        synchronousQueue.put("aaa");
        synchronousQueue.put("bbb");

        /*
         * java.lang.IllegalStateException: Queue full
         * 容量为0,不可以装元素
         * queue.add("aaa");
         */

        log.error(synchronousQueue.size() + "");
    }


    /*
     * 5. LinkedTransferQueue:无界队列,不会阻塞生产者
     *      1. 提供LinkedBlockingQueue功能
     *      2. 提供SynchronousQueue功能
     *      3. LinkedTransferQueue与SynchronousQueue是CAS实现,LinkedBlockingQueue锁来实现的
     */
    @Test
    public void T5_LinkedTransferQueue() throws InterruptedException {
        // TransferQueue<E> extends BlockingQueue<E>
        TransferQueue<String> queue = new LinkedTransferQueue<>();

        // customer_thread
        new Thread(() -> {
            try {
                System.out.println(queue.take());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

        // 等价于 => synchronousQueue.put("aaa");
        queue.transfer("aaa");

        // put()和其他queue没区别
        // queue.put("aaa");
    }
}

@ToString
@AllArgsConstructor
class MyTask implements Delayed {
    String name;
    long runningTime;

    @Override
    public int compareTo(Delayed o) {
        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
}