04-JUC
latch 英 [lætʃ] n. 插销;门闩;弹簧锁;碰锁 vi. 占有,抓住;闭锁
semaphore 英 [ˈseməfɔː(r)] n. 信号标,旗语;臂板信号装置
cyclic 英 [ˈsaɪklɪk] adj. 循环的;周期的
phaser 英 [ˈfeɪzə(r)] n. [电子] 移相器;相位器
1. ReentrantLock
- sync自动加解锁,ReentrantLock手动
- ReentrantLock可以condition,不同等待队列
- RL底层CAS,sync锁升级
- synchronized可重入性
- t调用
m1()
,m1()
调用m2()
,t线程为this对象加了两把锁
- t调用
- ReentrantLock替代synchronized
- sync遇到异常,jvm自动释放锁
- ReentrantLock必须手动释放锁,因此经常在
finally{}
中进行锁的释放
tryLock()
,尝试锁定 || 指定时间内尝试锁定,线程可以决定是否继续等待lockInterruptibly()
被interrupt()
打断- 公平锁(fairLock) =>
new ReentrantLock(true);
- Thread-2、Thread-1交替获得锁
- Condition本质:锁资源上不同的等待队列
- 3个API,必须都在
lock()
后。否则java.lang.IllegalMonitorStateException
await()
:释放lock。thread进入WAITINGsignal()
:不释放lock。随机激活Condition一个thread,从await()
位置继续执行signalAll()
:不释放lock。激活Condition全部thread,只有一个获得lock,从await()
位置继续执行
- 3个API,必须都在
@Slf4j
public class T01_ReentrantLock {
synchronized void m1() {
System.out.println(Thread.currentThread().getName() + ": m1() ...");
for (int i = 0; i < 8; i++) {
ThreadHelper.sleepSeconds(1);
System.out.println(i);
if (i == 2) m2();
}
}
synchronized void m2() {
System.out.println(Thread.currentThread().getName() + ": m2() ...");
}
/*
* 1. synchronized可重入性
* t调用m1(),m1()调用m2(),t线程为this对象加了两把锁
*/
@Test
public void T1_sync_reentrant() {
T01_ReentrantLock rl = new T01_ReentrantLock();
Thread t = new Thread(rl::m1);
t.start();
ThreadHelper.join(t);
}
// ------------------------------------------------------------------------------------------------
Lock lock = new ReentrantLock();
void m3() {
try {
lock.lock(); // synchronized(this)
for (int i = 0; i < 3; i++) {
ThreadHelper.sleepSeconds(1);
System.out.println("m3: " + i);
}
} finally {
lock.unlock();
}
}
void m4() {
try {
lock.lock();
System.out.println("m4 ...");
} finally {
lock.unlock();
}
}
/*
* 2. ReentrantLock替代synchronized
* 1. sync遇到异常,jvm自动释放锁
* 2. ReentrantLock必须手动释放锁,因此经常在finally{}中进行锁的释放
*/
@Test
public void T2_ReentrantLock() {
T01_ReentrantLock rl = new T01_ReentrantLock();
Thread t1 = new Thread(rl::m3);
t1.start();
ThreadHelper.sleepSeconds(1);
Thread t2 = new Thread(rl::m4);
t2.start();
ThreadHelper.join(t1, t2);
}
// ------------------------------------------------------------------------------------------------
/**
* tryLock()抛出异常
*/
void m5() {
boolean locked = false;
try {
locked = lock.tryLock(1, TimeUnit.SECONDS);
if (locked) {
System.out.println("m5 ..." + locked + ", 业务...");
} else {
System.out.println("m5 ..." + locked);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (locked) lock.unlock();
}
}
/**
* 3. tryLock(),尝试锁定 || 指定时间内尝试锁定,线程可以决定是否继续等待
*/
@Test
public void T3_tryLock() {
T01_ReentrantLock rl = new T01_ReentrantLock();
Thread t1 = new Thread(rl::m3);
t1.start();
ThreadHelper.sleepSeconds(1);
Thread t2 = new Thread(rl::m5);
t2.start();
ThreadHelper.join(t1, t2);
}
// ------------------------------------------------------------------------------------------------
/*
* 4. lockInterruptibly()被interrupt()打断
*/
@Test
public void T4_lockInterruptibly() {
Lock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
try {
lock.lock();
System.out.println("t1, start");
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
System.out.println("t1, end");
} catch (InterruptedException e) {
System.out.println("t1, interrupted!");
} finally {
// 即使被interrupt(),也要unlock()
lock.unlock();
}
});
Thread t2 = new Thread(() -> {
try {
// lock.lock();
lock.lockInterruptibly(); // 可以对interrupt()方法做出响应
System.out.println("t2, start");
TimeUnit.SECONDS.sleep(5);
System.out.println("t2,end");
lock.unlock();
} catch (InterruptedException e) {
System.out.println("t2, interrupted!");
}
});
ThreadHelper.start(t1, t2);
ThreadHelper.sleepSeconds(1);
t2.interrupt(); // 打断t2的等待
ThreadHelper.join(t1, t2);
}
// ------------------------------------------------------------------------------------------------
ReentrantLock fireLock = null;
/*
* 5. 公平锁(fairLock) => `new ReentrantLock(true);`
* Thread-2、Thread-1交替获得锁
*/
public void m6() {
Runnable r = () -> {
for (int i = 0; i < 100; i++) {
fireLock.lock();
try {
log.error("获得锁");
} finally {
fireLock.unlock();
}
}
};
Thread t1 = new Thread(r);
Thread t2 = new Thread(r);
ThreadHelper.start(t1, t2);
ThreadHelper.join(t1, t2);
}
@Test
public void T5_fireLock() {
fireLock = new ReentrantLock(true);
m6();
}
@Test
public void T6_fireLock() {
fireLock = new ReentrantLock(false);
m6();
}
// ------------------------------------------------------------------------------------------------
/*
* 6. Condition本质:锁资源上不同的等待队列
*
* 3个API,必须都在`lock()`后。否则`java.lang.IllegalMonitorStateException`
* 1. await():释放lock。thread进入WAITING
* 2. signal():不释放lock。随机激活Condition一个thread,从await()位置继续执行
* 3. signalAll():不释放lock。激活Condition全部thread,只有一个获得lock,从await()位置继续执行
*/
@Test
public void T7_await_signal() {
char[] aI = "1234567".toCharArray();
char[] aC = "ABCDEFG".toCharArray();
Condition conditionT1 = lock.newCondition();
Condition conditionT2 = lock.newCondition();
Thread t1 = new Thread(() -> {
try {
lock.lock();
for (char c : aI) {
System.out.print(c);
conditionT2.signal();
conditionT1.await();
}
conditionT2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
Thread t2 = new Thread(() -> {
try {
lock.lock();
for (char c : aC) {
System.out.print(c);
conditionT1.signal();
conditionT2.await();
}
conditionT1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
ThreadHelper.start(t1, t2);
}
}
2. ReadWriteLock
读共享、写互斥
/**
* 10个读thread,2个写thread
*/
@Slf4j
public class T02_ReadWriteLock {
/**
* 读取操作
*/
public static void read(Lock lock) {
try {
lock.lock();
ThreadHelper.sleepMilli(RandomUtil.randomInt(500, 1000));
log.error("read over! ");
} finally {
lock.unlock();
}
}
/**
* 写操作
*/
public static void write(Lock lock, int v) {
try {
lock.lock();
ThreadHelper.sleepMilli(RandomUtil.randomInt(200, 1000));
log.error("write over! ");
} finally {
lock.unlock();
}
}
static Lock lock = new ReentrantLock();
List<Thread> ts = new ArrayList<>(12);
/**
* 1. 普通锁,读写全部互斥
*/
@Test
public void T1_local() {
for (int i = 0; i < 10; i++)
ts.add(new Thread(() -> read(lock), "r" + i));
for (int i = 0; i < 2; i++)
ts.add(new Thread(() -> write(lock, new Random().nextInt()), "w" + i));
Collections.shuffle(ts);
ThreadHelper.start(ts);
ThreadHelper.join(ts);
}
static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();
/*
* 2. 读写锁
* 读锁:共享锁
* 写锁:排他锁
*/
@Test
public void T2_read_write() {
// 读共享、写互斥
for (int i = 0; i < 10; i++)
ts.add(new Thread(() -> read(readLock), "r" + i));
for (int i = 0; i < 2; i++)
ts.add(new Thread(() -> write(writeLock, new Random().nextInt()), "w" + i));
Collections.shuffle(ts);
ThreadHelper.start(ts);
ThreadHelper.join(ts);
}
}
3. CountDownLatch
public class T03_CountDownLatch {
static Thread[] threads = new Thread[100];
static AtomicInteger atomicInt = new AtomicInteger(0);
public void threads(Runnable runnable) {
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(runnable);
}
}
@Test
public void T1_CountDownLatch() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(threads.length);
Runnable runnable = () -> {
atomicInt.incrementAndGet();
latch.countDown();
};
threads(runnable);
ThreadHelper.start(threads);
latch.await();
System.out.println("end latch: " + atomicInt.get());
}
@Test
public void T2_join() throws InterruptedException {
Runnable runnable = () -> {
atomicInt.incrementAndGet();
};
threads(runnable);
ThreadHelper.start(threads);
for (Thread thread : threads) {
thread.join();
}
System.out.println("end join: " + atomicInt);
}
}
4. Semaphore
限流(公平、非公平)
/*
* 限流。允许n线程同时执行
*/
public class T04_Semaphore {
public static void main(String[] args) {
// Semaphore s = new Semaphore(2);
Semaphore s = new Semaphore(1, true); // 是否公平
new Thread(() -> {
try {
s.acquire();
System.out.println("T1 running...");
Thread.sleep(200);
System.out.println("T1 running...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
s.release();
}
}).start();
new Thread(() -> {
try {
s.acquire();
System.out.println("T2 running...");
Thread.sleep(200);
System.out.println("T2 running...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
s.release();
}
}).start();
}
}
5. CyclicBarrier
- 人满,发车
- 限流框架:Guava RateLimiter
/*
* CyclicBarrier(人满,发车)
* 应用场景:复杂操作3个线程并行,都执行完了才进行主线程操作
*/
public class T05_CyclicBarrier {
@Test
public void T1() {
// CyclicBarrier barrier = new CyclicBarrier(20);
CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println("人满,发车"));
for (int i = 0; i < 100; i++) {
new Thread(() -> {
try {
barrier.await();
// 业务操作
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
6. Phaser
人满,下一步
/*
* Phaser(人满,下一步)
*/
public class T06_Phaser {
static Random r = new Random();
static MarriagePhaser phaser = new MarriagePhaser();
public static void main(String[] args) {
phaser.bulkRegister(7);
for (int i = 0; i < 5; i++)
new Thread(new Person("p" + i)).start();
new Thread(new Person("新郎")).start();
new Thread(new Person("新娘")).start();
}
/**
* 继承Phaser,重写onAdvance
*/
static class MarriagePhaser extends Phaser {
/**
* onAdvance
*
* @param phase 当前是哪个阶段
* @param registeredParties 当前阶段多少人
* @return boolean
*/
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("所有人到齐了!线程数: " + registeredParties + "\n");
return false;
case 1:
System.out.println("所有人吃完了!线程数: " + registeredParties + "\n");
return false;
case 2:
System.out.println("所有人离开了!线程数: " + registeredParties + "\n");
return false;
case 3:
System.out.println("婚礼结束!新郎新娘抱抱!线程数: " + registeredParties);
return true;
default:
return true;
}
}
}
/*
* 1. arriveAndAwaitAdvance():到达并等待共同前进
* 2. arriveAndDeregister():到达并取消注册
*/
@AllArgsConstructor
static class Person implements Runnable {
String name;
public void arrive() {
ThreadHelper.sleepMilli(r.nextInt(1000));
System.out.printf("%s 到达现场!\n", name);
phaser.arriveAndAwaitAdvance();
}
public void eat() {
ThreadHelper.sleepMilli(r.nextInt(1000));
System.out.printf("%s 吃完!\n", name);
phaser.arriveAndAwaitAdvance();
}
public void leave() {
ThreadHelper.sleepMilli(r.nextInt(1000));
System.out.printf("%s 离开!\n", name);
phaser.arriveAndAwaitAdvance();
}
private void hug() {
if ("新郎".equals(name) || "新娘".equals(name)) {
ThreadHelper.sleepMilli(r.nextInt(1000));
System.out.printf("%s 洞房!\n", name);
phaser.arriveAndAwaitAdvance();
} else {
phaser.arriveAndDeregister();
// phaser.register()
}
}
@Override
public void run() {
arrive();
eat();
leave();
hug();
}
}
}
7. Exchanger
两thread数据交换
/**
* 两thread数据交换(阻塞)
*/
@Slf4j
public class T07_Exchanger {
static Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) {
new Thread(() -> {
String s = "T1";
try {
// 1. 数据放入Exchanger,t1线程阻塞,等待第2个线程放入交换数据
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.error(s);
}, "t1").start();
new Thread(() -> {
String s = "T2";
try {
// 2. 数据放入Exchanger,进行交换。交换完成t1、t2线程继续运行
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.error(s);
}, "t2").start();
}
}
8. LockSupport
park()
:停止任意threadunpark(t)
:任意thread启动Thread.start()
后。unpark(t)
,可以在park()
前执行
/**
* 锁的拓展支持
*/
@Slf4j
public class T08_LockSupport {
/*
* 1. park():停止任意thread
* 2. unpark(t):任意thread启动
*/
@Test
public void T1_park() {
Thread t = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(i);
if (i == 5) {
LockSupport.park();
}
ThreadHelper.sleepSeconds(1);
}
});
t.start();
ThreadHelper.sleepSeconds(8);
System.out.println("after 8 seconds!");
LockSupport.unpark(t);
ThreadHelper.join(t);
}
/*
* 3. Thread.start()后。unpark(t),可以在park()前执行
*/
@Test
public void T2_unpark_advance() {
Thread t = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(i);
if (i == 5) {
LockSupport.park();
}
ThreadHelper.sleepSeconds(1);
}
});
t.start();
LockSupport.unpark(t);
ThreadHelper.join(t);
}
}