03-JUC(55)
1. 什么是线程安全
线程安全一般指在多线程情况下,一段代码或某个对象可以正确地应对多线程访问,不会导致数据丢失、错乱等不一致问题
- 多线程场景下,来执行 1000 次
i++
逻辑,不一定等于 1000 synchronized
加锁,线程安全,等于 1000
synchronized(lock){
i++;
}
1. 实现方式
- 原子操作:不可分割的操作,避免了多线程同时修改数据的问题。eg:CAS 就是一个原子操作
- 锁机制:如互斥锁、读写锁等,保证同时只有一个线程能够访问共享资源。eg:
synchronized
、ReentrantLock
- 并发控制:如信号量、条件变量等,控制线程的执行顺序和时机。eg:
Semaphore
、CountDownLatch
2. 线程、进程区别
1. 进程
OS分配资源的基本单位,进程都有自己的内存空间,它由OS调度
- 进程是程序在OS中的一个执行实例。它包含了程序代码、数据、和资源(如文件句柄、内存)等
- 也由于进程有独立的地址空间,所以进程间切换(上下文切换)开销较大
- 进程间通信(IPC)复杂,需要借助OS提供的机制。eg:管道、消息队列、共享内存等
2. 线程
CPU 调度执行的基本单位,它由 CPU 调度
- 线程是进程中的一个执行单元,属于轻量级进程。一个进程可以包含多个线程,它们共享进程的地址空间和资源,但是它们各自都有自己的程序计数器和栈空间,线程是属于进程的
- 因为共享进程中的资源,所以线程的创建和销毁比进程更快,线程间切换(上下文切换)开销小
- 线程间通信简单,因为它们共享地址空间,可以直接使用共享变量
3. 线程池
线程池是一种线程管理技术,通过复用线程来减少线程创建和销毁的开销,提高系统的响应速度
4. 并行、并发
- 并行(Parallelism):指在多处理器系统中同时执行多个任务
- 并发(Concurrency):指在单处理器系统中通过任务切换来实现多个任务的“同时”进行
5. 同步、异步
- 同步操作:需要等待任务完成后才能继续执行
- 异步操作:则不需要等待任务完成,可以继续执行其他任务,提高系统的效率
3. 协程
协程(Coroutine):是一个比线程轻量级的执行单位。尤其适合于高并发场景和异步编程。协程在某些语言中被称为绿色线程(Green Threads)
- 线程确实是 CPU 层面的最小执行单元,但是这个协程是在程序层面的定义,它不属于OS的概念,可以理解一个线程在执行中可切换执行不同的协程的任务,对 CPU 而言还是在运行同一个线程,不涉及系统上下文的切换,所以它的性能相比线程切换会更高
- 协程暂停执行后会保存当前的状态,当协程恢复执行时,可以从之前保存的状态开始执行,也就是程序层面来进行协程的调度管理
- 像 Java 之前也是有协程的,但是后面废除了,好像是因为难度比较高
- 典型的像 Go 语言是原生支持协程的
综上:协程是程序调度的基本单位。线程可以包含多个协程,协程可以在单个线程中运行,也可以在多个线程之间共享
特征:
- 协程的切换是由程序显式控制的,而不是由OS调度
- 协程的开销非常小,比线程更轻量,因为它们不需要OS内核的干预
- 协程之间的切换是非抢占式的,也就是说,协程只有在显式调用挂起操作时才会切换
优点:
- 由于不需要内核的参与,协程切换的开销非常低,可以显著提高程序的性能
- 协程通过协作式调度避免了许多传统多线程编程中的复杂问题。eg:锁和竞态条件
- 协程更容易实现大量并发操作,因为它们消耗的资源(内存、CPU)更少
1. 使用场景
- 非常适合处理大量 I/O 操作(如网络请求、文件读写等),因为它们可以在等待 I/O 完成时释放 CPU 资源
- 非常适合高并发场景。eg:可以使用协程来处理大量并发连接,以提高吞吐量和响应时间
2. 扩展关联协程库和框架
除了 Go 语言的 goroutine,还有许多其他语言和框架支持协程:
- Python 的 asyncio
- Kotlin 的 协程
- JavaScript 的 async/await
- C# 的 async/await
4. 线程的生命周期
在 Java 中,线程的生命周期可以细化为以下几个状态:
New
(初始状态):线程对象创建后,但未调用start()
Runnable
(可运行状态):调用start()
后,线程进入就绪状态,等待 CPU 调度Running
(运行):线程获得CPU资源,开始执行run()
中的代码
Blocked
(阻塞状态):线程试图获取一个对象锁而被阻塞Waiting
(等待状态):线程进入等待状态,需要被显式唤醒才能继续执行Object.wait
、Thread.join
、LockSupport.park
Timed Waiting
(含等待时间的等待状态):线程进入等待状态,但指定了等待时间,超时后会被唤醒Terminated
(终止状态):线程执行完成或因异常退出
而 Blocked
、Waiting
、Timed Waiting
其实都属于休眠状态
5. 线程之间通信
线程之间的通信(Inter-Thread Communication, ITC)主要依赖于共享内存。由于线程共享同一个进程的内存空间,因此可以直接通过共享变量进行通信
- 共享变量
- 线程可以通过访问共享内存变量来交换信息(需要注意同步问题,防止数据竞争和不一致)
- 共享的也可以是文件,eg:写入同一个文件来进行通信
- 同步机制
synchronized
:Java 中的同步关键字,用于确保同一时刻只有一个线程可以访问共享资源Lock
:Java 中的java.util.concurrent.locks
包提供了更灵活的锁机制。eg:ReentrantLock
volatile
:Java 中的关键字,确保变量的可见性,防止指令重排- 信号量
- 等待/通知机制
wait()
:使线程进入等待状态,释放锁notify()
:唤醒单个等待线程notifyAll()
:唤醒所有等待线程
6. 进程之间通信
进程之间的通信(Inter-Process Communication, IPC)比较复杂,因为它们有各自独立的内存空间
- 管道(Pipes)
- 单向通信机制,数据以字节流的形式从一个进程传递到另一个进程。在Unix/Linux系统中常用,如命令行中的管道
|
- 单向通信机制,数据以字节流的形式从一个进程传递到另一个进程。在Unix/Linux系统中常用,如命令行中的管道
- 命名管道(Named Pipes, FIFO)
- 类似于管道,但具有名称,可以在不相关的进程之间通信。跨网络的通信也可以使用命名管道
- 消息队列(Message Queues)
- 通过消息传递进行进程间通信,允许进程以消息的形式发送和接收数据
- 共享内存(Shared Memory)
- 多个进程共享同一块内存区域,实现高速通信。注意需要同步机制(如信号量)来防止数据竞争
- 信号量(Semaphores)
- 一种用于进程同步的计数器,控制多个进程对共享资源的访问
- 套接字(Sockets)
- 通过网络进行进程间通信,支持本地和远程进程通信
7. 线程池原理
关键配置:核心线程数、最大线程数、空闲存活时间、工作队列、拒绝策略
按照下面的顺序来回答:
- 默认情况下线程不会预创建,所以来任务之后才会创建线程(设置
prestartAllCoreThreads
可以预创建核心线程) - 当核心线程满了之后不会新建线程,而是把任务堆积到工作队列中
- 如果工作队列放不下了,然后才会新增线程,直至达到最大线程数
- 如果工作队列满了,然后也已经达到最大线程数了,这时候来任务会执行拒绝策略
- 如果线程空闲时间超过空闲存活时间,并且线程线程数是大于核心线程数的则会销毁线程,直到线程数等于核心线程数(设置
allowCoreThreadTimeOut
可以回收核心线程)
1. 线程池优势
- 复用线程,减少线程创建、销毁的开销
- 限制同时运行的线程数量,避免资源耗尽
- 统一管理线程生命周期
8. 线程池线程数设置
任务类型可以分:CPU 密集型任务、I/O 密集型任务
- CPU 密集型任务:就好比单纯的数学计算任务,它不会涉及 I/O 操作,也就是说它可以充分利用 CPU 资源(如果涉及 I/O,在进行 I/O 的时候 CPU 是空闲的),不会因为 I/O 操作被阻塞,因此不需要很多线程,线程多了上下文开销反而会变多
- 根据经验法则,
CPU 密集型任务线程数 = CPU 核心数 * 1.5
- 根据经验法则,
- I/O 密集型任务:有很多 I/O 操作,例如文件的读取、数据库的读取等,任务在读取这些数据的时候,是无法利用 CPU 的,对应的线程会被阻塞等待 I/O 读取完成,因此如果任务比较多,就需要有更多的线程来执行任务,来提高等待 I/O 时候的 CPU 利用率
- 根据经验法则,
I/O 密集型任务线程数 = CPU 核心数 * 2 + 1
- 根据经验法则,
注意,实际的最佳线程数还是需要具体应用压测分析的,以上公式仅供参考!
9. 线程池的拒绝策略
看源码,一共提供了 4 种(其中的 blockPolicy 是 hutool 的不算 ThreadPoolExecutor):
- AbortPolicy
- 当任务队列满且没有线程空闲,此时添加任务会直接抛出
RejectedExecutionException
错误,这也是默认的拒绝策略。适用于必须通知调用者任务未能被执行的场景
- 当任务队列满且没有线程空闲,此时添加任务会直接抛出
- CallerRunsPolicy
- 当任务队列满且没有线程空闲,此时添加任务由即调用者线程执行。适用于希望通过减缓任务提交速度来稳定系统的场景
- DiscardOldestPolicy
- 当任务队列满且没有线程空闲,会删除最早的任务,然后重新提交当前任务。适用于希望丢弃最旧的任务以保证新的重要任务能够被处理的场景
- DiscardPolicy
- 直接丢弃当前提交的任务,不会执行任何操作,也不会抛出异常。适用于对部分任务丢弃没有影响的场景,或系统负载较高时不需要处理所有任务
10. 并发类库的线程池
1. newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
核心线程数、最大线程数是一致的,然后 keepAliveTime 的时间是 0 ,队列是无界队列
- 按照这几个设定可以得知它任务线程数是固定
- 然后可能出现 OOM 的现象,因为队列是无界的,所以任务可能挤爆内存
- 特性:就固定这么多线程,多余的任务就排队,队伍排爆了也不管
- 因此不建议用这个方式来创建线程池
2. newWorkStealingPool
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
1.8才有,返回的就是 ForkJoinPool
,1.8用的并行流就是这个线程池
- eg:
users.parallelStream().filter(...).sum();
用的就是 ForkJoinPool - 线程数会参照当前服务器可用的处理核心数,并行数是
核心数-1
- 每个线程都有自己的双端队列,当队列的任务处理完毕之后,会去别的线程的任务队列尾部拿任务来执行(Stealing,会窃取任务),加快任务的执行速率
- 至于 ForkJoin 的话,就是分而治之,把大任务分解成一个个小任务,然后分配执行之后再总和结果
3. newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 一个线程池就一个线程,配备的是无界队列
- 特性:保证任务是按顺序执行的
4. newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心线程数是 0,最大线程数看作无限,然后任务队列是没有存储空间的,来个任务就找个线程接着,不然就阻塞了
- cached 意思就是会缓存之前执行过的线程,缓存时间是 60 秒,这时如果有任务进来就用之前的线程来执行
- 它适合用在短时间内有大量短任务的场景。如果暂无可用线程,那么来个任务就会新启一个线程去执行这个任务,快速响应任务
- 但是如果任务的时间很长,那存在的线程就很多,上下文切换就很频繁,切换的消耗就很明显,并且存在太多线程在内存中,也有 OOM 的风险
5. newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
- 定时执行任务,重点是那个延时队列
11. DelayQ、ScheduledTP区别
DelayQueue 是一个阻塞队列,而 ScheduledThreadPool 是线程池,内部核心原理差不多
DelayQueue
:利用优先队列存储元素,当从队列中获取任务时,如果最老的任务已经到了执行时间,可以从队列中出队一个任务,反之可以获得 null 或阻塞等待任务来临ScheduledThreadPool
:内部也使用的一个优先队列DelayedWorkQueue
且可以内部多线程执行任务,支持定时执行的任务,即每隔一段时间执行一次的任务
12. Timer
Timer 可以实现延时任务,也可以实现周期性任务,它的核心就是一个优先队列和封装的执行任务的线程
实现原理:维持一个小顶堆,即最快需要执行的任务排在优先队列的第一个,根据堆的特性插入、删除的时间复杂度都是
- 然后有个
TimerThread
线程不断地拿排着的第一个任务的执行时间和当前时间做对比- 如果时间到了(先看看这个任务是不是周期性执行的任务。如果是则修改当前任务时间为下次执行的时间,如果不是周期性任务则将任务从优先队列中移除),最后执行任务
- 如果时间还未到,则调用
wait()
等待
弊端:
- 首先优先队列的插入、删除时间复杂度是,当数据量大时,频繁的入堆出堆性能有待考虑
- 并且是单线程执行,那么如果一个任务执行的时间过久则会影响下一个任务的执行时间(任务的run是异步执行也行)
- 对异常没有做什么处理,那么一个任务出错的时候会导致之后的任务都无法执行
13. 时间轮
时间轮和手表时钟很相似的存在,用环形数组实现,数组的每个元素可以称为槽,和 HashMap一样称呼
- 槽的内部用双向链表存着待执行的任务,添加和删除的链表操作时间复杂度都是 ,槽位本身也指代时间精度,比如一秒扫一个槽,那么这个时间轮的最高精度就是 1 秒
- 也就是说延迟 1.2 秒的任务和 1.5 秒的任务会被加入到同一个槽中,然后在 1 秒的时候遍历这个槽中的链表执行任务
此时指针指向的是第一个槽,一共有八个槽(0~7),假设槽的时间单位为 1 秒,现在要加入一个延时 5 秒的任务,计算方式就是 5 % 8 + 1 = 6
,即放在槽位为 6,下标为 5 的那个槽中。再拼到槽的双向链表的尾部
- 然后每秒指针顺时针移动一格,这样就扫到了下一格,遍历这格中的双向链表执行任务。然后再循环继续
- 可以看到插入任务从计算槽位到插入链表,时间复杂度都是
假设现在要加入一个50秒后执行的任务怎么办?这槽不够啊?
- 增加轮次的概念。
50 % 8 + 1 = 3
,即应该放在槽位是 3,下标是 2 的位置。然后(50 - 1) / 8 = 6
,即轮数记为 6。也就是说当循环 6 轮之后扫到下标的 2 的这个槽位会触发这个任务。Netty 中的 HashedWheelTimer 使用的就是这种方式 - 还有一种是通过多层次的时间轮,这个和手表就更像了,像秒针走一圈,分针走一格,分针走一圈,时针走一格
- 多层次时间轮就是这样实现的。假设上图就是第一层,那么第一层走了一圈,第二层就走一格
- 第二层的一格就是8秒,假设第二层也是 8 个槽,那么第二层走一圈,第三层走一格,第三层一格就是 64 秒
- 那么一格三层,每层8个槽,一共 24 个槽时间轮就可以处理最多延迟 512 秒的任务
- 降级,假设一个任务延迟 500 秒执行,那么刚开始加进来肯定是放在第三层的
- 当时间过了 436 秒后,此时还需要 64 秒就会触发任务的执行,而此时相对而言它就是个延迟 64 秒后的任务,因此它会被降低放在第二层中,第一层还放不下它
- 再过个 56 秒,相对而言它就是个延迟 8 秒后执行的任务,因此它会再被降级放在第一层中,等待执行
降级是为了保证时间精度一致性。Kafka内部用的就是多层次的时间轮算法
14. 并发工具类
- ConcurrentHashMap
- AtomicInteger
- Semaphore
- CyclicBarrier
- CountDownLatch
- BlockingQueue
15. Semaphore
信号量:广泛应用于各种OS中,相对于平日只允许一个线程访问临界区的 lock 和 synchronized 来说,信号量允许多线程同时访问一个临界区
原理:初始化一个数,如果来了一个线程则把数减一,如果减一之后数的值小于 0 则阻塞当前线程,移入一个阻塞队列中,否则允许执行
- 当一个线程执行完毕之后将数加一,并唤醒阻塞队列中的一个等待线程
- 实际是内部有个继承自 AQS 的 Sync 类,通过依托 AQS 的封装来实现功能
主要用于流量的控制。eg:停车场只允许停一定数量的车位
int count;
final Semaphore semaphore = new Semaphore(1); // 初始化信号量
// 用信号量保证互斥
void addOne() {
try {
semaphore.acquire(); // 对应down,计数减一
count+=1;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 对应up,计数加一
}
}
16. CyclicBarrier
屏障:让一组线程都运行到同一个屏障点之后,线程会阻塞等待所有线程都达到这个屏障点,然后所有线程才得以继续执行
用法:
它实际上是基于 ReentrantLock 和 Condition 的封装来实现这一功能
口述原理
- 首先设置了达到屏障的线程数量,当线程调用 await 时计数器会减一,如果计数器减一不等于 0,线程会调用
condition.await
进行阻塞等待 - 如果计数器减一的值等于 0,说明最后一个线程也到达了屏障,于是如果有就执行
barrierCommand
,然后调用condition.signalAll
唤醒之前等待的线程,并且重置计数器,然后开启下一代
循环源码:
- 当规定数量的线程到达屏障之后会把计数重置回去,开启下一代,所以 CyclicBarrier 是可以循环使用的
17. CountDownLatch
- CyclicBarrier 是各个线程等待阻塞所有线程都达到一个节点之后,所有线程继续执行
- CountDownLatch 是一个线程阻塞着等待其他线程到达一个节点之后才能继续执行,这个过程中其他线程是不会阻塞的
实现原理:内部又一个继承自 AQS 的 Sync 类,核心其实就是围绕一个整数 state
- 初始化 state 的值,当调用一次 countDown 会把 state 的值减一,当 state 的值减到 0 的时候就会唤醒之前调用 await 等待的线程
- 主要是依靠 AQS 封装的好,所以代码很少,原理也很清晰简单
18. StampedLock
- 可以认为是读写锁的“改进”版本
- 读写锁读写是互斥的,而 StampedLock 搞了个悲观读和乐观读,悲观读和写是互斥的,乐观读则不会
官方示例:
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
void move(double deltaX, double deltaY) { // an exclusively locked method
long stamp = sl.writeLock(); // 获取写锁
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp); // 释放写锁
}
}
double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead(); // 乐观读
double currentX = x, currentY = y;
if (!sl.validate(stamp)) { // 判断共享变量是否已经被其他线程写过
stamp = sl.readLock(); // 如果被写过则升级为悲观读锁
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp); // 释放悲观读锁
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock(); // 获取读锁
try {
while (x == 0.0 && y == 0.0) {
long ws = sl.tryConvertToWriteLock(stamp); // 升级为写锁
if (ws != 0L) {
stamp = ws;
x = newX;
y = newY;
break;
}
else {
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
sl.unlock(stamp);
}
}
}
- 乐观锁就是获取判断一下,如果被修改了那么就升级为悲观锁
- 但是 StampedLock 是不可重入锁,而且也不支持 condition 。并且如果线程使用
writeLock()
或者readLock()
获得锁之后,线程还没执行完就被interrupt()
,会导致CPU飙升,需要用readLockInterruptibly
或writeLockInterruptibly
19. CompletableFuture
Java8 引入的一个类,主要用于异步编程处理
更简洁地编写异步代码,直接在方法里面就可以写回调方法,非常简便。eg:提供了一些链式操作
thenApply()
: 获取返回结果thenAccept()
: 不获取返回结果thenRun()
: 不获取返回结果
组合方法:
thenCompose()
:连接两个CompletableFuture
对象,并且可以将第一个任务的返回结果作为下一个任务的参数,有先后顺序thenCombine()
:两个任务执行完毕后,将结果合并,没有先后顺序,两个任务是并行执行
20. ForkJoinPool
其实就是一个线程池,在 Java8 才提供,在 Java8 里用到的 stream 内的并行流,默认用的就是 ForkjoinPool 来实现并发的
核心思想:利用分治算法将一个大任务拆分成多个小任务,然后将这些小任务并行执行
- 它内部实现不同的地方在于每个线程都有自己的双端队列,当自己队列的任务处理完毕之后,会去别的线程的任务队列尾部拿任务来执行,加快任务的执行速率
- 自适应线程数。根据当前 CPU 核数和任务量来判断是否需要创建新的线程,且如果任务少了,就会关闭一些线程来节省资源
21. 多线程并发执行,控制顺序
- CompletableFuture 就可以实现任务按序执行
- 现在有三个任务T1、T2、T3 需要按序执行
CompletableFuture.runAsync(() -> {do t1 sth}) .thenRun(()-> {do t2 sth}) .thenRun(()-> {do t3 sth});
- 利用线程池,内部仅设置一个线程来执行任务,按序的将任务提交到线程池中就可以了
- 利用 CountDownLatch、CyclicBarrier、信号量等
22. 阻塞队列哪些
阻塞队列主要用来阻塞队列的插入和获取操作,当队列满了的时候阻塞队列的插入操作,直到队列有空位。当队列为空时阻塞队列的获取操作,直到队列有值
ArrayBlockingQueue 和 LinkedBlockingQueue 都是有界阻塞队列两者有什么区别?
- 两者原理都是基于 ReentrantLock 和 Condition
- ArrayBlockingQueue:基于数组,内部实现只用了一把锁,可以指定公平或非公平锁
- LinkedBlockingQueue:基于链表,内部实现用了两把锁,take 一把、put 一把,所以入队和出队这两个操作是可以并行的,并发度应该比 ArrayBlockingQueue 高
- PriorityBlockingQueue:支持优先级的无界阻塞队列
- DelayQueue:支持延时获取的无界阻塞队列,内部用的是 PriorityQueue
- SynchronousQueue:不占空间的,入队必须等待一个出队,生产者必须等待消费者拿货,无法把先把货存在队列
- LinkedBlockingDeque:是双端阻塞无界队列,就是队列的头尾都能操作,头尾都能插入和移除
- LinkedTransferQueue:相对于其他阻塞队列从名字来看它有 Transfer 功能,其实也不是什么神奇功能,一般阻塞队列都是将元素入队,然后消费者从队列中获取元素。而其 transfer 是元素入队时看看是否已经有消费者在等了,如果有在等了直接给消费者即可,所以就是这里少了一层,没有锁操作
23. 原子类
原子类是 JUC 封装的通过无锁的方式实现的一系列线程安全的原子操作类
原子类主要分为五大类,脑图汇总一下:
核心原理:基于 CAS(Compare And Swap)
- 给予一个共享变量的内存地址,然后内存中应该的值(预期值)和新值,然后通过一条 CPU 指令来比较此内存地址。也就是说硬件层面支持一条指令来实现这么几个操作,一条指令是不会被打断的,所以保证了原子性
1. 基本类型
AtomicBoolean、AtomicInteger、AtomicLong 就可线程安全地、原子地更新这几个基本类型
2. 数组类型
AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray 可以原子化地更新数组内的每个元素,几个的差别无非就是数组里面存储的数据是什么类型
3. 引用类型
AtomicReference、AtomicStampedReference、AtomicMarkableReference 就是对象引用的原子化更新
- AtomicStampedReference:通过版本号 stamp,来避免 ABA 问题
- AtomicMarkableReference:通过一个布尔值 mark,来避免 ABA 问题
4. 属性更新类型
AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater 是通过反射,原子化的更新对象的属性,不过要求属性必须用 volatile 修饰来保证可见性。源码很直观
5. 累加器
DoubleAccumulator、DoubleAdder、LongAccumulator、LongAdder 主要用来累加数据
- AtomicLong 也能累加,而 LongAdder 是专业累加,也只能累加,并发度更高,它通过分多个 cells 来减少线程的竞争,提高了并发度
xxxAccumulator 和 xxxAdder 两者的区别?
xxxAccumulator
的功能比xxxAdder
丰富,可以自定义累加方法,也可以设置初始值,按照注释上的解释xxxAdder
等价于new xxxAccumulator((x, y) -> x + y, 0L}
xxxAdder
是xxxAccumulator
的一个特例
24. 用过Java的累加器吗
Java8 引入的 LongAdder 新类,主要是应对高并发场景下的累加操作,相比于 AtomicLong 它更高效
- LongAdder 引入了分段的思想,利用多个 cell 将并发更新分散开了,本质就是内部弄了一个 cell 数组和 base 字段
- 如果 cas 更新 base 成功,则直接更新结束,如果更新失败说明当前并发高,那么就根据线程计算 hash 得到一个 cell,在 cell 进行计数修改
- 最后统计的时候,会把 base 和 cells 的值都加一起,这样更新的操作被分散了而最终的结果是对的,分而治之的思想
25. Sync、ReentrantLock区别
Synchronized 和 ReentrantLock 都是可重入锁,ReentrantLock 需要手动解锁,而 Synchronized 不需要
- ReentrantLock 支持设置超时时间,可以避免死锁,比较灵活,并且支持公平锁,可中断,支持条件判断
- Synchronized 不支持超时,非公平,不可中断,不支持条件
总的而言,一般情况下用 Synchronized 足矣,比较简单,而 ReentrantLock 比较灵活,支持的功能比较多,所以复杂的情况用 ReentrantLock
26. Synchronized原理
原理:基于一个锁对象和锁对象相关联的一个 monitor 对象
- 偏向锁、轻量级锁:只需要利用 CAS 来操控锁对象头即可完成加解锁动作
- 在升级为重量级锁之后还需要利用 monitor 对象,利用 CAS 和 mutex 来作为底层实现
- monitor 对象有等待队列和条件等待队列,未竞争到锁的线程存储到等待队列中,获得锁的线程调用 wait 后便存放在条件等待队列中,解锁和 notify 都会唤醒相应队列中的等待线程来争抢锁
- 重量级锁:由于阻塞和唤醒依赖于底层的OS实现,系统调用存在用户态与内核态之间的切换,所以有较高的开销
- 所以才会有偏向锁和轻量级锁的优化,并且引入自适应自旋机制,来提高锁的性能
27. Sync轻量级锁升级会自旋
关于 Synchronized 专门翻阅 HotSpot 1.8 的源码来研究,就会发现有一点
当轻量级锁 CAS 失败,则当前线程会尝试使用自旋来获取锁。但是看了源码之后发现并不是这样的,这段代码在
synchronizer.cpp
中
从源码来看,如果轻量级锁 CAS 成功就直接 return,CAS 失败并不会自旋而是直接膨胀成重量级锁
- 锁膨胀的代码
ObjectSynchronizer::inflate
翻了翻 - 不过为了优化性能,自旋操作在 Synchronized 中确实却有。在已经升级成重量级锁之后,线程如果没有争抢到锁,会进行一段自旋等待锁的释放
- 阻塞线程入队再唤醒开销还是有点大的
- 再来看看 TrySpin 的操作,里面有自适应自旋,其实从实际函数名就
TrySpin_VaryDuration
就可以反映出自旋是变化的
至此,有关 Synchronized 自旋问题就完结了,重量级锁竞争失败会有自旋操作,轻量级锁没有这个动作(至少 1.8 源码是这样的)
28. Synchronized重量级锁
synchronized 的锁升级,也听过锁升级之后不会降级
- 先看下无锁的对象布局
- 然后多线程争抢,此时应该是重量级锁
- 然后 sleep 等待所有线程执行完毕释放锁,然后再看看此时的锁布局
- 最后再加一次锁看看对象布局
1.8 的偏向锁是会延迟生效的,得在 JVM 启动 4 秒后生效,通过
-XX:BiasedLockingStartupDelay=0
关闭偏向锁延迟
没搞这个参数,因为不是重点,所以等下结果里面不会有偏向锁
- 初始是无锁的
- 然后4个线程同时竞争变成了重量级锁
- 4个线程执行完毕之后,锁对象变成了无锁
- 此时再有一个线程去争抢锁,就从无锁变成了轻量级锁
所以当重量级锁释放了之后,锁对象是无锁的!有新的线程来竞争的话又会从轻量级锁开始!
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.14</version>
</dependency>
public class YesLockTest {
static Object yesLock;
public static void main(String[] args) throws InterruptedException {
yesLock = new Object();
System.out.println("无锁时对象布局:" + ClassLayout.parseInstance(yesLock).toPrintable());
IntStream.rangeClosed(1,4).forEach(i->{getYesLock();});
Thread.sleep(5000L);
System.out.println("无竞争之后,此时的对象布局:" + ClassLayout.parseInstance(yesLock).toPrintable());
getYesLock(); // 此时再来一次加锁
}
private static void getYesLock() {
new Thread(() -> {
try {
synchronized (yesLock) {
System.out.println("线程[" + Thread.currentThread().getName() + "]" +
":重量级锁状态对象布局:" + ClassLayout.parseInstance(yesLock).toPrintable());
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
29. 锁的自适应自旋
在重量级锁时,一个线程如果竞争锁失败会进行自旋操作,说白了就是执行一些无意义的执行,空转 CPU 等着锁的释放
- 锁的自适应自旋:结合经验值来看,如果上次自旋一会儿就拿到锁,那这次多自旋几次,如果上次自旋很久都拿不到,这次就少自旋
30. 锁如何优化
锁的优化主要有两种方式:
- 减少锁的持有时间
- 并发资源已操作完成后,里面释放锁,不然别的线程就会阻塞等待,这样就不高效了
- 减少锁的粒度
- 以前用 Synchronized 修饰整个方法,可以优化下用代码块仅包括需要抢占的逻辑,减少整体锁定码逻辑
- 以前使用的是 HashTable,可以替换成 ConcurrentHashMap,是因为 HashTable 虽然是线程安全的,但是它太粗暴了,它为所有的方法都上了同一把锁!
31. ReentrantLock原理
ReentrantLock 其实就是基于 AQS 实现的一个可重入锁,支持公平和非公平两种方式
- 内部实现依靠一个 state 变量和两个等待队列:同步队列、等待队列
- 利用 CAS 修改 state 来争抢锁
- 争抢不到则入同步队列等待,同步队列是一个双向链表
- 条件 condition 不满足时候则入等待队列等待,也是个双向链表
- 是否是公平锁的区别在于:线程获取锁时是加入到同步队列尾部还是直接利用 CAS 争抢锁
32. AQS
如果面试官问你为什么需要 AQS ,不要长篇大论,容易都绕进去
- AQS 起到了一个抽象,封装的作用,将一些排队、入队、加锁、中断等方法封装起来,便于其他相关 JUC 锁的使用,具体加锁时机、入队时机等都需要实现类控制
- eg:ReentrantLock、CountDownLatch、Semaphore 等
AQS 整体架构图:
33. 读写锁
读写锁在 Java 中一般默认指的是 ReentrantReadWriteLock
- 读写锁是有两把锁,分别是读锁、写锁
- 除了读读操作不互斥之外,其他都互斥
- 所以读很多写比较少的情况,用读写锁比较合适。如果不是这种情况不要用读写锁,因为读写锁需要额外维护读锁的状态,所以如果读读操作不多还不如一般的锁
读写锁也是基于 AQS 实现的,再具体点就是将 state 分为了两部分,高16bit 用于标识读状态、低16bit 标识写状态,这样灵巧的通过一个 state 实现了两把锁
34. CAS知道不
CAS(compare and swap),即比较并交换
- CAS 需要三个操作数,分别是旧的预期值,变量内存地址,新值
- 指令是根据变量地址拿到值,比较是否和预期值相等,如果是的话则替换成新值,如果不是则不替换
35. JMM
JMM(Java Memory Model),Java 内存模型。屏蔽了各大底层硬件的细节,抽象出来的一套 JVM 层面的内存规范
JMM 其实是一组规则,规定了一个线程的写操作何时会对另一个线程可见(JSR133)
- 抽象的来看 JMM 会把内存分为本地内存和主存,每个线程都有自己的私有化的本地内存,然后还有个存储共享数据的主存
- 注意:本地内存只是一种抽象的说法,实际指代:寄存器、CPU 缓存等
- 由 JMM 来定义这两个内存之间的交互规则
36. 原子性、可见性、有序性
- 原子性
- 一个操作不会被中断,要么这个操作执行完毕,要么不会执行,不会有执行一半的存在
- 可见性
- 一个线程对某个共享变量进行了修改,则其他线程能立刻获取到最新值
- 有序性
- 编译器或处理器会将指令进行重排,这种操作会影响多线程的执行顺序导致错误
37. happens-before
这个问题估计应该都来自《深入理解Java虚拟机》这本书
happens-before: 定义的一些规则,在一些特定场景下,一些操作会先行发生于另一些操作
- A先行发生于B,其实含义就是 A 操作得到的结果在 B 操作开始时可以得到,重点不在于 A 执行的时间比 B 早,而是 A 的结果是可以在 B 开始时候被 B 读取的
- JVM 规定的有序性,也可以认为写 JVM 的程序员需要按照这样的规则来实现 JVM
操作符合以下规则,就会按照下面的定义动作先行发生:
- 程序次序规则:在一个线程内,按照程序代码顺序,书写在前面的操作先行发生于书写在后面的操作。准确地说,应该是控制流顺序而不是程序代码顺序,因为要考虑分支、循环等结构
- 管程锁定规则:一个 unlock 操作先行发生于后面对同一个锁的 lock 操作。这里必须强调的是同一个锁,而“后面”是指时间上的先后顺序
- volatile 变量规则:对一个 volatile 变量的写操作先行发生于后面对这个变量的读操作,这里的“后面”同样是指时间上的先后顺序
- 传递性规则:如果操作A先行发生于操作B,操作B先行发生于操作C,那就可以得出操作A先行发生于操作C的结论
- 线程启动规则:Thread 对象的
start()
方法先行发生于此线程的每一个动作 - 线程中断规则:对线程
interrupt()
方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可以通过Thread.interrupted()
方法检测到是否有中断发生 - 线程终止规则:线程中的所有操作都先行发生于对此线程的终止检测,可以通过
Thread.join()
方法结束、Thread.isAlive()
的返回值等手段检测到线程已经终止执行 - 对象终结规则:一个对象的初始化完成(构造函数执行结束)先行发生于它的
finalize()
方法的开始
38. 指令重排
为了提高程序执行的效率,CPU或编译器就将执行命令重排序
- 因为内存访问的速度比 CPU 运行速度慢很多,因此需要编排一下执行的顺序,防止因为访问内存的比较慢的指令而使得 CPU 闲置着
- CPU 执行有个指令流水线的概念,还有分支预测等
- 总之为了提高效率就会有指令重排的情况,导致指令乱序执行的情况发生,不过会保证结果肯定是与单线程执行结果一致的,这叫 as-if-serial
不过多线程就无法保证了,在 Java 中的 volatile 关键字可以禁止修饰变量前后的指令重排
39. final和可以保证可见性?
- 提到的 final 可以保证可见性,其实指的是 final 修饰的字段在构造方法初始化完成,并且期间没有把 this 传递出去,那么当构造器执行完毕之后,其他线程就能看见 final 字段的值
- 如果不用 final 修饰,那么有可能在构造函数里面对字段的写操作被排序到外部,这样别的线程就拿不到写操作后的值
public class YesFinalTest {
final int a;
int b;
static YesFinalTest testObj;
public void YesFinalTest () { // 对字段赋值
a = 1;
b = 2;
}
public static void newTestObj () { // 此时线程 A 调用这个方法
testObj = new YesFinalTest ();
}
public static void getTestObj () { // 此时线程 B 执行这个方法
YesFinalTest object = obj;
int a = object.a; // 这里读到的肯定是 1
int b = object.b; // 这里读到的可能是 2
}
}
对于 final 域,编译器和处理器要遵守两个重排序规则:
- 在构造函数内对一个 final 域的写入,与随后把这个被构造对象的引用赋值给一个引用变量,这两个操作之间不能重排序。 初次读一个包含
- final 域的对象的引用,与随后初次读这个 final 域,这两个操作之间不能重排序
所以 final 无法保证可见性!final 的可见性和在并发中常说的可见性不是一个概念!
40. 为什么需要ThreadLocal
通过本地化资源来避免共享,避免了多线程竞争导致的锁等消耗
public class YesThreadLocal {
private static final ThreadLocal<String> threadLocalName = ThreadLocal.withInitial(() -> Thread.currentThread().getName());
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new Thread(() -> {
System.out.println("threadName: " + threadLocalName.get());
}, "yes-thread-" + i).start();
}
}
}
在 new 线程时,设置了每个线程名,每个线程都操作同一个 ThreadLocal 对象的 get 却返回的各自的线程名
41. 如何设计ThreadLocal
用 ThreadLocal 变量来实现线程隔离
- 在线程对象内部搞个 map,把 ThreadLocal 对象自身作为 key,把它的值作为 map 的值
ThreadLocal<String> threadLocal1 = new ThreadLocal<>();
ThreadLocal<Integer> threadLocal2 = new ThreadLocal<>();
ThreadLocal<Integer> threadLocal3 = new ThreadLocal<>();
此时 ThreadLocal 对象和线程的关系,如图:
满足了本地化资源的需求,每个线程维护自己的变量,互不干扰,实现了变量的线程隔离,同时也满足存储多个本地变量的需求
42. ThreadLocal原理
public class Thread implements Runnable {
// ThreadLocal 的静态内部类
ThreadLocal.ThreadLocalMap threadLocals = null;
}
竟然这个 map 是放在 Thread 里面使用,那为什么要定义成 ThreadLocal 的静态内部类呢?
- 首先内部类这个东西是编译层面的概念,就像语法糖一样,经过编译器之后其实内部类会提升为外部顶级类,和平日里外部定义的类没有区别,也就是说在 JVM 中是没有内部类这个概念
- 一般情况下非静态内部类用在内部类,跟其他类无任何关联,专属于这个外部类使用,并且也便于调用外部类的成员变量和方法,比较方便
- 静态外部类其实就等于一个顶级类,可以独立于外部类使用,所以更多的只是表明类结构和命名空间
这样定义的用意,就是说明 ThreadLocalMap
是和 ThreadLocal
强相关的,专用于保存线程本地变量
ThreadLocalMap
的定义:
Entry 继承了 WeakReference 即弱引用。注意:不是说 Entry 是弱引用,构造函数的 super(k)
,这个 key 才是弱引用
- 所以 ThreadLocalMap 里有个 Entry 数组,这个 Entry 的 key 就是 ThreadLocal 对象,value 就是需要保存的值
1. get()
那是如何通过 key 在数组中找到 Entry 然后得到 value 的呢 ?
不同的线程对同一个 ThreadLocal 对象调用 get()
能得到不同的值
key 是如何从 ThreadLocalMap 中找到 Entry 的,即 map.getEntry(this)
是如何实现的,其实很简单
可以看到 ThreadLocalMap 虽然和 HashMap 一样,都是基于数组实现的,但是它们对于 Hash 冲突的解决方法不一样,HashMap 是通过链表(红黑树)法来解决冲突,而 ThreadLocalMap 是通过开放寻址法来解决冲突
2. 开放寻址法
如果通过 key 的哈希值得到的下标无法直接命中,则会将下标 +1,即继续往后遍历数组查找 Entry ,直到找到或者返回 null
- 这种解决 hash 冲突效率其实不高,但是一般 ThreadLocal 也不会太多,所以用这种简单的办法解决即可
3. set()
先通过 key 的 hash 值计算出一个数组下标,然后看看这个下标是否被占用了,如果被占了看看是否就是要找的 Entry ,如果是则进行更新,如果不是则下标++,即往后遍历数组,查找下一个位置,找到空位就 new 个 Entry 然后把坑给占用了
当然,这种数组操作一般免不了阈值的判断,如果超过阈值则需要进行扩容
4. key的哈希值
- 可以看到
key.threadLocalHashCode
其实就是调用nextHashCode
进行一个原子类的累加 - 注意看上面都是静态变量和静态方法,所以在 ThreadLocal 对象之间是共享的,然后通过固定累加一个奇怪的数字
0x61c88647
来分配 hash 值 - 这个数字当然不是乱写的,是实验证明的一个值,即通过
0x61c88647
累加生成的值与 2 的幂取模的结果,可以较为均匀地分布在 2 的幂长度的数组中,这样可以减少 hash 冲突
43. ThreadLocal弱引用
Entry 对 key 是弱引用,那为什么要弱引用呢?
- 为了让当外部没有对 ThreadLocal 对象有强引用时,可以将 ThreadLocal 对象给清理掉
假设 Entry 对 key 的引用是强引用,看一下这个引用链:
线程常常是以线程池的方式来使用的,线程的生命周期就很长,一般是不会被清理掉的,所以这个引用链就会一直存在,那么 ThreadLocal 对象即使没有用了,也会随着线程的存在,而一直存在着!产生内存泄漏
- 引用链:
- 随着方法的执行完毕,相应的栈帧也出栈了,此时上条强引用链就没了,如果没有别的栈有对 ThreadLocal 对象的引用,那么说明 ThreadLocal 对象无法再被访问到(定义成静态变量的另说)
- 此时 ThreadLocal 只存在于 Entry 之间的弱引用,那此时发生 GC 它就可以被清除了,因为它无法被外部使用了,那就等于没用了,是个垃圾,应该被处理来节省空间
所以为了能让已经没用 ThreadLocal 对象得以回收,所以 Entry 和 key 要设计成弱引用,不然 Entry 和 key是强引用的话,ThreadLocal 对象就会一直在内存中存在
1. 内存泄漏
内存泄漏:程序中已经无用的内存无法被释放,造成系统内存的浪费
- 当 Entry 中的 key 即 ThreadLocal 对象被回收了之后,会发生 Entry 中 key 为 null 的情况,其实这个 Entry 就已经没用了,但是又无法被回收,因为有
Thread -> ThreadLocalMap -> Entry
这条强引用在,这样没用的内存无法被回收造成内存泄露
关于 expungeStaleEntry()
即清理过期的 Entry 的操作
- 设计者当然知道会出现这种情况,所以在多个地方都做了清理无用 Entry ,即 key 已经被回收的 Entry 的操作
- 通过 key 查找 Entry 时,如果下标无法直接命中,那么就会向后遍历数组,此时遇到 key 为 null 的 Entry 就会清理掉
- 扩容时,也会清理无用的 Entry
其它还有,反正知晓设计者是做了一些操作来回收无用的 Entry 的即可
44. ThreadLocal#remove
可能会出现内存泄露问题
- 所以,最佳实践是用完了之后,调用一下
remove()
,手工把 Entry 清理掉,这样就不会发生内存泄漏了!
void yesDosth {
threadlocal.set(xxx);
try {
// do sth
} finally {
threadlocal.remove();
}
}
如果不是线程池使用方式的话,其实不用关系内存泄漏,反正线程执行完了就都回收了,但是一般我们都是使用线程池的,可能只是你没感觉到
- eg:用了 tomcat ,其实请求的执行用的就是 tomcat 的线程池,这就是隐式使用。即线程第一次调用执行 Threadlocal 之后,如果没有显示调用
remove()
,则这个 Entry 还是存在的,那么下次这个线程再执行任务,不会再调用withInitial()
,也就是说会拿到上一次执行的值 - 但是你以为执行任务的是新线程,会初始化值,然而它是线程池里面的老线程,这就和预期不一致了
45. 父子线程之间传递数据
InheritableThreadLocal 类
- InheritableThreadLocal 相比 ThreadLocal 它可以在父线程创建子线程时,将 InheritableThreadLocal 变量复制给子线程而实现数据的传递
- 使用方式和 ThreadLocal 一致,无非就是将类替换成 InheritableThreadLocal 即可
- 原理:父线程在创建一个新的线程时,会将 InheritableThreadLocal 值传递给子线程。因为 InheritableThreadLocal 重写了 ThreadLocal 类的
createMap()
,它会在创建新线程时将父线程的 ThreadLocalMap 复制到子线程中,从而使得子线程可以继承父线程的 InheritableThreadLocal 值
46. ThreadLocal缺点
1)hash 冲突用的是线性探测法,效率低
- 可以看到,图上显示的是经过两个遍历找到了空位,假设冲突多了,需要遍历的次数就多了。并且下次 get 的时候,hash 直接命中的位置发现不是要找的 Entry ,于是就接着遍历向后找,所以说这个效率低
- 而像 HashMap 是通过链表法来解决冲突,并且为了防止链表过长遍历的开销变大,在一定条件之后又会转变成红黑树来查找,这样的解决方案在频繁冲突的条件下,肯定是优于线性探测法,所以这是一个优化方向
2)ThreadLocal 使用了 WeakReference 以保证资源可以被释放,但是这可能会产生一些 Entry 的 key 为 null,即无用的 Entry 存在
- 调用 ThreadLocal 的
get()
或set()
时,会主动清理无用的 Entry,减轻内存泄漏的发生。把清理的开销弄到了get()
和set()
上,万一要清理的无用 Entry 特别多,那这次调用相对而言就比较慢了
3)内存泄漏问题
- 线程池使用时,并且上面也提到了 get 和 set 的时候也能清理一些无用的 Key,所以没有那么的夸张,只要记得用完后调用
ThreadLocal#remove
就不会有内存泄漏问题了
47. ThreadLocal缺点改进
- ThreadLocal hash 冲突的线性探测法不好,还有 Entry 的弱引用可能会发生内存泄漏,这些都和 ThreadLocalMap 有关,所以需要搞个新的 map 来替换 ThreadLocalMap
- 而这个 ThreadLocalMap 又是 Thread 里面的一个成员变量,这么一看 Thread 也得动一动,但是我们又无法修改 Thread 的代码,所以配套的还得弄个新的 Thread
对应到 Netty 的实现就是 FastThreadLocal、InternalThreadLocalMap、FastThreadLocalThread
- 既然 Hash 冲突的线性探测法效果不好,可能比较容易想到的就是上面提到的链表法,然后再基于链表法说个改成红黑树,这个确实是一方面
- 每往 InternalThreadLocalMap 中塞入一个新的 FastThreadLocal 对象,就给这个对象发个唯一的下标,然后让这个对象记住这个下标,到时候去 InternalThreadLocalMap 找 value 的时候,直接通过下标去取对应的 value
所以关于 ThreadLocal 的优化 FastThreadLocal 给出了答案
48. FastThreadLocal原理
以下 Netty 基于 4.1 版本分析
ThreadLocalMap 定义:
- 它是个 Entry 数组,然后 Entry 里面弱引用了 ThreadLocal 作为 Key
InternalThreadLocalMap 定义:
- 放弃了 map 的形式,没用定义 key 和 value,而是一个 Object 数组,来存储 FastThreadLocal 和对应的 value
1. FastThreadLocal
index
,在 FastThreadLocal 构造时就被赋值了,且也被 final 修饰,所以也不可变。给每个新 FastThreadLocal 都生成唯一的下标,这样每个 index 就都知道自己的位置了InternalThreadLocalMap.nextVariableIndex()
进行赋值的,是用原子类递增实现的
public class FastThreadLocal<V> {
private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
private final int index;
public final void set(V value) {
if (value != InternalThreadLocalMap.UNSET) {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
setKnownNotUnset(threadLocalMap, value);
}
// 默认值
else {
remove();
}
}
private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
if (threadLocalMap.setIndexedVariable(index, value)) {
addToVariablesToRemove(threadLocalMap, this);
}
}
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
Set<FastThreadLocal<?>> variablesToRemove;
if (v == InternalThreadLocalMap.UNSET || v == null) {
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
} else {
variablesToRemove = (Set<FastThreadLocal<?>>) v;
}
variablesToRemove.add(variable);
}
// -----------------------------------------------------------------------
@SuppressWarnings("unchecked")
public final V get() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
return initialize(threadLocalMap);
}
public static void removeAll() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
return;
}
try {
// 1. v 就是 set
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v != null && v != InternalThreadLocalMap.UNSET) {
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
// 2. 转换成数组遍历
FastThreadLocal<?>[] variablesToRemoveArray =
variablesToRemove.toArray(new FastThreadLocal[0]);
for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
// 3. 分别调用 remove()
tlv.remove(threadLocalMap);
}
}
} finally {
// 4. 将线程里 map 置空,完成整体移除
InternalThreadLocalMap.remove();
}
}
// --------------------------------------------------------------------
public final void remove() {
remove(InternalThreadLocalMap.getIfSet());
}
@SuppressWarnings("unchecked")
public final void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}
Object v = threadLocalMap.removeIndexedVariable(index);
removeFromVariablesToRemove(threadLocalMap, this);
if (v != InternalThreadLocalMap.UNSET) {
try {
onRemoval((V) v);
} catch (Exception e) {
PlatformDependent.throwException(e);
}
}
}
private static void removeFromVariablesToRemove(
InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v == InternalThreadLocalMap.UNSET || v == null) {
return;
}
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
variablesToRemove.remove(variable);
}
}
2. InternalThreadLocalMap
- 在 InternalThreadLocalMap 也定义了一个静态原子类,每次调用
nextVariableIndex
就返回且递增,没有什么别的赋值操作,从这里也可以得知variablesToRemoveIndex
的值为 0,因为它属于常量赋值,第一次调用时nextIndex
的值为 0
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
private static final int DEFAULT_ARRAY_LIST_INITIAL_CAPACITY = 8;
public static final Object UNSET = new Object();
private static final AtomicInteger nextIndex = new AtomicInteger();
private Object[] indexedVariables;
public static int nextVariableIndex() {
int index = nextIndex.getAndIncrement();
if (index < 0) {
nextIndex.decrementAndGet();
throw new IllegalStateException("too many thread-local indexed variables");
}
return index;
}
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
} else {
return slowGet();
}
}
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
// ------------------------------------------------------------------------------------------
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET;
} else {
expandIndexedVariableTableAndSet(index, value);
return true;
}
}
// 直接基于 index 的向上 2 次幂取整
private void expandIndexedVariableTableAndSet(int index, Object value) {
Object[] oldArray = indexedVariables;
final int oldCapacity = oldArray.length;
int newCapacity = index;
newCapacity |= newCapacity >>> 1;
newCapacity |= newCapacity >>> 2;
newCapacity |= newCapacity >>> 4;
newCapacity |= newCapacity >>> 8;
newCapacity |= newCapacity >>> 16;
newCapacity ++;
// 直接数组copy,不需要rehash,优于 ThreadLocalMap
Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
newArray[index] = value;
indexedVariables = newArray;
}
// -----------------------------------------------------------------------
public Object indexedVariable(int index) {
Object[] lookup = indexedVariables;
return index < lookup.length? lookup[index] : UNSET;
}
// -----------------------------------------------------------------------
public Object removeIndexedVariable(int index) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object v = lookup[index];
lookup[index] = UNSET;
return v;
} else {
return UNSET;
}
}
}
- 首先 InternalThreadLocalMap 没有采用 ThreadLocalMap k-v 形式的存储方式,而是用 Object 数组来存储 FastThreadLocal 对象和其 value,具体是在第一个位置存放了一个包含所有 FastThreadLocal 对象的 set,然后后面存储所有的 value
- 之所以需要个 set 是为了存储所有使用的 FastThreadLocal 对象,这样就能方便找到这些对象,便于后面的删除工作
- 之所以数组其他位置可以直接存储 value ,是因为每个 FastThreadLocal 构造的时候已经被分配了一个唯一的下标,这个下标对应的就是 value 所处的下标
- 扩容传进去的参数是 index,全有空间浪费。Netty 就是特意这样设计的,用多余的空间去换取不会冲突的 set 和 get ,这样写入和获取的速度就更快了,典型的空间换时间
- fastGet 和 slowGet 是为了做一个兼容,用了 FastThreadLocal 但是没有配套使用 FastThreadLocalThread ,然后调用 FastThreadLocal#get 时,去 Thread 里面找 InternalThreadLocalMap 会报错
- 所以就再弄了个 slowThreadLocalMap ,它是个 ThreadLocal ,里面保存 InternalThreadLocalMap 来兼容一下这个情况
3. FastThreadLocalThread
- 和 ThreadLocal 一致,InternalThreadLocalMap 肯定是 FastThreadLocalThread 里面的一个变量
public class FastThreadLocalThread extends Thread {
private InternalThreadLocalMap threadLocalMap;
public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {
this.threadLocalMap = threadLocalMap;
}
}
5. FastThreadLocalRunnable
内存泄漏
- FastThreadLocal 就没用弱引用,所以它把无用 FastThreadLocal 的清理就寄托到规范使用上,即没用了就主动调用 remove 方法
- 但是它曲线救国了一下,看一下 FastThreadLocalRunnable 类。Runnable 执行完毕之后,会主动调用 FastThreadLocal.removeAll() 来清理所有的 FastThreadLocal
- 前提是你不能用 Runnable 而是用 FastThreadLocalRunnable
final class FastThreadLocalRunnable implements Runnable {
private final Runnable runnable;
private FastThreadLocalRunnable(Runnable runnable) {
this.runnable = ObjectUtil.checkNotNull(runnable, "runnable");
}
@Override
public void run() {
try {
runnable.run();
} finally {
FastThreadLocal.removeAll();
}
}
static Runnable wrap(Runnable runnable) {
return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);
}
}
6. DefaultThreadFactory
- Netty 实现了一个 DefaultThreadFactory 工厂类来创建线程
public class DefaultThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread t = this.newThread(FastThreadLocalRunnable.wrap(r), this.prefix + this.nextId.incrementAndGet());
try {
if (t.isDaemon() != this.daemon) {
t.setDaemon(this.daemon);
}
if (t.getPriority() != this.priority) {
t.setPriority(this.priority);
}
} catch (Exception var4) {
}
return t;
}
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(this.threadGroup, r, name);
}
}
49. TransmittableThreadLocal
阿里开源的一个组件,原生的 ThreadLocal 不支持在线程池中传递本地变量,所以实现这个需求
总结下来的核心操作就是 CRR(Capture/Replay/Restore),拷贝快照、重放快照、复原上下文。
为什么需要复原,线程池的线程每次执行的时候,如果用了 TTL 那执行的线程都会被覆盖上下文,没必要复原对吧?
作者回答是:
- 线程池满了且线程池拒绝策略使用的是『CallerRunsPolicy』,这样执行的线程就变成当前线程了,那肯定是要复原的,不然上下文就没了。 使用ForkJoinPool(包含并行执行Stream与CompletableFuture,底层使用ForkJoinPool)的场景,展开的ForkJoinTask会在调用线程中直接执行
1. 背景
- ThreadLocal 的出现就是为了本地化线程资源,防止不必要的多线程之间的竞争
- 在有些场景,当父线程 new 一个子线程时,希望把它的 ThreadLocal 继承给子线程
- InheritableThreadLocal 就是为了父子线程传递本地化资源
- 具体实现:子线程对象被 new,即
Thread.init
,如果查看到父线程内部有 InheritableThreadLocal 的数据,那就在子 Thread 初始化时,把父线程的 InheritableThreadLocal 拷贝给子线程
手动创建线程ok,线程池就有问题,线程池里面的线程都预创建好的。如何往线程池内的线程传递 ThreadLocal?
2. 原理
以下的 ThreadLocal 泛指线程本地数据,不是指 ThreadLocal 这个类
- 先把当前线程的 ThreadLocal 保存到这个 task 中
- 然后当线程池里的某个线程,比如线程 A 获取这个任务要执行的时候,看看 task 里面是否有存储着的 ThreadLocal
- 如果存着那就把这个 ThreadLocal 放到线程 A 的本地变量里,这样就完成了传递
- 最后,恢复线程池内部执行线程的上下文,任务执行完毕之后,把任务带来的本地数据给删了,把线程以前的本地数据复原
逻辑很清晰的四步骤:
- 拿到父类本地变量拷贝
- 赋值给当前线程(线程池内的某线程),并保存之前的本地变量
- 执行逻辑
- 复原当前线程之前的本地变量
4. 源码
这篇只讲 TTL 核心思想(关键路径)
@Test
public void ooxx() {
TransmittableThreadLocal<String> ttl = new TransmittableThreadLocal<>();
ExecutorService executorService = Executors.newFixedThreadPool(1);
ttl.set("ooxx");
Runnable task = () -> System.out.println("ttl.get() = " + ttl.get());
executorService.execute(TtlRunnable.get(task));
}
public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T> {
private static final InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder = new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
public final T get() {
// ThreadLocal 方法
T value = super.get();
if (this.disableIgnoreNullValueSemantics || null != value) {
this.addThisToHolder();
}
return value;
}
public final void set(T value) {
if (!this.disableIgnoreNullValueSemantics && null == value) {
this.remove();
} else {
// ThreadLocal 方法
super.set(value);
this.addThisToHolder();
}
}
private void addThisToHolder() {
if (!((WeakHashMap)holder.get()).containsKey(this)) {
((WeakHashMap)holder.get()).put(this, (Object)null);
}
}
public static class Transmitter {
private static volatile WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> threadLocalHolder = new WeakHashMap<>();
@NonNull
public static Object capture() {
return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}
private static HashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new HashMap<>();
for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
ttl2Value.put(threadLocal, threadLocal.copyValue());
}
return ttl2Value;
}
@NonNull
public static Object replay(@NonNull Object captured) {
final Snapshot capturedSnapshot = (Snapshot) captured;
return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
}
@NonNull
private static HashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> captured) {
HashMap<TransmittableThreadLocal<Object>, Object> backup = new HashMap<>();
for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
TransmittableThreadLocal<Object> threadLocal = iterator.next();
// backup
backup.put(threadLocal, threadLocal.get());
// clear the TTL values that is not in captured
// avoid the extra TTL values after replay when run task
if (!captured.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// set TTL values to captured
setTtlValuesTo(captured);
// call beforeExecute callback
doExecuteCallback(true);
return backup;
}
private static void setTtlValuesTo(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> ttlValues) {
for (Map.Entry<TransmittableThreadLocal<Object>, Object> entry : ttlValues.entrySet()) {
TransmittableThreadLocal<Object> threadLocal = entry.getKey();
threadLocal.set(entry.getValue());
}
}
public static void restore(@NonNull Object backup) {
final Snapshot backupSnapshot = (Snapshot) backup;
restoreTtlValues(backupSnapshot.ttl2Value);
restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
}
private static void restoreTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> backup) {
// call afterExecute callback
doExecuteCallback(false);
for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
TransmittableThreadLocal<Object> threadLocal = iterator.next();
// clear the TTL values that is not in backup
// avoid the extra TTL values after restore
if (!backup.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// restore TTL values
setTtlValuesTo(backup);
}
private static class Snapshot {
final HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value;
final HashMap<ThreadLocal<Object>, Object> threadLocal2Value;
private Snapshot(HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value, HashMap<ThreadLocal<Object>, Object> threadLocal2Value) {
this.ttl2Value = ttl2Value;
this.threadLocal2Value = threadLocal2Value;
}
}
}
}
captureThreadLocalValues ,这个是为兼容那些无法将 ThreadLocal 类变更至 TTL ,但是又想复制传递 ThreadLocal 的值而使用的,可以先忽略
public final class TtlRunnable implements Runnable, TtlWrapper<Runnable>, TtlEnhanced, TtlAttachments {
private final AtomicReference<Object> capturedRef;
private final Runnable runnable;
private final boolean releaseTtlValueReferenceAfterRun;
private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
this.capturedRef = new AtomicReference<>(capture());
this.runnable = runnable;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
@Nullable
@Contract(value = "null -> null; !null -> !null", pure = true)
public static TtlRunnable get(@Nullable Runnable runnable) {
return get(runnable, false, false);
}
@Nullable
@Contract(value = "null, _, _ -> null; !null, _, _ -> !null", pure = true)
public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
if (null == runnable) return null;
if (runnable instanceof TtlEnhanced) {
// avoid redundant decoration, and ensure idempotency
if (idempotent) return (TtlRunnable) runnable;
else throw new IllegalStateException("Already TtlRunnable!");
}
return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
}
/**
* wrap method {@link Runnable#run()}.
*/
@Override
public void run() {
final Object captured = capturedRef.get();
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}
final Object backup = replay(captured);
try {
runnable.run();
} finally {
restore(backup);
}
}
}
5. TtlExecutors
- 在 Java 的启动参数加上:
-javaagent:path/to/transmittable-thread-local-2.x.y.jar
即可,然后就正常的使用就行,原生的线程池实现类已经悄悄的被改了!
<!-- https://mvnrepository.com/artifact/com.alibaba/transmittable-thread-local -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.13.2</version>
</dependency>
ExecutorService ttlExecutorService = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(1));
// ExecutorServiceTtlWrapper
ttlExecutorService.execute(task);
ttlExecutorService.submit(task);
@SuppressFBWarnings({"EQ_DOESNT_OVERRIDE_EQUALS"})
class ExecutorServiceTtlWrapper extends ExecutorTtlWrapper implements ExecutorService, TtlEnhanced {
private final ExecutorService executorService;
@NonNull
@Override
public <T> Future<T> submit(@NonNull Callable<T> task) {
return executorService.submit(TtlCallable.get(task, false, idempotent));
}
@NonNull
@Override
public <T> Future<T> submit(@NonNull Runnable task, T result) {
return executorService.submit(TtlRunnable.get(task, false, idempotent), result);
}
@NonNull
@Override
public Future<?> submit(@NonNull Runnable task) {
return executorService.submit(TtlRunnable.get(task, false, idempotent));
}
}
class ExecutorTtlWrapper implements Executor, TtlWrapper<Executor>, TtlEnhanced {
private final Executor executor;
protected final boolean idempotent;
@Override
public void execute(@NonNull Runnable command) {
executor.execute(TtlRunnable.get(command, false, idempotent));
}
}
50. Thread.Sleep(0)作用
Thread.sleep(0)
没有睡眠,但当前的线程会暂时出让 CPU ,这使得 CPU 的资源短暂的空闲出来别的线程有机会得到 CPU 资源- 所以,在一些大循环场景,如果害怕这段逻辑一直占用 CPU 资源,则可以调用
Thread.sleep(0)
让别的线程有机会使用 CPU
51. wait、notify、notifyall
Object 内定义的方法,主要用于线程之间的通信和同步,且需要在 synchronized 修饰的方法或同步块中使用
wait()
:使得当前线程进入等待状态,且会释放锁notify()
: 会随机唤醒一个调用 wait 后等待的线程notifyAll()
: 会唤醒所有调用 wait 等待的线程
52. 死锁
非常典型的八股文:
- 互斥条件:每个资源只能被一个线程占用
- 占有和等待:线程在持有至少一个资源的同时,等待获取其他资源
- 不可抢占:线程所获得的资源在未使用完毕之前不能被其他线程抢占
- 循环等待:多个线程形成一种头尾相接的循环等待资源关系
- 只要我们打破上述的一个条件,就能避免死锁的发生
- eg:按序申请资源,这样就能破坏循环等待,比如有 A、B 两个资源,抢占的逻辑都是先抢 A, 抢到 A 之后才能抢 B
- 设置超时等待时间,避免一直占有资源等
53. volatile作用
作用:保证线程可见性和禁止指令重排序
- 可见性:被它修饰的值一旦被修改,立即会被其他线程看到
- 指令重排序:在多线程环境中,编译器和处理器可能会对代码进行优化,使得指令重排序。禁止指令重排序,确保变量的读写操作按照代码的顺序执行
54. ABA问题
现在有一个变量的值是 A,然后一个线程将它改成了 B,然后又被人改回了 A,此时有个线程它之前访问过变量,得到的值是 A,此时它又来访问变量,发现值还是 A,因此当前这个线程认为这个变量期间没有被修改过
如何解决?
可以通过版本号解决,比如修改一次变量版本号加一,这样虽然线程访问发现值没变,但是版本是变更了
55. 线程同步
线程同步是一种在多线程环境中用于控制不同线程对共享资源访问的机制
- 目的:确保在任何时刻,共享资源只能由一个线程访问,从而保证数据的完整性和一致性
- 在 Java 中,线程同步可以通过以下几种机制实现
1. synchronized
Java 提供的加锁关键字,用于在方法或代码块上加锁,以确保同一时刻只有一个线程能够执行被同步的方法或代码块
在 synchronized 可以使用 wait()
、notify()
、notifyAll()
实现条件等待通知
wait()
:当前线程进入等待状态,直到被其他线程唤醒。必须在同步块或同步方法中调用notify()
:唤醒一个等待的线程。如果有多个线程在等待,同一时刻只能唤醒一个notifyAll()
:唤醒所有等待的线程
eg:在 synchronized 块或方法中,可以使用 wait()
方法使线程等待某个条件满足,可使用 notify()
或 notifyAll()
方法唤醒等待的线程
2. ReentrantLock
是 JUC(java.util.concurrent
)提供的可重入锁,相比 synchronized 它更加灵活
- ReentrantLock 使用 Condition 对象来提供了更灵活的等待/通知机制。每个 ReentrantLock 可以创建一个或多个 Condition 对象,通过
newCondition()
方法创建await()
:使当前线程等待,直到收到信号或被中断signal()
:唤醒一个等待线程signalAll()
:唤醒所有等待线程
- 相比于 synchronized,ReentrantLock 还提供了公平锁和非公平锁机制
3. JUC其他同步工具类
Java 提供了一些高级的并发工具类,eg:CountDownLatch、CyclicBarrier、Semaphore 等,它用于实现一些复杂的同步需求