02-thread
1. 线程历史
CPU性能压榨的血泪史
- 单进程人工切换
- 纸带机
- 多进程批处理
- 多个任务批量执行。进程串行
- 多进程并行处理
- 把程序写在不同的内存位置上来回切换。进程并行,cpu切换
- 多线程
- 一个程序内部不同任务的来回切换
- selector - epoll
- 纤程/协程
- 绿色线程,用户管理(不是OS管理)线程
2. 进程, 线程, 程序
- 什么是程序?
- 什么是进程?
- 什么是线程?
- 什么是纤程/协程?
1. 程序、进程
- 程序:OS可以执行的文件。eg:QQ.exe
- 进程:OS进行资源分配的基本单位。静态概念
- eg:分配内存、硬盘……
OS将程序load到内存中运行。一个程序可以有多个进程。eg:QQ双开
2. 线程
- 线程:调度执行的基本单位。动态概念
- 一个程序里的不同的执行路径
- 多个线程共享同一个进程的资源。大多问题就是出现在共享资源里
- 程序进程开始执行,以线程为单位来执行。OS会找到主线程,给cpu执行,主线程中间开启了其他线程,再进行线程间的切换
- 单线程,主线程。eg:
main()
- 多线程:主线程产生了分支,同时进行
- 单线程,主线程。eg:
3. 线程切换
CPU组成单元:
- ALU计算单元
- 寄存器组,用来存储数据
- PC(program_counter)也是一种寄存器,用来存储执行到哪条指令
- T1运行,将其指令和data放入cpu,cpu执行
- OS线程调度算法,将T1(指令和data)存入cache。运行T2(cpu很傻,只会根据指令、数据计算结果)
- 线程切换(context_switch)需要消耗资源,由OS进行
4. interview
1. 单核CPU多线程是否有意义?
- 一个核cpu在同一时间只能跑一个thread
- 为了充分利用cpu。一个thread可能在sleep或进行IO,可以先切到另一个thread执行
- cpu密集型线程:大量做计算,cpu利用率高
- IO密集型线程:大量等待IO
2. 线程数是不是越大越好?
线程间的切换要消耗资源
/**
* 100_000_000个随机double的和
* 单线程、2线程、10000线程
*/
@Slf4j
public class T01_MultiVsSingle_Thread {
private static final double[] doubles = new double[100_000_000];
private static final Random r = new Random();
private static final DecimalFormat df = new DecimalFormat("0.00");
static {
for (int i = 0; i < doubles.length; i++) {
doubles[i] = r.nextDouble();
}
}
/**
* 1. 单线程
*/
private void _single() {
long start = System.currentTimeMillis();
double result = 0.0;
for (double num : doubles) {
result += num;
}
long end = System.currentTimeMillis();
System.out.println("m1: " + (end - start) + " result = " + df.format(result));
}
static double result1 = 0.0, result2 = 0.0, result = 0.0;
/**
* 2. 线程
*/
private void _two() throws Exception {
Thread t1 = new Thread(() -> {
for (int i = 0; i < doubles.length / 2; i++) {
result1 += doubles[i];
}
});
Thread t2 = new Thread(() -> {
for (int i = doubles.length / 2; i < doubles.length; i++) {
result2 += doubles[i];
}
});
long start = System.currentTimeMillis();
ThreadHelper.start(t1, t2);
ThreadHelper.join(t1, t2);
result = result1 + result2;
long end = System.currentTimeMillis();
System.out.println("m2: " + (end - start) + " result = " + df.format(result));
}
/**
* 3. custom线程
* n必须能被doubles.length整除,即segmentCount必须为整数
*/
private void _custom(int n) throws Exception {
Thread[] threads = new Thread[n];
double[] results = new double[n];
final int segmentCount = doubles.length / n;
CountDownLatch latch = new CountDownLatch(n);
for (int i = 0; i < n; i++) {
int m = i;
threads[i] = new Thread(() -> {
for (int j = m * segmentCount; j < (m + 1) * segmentCount; j++) {
results[m] += doubles[j];
}
latch.countDown();
});
}
double resultM3 = 0.0;
long start = System.currentTimeMillis();
ThreadHelper.start(threads);
latch.await();
for (double v : results) {
resultM3 += v;
}
long end = System.currentTimeMillis();
System.out.println("m3: " + (end - start) + " result = " + df.format(resultM3));
}
@Test
public void T1_single_vs_multi() throws Exception {
// m1: 107 result = 49997084.77
_single();
// m2: 64 result = 49997084.77
_two();
// n必须能被doubles.length整除
// m3: 780 result = 49996797.68
int n = 10_000;
_custom(n);
}
}
3. 线程数多少最合适?
# 1. cpu型号
# machdep.cpu.brand_string: Intel(R) Core(TM) i7-8700B CPU @ 3.20GHz
sysctl machdep.cpu.brand_string
# 2. cpu物理核心数
# hw.physicalcpu: 6
sysctl hw.physicalcpu
# 3. cpu逻辑核心数
# hw.logicalcpu: 12
sysctl hw.logicalcpu
- 服务器时时刻刻都有线程在跑,所以你设置的线程不能把所有的core都利用上
- 服务安全的角度,cpu性能利用到80%
- 压测来确定
- 性能测试工具(profiler统称)=> eg:Jprofiler
- 生产环境,用阿里arthas分析 => arthas官网
- 测试出
W/C
,等待时间与计算时间的比率——《Java并发编程实战》
5. Thread
1. Concept
- 同步:
new T1().run();
- 异步:
new T1().start();
/**
* 线程启动的方式
* 1. 同步:new T1().run();
* 2. 异步:new T1().start();
*/
public class T00_WhatIsThread {
/**
* start()异步执行
*/
@Test
public void T1_start() {
new T().start();
for (int i = 0; i < 10; i++) {
ThreadHelper.sleepSeconds(1);
System.out.println("main: " + i);
}
}
/**
* run()同步执行
*/
@Test
public void T2_run() {
new T().run();
for (int i = 0; i < 10; i++) {
ThreadHelper.sleepSeconds(1);
System.out.println("main: " + i);
}
}
}
/**
* T1线程
*/
class T extends Thread {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
ThreadHelper.sleepSeconds(1);
System.out.println("T1: " + i);
}
}
}
2. Create
extends Thread
implements Runnable
- lambda
implements Callable
=> FutureTask- ThreadPool(是否return)
ExecutorService.execute()
ExecutorService.submit()
@Slf4j
public class T02_Thread_Create {
/**
* 1. extends Thread
*/
@Test
public void T1_extends() {
MyThread t = new MyThread();
t.start();
ThreadHelper.join(t);
}
/**
* 2. implements Runnable
* 比extends更合理,java单继承
*/
@Test
public void T2_Runnable() {
Thread t = new Thread(new MyRunnable());
t.start();
ThreadHelper.join(t);
}
/**
* 3. lambda
*/
@Test
public void T3_lambda() {
Thread t = new Thread(() -> {
log.error("Hello Lambda!");
});
t.start();
ThreadHelper.join(t);
}
/**
* 4. implements Callable
*/
@Test
public void T4_Callable() throws ExecutionException, InterruptedException {
/*
* FutureTask,既是Runnable,也是Future
* FutureTask<V> implements RunnableFuture<V>
* RunnableFuture<V> extends Runnable, Future<V>
*/
FutureTask<String> task = new FutureTask<>(new MyCallable());
new Thread(task).start();
// ...业务操作
log.error(task.get()); // 阻塞类型,等线程返回值
}
/**
* 5. execute(),无返回值
*/
@Test
public void T5_ThreadPool_execute() throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> {
ThreadHelper.sleepSeconds(3);
log.error("Hello ThreadPool!");
});
// ExecutorService停止接收task,等待已经提交的task执行完成。当所有提交task执行完毕,线程池即被关闭
es.shutdown();
// 等待timeout后,监测ExecutorService是否已经关闭,若关闭true,否则false
while (!es.awaitTermination(1, TimeUnit.SECONDS)) {
System.out.println("ThreadPool ==>> 运行");
}
System.out.println("ThreadPool ==>> 关闭");
}
/**
* 6. submit(),有返回值
*/
@Test
public void T6_ThreadPool_submit() throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
// 可以执行runnable、callable
Future<String> f = es.submit(new MyCallable());
// ...业务操作
log.error(f.get());// 阻塞类型,等线程返回值
es.shutdown();
}
}
@Slf4j
class MyThread extends Thread {
@Override
public void run() {
log.error("Hello MyThread!");
}
}
@Slf4j
class MyRunnable implements Runnable {
@Override
public void run() {
log.error("Hello MyRunnable!");
}
}
@Slf4j
class MyCallable implements Callable<String> {
@Override
public String call() {
log.error("Hello MyCallable!");
return "success";
}
}
3. API
sleep()
:线程状态:timed waitingyield()
:线程状态:running => ready。当前线程停止,随机切到下个线程join()
:当前thread进行waiting,等待join_thread结束
public class T03_Thread_API {
/**
* 1. sleep():线程状态:timed waiting
*/
@Test
public void T1_sleep() {
Thread t = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println("A" + i);
try {
Thread.sleep(500);
// TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
ThreadHelper.join(t);
}
/**
* 2. yield():线程状态:running => ready。当前线程停止,随机切到下个线程
*/
@Test
public void T2_yield() {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 100; i++) {
System.out.println("A" + i);
if (i % 10 == 0) {
// 让出cpu
Thread.yield();
}
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 100; i++) {
System.out.println("---- >>>> B" + i);
if (i % 10 == 0) {
Thread.yield();
}
}
});
ThreadHelper.start(t1, t2);
ThreadHelper.join(t1, t2);
}
/**
* 3. join():当前thread进行waiting,等待join_thread结束
*/
@Test
public void T3_join() {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println("t1: " + i);
ThreadHelper.sleepMilli(500);
}
});
Thread t2 = new Thread(() -> {
// t1线程,运行结束。t2继续
try {
t1.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 10; i++) {
System.out.println("T2: " + i);
ThreadHelper.sleepMilli(500);
}
});
t1.start();
ThreadHelper.sleepSeconds(1);
t2.start();
ThreadHelper.join(t1, t2);
}
}
4. State
Linux中线程和进程区别不大
NEW
:线程刚刚创建,还没有启动RUNNABLE
:可运行状态,由线程调度器可以安排执行READY
:准备被调用(等待被cpu占用)RUNNING
:正在运行(正在被cpu占用)
WAITING
:等待被唤醒——ReentrantLock、LockSupportTIMED WAITING
:隔一段时间后自动唤醒BLOCKED
:被阻塞,正在等待锁——synchronizedTERMINATED
:线程结束
线程状态迁移图
linux中,thread理解为轻量的process
/**
* ThreadState
*/
@Slf4j
public class T04_ThreadState {
@Test
public void T1_NEW_RUNNABLE_TERMINATED() throws InterruptedException {
Thread t = new Thread(() -> {
log.debug("2: " + Thread.currentThread().getState()); // 2: RUNNABLE
for (int i = 0; i < 3; i++) {
ThreadHelper.sleepSeconds(1);
System.out.print(i + " ");
}
System.out.println();
});
log.debug("1: " + t.getState()); // 1: NEW
t.start();
t.join(); // 等待t结束
log.debug("3: " + t.getState()); // 3: TERMINATED
}
@Test
public void T2_WAITING_TIMED_WAITING() {
Thread t = new Thread(() -> {
LockSupport.park(); // 阻塞当前线程
log.debug("t go on!");
ThreadHelper.sleepSeconds(5);
});
t.start();
ThreadHelper.sleepSeconds(1);
log.debug("1: " + t.getState());// 1: WAITING,无期阻塞
LockSupport.unpark(t); // 解放t线程
ThreadHelper.sleepSeconds(1);
log.debug("2: " + t.getState()); // 2: TIMED_WAITING,定时阻塞
ThreadHelper.join(t);
}
/*
* 1. synchronized是BLOCKED状态
* 只有经过OS调度的才是BLOCKED状态
*/
@Test
public void T3_sync_BLOCKED() {
final Object o = new Object();
Thread t = new Thread(() -> {
synchronized (o) { // BLOCKED
log.debug("t 得到了锁 o");
}
});
new Thread(() -> {
synchronized (o) {
ThreadHelper.sleepSeconds(5);
}
}).start();
ThreadHelper.sleepSeconds(1);
t.start();
ThreadHelper.sleepSeconds(1);
// BLOCKED
log.debug("1: " + t.getState());
ThreadHelper.join(t);
}
/*
* 2. juc的锁cas来实现,进入忙等待,是waiting状态,非blocked状态
* 除了synchronized锁是blocked状态,其他都是waiting状态
*/
@Test
public void T4_ReentrantLock_WAITING() {
Lock lock = new ReentrantLock();
Thread t = new Thread(() -> {
lock.lock(); // 省略try finally
log.debug("t 得到了锁");
lock.unlock();
});
new Thread(() -> {
lock.lock();
ThreadHelper.sleepSeconds(5);
lock.unlock(); // 手动释放锁
}).start();
ThreadHelper.sleepSeconds(1);
t.start();
ThreadHelper.sleepSeconds(1);
log.debug("1: " + t.getState()); // WAITING
ThreadHelper.join(t);
}
@Test
public void T5_LockSupport_WAITING() {
Thread t = new Thread(() -> {
LockSupport.park();
log.debug("t go on!");
});
t.start();
ThreadHelper.sleepSeconds(1);
log.debug("1: " + t.getState()); // WAITING
LockSupport.unpark(t);
ThreadHelper.join(t);
}
}
5. interrupt
interrupt()
:设置打断标志位isInterrupted()
:查询打断标志位interrupted()
:清空打断标志位,并返回标志情况
sleep(), wait(), join()
与标志位冲突。抛InterruptedException,未设置上标志位lockInterruptibly()
,抛InterruptedException,未设置上标志位LockSupport.park()
失效,设置上标志位- 不能打断BLOCKED状态线程,设置上标志位
- 不能打断ReentrantLock_lock,设置上标志位
/**
* 1. interrupt():设置打断标志位
* 2. isInterrupted():查询打断标志位
* 3. interrupted():清空打断标志位,并返回标志情况
*/
@Slf4j
public class T05_Interrupt {
@Test
public void T1_interrupt_isInterrupted() {
Thread t = new Thread(() -> {
for (; ; ) {
// 1. isInterrupted() 查询标志位
if (Thread.currentThread().isInterrupted()) {
System.out.println("Thread is interrupted!");
break;
} else {
System.out.println("Thread is not interrupted!");
}
}
});
t.start();
ThreadHelper.sleepSeconds(1);
// 设置标志位
t.interrupt();
ThreadHelper.join(t);
}
/*
* 2. Thread.interrupted()查询并重置标志位
*
* t.interrupted() == Thread.interrupted()
* 查询当前thread标志位
* public static boolean interrupted() {
* return currentThread().isInterrupted(true);
* }
*/
@Test
public void T2_interrupted() {
Thread t = new Thread(() -> {
for (; ; ) {
// 查询当前线程标志位,并清空:true
if (Thread.interrupted()) {
log.debug("t: {}", Thread.interrupted());
break;
}
}
});
t.start();
ThreadHelper.sleepSeconds(1);
t.interrupt();
ThreadHelper.join(t);
}
/*
* 1. sleep(), wait(), join()与标志位冲突。抛InterruptedException异常
* 2. 未设置上标志位
*/
@Test
public void T3_sleep() {
Thread t = new Thread(() -> {
try {
Thread.sleep(5_000);
} catch (InterruptedException e) {
System.out.println("Thread is interrupted!");
// false
log.debug("t :" + Thread.currentThread().isInterrupted());
}
});
t.start();
ThreadHelper.sleepSeconds(1);
System.out.println("t.getState() = " + t.getState());
t.interrupt();
ThreadHelper.join(t);
}
@Test
public void T4_synchronized() {
final Object o = new Object();
Thread t = new Thread(() -> {
synchronized (o) {
try {
o.wait();
} catch (InterruptedException e) {
System.out.println("Thread is interrupted!");
// false
log.debug("t :" + Thread.currentThread().isInterrupted());
}
}
});
t.start();
ThreadHelper.sleepSeconds(1);
System.out.println("t.getState() = " + t.getState());
t.interrupt();
ThreadHelper.join(t);
}
Thread t;
/**
* 1. LockSupport.park()失效
* 2. 设置上标志位
*/
@Test
public void T5_LockSupport() {
t = new Thread(() -> {
System.out.println("t start!");
LockSupport.park(t);
System.out.println("t end! 是否Interrupted: " + Thread.currentThread().isInterrupted());
});
t.start();
ThreadHelper.sleepSeconds(2);
System.out.println("t.getState() = " + t.getState());
t.interrupt();
// LockSupport.unpark(t);
ThreadHelper.join(t);
}
/**
* 1. 不能打断BLOCKED状态线程
* 2. 设置上标志位
*/
@Test
public void T6_tryLock_synchronized() {
final Object o = new Object();
Thread t1 = new Thread(() -> {
synchronized (o) {
ThreadHelper.sleepSeconds(5);
log.debug("t1 continue!");
}
});
t1.start();
ThreadHelper.sleepSeconds(1);
// BLOCKED
Thread t2 = new Thread(() -> {
synchronized (o) {
}
log.debug("t2 end! 是否Interrupted: " + Thread.currentThread().isInterrupted());
});
t2.start();
ThreadHelper.sleepSeconds(1);
System.out.println("t2.getState() = " + t2.getState());
t2.interrupt();
ThreadHelper.join(t1, t2);
}
/**
* 1. 不能打断ReentrantLock_lock
* 2. 设置标志位
*/
@Test
public void T7_tryLock_ReentrantLock() {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
lock.lock();
ThreadHelper.sleepSeconds(5);
log.debug("t1 continue!");
lock.unlock();
});
t1.start();
ThreadHelper.sleepSeconds(1);
// WAITING
Thread t2 = new Thread(() -> {
lock.lock();
log.debug("t2 end! 是否Interrupted: " + Thread.currentThread().isInterrupted());
lock.unlock();
});
t2.start();
ThreadHelper.sleepSeconds(1);
System.out.println("t2.getState() = " + t2.getState());
t2.interrupt();
ThreadHelper.join(t1, t2);
}
/**
* lockInterruptibly()
* 1. 抛InterruptedException
* 2. 未设置上标志位
*/
@Test
public void T8_tryLock_lockInterruptibly() {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
System.out.println("t1 start!");
lock.lock();
ThreadHelper.sleepSeconds(5);
System.out.println("t1 end!");
});
t1.start();
ThreadHelper.sleepSeconds(1);
Thread t2 = new Thread(() -> {
System.out.println("t2 start!");
try {
// 可被interrupt的锁争抢
lock.lockInterruptibly();
System.out.println("t2 continue!");
lock.unlock();
} catch (InterruptedException e) {
// lock争抢被打断后,业务处理
e.printStackTrace();
System.out.println("t2 WAITING,被打断");
}
System.out.println("t2 end! 是否Interrupted: " + Thread.currentThread().isInterrupted());
});
t2.start();
ThreadHelper.sleepSeconds(1);
System.out.println("t2.getState() = " + t2.getState());
t2.interrupt();
ThreadHelper.join(t1, t2);
}
}
6. end
- 自然结束(能自然结束就尽量自然结束)
stop()
,@Deprecatededsuspend(), resume()
,@Deprecated- volatile
wait(), sleep(), recv(), accept()
线程阻塞,没有办法停止- 停止时刻难以控制
- eg:一个阻塞容器,容量为5的时候结束生产者,但是,volatile同步线程标志位的时间控制不是很精确,可能生产者还继续生产
interrupt(), isInterrupted()
(比较优雅)- 停止时刻难以控制
- 比volatile更优雅。
wait(), sleep()
线程状态会抛InterruptedException
。正确处理异常即可
/**
* 结束thread
*/
@Slf4j
public class T06_end {
/*
* 1. stop():@Deprecated
* 释放所有锁(可能进行数据同步)。容易产生数据不一致的问题
*/
@Test
public void T1_stop() {
Thread t = new Thread(() -> {
while (true) {
System.out.println("uploading file");
ThreadHelper.sleepSeconds(1);
}
});
t.start();
ThreadHelper.sleepSeconds(5);
// @Deprecated
t.stop();
ThreadHelper.join(t);
}
/*
* 2. suspend(), resume():@Deprecated
* 暂停时持有锁,锁可能会永远不释放。产生死锁
*/
@Test
public void T2_suspend_resume() {
Thread t = new Thread(() -> {
int i = 1;
while (true) {
System.out.println("uploading file: " + i++);
ThreadHelper.sleepSeconds(1);
}
});
t.start();
ThreadHelper.sleepSeconds(5);
t.suspend();
ThreadHelper.sleepSeconds(3);
t.resume();
ThreadHelper.join(t);
}
private static volatile boolean running = true;
/*
* 3. volatile
* 1. 停止时刻难以控制。eg:一个阻塞容器,容量为5的时候结束生产者
* 2. `wait(), sleep(), recv(), accept()`线程阻塞,没有办法停止
*/
@Test
public void T3_volatile() {
Thread t = new Thread(() -> {
long i = 0L;
while (running) {
// wait(), sleep(), recv(), accept()
i++;
}
// 很难控制循环了多少次:4168806262 4163032200……
System.out.println("t end and i = " + i);
});
t.start();
ThreadHelper.sleepSeconds(1);
running = false;
ThreadHelper.join(t);
}
/*
* 4. interrupt()
* 1. 停止时刻难以控制
* 2. 比volatile更优雅。wait(), sleep()线程状态会抛InterruptedException。正确处理异常即可
*/
@Test
public void T4_interrupt() {
Thread t = new Thread(() -> {
long i = 0L;
while (!Thread.interrupted()) {
// sleep() wait()
i++;
}
System.out.println("t end i = " + i);
});
t.start();
ThreadHelper.sleepSeconds(1);
t.interrupt();
ThreadHelper.join(t);
}
}
6. Reference
1. normalRef
- 强引用,不会被GC。引用为null才会GC
2. softRef
- 内存不够用了,才回收
3. weakRef
- GC立即回收
- ThreadLocal原理
- WeakHashMap
4. phantomRef
- 作用:回收堆外内存
- 直接内存(堆外内存):
- 不被JVM管理的memory,memory不在堆上,GC回收不了。引用被干掉,加入到queue中。手动清除queue指定的memory(java堆外内存的回收用unsafe类)
/**
* 强、软、弱、虚
*/
public class T1_Reference {
/**
* 1. 强引用,不会被GC。引用为null才会GC
*/
@Test
public void T1_normalRef() {
M m = new M();
System.gc(); // DisableExplicitGC
// 延缓main线程,给GC时间执行
LockSupport.park();
}
/*
* 2. 软引用
* -Xms20M -Xmx20M
* 1. 内存不够用了,才回收。回收后还不够,抛内存溢出异常
* 2. 描述一些还有用,但并非必须的对象。软引用非常适合缓存使用
*
* 应用场景(缓存):
* 1. 大图片
* 2. 大数据
*/
@Test
public void T2_softRef() {
// sr大小10M
SoftReference<byte[]> sr = new SoftReference<>(new byte[1024 * 1024 * 10]);
System.out.println(sr.get());
System.gc();
ThreadHelper.sleepSeconds(1);
System.out.println(sr.get());
// 再分配一个12M数组,heap装不下
byte[] b = new byte[1024 * 1024 * 12];
ThreadHelper.sleepSeconds(1);
// GC一次,还不够,把SoftReference干掉,再GC
System.out.println(sr.get());
}
/*
* 3. 弱引用
* 1. GC立即回收
* 2. 内存泄漏、内存溢出,概念不同
*
* 应用场景:容器
* 作业:1. weakHashMap
*/
@Test
public void T3_weakRef() {
WeakReference<M> wr = new WeakReference<>(new M());
System.out.println(wr.get());
System.gc(); // GC回收
ThreadHelper.sleepSeconds(1);
System.out.println(wr.get());
// 弱引用的典型应用,也是ThreadLocal问到的最深的地方
ThreadLocal<M> tl = new ThreadLocal<>();
// set() => ThreadLocalMap.set() => Entry extends WeakReference<ThreadLocal<?>>
tl.set(new M()); // 本质是current_thread的ThreadLocalMap赋值
// remove()务必。key为null,value也会导致memory泄漏。
tl.remove();
}
private static final List<Object> LIST = new LinkedList<>();
// 装得都是引用
private static final ReferenceQueue<M> QUEUE = new ReferenceQueue<>();
/*
* 4. 虚引用(清理堆外内存)不是给程序员用的,给写JVM人用的
* -Xms50M -Xmx50M
* 1. 唯一作用:对象被GC时,收到一个系统通知(ReferenceQueue)
* 2. get()无法获取对象实例,GC立即回收
*
* 1. jdk中直接内存的回收就用到虚引用,直接内存是堆外内存
* 2. GC范围是堆内存,直接内存的分配和回收都是Unsafe类去操作
* 3. java申请直接内存,堆内存分配一个Obj保存这个堆外内存的引用,被GC管理,一旦这个对象被回收,相应的用户线程会收到通知并对直接内存进行清理工作
*
* DirectByteBuffer就是通过虚引用来实现堆外内存的释放
*/
@Test
public void T4_phantomRef() {
PhantomReference<M> phantomReference = new PhantomReference<>(new M(), QUEUE);
// System.out.println(phantomReference.get());
ByteBuffer b = ByteBuffer.allocateDirect(1024);
new Thread(() -> {
while (true) {
LIST.add(new byte[2 * 1024 * 1024]); // 2M
ThreadHelper.sleepSeconds(1);
// 虚引用,get()不到对象
System.out.println(phantomReference.get());
}
}).start();
// 垃圾回收线程
new Thread(() -> {
while (true) {
Reference<? extends M> poll = QUEUE.poll();
if (poll != null) {
System.out.println("--- 虚引用对象被JVM回收了 ---- " + JSONUtil.toJsonStr(poll));
}
}
}).start();
ThreadHelper.sleepSeconds(50);
}
@Test
public void T5_phantom_gc() {
ReferenceQueue<M> referenceQueue = new ReferenceQueue<>();
PhantomReference<M> phantomReference = new PhantomReference<>(new M(), referenceQueue);
System.out.println(phantomReference.get()); // null
new Thread(() -> {
while (true) {
Reference<? extends M> poll = referenceQueue.poll();
if (poll != null) {
System.out.println("--- 虚引用对象被jvm回收了 ---- " + poll); // java.lang.ref.PhantomReference@108beb45
System.out.println("--- 回收对象 ---- " + poll.get()); // null
}
}
}).start();
ThreadHelper.sleepSeconds(1);
System.gc();
ThreadHelper.sleepSeconds(1);
// 第二次gc()才被感知到
System.gc();
ThreadHelper.sleepSeconds(50);
}
}
/**
* GC_FreeMemory。垃圾回收,释放内存
*/
class M {
// 默认给3M的空间
private int[] bytes = new int[1024 * 1024 * 3];
/**
* GC释放对象内存时,会调用finalize()。该方法永远不应该重写、调用
*/
@Override
protected void finalize() {
System.out.println("GC将对象清除!!");
}
}
// T4_phantomRef
null
null
null
null
null
null
null
null
null
GC将对象清除!!
null
--- 虚引用对象被JVM回收了 ---- {}
null
null
// ...
// T5_phantom_gc
null
GC将对象清除!!
--- 虚引用对象被jvm回收了 ---- java.lang.ref.PhantomReference@108beb45
--- 回收对象 ---- null
7. ThreadLocal
- ThreadLocal给每个线程建立单独变量副本,从而不影响其他线程
- ThreadLocal不存储值,只是提供能查到value的key
JDK1.2 解决共享参数的频繁传递与线程安全等问题,Thread的局部变量
/*
* ThreadLocal:当前线程局部变量
* 需求:两个线程变量独享,互不影响
*
* 1. ThreadLocal是使用空间换时间
* 2. synchronized是使用时间换空间
*
* 用途:
* 声明式事务。数据库连接对象存于ThreadLocal中,一个事务中每个sql都使用同一个连接对象
* hibernate_session就存在与ThreadLocal中,避免synchronized的使用
*/
public class T2_TL_Basic {
volatile static Person p = new Person();
/**
* t1线程变量被t2修改
*/
@Test
public void T1_common() {
Thread t1 = new Thread(() -> {
ThreadHelper.sleepSeconds(2);
System.out.println(p);
}, "t1");
Thread t2 = new Thread(() -> {
ThreadHelper.sleepSeconds(1);
p.name = "listao";
}, "t2");
ThreadHelper.start(t1, t2);
ThreadHelper.join(t1, t2);
}
static ThreadLocal<Person> tl = new ThreadLocal<>();
/*
* threadLocal => 设到了当前线程的map中
* .set() => Thread.currentThread... map(ThreadLocal, person)
*/
@Test
public void T2_threadLocal() {
Thread t1 = new Thread(() -> {
tl.set(new Person("listao"));
ThreadHelper.sleepSeconds(2);
System.out.println(tl.get());
}, "t1");
Thread t2 = new Thread(() -> {
ThreadHelper.sleepSeconds(1);
tl.set(new Person());
}, "t2");
ThreadHelper.start(t1, t2);
ThreadHelper.join(t1, t2);
}
}
@ToString
@NoArgsConstructor
@AllArgsConstructor
class Person {
String name = "zhangsan";
}
1. Multi_Thread
multi_thread都独享一个对象
/**
* multi_thread都独享一个对象
*/
public class T3_TL_copy {
public static ExecutorService es = Executors.newFixedThreadPool(10);
private String m1(int seconds) {
// 参数的单位是毫秒,从1970.1.1 00:00:00 GMT计时
Date date = new Date(1_000L * seconds);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return dateFormat.format(date);
}
/**
* 1. 每个thread都创建SimpleDateFormat,消耗内存,GC有压力
*/
@Test
public void T1_normal() {
for (int i = 0; i < 1_000; i++) {
int finalI = i;
es.submit(() -> {
String date = m1(finalI);
System.out.println(date);
});
}
es.shutdown();
}
// -----------------------------------------------------------------------------------------------
static SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private String m2(int seconds) {
Date date = new Date(1_000L * seconds);
return DATE_FORMAT.format(date);
}
/**
* 2. 所有thread共享一个对象static SimpleDateFormat,thread_unsafe
*/
@Test
public void T1_static() {
for (int i = 0; i < 1_000; i++) {
int finalI = i;
es.submit(() -> {
String date = m2(finalI);
System.out.println(date);
});
}
es.shutdown();
}
// -----------------------------------------------------------------------------------------------
private String m3(int seconds) {
Date date = new Date(1_000L * seconds);
synchronized (this) {
return DATE_FORMAT.format(date);
}
}
/**
* 3. synchronized(),thread_safe不过性能下降
*/
@Test
public void T3_static_sync() {
for (int i = 0; i < 1_000; i++) {
int finalI = i;
es.submit(() -> {
String date = m3(finalI);
System.out.println(date);
});
}
es.shutdown();
}
// -----------------------------------------------------------------------------------------------
private String m4(int seconds) {
// 参数的单位是毫秒,从1970.1.1 00:00:00 GMT计时
Date date = new Date(1_000L * seconds);
SimpleDateFormat simpleDateFormat = TL_DataFormat.dateFormat_TL.get();
return simpleDateFormat.format(date);
}
/**
* 4. ThreadLocal<SimpleDateFormat>,每个thread都有一个对象副本
* 副本只能被当前线程使用,是当前线程独享的成员变量
*/
@Test
public void T4_ThreadLocal() {
for (int i = 0; i < 1_000; i++) {
int finalI = i;
es.submit(() -> {
String date = m4(finalI);
System.out.println(date);
});
}
es.shutdown();
}
}
class TL_DataFormat {
public static ThreadLocal<SimpleDateFormat> dateFormat_TL = ThreadLocal.withInitial(
() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
);
}
2. Thread_phaser
同一个thread,不同阶段同一个变量
/**
* 同一个thread,不同阶段同一个变量
*/
public class T4_TL_phaser {
private static C0_Student init() {
final C0_Student build = C0_Student.builder()
.name("listao")
.sex("male")
.score(100).build();
TL_Student.studentTL.set(build);
return build;
}
@Test
public void T1_tradition() {
C0_Student student = init();
new C1_Name().getName(student);
new C2_Sex().getSex(student);
new C3_Score().getScore(student);
}
@Test
public void T2_ThreadLocal() {
init();
new C1_Name().getName();
new C2_Sex().getSex();
new C3_Score().getScore();
TL_Student.studentTL.remove();
}
}
class TL_Student {
public static ThreadLocal<C0_Student> studentTL = new ThreadLocal<>();
}
@Builder
@Getter
class C0_Student {
String name;
String sex;
int score;
}
// 获取name
class C1_Name {
public void getName(C0_Student student) {
System.out.println(student.name);
}
public void getName() {
System.out.println(TL_Student.studentTL.get().name);
}
}
// 获取sex
class C2_Sex {
public void getSex(C0_Student student) {
System.out.println(student.sex);
}
public void getSex() {
System.out.println(TL_Student.studentTL.get().sex);
}
}
// 获取score
class C3_Score {
public void getScore(C0_Student student) {
System.out.println(student.score);
}
public void getScore() {
System.out.println(TL_Student.studentTL.get().score);
}
}
3. SourceCode
为什么Entry要使用弱引用?
tl = null
- Entry中key为强引用:依然指向ThreadLocal对象,ThreadLocal始终不GC,内存泄漏
- key为弱引用:ThreadLocal被回收,
key = null
的Entry会在get(), set()
时进行回收
- Thread一直存在,并不在调用
get(), set()
。key = null
的Entry的value会内存泄漏。因此手动remove()
协助回收
- Thread内部持有ThreadLocalMap成员变量
- ThreadLocalMap是ThreadLocal的内部类,Entry是ThreadLocalMap内部类
- ThreadLocal操作了ThreadLocalMap对象内部的数据,对外暴露的都是ThreadLocal的API,隐藏了ThreadLocalMap的具体实现
public class Thread implements Runnable {
ThreadLocal.ThreadLocalMap threadLocals = null;
@HotSpotIntrinsicCandidate
public static native Thread currentThread();
}
public class ThreadLocal<T> {
static class ThreadLocalMap {
private Entry[] table;
private static final int INITIAL_CAPACITY = 16;
// Entry,key为弱引用,value为强引用
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
// ------------------------------------------------------------------------
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
// ThreadLocal对象作为key在Entry[]中的下标索引
int i = key.threadLocalHashCode & (len-1);
// 3.2. 获取指定下标的Entry对象,不为空,则进入for
for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { // 调用nextIndex()搜寻下一个合适的位置
ThreadLocal<?> k = e.get();
// 3.2.1. 判断Entry的key和当前的ThreadLocal对象是否是同一个对象
if (k == key) {
// 是,进行值替换,并结束方法
e.value = value;
return;
}
// 3.2.2. Entry的key是否失效,如果失效,则直接将失效的key和值进行替换
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
// 3.3. Entry第一次
tab[i] = new Entry(key, value);
int sz = ++size;
// 判断是否满足扩容的条件,进行扩容
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
// 开放寻址法
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
e.clear();
expungeStaleEntry(i);
return;
}
}
}
}
// ------------------------------------------------------------------------
public void set(T value) {
// 获取当前线程
Thread t = Thread.currentThread();
// 1.1.
ThreadLocalMap map = getMap(t);
if (map != null) {
// 3.1.
// 将本ThreadLocal对象作为key,value作为值存储到ThreadLocalMap中
map.set(this, value);
} else {
// 2.1.
createMap(t, value);
}
}
ThreadLocalMap getMap(Thread t) {
// 1.2.
return t.threadLocals;
}
void createMap(Thread t, T firstValue) {
// 2.2.
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
// 4.
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// 5.1.
return setInitialValue();
}
/*
* 5.2. Map为空 || Entry[]中没有以当前ThreadLocal为key的Entry
* 进行初始化,key为ThreadLocal对象,value为null
*/
private T setInitialValue() {
T value = initialValue(); // null
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
map.set(this, value);
} else {
createMap(t, value);
}
if (this instanceof TerminatingThreadLocal) {
TerminatingThreadLocal.register((TerminatingThreadLocal<?>) this);
}
return value;
}
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null) {
// 6.
m.remove(this);
}
}
}