06-container
- 物理存储两种
- 数组
- 链表
- 容器涉及知识
- 算法
- 数据结构
- 本身的内部结构
- 高并发
- Queue专门为并发准备的容器
1. Map发展
Hashtable
:jdk1.0所有方法都synchronized,设计是不太合理的。所以现在Hashtable、Vector基本不用HashMap, LinkedHashMap, TreeMap
:完全不加锁Collections.sychronizedXXX
:synchronizedMap将线程不安全的容器变为安全的。效率仍然不是特别高。锁有点重ConcurrentHashMap, ConcurrentSkipListMap
:读效率出奇的高,写效率一般
public class T01_Map {
// 测试数据量(500万)
public static final int count = 5_000_000;
// 测试thread数量
public static final int THREAD_COUNT = 100;
static Map<UUID, UUID> map = null;
static UUID[] keys = new UUID[count];
static UUID[] values = new UUID[count];
static {
for (int i = 0; i < count; i++) {
keys[i] = UUID.randomUUID();
values[i] = UUID.randomUUID();
}
}
// ---------------------------------------------------------------------
static class MyThread extends Thread {
int start;
int gap = count / THREAD_COUNT;
public MyThread(int start) {
this.start = start;
}
@Override
public void run() {
for (int i = start; i < start + gap; i++) {
map.put(keys[i], values[i]);
}
}
}
// ---------------------------------------------------------------------
private void ooxx() {
// thread[]准备
Thread[] threads = new Thread[THREAD_COUNT];
for (int i = 0; i < threads.length; i++) {
threads[i] = new MyThread(i * (count / THREAD_COUNT));
}
long start = System.currentTimeMillis();
ThreadHelper.start(threads);
ThreadHelper.join(threads);
System.out.println("容器装载数据耗时:" + (System.currentTimeMillis() - start));
System.out.println("Map容器大小:" + map.size());
// ------------------------- 容器查询效率测试 -------------------------
for (int i = 0; i < threads.length; i++) {
// 100个thread,每个thread对容器里一个key查询100万次
threads[i] = new Thread(() -> {
for (int j = 0; j < 1_000_000; j++) {
map.get(keys[10]);
}
});
}
start = System.currentTimeMillis();
ThreadHelper.start(threads);
ThreadHelper.join(threads);
System.out.println("容器查询数据耗时:" + (System.currentTimeMillis() - start));
}
/*
* 1. Hashtable
* 线程安全。几乎所有方法synchronized
* jdk1.0,已经基本不用Hashtable、vector
*
* 容器装载数据耗时:3111
* Map容器大小:5000000
* 容器查询数据耗时:5548
*/
@Test
public void T1_HashTable() {
map = new Hashtable<>();
ooxx();
}
/*
* 2. HashMap
* 线程不安全。完全不加锁
*
* 容器装载数据耗时:2251
* Map容器大小:4438642
* 容器查询数据耗时:2408
*/
@Test
public void T2_HashMap() {
map = new HashMap<>();
ooxx();
}
/*
* 3. synchronizedMap()
* Collections容器工具类,锁有点重
*
* 容器装载数据耗时:2757
* Map容器大小:5000000
* 容器查询数据耗时:6809
*/
@Test
public void T3_synchronizedMap() {
map = Collections.synchronizedMap(new HashMap<>());
ooxx();
}
/*
* 4. ConcurrentHashMap
* 读效率出奇的高,写效率一般
*
* 容器装载数据耗时:3930
* Map容器大小:5000000
* 容器查询数据耗时:572
*/
@Test
public void T4_ConcurrentHashMap() {
map = new ConcurrentHashMap<>();
ooxx();
}
}
2. Collection发展
Vector
:早期使用synchronized实现ArrayList, LinkedList, HashSet
:未考虑多线程安全(未实现同步)Collections.synchronizedList()
:工厂方法使用的也是synchronizedCopyOnWriteList
ConcurrentLinkedQueue
StringBuilder
vsStringBuffer
- 使用早期的同步容器以及
Collections.synchronized()
的不足之处,请阅读: - java集合框架【3】 java1.5新特性 ConcurrentHashMap、Collections.synchronizedMap、Hashtable讨论
/**
* 有N张火车票,每张票都有一个编号,同时有10个窗口对外售票
* 请写一个模拟程序
*/
@Slf4j
public class T02_Collection {
static CountDownLatch latch = new CountDownLatch(10);
static Vector<String> vector = new Vector<>();
static List<String> arrayList = new ArrayList<>();
static final List<String> linkedList = new LinkedList<>();
static Queue<String> clq = new ConcurrentLinkedQueue<>();
static {
for (int i = 0; i < 1_000; i++) {
vector.add("票编号:" + i);
arrayList.add("票编号:" + i);
linkedList.add("票编号:" + i);
clq.add("票编号:" + i);
}
}
/*
* 1. Vector、Collections.synchronizedXXX
* 1. 线程安全,大多方法自带锁,synchronized
* 2. 分析一下,这样能解决问题吗?重复销售、超量销售
*/
@Test
public void T1_Vector() throws InterruptedException {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
while (vector.size() > 0) {
// 执行到这里锁被释放了,多个线程会进来
ThreadHelper.sleepMilli(10);
// 打印数据条数 = 1000,—— 线程安全
log.debug("销售了--" + vector.remove(0));
}
latch.countDown();
}).start();
}
latch.await();
}
/*
* 2. ArrayList
* 1. 线程不安全
* 2. 分析下面的程序可能会产生哪些问题?重复销售、超量销售
*/
@Test
public void T2_ArrayList() throws InterruptedException {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
while (arrayList.size() > 0) {
ThreadHelper.sleepMilli(10);
// 打印数据条数 > 1000,—— 线程不安全
log.debug("销售了--" + arrayList.remove(0));
}
latch.countDown();
}).start();
}
latch.await();
}
/*
* 3. synchronized()
* size()和进行remove()必须整体为原子操作
*/
@Test
public void T3_LinkedList() throws InterruptedException {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
while (true) {
synchronized (linkedList) {
if (linkedList.size() <= 0) break;
ThreadHelper.sleepMilli(10);
log.debug("销售了--" + linkedList.remove(0));
}
}
latch.countDown();
}).start();
}
latch.await();
}
/*
* 4. ConcurrentQueue提高并发性
*
* CAS 与 synchronized 效率高低是相对的
* 1. 单线程:HashMap、ArrayList
* 2. 低并发,执行时间短:ConcurrentLinkedQueue、ConcurrentHashMap
* 3. 高并发,执行时间长:synchronized
* 4. 根据实际压测
*/
@Test
public void T4_ConcurrentLinkedQueue() throws InterruptedException {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
while (true) {
// 无锁,CAS来实现。效率上高很多
// the head of this queue, or {@code null} if this queue is empty
String s = clq.poll();
if (s == null) break;
else {
ThreadHelper.sleepMilli(10);
log.debug("销售了--" + s);
}
}
latch.countDown();
}).start();
}
latch.await();
}
}
3. 多线程下容器
1. Map
跳表(SkipList)及ConcurrentSkipListMap源码解析
- Map
HashMap
(HashTable) —— 无序TreeMap
(红黑树)—— 有序
- ConcurrentMap
ConcurrentHashMap
—— 无序ConcurrentSkipListMap
(CAS用tree实现太复杂)—— 有序
跳表结构:在中间关键元素加索引。时间复杂度:O(lgn)
/*
* 1. Map
* 1. HashMap —— 无序
* 2. TreeMap(红黑树)—— 有序
* 2. ConcurrentMap
* 1. ConcurrentHashMap —— 无序
* 2. ConcurrentSkipListMap有序(CAS用tree实现太复杂)—— 有序
*/
public class T03_ConcurrentMap {
private Map<String, String> map = null;
private void ooxx() throws InterruptedException {
Thread[] ts = new Thread[100];
CountDownLatch latch = new CountDownLatch(ts.length);
long start = System.currentTimeMillis();
for (int i = 0; i < ts.length; i++) {
ts[i] = new Thread(() -> {
for (int j = 0; j < 1_000; j++) {
map.put(IdUtil.fastSimpleUUID(), IdUtil.fastSimpleUUID());
}
latch.countDown();
});
}
ThreadHelper.start(ts);
latch.await();
System.out.println("耗时:" + (System.currentTimeMillis() - start));
System.out.println("map.size:" + map.size());
}
// 耗时:176
// map.size:100000
@Test
public void T1_Hashtable() throws InterruptedException {
map = new Hashtable<>();
ooxx();
}
// 耗时:110
// map.size:96898
@Test
public void T2_HashMap() throws InterruptedException {
map = new HashMap<>(); // Collections.synchronizedXXX
ooxx();
}
// 耗时:178
// map.size:100000
@Test
public void T3_ConcurrentHashMap() throws InterruptedException {
// CAS
map = new ConcurrentHashMap<>();
ooxx();
}
// 耗时:332
// map.size:100000
@Test
public void T4_ConcurrentSkipListMap() throws InterruptedException {
// 高并发并且排序
// TreeMap排序,底层红黑树。treeMap实现CAS复杂,所以SkipListMap来实现
map = new ConcurrentSkipListMap<>();
ooxx();
}
}
2. Collection
Vector
Collections.synchronizedList(new ArrayList<>());
new CopyOnWriteArrayList<>();
public class T04_ConcurrentList {
static List<String> lists;
public static void ooxx() {
Thread[] ts = new Thread[100];
for (int i = 0; i < ts.length; i++) {
ts[i] = new Thread(() -> {
for (int i1 = 0; i1 < 1_000; i1++) {
lists.add(IdUtil.fastSimpleUUID());
}
});
}
long l = System.currentTimeMillis();
ThreadHelper.start(ts);
ThreadHelper.join(ts);
System.out.println("耗时:" + (System.currentTimeMillis() - l));
System.out.println("lists.size():" + lists.size());
}
// 耗时:83
// lists.size():100000
@Test
public void T1_Vector() {
lists = new Vector<>();
ooxx();
}
// 耗时:111
// lists.size():100000
@Test
public void T2_ArrayList() {
// 并发问题
lists = Collections.synchronizedList(new ArrayList<>());
ooxx();
}
/**
* 写时复制
* 1. 读多写少。多线程环境下,写时效率低,读时效率高
* 2. 写加锁,读无锁
*/
// 耗时:2387
// lists.size():100000
@Test
public void T3_CopyOnWriteArrayList() {
lists = new CopyOnWriteArrayList<>();
ooxx();
}
}
/*
CopyOnWriteArrayList源码分析
写synchronized,读无锁
public boolean add(E e) {
synchronized (lock) { // 1. 整体加锁
Object[] es = getArray(); // 2. 得到原数组
int len = es.length;
es = Arrays.copyOf(es, len + 1); // 3. 原数组copy到新数组
es[len] = e; // 4. 新数组添加新元素
setArray(es); // 5. 原数组引用切到新数组
return true;
}
}
*/
4. Queue
list、queue区别
- queue提供对线程友好API,
offer(), peek(), poll()
ConcurrentLinkedQueue
PriorityQueue
/*
* 1. list、queue区别
* 1. queue提供对线程友好API,offer(), peek(), poll()
* 2. BlockingQueue,put(), take()阻塞
*/
@Slf4j
public class T05_Queue {
/**
* offer()、poll()、peek(),线程安全
*/
@Test
public void T1_Api() {
Queue<String> q = new ConcurrentLinkedQueue<>();
for (int i = 0; i < 10; i++) {
// list.add()加不进去 exception
// offer()返回boolean
q.offer("a" + i);
}
// remove
System.out.println(q.poll());
System.out.println(q.size());
// not remove
System.out.println(q.peek());
System.out.println(q.size());
// 双端队列Deque
}
/**
* 底层tree实现
*/
@Test
public void T2_PriorityQueue() {
PriorityQueue<String> q = new PriorityQueue<>();
q.add("c");
q.add("e");
q.add("a");
q.add("d");
q.add("z");
// q.size(), poll()导致size时刻都在变。必须为固定大小
for (int i = 0; i < 5; i++) {
System.out.println(q.poll());
}
}
}
1. BlockingQueue
put(), take()
阻塞thread体现Blocking
- LinkedBQ:无界BQ
- ArrayBQ:有界BQ
- DelayQueue:按时间进行任务调度,本质上:PriorityQueue
- SynchronusQ:线程间任务调度,juc用处最大的queue
- TransferQ。LinkedTransferQueue:无界队列,不会阻塞生产者
- 提供LinkedBlockingQueue功能
- 提供SynchronousQueue功能
- LinkedTransferQueue与SynchronousQueue是CAS实现,LinkedBlockingQueue锁来实现的
@Slf4j
public class T06_BlockingQueue {
/*
* 1. 无界BQ
*
* put(), take(),体现blocking
* 1. put(): 如果queue满了,就会等待。天生的生产者
* 2. take(): 如果queue空了,就会等待。天生的消费者
*/
@Test
public void T1_LinkedBlockingQueue() {
// 无界队列
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// product
new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
// queue满了,线程等待(不会满的,容量为Integer.MAX_VALUE)
queue.put("a" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
ThreadHelper.sleepSeconds(1);
}
}, "p").start();
// consumer
for (int i = 0; i < 5; i++) {
new Thread(() -> {
for (; ; ) {
try {
// queue空了,线程等待
log.debug(" take -" + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "c" + i).start();
}
LockSupport.park();
}
/*
* 2. 有界BQ
*
* put(), add(), offer()
*/
@Test
public void T2_ArrayBlockingQueue() throws InterruptedException {
// 有界的
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
for (int i = 0; i < 10; i++) {
queue.put("a" + i);
}
queue.put("aaa"); // 满了。就会等待,程序阻塞
queue.add("aaa"); // 满了。IllegalStateException: Queue full
queue.offer("aaa"); // 满了。返回boolean
queue.offer("aaa", 1, TimeUnit.SECONDS); // 满了。延迟添加
System.out.println(queue);
}
/*
* 3. DelayQueue本质上:PriorityQueue
* 场景:按时间进行任务调度
*/
@Test
public void T3_DelayQueue() throws InterruptedException {
// 时间排序
BlockingQueue<MyTask> bq = new DelayQueue<>();
long now = System.currentTimeMillis();
MyTask t1 = new MyTask("t1", now + 1000);
MyTask t2 = new MyTask("t2", now + 2000);
MyTask t3 = new MyTask("t3", now + 1500);
MyTask t4 = new MyTask("t4", now + 2500);
MyTask t5 = new MyTask("t5", now + 500);
bq.put(t1);
bq.put(t2);
bq.put(t3);
bq.put(t4);
bq.put(t5);
System.out.println(bq);
for (int i = 0; i < 5; i++) {
System.out.println(bq.take());
}
}
/*
* 4. 线程间任务调度
* juc用处最大的queue
* 对比:Exchanger.java
*/
@Test
public void T4_SynchronousQueue() throws InterruptedException {
// 线程间传递任务。容量为0
BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();
new Thread(() -> {
while (true) {
try {
log.error(synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// 阻塞main_thread,等待消费者.take()
synchronousQueue.put("aaa");
synchronousQueue.put("bbb");
/*
* java.lang.IllegalStateException: Queue full
* 容量为0,不可以装元素
* queue.add("aaa");
*/
log.error(synchronousQueue.size() + "");
}
/*
* 5. LinkedTransferQueue:无界队列,不会阻塞生产者
* 1. 提供LinkedBlockingQueue功能
* 2. 提供SynchronousQueue功能
* 3. LinkedTransferQueue与SynchronousQueue是CAS实现,LinkedBlockingQueue锁来实现的
*/
@Test
public void T5_LinkedTransferQueue() throws InterruptedException {
// TransferQueue<E> extends BlockingQueue<E>
TransferQueue<String> queue = new LinkedTransferQueue<>();
// customer_thread
new Thread(() -> {
try {
System.out.println(queue.take());
} catch (Exception e) {
e.printStackTrace();
}
}).start();
// 等价于 => synchronousQueue.put("aaa");
queue.transfer("aaa");
// put()和其他queue没区别
// queue.put("aaa");
}
}
@ToString
@AllArgsConstructor
class MyTask implements Delayed {
String name;
long runningTime;
@Override
public int compareTo(Delayed o) {
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
}