07-ThreadPool

prime 英 [praɪm] 质数

concurrent 英 [kənˈkʌrənt] adj. 并存的,同时发生的

parallel 英 [ˈpærəlel] adj. 平行的;相似的,同时发生的;(计算机)并行的;并联的

steal 英 [stiːl] v. 偷窃,盗窃;剽窃,窃取(观点)

policy 英 /ˈpɒləsi/ n. 政策;方针;原则;保险单;为人之道

1. ExecutorService

image-20230506204833200
image-20230515203543309

完善了线程池的生命周期

image-20230506204924924
  1. FutureTask,既是Runnable,也是Future
    • Callable,jdk1.5后,增加Callable有返回值,对Runnable扩展
  2. ExecutorService.execute()无返回值
  3. ExecutorService.submit()有返回值
  4. 完善了整个任务执行器的执行周期
    1. shutdown():ThreadPool中task全执行完了才关闭
    2. shutdownNow():立刻关闭
    3. isShutdown()shutdown()执行过了,即为true
    4. isTerminated():task全完了,才为true
  5. Executor接口,任务的定义、运行分开
public class T01_ExecutorService {

    /*
     * 1. FutureTask,既是Runnable,也是Future
     *      Callable,jdk1.5后,增加Callable有返回值,对Runnable扩展
     */
    @Test
    public void T2_Callable_FutureTask() throws ExecutionException, InterruptedException {
        FutureTask<Integer> ft = new FutureTask<>(() -> {
            ThreadHelper.sleepSeconds(1);
            return 1_000;
        });

        new Thread(ft).start();
        System.out.println(ft.get()); // 阻塞
    }

    /*
     * 2. ExecutorService.execute() 无返回值
     */
    @Test
    public void T3_ExecutorService_execute() {
        ExecutorService es = Executors.newCachedThreadPool();

        Runnable runnable = () -> System.out.println("hello executor");
        es.execute(runnable);

        es.shutdown();
    }

    /*
     * 3. ExecutorService.submit() 有返回值
     */
    @Test
    public void T4_ExecutorService_submit() throws InterruptedException, ExecutionException {
        ExecutorService es = Executors.newCachedThreadPool();

        Callable<String> callable = () -> "Hello Callable";
        Future<String> future = es.submit(callable);    // 异步
        System.out.println(future.get());               // 阻塞

        es.shutdown();
    }

    /*
     * 4. 完善了整个任务执行器的执行周期
     *      1. shutdown():ThreadPool中task全执行完了才关闭
     *      2. shutdownNow():立刻关闭
     *      3. isShutdown():shutdown()执行过了,即为true
     *      4. isTerminated():task全完了,才为true
     */
    @Test
    public void T5_ExecutorService() {
        ExecutorService es = Executors.newFixedThreadPool(5); //execute submit
        for (int i = 0; i < 6; i++) {
            es.submit(() -> {
                ThreadHelper.sleepMilli(500);
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println(es);

        es.shutdown();
        System.out.println("es.isTerminated() = " + es.isTerminated()); // false
        System.out.println("es.isShutdown() = " + es.isShutdown());     // true
        System.out.println(es);

        ThreadHelper.sleepSeconds(5);
        System.out.println("es.isTerminated() = " + es.isTerminated()); // true
        System.out.println("es.isShutdown() = " + es.isShutdown());     // true
        System.out.println(es);
    }
}

/*
 * 5. Executor接口
 *      任务的定义、运行分开
 */
class MyExecutor implements Executor {

    @Test
    public void T1_Executor() {
        new MyExecutor().execute(() -> System.out.println("hello executor"));
    }

    /**
     * execute可以任意指定
     * 采用threadFactory……
     */
    @Override
    public void execute(Runnable cmd) {
        new Thread(cmd).start();
        // cmd.run();
    }

}








 
 
 
 


 









 
 











 
 
 






















 
 
 

















 







 
 
 
 


2. ThreadPoolExecutor

  1. ThreadPoolExecutor
    • Thread共享一个task_queue
  2. ForkJoinPool
    • 每个Thread有一个task_queue
    • 分解、汇总任务

1. ctor

7个重要参数

new ThreadPoolExecutor(
    2,                                     // 1. 核心thread数。pool始终保持的thread数
    4,                                     // 2. pool最大thread数
    60,                                    // 3. 生存时间。非core_thread闲置,归还OS
    TimeUnit.SECONDS,                      // 4. 生存单位
    new ArrayBlockingQueue<>(4),           // 5. 任务队列。各种各样存task的blockingQueue
    Executors.defaultThreadFactory(),      // 6. thread_factory。可自定义
    new ThreadPoolExecutor.AbortPolicy()); // 7. 拒绝策略。thread_pool忙,队列满,进行拒绝

 
 
 
 
 
 
 

2. 拒绝策略

  1. Abort:抛异常
  2. CallerRuns:调用者执行
  3. DiscardOldest:放弃排队最久的
  4. Discard:放弃执行

3. DefaultThreadFactory

public Thread newThread(Runnable r) {
    Thread t = new Thread(group, r,
                          namePrefix + threadNumber.getAndIncrement(),
                          0);                       // 1. 线程名称自定义
    if (t.isDaemon())
        t.setDaemon(false);                         // 2. 设置非守护线程
    if (t.getPriority() != Thread.NORM_PRIORITY)
        t.setPriority(Thread.NORM_PRIORITY);        // 3. 设置优先级。设置了也没用
    return t;
}



 

 

 


4. 自定义拒绝策略

/*
 * 线程池两种:
 * 1. ThreadPoolExecutor(多个thread共享同一个task_queue)
 * 2. ForkJoinPool(每个thread有自己的task_queue)
 *      1. 分解汇总的任务
 *      2. 用很少的thread可以执行很多任务(子任务)TPE做不到先执行子任务
 *      3. CPU密集型
 */
@Slf4j
public class T02_ThreadPoolExecutor {
    ThreadPoolExecutor tpe = null;

    private void ooxx() {
        for (int i = 0; i < 8; i++) {
            tpe.execute(new Task(i));
        }
        System.out.println(tpe.getQueue());
        tpe.execute(new Task(100));
        System.out.println(tpe.getQueue());
        tpe.shutdown();
    }

    /*
     * 拒绝策略
     * 1. Abort:抛异常
     * 2. CallerRuns:调用者执行
     * 3. DiscardOldest:放弃排队最久的
     * 4. Discard:放弃执行
     */
    @Test
    public void T1_AbortPolicy() {
        /*
         * 阿里手册里线程池要自定义
         * 定义thread_pool七个参数
         * thread_pool维护两个集合,thread、task
         *
         * ThreadPool工作流程
         *      1. 默认pool为空
         *      2. corePoolSize => queue => maxPoolSize
         *      3. RejectedExecutionHandler,拒绝策略
         */
        tpe = new ThreadPoolExecutor(
                2,                                      // 1. 核心thread数。pool始终保持的thread数
                4,                                      // 2. pool最大thread数
                60,                                     // 3. 生存时间。非core_thread闲置,归还OS
                TimeUnit.SECONDS,                       // 4. 生存单位
                new ArrayBlockingQueue<>(4),            // 5. 任务队列。各种各样存task的blockingQueue
                Executors.defaultThreadFactory(),       // 6. thread_factory。可自定义
                new ThreadPoolExecutor.AbortPolicy());  // 7. 拒绝策略。thread_pool忙,队列满,进行拒绝
        ooxx();
    }

    /**
     * 2. CallerRunsPolicy,调用者执行
     */
    @Test
    public void T2_CallerRunsPolicy() {
        tpe = new ThreadPoolExecutor(
                2,
                4,
                60,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(4),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy());
        ooxx();
    }

    /**
     * 3. DiscardOldestPolicy,放弃排队最久的
     */
    @Test
    public void T3_DiscardOldestPolicy() {
        tpe = new ThreadPoolExecutor(
                2,
                4,
                60,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(4),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy());
        ooxx();
    }

    /**
     * 4. DiscardPolicy,放弃执行
     */
    @Test
    public void T4_DiscardPolicy() {
        tpe = new ThreadPoolExecutor(
                2,
                4,
                60,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(4),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardPolicy());
        ooxx();
    }

    /**
     * 5. 自定义拒绝策略
     */
    @Test
    public void T5_MyRejectedHandler() {
        tpe = new ThreadPoolExecutor(
                4,
                4,
                0,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(4),
                Executors.defaultThreadFactory(),
                new MyHandler());
        ooxx();
    }
}

@Slf4j
@ToString
@AllArgsConstructor
class Task implements Runnable {
    private final int i;

    @Override
    public void run() {
        log.debug("Task:" + i);
        LockSupport.park();
    }
}

/**
 * 自定义拒绝策略
 */
class MyHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor tpe) {
        // log("r rejected") 日志记录
        // save r => kafka、mysql、redis
        // try 3 times
        if (tpe.getQueue().size() < 10000) {
            // try put again();
            System.out.println("try put again");
        }
    }
}
















































 















 















 















 















 























 
 
 
 
 
 
 
 
 

3. Executors

线程池工厂,Executors线程池工具类

1. SingleThreadExecutor

  • 单线程线程池?
    1. 维护任务顺序队列
    2. 提供thread生命周期管理

2. CachedThreadPool

  • 没有核心thread,来一个task开辟一个线程

3. FixedThreadPool

固定数量线程的ThreadPool

image-20220705213228675

4. ScheduledThreadPool

  • 定时任务线程池
    1. quartz:定时任务框架
    2. cron:需要shell脚本功底
  • interview:假如提供一个闹钟服务,订阅这个服务的人特别多,10亿人,怎么优化?

5. WorkStealingPool

  1. thread_pool本质:thread_queue不断执行task_queue中的task
  2. WorkStealingPool:每个thread都有单独的task_queue,task_queue执行空了,去其他task_queue取task
/*
 * jdk提供的默认的ThreadPool,Executors线程工具类
 */
@Slf4j
public class T03_Executors {

    /*
     * 1. newSingleThreadExecutor()
     *      单thread串行执行,加入的task一直排队(LinkedBlockingQueue)等待,同一个core_thread来执行
     *
     *    单线程线程池?
     *      1. 维护任务顺序队列
     *      2. 提供thread生命周期管理
     *
     * new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
     */
    @Test
    public void T1_SingleThreadExecutor() {
        ExecutorService es = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            final int j = i;
            es.execute(() -> log.error(String.valueOf(j)));
        }
    }

    /*
     * 2. newCachedThreadPool()
     *      一个task,一个thread。没有core_thread,来一个task开辟一个线程
     *
     * new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
     */
    @Test
    public void T2_CachedThreadPool() {
        ExecutorService es = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            final int j = i;
            es.execute(() -> log.error(String.valueOf(j)));
        }
        System.out.println(es);
    }

    /*
     * 2. newFixedThreadPool()
     *      固定数量线程的ThreadPool
     *
     * new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
     */
    @Test
    public void T3_FixedThreadPool() throws InterruptedException, ExecutionException {
        long start = System.currentTimeMillis();
        List<Integer> primes = MyTask.getPrime(1, 200_000);
        System.out.println("serial_prime个数:" + primes.size());
        long end = System.currentTimeMillis();
        System.out.println("serial耗时:" + (end - start));

        // 4核cpu
        ExecutorService es = Executors.newFixedThreadPool(4);

        MyTask t1 = new MyTask(1, 80000);        // 1-5 5-10 10-15 15-20
        MyTask t2 = new MyTask(80001, 130000);
        MyTask t3 = new MyTask(130001, 170000);
        MyTask t4 = new MyTask(170001, 200000);

        Future<List<Integer>> f1 = es.submit(t1);
        Future<List<Integer>> f2 = es.submit(t2);
        Future<List<Integer>> f3 = es.submit(t3);
        Future<List<Integer>> f4 = es.submit(t4);

        start = System.currentTimeMillis();
        System.out.println("ThreadPool_prime个数:" + (f1.get().size() + f2.get().size() + f3.get().size() + f4.get().size()));
        end = System.currentTimeMillis();
        System.out.println("ThreadPool耗时:" + (end - start));
    }

    /*
     * 3. newScheduledThreadPool(),定时任务线程池
     *
     *  public ScheduledThreadPoolExecutor(int corePoolSize) {
     *      super(corePoolSize, Integer.MAX_VALUE, 10L, TimeUnit.MILLISECONDS, new DelayedWorkQueue());
     *  }
     *
     *      1. schedule()
     *      2. scheduleAtFixedRate()
     *      3. scheduleWithFixedDelay()
     */
    @Test
    public void T4_ScheduledExecutorService() {
        ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);
        ses.scheduleAtFixedRate(() -> {
                    ThreadHelper.sleepSeconds(3);
                    log.debug("ooxx");
                },                       // Runnable()
                0,                       // 初始延迟
                500,                     // 周期
                TimeUnit.MILLISECONDS);  // 周期单位

        LockSupport.park();
    }

    /**
     * 4. newWorkStealingPool()
     */
    @Test
    public void T5_WorkStealingPool() {
        // 指定并行thread数
        ExecutorService es = Executors.newWorkStealingPool(2);
        for (int i = 1; i < 10; i++) {
            int finalI = i;
            es.execute(() -> {
                System.out.println(finalI);
                ThreadHelper.sleepSeconds(1);
            });
        }
        LockSupport.park();
    }
}

@AllArgsConstructor
class MyTask implements Callable<List<Integer>> {
    int startPos, endPos;

    @Override
    public List<Integer> call() {
        return getPrime(startPos, endPos);
    }

    /**
     * 判断是否是质数
     */
    static boolean isPrime(int num) {
        for (int i = 2; i <= num / 2; i++) {
            if (num % i == 0) return false;
        }
        return true;
    }

    /**
     * 获取指定范围内的质数
     */
    static List<Integer> getPrime(int start, int end) {
        List<Integer> results = new ArrayList<>();
        for (int i = start; i <= end; i++) {
            if (isPrime(i)) results.add(i);
        }
        return results;
    }
}


















 














 






















 






























 
 
 
 
 
 
 
 










 

















 























6. jconsole

  • 使用JDK自带的监控工具来监控创建的线程数量,运行一个不终止的线程,创建指定量的线程,来观察:
  • 工具目录:C:\Program Files\Java\jdk1.6.0_06\bin\jconsole.exe
image-20230911104106478
image-20230911104122596

4. ForkJoinPool

  1. 无返回值,extends RecursiveAction
  2. 有返回值,extends RecursiveTask
  3. Executors.newWorkStealingPool(2);
  4. List.parallelStream().forEach();
/**
 * ForkJoinPool
 * 1_000_000个随机数相加
 */
@Slf4j
public class T04_ForkJoinPool {
    static int[] nums = new int[1_000_000];
    static final int MAX_NUM = 50_000;
    static Random r = new Random();
    static AtomicInteger atomicInt = new AtomicInteger(0);

    static {
        for (int i = 0; i < nums.length; i++) {
            nums[i] = r.nextInt(100);
        }
        // single_thread求和,stream_api
        System.out.println("serial_result: " + Arrays.stream(nums).sum());
    }

    /*
     * 1.1. extends RecursiveAction, extends ForkJoinTask<Void>
     */
    @AllArgsConstructor
    static class TaskVoid extends RecursiveAction {
        int start, end;

        @Override
        protected void compute() {
            if (end - start <= MAX_NUM) {
                for (int i = start; i < end; i++) atomicInt.addAndGet(nums[i]);
            } else {
                int middle = start + (end - start) / 2;
                TaskVoid subTask1 = new TaskVoid(start, middle);
                TaskVoid subTask2 = new TaskVoid(middle, end);
                subTask1.fork();
                subTask2.fork();
            }
        }
    }

    /**
     * 1. 无返回值
     */
    @Test
    public void T1_fjp_void() {
        ForkJoinPool fjp = new ForkJoinPool();
        TaskVoid task = new TaskVoid(0, nums.length);
        // 守护线程
        fjp.execute(task);
        ThreadHelper.sleepSeconds(3);
        System.out.println("fjp_void: " + atomicInt.get());
    }

    // ---------------------------------------------------------------------

    /*
     * 2.2. extends RecursiveTask<Long>, extends ForkJoinTask<V>
     */
    @AllArgsConstructor
    static class TaskReturn extends RecursiveTask<Long> {
        int start, end;

        @Override
        protected Long compute() {
            if (end - start <= MAX_NUM) {
                long sum = 0L;
                for (int i = start; i < end; i++) sum += nums[i];
                return sum;
            }
            int middle = start + (end - start) / 2;
            TaskReturn subTask1 = new TaskReturn(start, middle);
            TaskReturn subTask2 = new TaskReturn(middle, end);
            subTask1.fork();
            subTask2.fork();
            return subTask1.join() + subTask2.join();
        }
    }

    /**
     * 2.1. 有返回值
     */
    @Test
    public void T2_fjp_return() {
        ForkJoinPool fjp = new ForkJoinPool();
        TaskReturn task = new TaskReturn(0, nums.length);
        fjp.execute(task);
        long result = task.join();
        System.out.println("fjp_return: " + result);
    }

    // ---------------------------------------------------------------------

    /*
     * 3. WorkStealingPool:每个thread都有一个task_queue,task_queue执行空了,去其他task_queue取task
     */
    @Test
    public void T3_WorkStealingPool() {
        // ExecutorService es = Executors.newWorkStealingPool();
        System.out.println(Runtime.getRuntime().availableProcessors()); // 硬件线程数

        // 指定并行thread数
        ExecutorService es = Executors.newWorkStealingPool(2);
        for (int i = 1; i < 10; i++) {
            int finalI = i;
            es.execute(() -> {
                System.out.println(finalI);
                ThreadHelper.sleepSeconds(1);
            });
        }
        LockSupport.park();
    }
/*
 本质:ForkJoinPool
 public static ExecutorService newWorkStealingPool() {
     return new ForkJoinPool
         (Runtime.getRuntime().availableProcessors(),
          ForkJoinPool.defaultForkJoinWorkerThreadFactory,
          null, true);
 }
 */

    /**
     * 底层ForkJoinPool
     */
    @Test
    public void T4_ParallelStream_API() {
        final List<Integer> integers = new ArrayList<>(1_000_000);
        for (int i = 0; i < 1_000_000; i++) {
            integers.add(RandomUtil.randomInt(1_000));
        }

        long start, end;
        start = System.currentTimeMillis();
        // 1. forEach
        for (int one : integers) {
            isPrime(one);
        }
        end = System.currentTimeMillis();
        System.out.println("forEach: " + (end - start));

        // ---------------------------------------------------------------------

        start = System.currentTimeMillis();

        // 2. parallelStream()
        integers.parallelStream().forEach(this::isPrime);
        end = System.currentTimeMillis();

        System.out.println("parallelStream: " + (end - start));
    }

    void isPrime(Integer num) {
        for (int i = 2; i <= num / 2; i++) {
            if (num % i == 0) return;
        }
    }
}
















 















 
 
 
 









 
 

 





















 
 
 
 












 














 











































 











1. 并发、并行

  1. 并发是指任务提交
  2. 并行指任务执行。并行是并发的子集。多个cpu同一时刻执行任务

2. WorkStealingPool

  1. 每个thread都有自己的WorkQueue,WorkQueue是一个双端队列,它是线程私有的
  2. ForkJoinTask中fork的子任务,将放入运行该任务的工作线程的队头,工作线程将以LIFO的顺序来处理工作队列中的任务
  3. 最大化地利用CPU,空闲thread从其它thread的WorkQueue尾部窃取任务,减少竞争
  4. 双端队列的操作:push()/pop()仅在其所有者工作线程中调用,poll()由其它线程窃取任务时调用的
  5. 当只剩下最后一个任务时,会存在竞争,是通过CAS来分配

3. CompletableFuture

  1. CompletableFuture.supplyAsync()
  2. CompletableFuture.allOf().join();
/**
 * 提供一个服务,查询各大电商网站同一类产品的价格并汇总展示
 */
class T05_CompletableFuture {
    static long start, end;

    /**
     * 同步执行
     * After 183 sleep!
     * After 391 sleep!
     * After 255 sleep!
     * use serial method call! 506
     */
    @Test
    public void T1_serial() {
        start = System.currentTimeMillis();

        priceOfTM();
        priceOfTB();
        priceOfJD();

        end = System.currentTimeMillis();
        System.out.println("use serial method call! " + (end - start));
    }

    /**
     * 异步执行
     * After 52 sleep!
     * After 235 sleep!
     * After 442 sleep!
     * use completable future! 490
     */
    @Test
    public void T2_async() {
        start = System.currentTimeMillis();

        CompletableFuture<Double> futureTM = CompletableFuture.supplyAsync(T05_CompletableFuture::priceOfTM);
        CompletableFuture<Double> futureTB = CompletableFuture.supplyAsync(T05_CompletableFuture::priceOfTB);
        CompletableFuture<Double> futureJD = CompletableFuture.supplyAsync(T05_CompletableFuture::priceOfJD);

        // 任务堆管理 allOf(), anyOf()
        CompletableFuture.allOf(futureTM, futureTB, futureJD).join();

        end = System.currentTimeMillis();
        System.out.println("use completable future! " + (end - start));
    }

    /**
     * API学习
     */
    @Test
    public void T3_lambda() {
        CompletableFuture.supplyAsync(T05_CompletableFuture::priceOfTM)
                .thenApply(String::valueOf)
                .thenApply(str -> "price: " + str)
                .thenAccept(System.out::println);

        LockSupport.park();
    }

    // ----------------------------------------------------------------------

    private static double priceOfTM() {
        delay();
        return 1.00;
    }

    private static double priceOfTB() {
        delay();
        return 2.00;
    }

    private static double priceOfJD() {
        delay();
        return 3.00;
    }

    private static double priceOfAmazon() {
        delay();
        throw new RuntimeException("product not exist!");
    }

    private static void delay() {
        int time = new Random().nextInt(500);
        ThreadHelper.sleepMilli(time);
        System.out.printf("After %s sleep!\n", time);
    }

}




































 
 
 


 










 
 
 
 

































5. ThreadPool-SC

@Slf4j
public class T03_ThreadPoolExecutor_SC {

    @Test
    public void T1_ThreadPoolExecutor() {
        ThreadPoolExecutor tpl = new ThreadPoolExecutor(
                2,                                      // 1. 核心thread数。pool始终保持的thread数
                4,                                      // 2. pool最大thread数
                60,                                     // 3. 生存时间。非core_thread闲置,归还OS
                TimeUnit.SECONDS,                       // 4. 生存单位
                new ArrayBlockingQueue<>(4),            // 5. 任务队列。各种各样存task的blockingQueue
                Executors.defaultThreadFactory(),       // 6. thread_factory。可自定义
                new ThreadPoolExecutor.AbortPolicy());  // 7. 拒绝策略。thread_pool忙,队列满,进行拒绝

        tpl.execute(() -> {
            System.out.println("tpl = " + tpl);
        });
    }

    // 双重循环。continue, break后加标签,直接控制外层循环
    @Test
    public void T2_fori() {
        label:
        for (int i = 0; i < 10; i++) {
            System.out.println("i ===>>> " + i);
            for (int j = 0; j < 5; j++) {
                System.out.println("j = " + j);
                if (j == 2) {
                    continue label;
                }
            }
        }
        System.out.println("ooxx");
    }

}






















 





 







  1. 构造方法ThreadPoolExecutor
  2. 提交执行task,execute()
  3. 新增worker,addWorker()
  4. 核心线程执行逻辑,runworker()
  5. 线程池worker任务单元
    • class Worker extends AbstractQueuedSynchronizer implements Runnable
public class ThreadPoolExecutor extends AbstractExecutorService {

    // 1.1. 线程池状态,可以看做一个int类型的数字,高3位表示线程池状态,低29位表示worker数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 1.2. `Integer.SIZE`为32,所以`COUNT_BITS`为29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 1.3. 线程池允许的最大线程数。1左移29位,然后减1,即为 2^29 - 1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    // 1.4. 线程池有5种状态,大小排序如下:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 1.5. 获取线程池状态,通过按位与操作,低29位将全部变成0
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 1.6. 获取worker数量,通过按位与操作,高3位将全部变成0
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 1.7. 根据线程池状态、worker数量,生成ctl值
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    // 1.8. 线程池状态小于xx
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
    // 1.9. 线程池状态大于等于xx
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private volatile int corePoolSize;
    private volatile int maximumPoolSize;
    private final BlockingQueue<Runnable> workQueue;
    private volatile long keepAliveTime;
    private volatile ThreadFactory threadFactory;
    private volatile RejectedExecutionHandler handler;

	private final ReentrantLock mainLock = new ReentrantLock();
    private final HashSet<Worker> workers = new HashSet<>();

  	// 构造方法
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        // 2.1. 基本类型参数校验
        if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
            throw new IllegalArgumentException();
        // 2.2. 空指针校验
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        // 2.3. 根据传入参数`unit`和`keepAliveTime`,将存活时间转换为纳秒存到变量`keepAliveTime`中
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }


  	// 提交执行task
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        // 3.1. worker数量比核心线程数小,直接创建worker执行任务
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 3.2. worker数量超过核心线程数,任务直接进入队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 线程池状态不是RUNNING状态,说明执行过shutdown命令,需要对新加入的任务执行reject()
            // 这儿为什么需要recheck,是因为任务入队列前后,线程池的状态可能会发生变化
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 核心线程数允许为0
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 3.3. 线程池不是运行状态,或者任务进入队列失败,则尝试创建worker执行任务
        // 	    3点需要注意:
        // 			1. 线程池不是运行状态时,addWorker内部会判断线程池状态
        // 			2. addWorker第2个参数表示是否创建核心线程
        // 			3. addWorker返回false,则说明任务执行失败,需要执行reject操作
        else if (!addWorker(command, false))
            reject(command);
    }

  	// 新增worker
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        // 4.1. 外层自旋
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 这个条件比较难懂,对其进行了调整,和下面的条件等价
            // (rs > SHUTDOWN) || (rs == SHUTDOWN && firstTask != null) || (rs == SHUTDOWN && workQueue.isEmpty()
            // 1. 线程池状态大于SHUTDOWN时,直接返回false
            // 2. 线程池状态等于SHUTDOWN,且firstTask不为null,直接返回false
            // 3. 线程池状态等于SHUTDOWN,且队列为空,直接返回false
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                return false;

            // 4.2. 内层自旋
            for (;;) {
                int wc = workerCountOf(c);
                // worker数量超过容量,直接返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 使用CAS的方式增加worker数量
                // 若增加成功,则直接跳出外层循环进入到第二部分
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 线程池状态发生变化,对外层循环进行自旋
                if (runStateOf(c) != rs)
                    continue retry;
                // 其他情况,直接内层循环进行自旋即可
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // worker的添加必须是串行的,因此需要加锁
                mainLock.lock();
                try {
                    // 重新检查线程池状态
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        // worker已经调用过了start()方法,则不再创建worker
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();

						// 4.3. 增加workers
                        workers.add(w);
                        // 更新`largestPoolSize`
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 4.4. 启动worker线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 4.5. worker线程启动失败。说明线程池状态发生了变化(关闭操作被执行),需要进行shutdown相关操作
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

  	// 5.1. 线程池worker任务单元
    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            // 5.2. Worker的关键所在,使用线程工厂创建了一个线程。传入的参数为当前worker
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
          	// 6.1.. 核心线程执行逻辑
            runWorker(this);
        }
        // 省略代码...
    }

  	// 6.2. 核心线程执行逻辑`runworker()`
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // unlock()允许外部中断
        w.unlock(); // allow interrupts
        // 判断是否进入过自旋(while循环)
        boolean completedAbruptly = true;
        try {
            // 自旋
            // 1. 如果firstTask不为null,则执行firstTask
            // 2. 如果firstTask为null,则调用getTask()从队列获取任务
            // 3. 阻塞队列的特性就是:当队列为空时,当前线程会被阻塞等待
            while (task != null || (task = getTask()) != null) {
                // worker进行加锁
                // 1. 降低锁范围,提升性能
                // 2. 保证每个worker执行的任务是串行的
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // 如果线程池正在停止,则对当前线程进行中断操作
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                        && !wt.isInterrupted())
                    wt.interrupt();

                try {
                  	// 6.2. 执行任务,前置操作
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                      	// 6.3. 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                      	// 6.4. 执行任务,后置操作
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 帮助gc
                    task = null;
                    // 已完成任务数加一
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 自旋操作被退出,说明线程池正在结束
            processWorkerExit(w, completedAbruptly);
        }
    }

    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

}