07-ThreadPool
prime 英 [praɪm] 质数
concurrent 英 [kənˈkʌrənt] adj. 并存的,同时发生的
parallel 英 [ˈpærəlel] adj. 平行的;相似的,同时发生的;(计算机)并行的;并联的
steal 英 [stiːl] v. 偷窃,盗窃;剽窃,窃取(观点)
policy 英 /ˈpɒləsi/ n. 政策;方针;原则;保险单;为人之道
1. ExecutorService
完善了线程池的生命周期
- FutureTask,既是Runnable,也是Future
- Callable,jdk1.5后,增加Callable有返回值,对Runnable扩展
ExecutorService.execute()
无返回值ExecutorService.submit()
有返回值- 完善了整个任务执行器的执行周期
shutdown()
:ThreadPool中task全执行完了才关闭shutdownNow()
:立刻关闭isShutdown()
:shutdown()
执行过了,即为trueisTerminated()
:task全完了,才为true
- Executor接口,任务的定义、运行分开
public class T01_ExecutorService {
/*
* 1. FutureTask,既是Runnable,也是Future
* Callable,jdk1.5后,增加Callable有返回值,对Runnable扩展
*/
@Test
public void T2_Callable_FutureTask() throws ExecutionException, InterruptedException {
FutureTask<Integer> ft = new FutureTask<>(() -> {
ThreadHelper.sleepSeconds(1);
return 1_000;
});
new Thread(ft).start();
System.out.println(ft.get()); // 阻塞
}
/*
* 2. ExecutorService.execute() 无返回值
*/
@Test
public void T3_ExecutorService_execute() {
ExecutorService es = Executors.newCachedThreadPool();
Runnable runnable = () -> System.out.println("hello executor");
es.execute(runnable);
es.shutdown();
}
/*
* 3. ExecutorService.submit() 有返回值
*/
@Test
public void T4_ExecutorService_submit() throws InterruptedException, ExecutionException {
ExecutorService es = Executors.newCachedThreadPool();
Callable<String> callable = () -> "Hello Callable";
Future<String> future = es.submit(callable); // 异步
System.out.println(future.get()); // 阻塞
es.shutdown();
}
/*
* 4. 完善了整个任务执行器的执行周期
* 1. shutdown():ThreadPool中task全执行完了才关闭
* 2. shutdownNow():立刻关闭
* 3. isShutdown():shutdown()执行过了,即为true
* 4. isTerminated():task全完了,才为true
*/
@Test
public void T5_ExecutorService() {
ExecutorService es = Executors.newFixedThreadPool(5); //execute submit
for (int i = 0; i < 6; i++) {
es.submit(() -> {
ThreadHelper.sleepMilli(500);
System.out.println(Thread.currentThread().getName());
});
}
System.out.println(es);
es.shutdown();
System.out.println("es.isTerminated() = " + es.isTerminated()); // false
System.out.println("es.isShutdown() = " + es.isShutdown()); // true
System.out.println(es);
ThreadHelper.sleepSeconds(5);
System.out.println("es.isTerminated() = " + es.isTerminated()); // true
System.out.println("es.isShutdown() = " + es.isShutdown()); // true
System.out.println(es);
}
}
/*
* 5. Executor接口
* 任务的定义、运行分开
*/
class MyExecutor implements Executor {
@Test
public void T1_Executor() {
new MyExecutor().execute(() -> System.out.println("hello executor"));
}
/**
* execute可以任意指定
* 采用threadFactory……
*/
@Override
public void execute(Runnable cmd) {
new Thread(cmd).start();
// cmd.run();
}
}
2. ThreadPoolExecutor
- ThreadPoolExecutor
- Thread共享一个task_queue
- ForkJoinPool
- 每个Thread有一个task_queue
- 分解、汇总任务
1. ctor
7个重要参数
new ThreadPoolExecutor(
2, // 1. 核心thread数。pool始终保持的thread数
4, // 2. pool最大thread数
60, // 3. 生存时间。非core_thread闲置,归还OS
TimeUnit.SECONDS, // 4. 生存单位
new ArrayBlockingQueue<>(4), // 5. 任务队列。各种各样存task的blockingQueue
Executors.defaultThreadFactory(), // 6. thread_factory。可自定义
new ThreadPoolExecutor.AbortPolicy()); // 7. 拒绝策略。thread_pool忙,队列满,进行拒绝
2. 拒绝策略
Abort
:抛异常CallerRuns
:调用者执行DiscardOldest
:放弃排队最久的Discard
:放弃执行
3. DefaultThreadFactory
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0); // 1. 线程名称自定义
if (t.isDaemon())
t.setDaemon(false); // 2. 设置非守护线程
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY); // 3. 设置优先级。设置了也没用
return t;
}
4. 自定义拒绝策略
/*
* 线程池两种:
* 1. ThreadPoolExecutor(多个thread共享同一个task_queue)
* 2. ForkJoinPool(每个thread有自己的task_queue)
* 1. 分解汇总的任务
* 2. 用很少的thread可以执行很多任务(子任务)TPE做不到先执行子任务
* 3. CPU密集型
*/
@Slf4j
public class T02_ThreadPoolExecutor {
ThreadPoolExecutor tpe = null;
private void ooxx() {
for (int i = 0; i < 8; i++) {
tpe.execute(new Task(i));
}
System.out.println(tpe.getQueue());
tpe.execute(new Task(100));
System.out.println(tpe.getQueue());
tpe.shutdown();
}
/*
* 拒绝策略
* 1. Abort:抛异常
* 2. CallerRuns:调用者执行
* 3. DiscardOldest:放弃排队最久的
* 4. Discard:放弃执行
*/
@Test
public void T1_AbortPolicy() {
/*
* 阿里手册里线程池要自定义
* 定义thread_pool七个参数
* thread_pool维护两个集合,thread、task
*
* ThreadPool工作流程
* 1. 默认pool为空
* 2. corePoolSize => queue => maxPoolSize
* 3. RejectedExecutionHandler,拒绝策略
*/
tpe = new ThreadPoolExecutor(
2, // 1. 核心thread数。pool始终保持的thread数
4, // 2. pool最大thread数
60, // 3. 生存时间。非core_thread闲置,归还OS
TimeUnit.SECONDS, // 4. 生存单位
new ArrayBlockingQueue<>(4), // 5. 任务队列。各种各样存task的blockingQueue
Executors.defaultThreadFactory(), // 6. thread_factory。可自定义
new ThreadPoolExecutor.AbortPolicy()); // 7. 拒绝策略。thread_pool忙,队列满,进行拒绝
ooxx();
}
/**
* 2. CallerRunsPolicy,调用者执行
*/
@Test
public void T2_CallerRunsPolicy() {
tpe = new ThreadPoolExecutor(
2,
4,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(4),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
ooxx();
}
/**
* 3. DiscardOldestPolicy,放弃排队最久的
*/
@Test
public void T3_DiscardOldestPolicy() {
tpe = new ThreadPoolExecutor(
2,
4,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(4),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
ooxx();
}
/**
* 4. DiscardPolicy,放弃执行
*/
@Test
public void T4_DiscardPolicy() {
tpe = new ThreadPoolExecutor(
2,
4,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(4),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy());
ooxx();
}
/**
* 5. 自定义拒绝策略
*/
@Test
public void T5_MyRejectedHandler() {
tpe = new ThreadPoolExecutor(
4,
4,
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(4),
Executors.defaultThreadFactory(),
new MyHandler());
ooxx();
}
}
@Slf4j
@ToString
@AllArgsConstructor
class Task implements Runnable {
private final int i;
@Override
public void run() {
log.debug("Task:" + i);
LockSupport.park();
}
}
/**
* 自定义拒绝策略
*/
class MyHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor tpe) {
// log("r rejected") 日志记录
// save r => kafka、mysql、redis
// try 3 times
if (tpe.getQueue().size() < 10000) {
// try put again();
System.out.println("try put again");
}
}
}
3. Executors
线程池工厂,Executors线程池工具类
1. SingleThreadExecutor
- 单线程线程池?
- 维护任务顺序队列
- 提供thread生命周期管理
2. CachedThreadPool
- 没有核心thread,来一个task开辟一个线程
3. FixedThreadPool
固定数量线程的ThreadPool
4. ScheduledThreadPool
- 定时任务线程池
- quartz:定时任务框架
- cron:需要shell脚本功底
- interview:假如提供一个闹钟服务,订阅这个服务的人特别多,10亿人,怎么优化?
5. WorkStealingPool
- thread_pool本质:thread_queue不断执行task_queue中的task
- WorkStealingPool:每个thread都有单独的task_queue,task_queue执行空了,去其他task_queue取task
/*
* jdk提供的默认的ThreadPool,Executors线程工具类
*/
@Slf4j
public class T03_Executors {
/*
* 1. newSingleThreadExecutor()
* 单thread串行执行,加入的task一直排队(LinkedBlockingQueue)等待,同一个core_thread来执行
*
* 单线程线程池?
* 1. 维护任务顺序队列
* 2. 提供thread生命周期管理
*
* new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
*/
@Test
public void T1_SingleThreadExecutor() {
ExecutorService es = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
final int j = i;
es.execute(() -> log.error(String.valueOf(j)));
}
}
/*
* 2. newCachedThreadPool()
* 一个task,一个thread。没有core_thread,来一个task开辟一个线程
*
* new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
*/
@Test
public void T2_CachedThreadPool() {
ExecutorService es = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
final int j = i;
es.execute(() -> log.error(String.valueOf(j)));
}
System.out.println(es);
}
/*
* 2. newFixedThreadPool()
* 固定数量线程的ThreadPool
*
* new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
*/
@Test
public void T3_FixedThreadPool() throws InterruptedException, ExecutionException {
long start = System.currentTimeMillis();
List<Integer> primes = MyTask.getPrime(1, 200_000);
System.out.println("serial_prime个数:" + primes.size());
long end = System.currentTimeMillis();
System.out.println("serial耗时:" + (end - start));
// 4核cpu
ExecutorService es = Executors.newFixedThreadPool(4);
MyTask t1 = new MyTask(1, 80000); // 1-5 5-10 10-15 15-20
MyTask t2 = new MyTask(80001, 130000);
MyTask t3 = new MyTask(130001, 170000);
MyTask t4 = new MyTask(170001, 200000);
Future<List<Integer>> f1 = es.submit(t1);
Future<List<Integer>> f2 = es.submit(t2);
Future<List<Integer>> f3 = es.submit(t3);
Future<List<Integer>> f4 = es.submit(t4);
start = System.currentTimeMillis();
System.out.println("ThreadPool_prime个数:" + (f1.get().size() + f2.get().size() + f3.get().size() + f4.get().size()));
end = System.currentTimeMillis();
System.out.println("ThreadPool耗时:" + (end - start));
}
/*
* 3. newScheduledThreadPool(),定时任务线程池
*
* public ScheduledThreadPoolExecutor(int corePoolSize) {
* super(corePoolSize, Integer.MAX_VALUE, 10L, TimeUnit.MILLISECONDS, new DelayedWorkQueue());
* }
*
* 1. schedule()
* 2. scheduleAtFixedRate()
* 3. scheduleWithFixedDelay()
*/
@Test
public void T4_ScheduledExecutorService() {
ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);
ses.scheduleAtFixedRate(() -> {
ThreadHelper.sleepSeconds(3);
log.debug("ooxx");
}, // Runnable()
0, // 初始延迟
500, // 周期
TimeUnit.MILLISECONDS); // 周期单位
LockSupport.park();
}
/**
* 4. newWorkStealingPool()
*/
@Test
public void T5_WorkStealingPool() {
// 指定并行thread数
ExecutorService es = Executors.newWorkStealingPool(2);
for (int i = 1; i < 10; i++) {
int finalI = i;
es.execute(() -> {
System.out.println(finalI);
ThreadHelper.sleepSeconds(1);
});
}
LockSupport.park();
}
}
@AllArgsConstructor
class MyTask implements Callable<List<Integer>> {
int startPos, endPos;
@Override
public List<Integer> call() {
return getPrime(startPos, endPos);
}
/**
* 判断是否是质数
*/
static boolean isPrime(int num) {
for (int i = 2; i <= num / 2; i++) {
if (num % i == 0) return false;
}
return true;
}
/**
* 获取指定范围内的质数
*/
static List<Integer> getPrime(int start, int end) {
List<Integer> results = new ArrayList<>();
for (int i = start; i <= end; i++) {
if (isPrime(i)) results.add(i);
}
return results;
}
}
6. jconsole
- 使用JDK自带的监控工具来监控创建的线程数量,运行一个不终止的线程,创建指定量的线程,来观察:
- 工具目录:
C:\Program Files\Java\jdk1.6.0_06\bin\jconsole.exe
4. ForkJoinPool
- 无返回值,
extends RecursiveAction
- 有返回值,
extends RecursiveTask
Executors.newWorkStealingPool(2);
List.parallelStream().forEach();
/**
* ForkJoinPool
* 1_000_000个随机数相加
*/
@Slf4j
public class T04_ForkJoinPool {
static int[] nums = new int[1_000_000];
static final int MAX_NUM = 50_000;
static Random r = new Random();
static AtomicInteger atomicInt = new AtomicInteger(0);
static {
for (int i = 0; i < nums.length; i++) {
nums[i] = r.nextInt(100);
}
// single_thread求和,stream_api
System.out.println("serial_result: " + Arrays.stream(nums).sum());
}
/*
* 1.1. extends RecursiveAction, extends ForkJoinTask<Void>
*/
@AllArgsConstructor
static class TaskVoid extends RecursiveAction {
int start, end;
@Override
protected void compute() {
if (end - start <= MAX_NUM) {
for (int i = start; i < end; i++) atomicInt.addAndGet(nums[i]);
} else {
int middle = start + (end - start) / 2;
TaskVoid subTask1 = new TaskVoid(start, middle);
TaskVoid subTask2 = new TaskVoid(middle, end);
subTask1.fork();
subTask2.fork();
}
}
}
/**
* 1. 无返回值
*/
@Test
public void T1_fjp_void() {
ForkJoinPool fjp = new ForkJoinPool();
TaskVoid task = new TaskVoid(0, nums.length);
// 守护线程
fjp.execute(task);
ThreadHelper.sleepSeconds(3);
System.out.println("fjp_void: " + atomicInt.get());
}
// ---------------------------------------------------------------------
/*
* 2.2. extends RecursiveTask<Long>, extends ForkJoinTask<V>
*/
@AllArgsConstructor
static class TaskReturn extends RecursiveTask<Long> {
int start, end;
@Override
protected Long compute() {
if (end - start <= MAX_NUM) {
long sum = 0L;
for (int i = start; i < end; i++) sum += nums[i];
return sum;
}
int middle = start + (end - start) / 2;
TaskReturn subTask1 = new TaskReturn(start, middle);
TaskReturn subTask2 = new TaskReturn(middle, end);
subTask1.fork();
subTask2.fork();
return subTask1.join() + subTask2.join();
}
}
/**
* 2.1. 有返回值
*/
@Test
public void T2_fjp_return() {
ForkJoinPool fjp = new ForkJoinPool();
TaskReturn task = new TaskReturn(0, nums.length);
fjp.execute(task);
long result = task.join();
System.out.println("fjp_return: " + result);
}
// ---------------------------------------------------------------------
/*
* 3. WorkStealingPool:每个thread都有一个task_queue,task_queue执行空了,去其他task_queue取task
*/
@Test
public void T3_WorkStealingPool() {
// ExecutorService es = Executors.newWorkStealingPool();
System.out.println(Runtime.getRuntime().availableProcessors()); // 硬件线程数
// 指定并行thread数
ExecutorService es = Executors.newWorkStealingPool(2);
for (int i = 1; i < 10; i++) {
int finalI = i;
es.execute(() -> {
System.out.println(finalI);
ThreadHelper.sleepSeconds(1);
});
}
LockSupport.park();
}
/*
本质:ForkJoinPool
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
*/
/**
* 底层ForkJoinPool
*/
@Test
public void T4_ParallelStream_API() {
final List<Integer> integers = new ArrayList<>(1_000_000);
for (int i = 0; i < 1_000_000; i++) {
integers.add(RandomUtil.randomInt(1_000));
}
long start, end;
start = System.currentTimeMillis();
// 1. forEach
for (int one : integers) {
isPrime(one);
}
end = System.currentTimeMillis();
System.out.println("forEach: " + (end - start));
// ---------------------------------------------------------------------
start = System.currentTimeMillis();
// 2. parallelStream()
integers.parallelStream().forEach(this::isPrime);
end = System.currentTimeMillis();
System.out.println("parallelStream: " + (end - start));
}
void isPrime(Integer num) {
for (int i = 2; i <= num / 2; i++) {
if (num % i == 0) return;
}
}
}
1. 并发、并行
- 并发是指任务提交
- 并行指任务执行。并行是并发的子集。多个cpu同一时刻执行任务
2. WorkStealingPool
- 每个thread都有自己的WorkQueue,WorkQueue是一个双端队列,它是线程私有的
- ForkJoinTask中fork的子任务,将放入运行该任务的工作线程的队头,工作线程将以LIFO的顺序来处理工作队列中的任务
- 最大化地利用CPU,空闲thread从其它thread的WorkQueue尾部窃取任务,减少竞争
- 双端队列的操作:
push()/pop()
仅在其所有者工作线程中调用,poll()
由其它线程窃取任务时调用的 - 当只剩下最后一个任务时,会存在竞争,是通过CAS来分配
3. CompletableFuture
CompletableFuture.supplyAsync()
CompletableFuture.allOf().join();
/**
* 提供一个服务,查询各大电商网站同一类产品的价格并汇总展示
*/
class T05_CompletableFuture {
static long start, end;
/**
* 同步执行
* After 183 sleep!
* After 391 sleep!
* After 255 sleep!
* use serial method call! 506
*/
@Test
public void T1_serial() {
start = System.currentTimeMillis();
priceOfTM();
priceOfTB();
priceOfJD();
end = System.currentTimeMillis();
System.out.println("use serial method call! " + (end - start));
}
/**
* 异步执行
* After 52 sleep!
* After 235 sleep!
* After 442 sleep!
* use completable future! 490
*/
@Test
public void T2_async() {
start = System.currentTimeMillis();
CompletableFuture<Double> futureTM = CompletableFuture.supplyAsync(T05_CompletableFuture::priceOfTM);
CompletableFuture<Double> futureTB = CompletableFuture.supplyAsync(T05_CompletableFuture::priceOfTB);
CompletableFuture<Double> futureJD = CompletableFuture.supplyAsync(T05_CompletableFuture::priceOfJD);
// 任务堆管理 allOf(), anyOf()
CompletableFuture.allOf(futureTM, futureTB, futureJD).join();
end = System.currentTimeMillis();
System.out.println("use completable future! " + (end - start));
}
/**
* API学习
*/
@Test
public void T3_lambda() {
CompletableFuture.supplyAsync(T05_CompletableFuture::priceOfTM)
.thenApply(String::valueOf)
.thenApply(str -> "price: " + str)
.thenAccept(System.out::println);
LockSupport.park();
}
// ----------------------------------------------------------------------
private static double priceOfTM() {
delay();
return 1.00;
}
private static double priceOfTB() {
delay();
return 2.00;
}
private static double priceOfJD() {
delay();
return 3.00;
}
private static double priceOfAmazon() {
delay();
throw new RuntimeException("product not exist!");
}
private static void delay() {
int time = new Random().nextInt(500);
ThreadHelper.sleepMilli(time);
System.out.printf("After %s sleep!\n", time);
}
}
5. ThreadPool-SC
@Slf4j
public class T03_ThreadPoolExecutor_SC {
@Test
public void T1_ThreadPoolExecutor() {
ThreadPoolExecutor tpl = new ThreadPoolExecutor(
2, // 1. 核心thread数。pool始终保持的thread数
4, // 2. pool最大thread数
60, // 3. 生存时间。非core_thread闲置,归还OS
TimeUnit.SECONDS, // 4. 生存单位
new ArrayBlockingQueue<>(4), // 5. 任务队列。各种各样存task的blockingQueue
Executors.defaultThreadFactory(), // 6. thread_factory。可自定义
new ThreadPoolExecutor.AbortPolicy()); // 7. 拒绝策略。thread_pool忙,队列满,进行拒绝
tpl.execute(() -> {
System.out.println("tpl = " + tpl);
});
}
// 双重循环。continue, break后加标签,直接控制外层循环
@Test
public void T2_fori() {
label:
for (int i = 0; i < 10; i++) {
System.out.println("i ===>>> " + i);
for (int j = 0; j < 5; j++) {
System.out.println("j = " + j);
if (j == 2) {
continue label;
}
}
}
System.out.println("ooxx");
}
}
- 构造方法
ThreadPoolExecutor
- 提交执行task,
execute()
- 新增worker,
addWorker()
- 核心线程执行逻辑,
runworker()
- 线程池worker任务单元
class Worker extends AbstractQueuedSynchronizer implements Runnable
public class ThreadPoolExecutor extends AbstractExecutorService {
// 1.1. 线程池状态,可以看做一个int类型的数字,高3位表示线程池状态,低29位表示worker数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 1.2. `Integer.SIZE`为32,所以`COUNT_BITS`为29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 1.3. 线程池允许的最大线程数。1左移29位,然后减1,即为 2^29 - 1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 1.4. 线程池有5种状态,大小排序如下:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 1.5. 获取线程池状态,通过按位与操作,低29位将全部变成0
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 1.6. 获取worker数量,通过按位与操作,高3位将全部变成0
private static int workerCountOf(int c) { return c & CAPACITY; }
// 1.7. 根据线程池状态、worker数量,生成ctl值
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 1.8. 线程池状态小于xx
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// 1.9. 线程池状态大于等于xx
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private final BlockingQueue<Runnable> workQueue;
private volatile long keepAliveTime;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<>();
// 构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 2.1. 基本类型参数校验
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
// 2.2. 空指针校验
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
// 2.3. 根据传入参数`unit`和`keepAliveTime`,将存活时间转换为纳秒存到变量`keepAliveTime`中
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
// 提交执行task
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 3.1. worker数量比核心线程数小,直接创建worker执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 3.2. worker数量超过核心线程数,任务直接进入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 线程池状态不是RUNNING状态,说明执行过shutdown命令,需要对新加入的任务执行reject()
// 这儿为什么需要recheck,是因为任务入队列前后,线程池的状态可能会发生变化
if (! isRunning(recheck) && remove(command))
reject(command);
// 核心线程数允许为0
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3.3. 线程池不是运行状态,或者任务进入队列失败,则尝试创建worker执行任务
// 3点需要注意:
// 1. 线程池不是运行状态时,addWorker内部会判断线程池状态
// 2. addWorker第2个参数表示是否创建核心线程
// 3. addWorker返回false,则说明任务执行失败,需要执行reject操作
else if (!addWorker(command, false))
reject(command);
}
// 新增worker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 4.1. 外层自旋
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 这个条件比较难懂,对其进行了调整,和下面的条件等价
// (rs > SHUTDOWN) || (rs == SHUTDOWN && firstTask != null) || (rs == SHUTDOWN && workQueue.isEmpty()
// 1. 线程池状态大于SHUTDOWN时,直接返回false
// 2. 线程池状态等于SHUTDOWN,且firstTask不为null,直接返回false
// 3. 线程池状态等于SHUTDOWN,且队列为空,直接返回false
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
// 4.2. 内层自旋
for (;;) {
int wc = workerCountOf(c);
// worker数量超过容量,直接返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 使用CAS的方式增加worker数量
// 若增加成功,则直接跳出外层循环进入到第二部分
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 线程池状态发生变化,对外层循环进行自旋
if (runStateOf(c) != rs)
continue retry;
// 其他情况,直接内层循环进行自旋即可
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// worker的添加必须是串行的,因此需要加锁
mainLock.lock();
try {
// 重新检查线程池状态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
// worker已经调用过了start()方法,则不再创建worker
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 4.3. 增加workers
workers.add(w);
// 更新`largestPoolSize`
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 4.4. 启动worker线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 4.5. worker线程启动失败。说明线程池状态发生了变化(关闭操作被执行),需要进行shutdown相关操作
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
// 5.1. 线程池worker任务单元
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 5.2. Worker的关键所在,使用线程工厂创建了一个线程。传入的参数为当前worker
this.thread = getThreadFactory().newThread(this);
}
public void run() {
// 6.1.. 核心线程执行逻辑
runWorker(this);
}
// 省略代码...
}
// 6.2. 核心线程执行逻辑`runworker()`
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// unlock()允许外部中断
w.unlock(); // allow interrupts
// 判断是否进入过自旋(while循环)
boolean completedAbruptly = true;
try {
// 自旋
// 1. 如果firstTask不为null,则执行firstTask
// 2. 如果firstTask为null,则调用getTask()从队列获取任务
// 3. 阻塞队列的特性就是:当队列为空时,当前线程会被阻塞等待
while (task != null || (task = getTask()) != null) {
// worker进行加锁
// 1. 降低锁范围,提升性能
// 2. 保证每个worker执行的任务是串行的
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果线程池正在停止,则对当前线程进行中断操作
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())
wt.interrupt();
try {
// 6.2. 执行任务,前置操作
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 6.3. 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 6.4. 执行任务,后置操作
afterExecute(task, thrown);
}
} finally {
// 帮助gc
task = null;
// 已完成任务数加一
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 自旋操作被退出,说明线程池正在结束
processWorkerExit(w, completedAbruptly);
}
}
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
}