02-thread

1. 线程历史

CPU性能压榨的血泪史

  1. 单进程人工切换
    • 纸带机
  2. 多进程批处理
    • 多个任务批量执行。进程串行
  3. 多进程并行处理
    • 把程序写在不同的内存位置上来回切换。进程并行,cpu切换
  4. 多线程
    • 一个程序内部不同任务的来回切换
    • selector - epoll
  5. 纤程/协程
    • 绿色线程,用户管理(不是OS管理)线程

2. 进程, 线程, 程序

  1. 什么是程序?
  2. 什么是进程?
  3. 什么是线程?
  4. 什么是纤程/协程?

1. 程序、进程

  1. 程序:OS可以执行的文件。eg:QQ.exe
  2. 进程:OS进行资源分配的基本单位。静态概念
    • eg:分配内存、硬盘……

OS将程序load到内存中运行。一个程序可以有多个进程。eg:QQ双开

image-20220705193650549

2. 线程

  1. 线程调度执行的基本单位。动态概念
    • 一个程序里的不同的执行路径
    • 多个线程共享同一个进程的资源。大多问题就是出现在共享资源里
  2. 程序进程开始执行,以线程为单位来执行。OS会找到主线程,给cpu执行,主线程中间开启了其他线程,再进行线程间的切换
    • 单线程,主线程。eg:main()
    • 多线程:主线程产生了分支,同时进行

3. 线程切换

CPU组成单元:

  1. ALU计算单元
  2. 寄存器组,用来存储数据
  3. PC(program_counter)也是一种寄存器,用来存储执行到哪条指令

  1. T1运行,将其指令和data放入cpu,cpu执行
  2. OS线程调度算法,将T1(指令和data)存入cache。运行T2(cpu很傻,只会根据指令、数据计算结果)
  3. 线程切换(context_switch)需要消耗资源,由OS进行
image-20220705193828744

4. interview

1. 单核CPU多线程是否有意义?

  • 一个核cpu在同一时间只能跑一个thread
  • 为了充分利用cpu。一个thread可能在sleep或进行IO,可以先切到另一个thread执行
  • cpu密集型线程:大量做计算,cpu利用率高
  • IO密集型线程:大量等待IO

2. 线程数是不是越大越好?

线程间的切换要消耗资源

/**
 * 100_000_000个随机double的和
 * 单线程、2线程、10000线程
 */
@Slf4j
public class T01_MultiVsSingle_Thread {
    private static final double[] doubles = new double[100_000_000];
    private static final Random r = new Random();
    private static final DecimalFormat df = new DecimalFormat("0.00");

    static {
        for (int i = 0; i < doubles.length; i++) {
            doubles[i] = r.nextDouble();
        }
    }

    /**
     * 1. 单线程
     */
    private void _single() {
        long start = System.currentTimeMillis();

        double result = 0.0;
        for (double num : doubles) {
            result += num;
        }

        long end = System.currentTimeMillis();
        System.out.println("m1: " + (end - start) + " result = " + df.format(result));
    }

    static double result1 = 0.0, result2 = 0.0, result = 0.0;

    /**
     * 2. 线程
     */
    private void _two() throws Exception {

        Thread t1 = new Thread(() -> {
            for (int i = 0; i < doubles.length / 2; i++) {
                result1 += doubles[i];
            }
        });
        Thread t2 = new Thread(() -> {
            for (int i = doubles.length / 2; i < doubles.length; i++) {
                result2 += doubles[i];
            }
        });

        long start = System.currentTimeMillis();
        ThreadHelper.start(t1, t2);
        ThreadHelper.join(t1, t2);

        result = result1 + result2;

        long end = System.currentTimeMillis();
        System.out.println("m2: " + (end - start) + " result = " + df.format(result));
    }

    /**
     * 3. custom线程
     * n必须能被doubles.length整除,即segmentCount必须为整数
     */
    private void _custom(int n) throws Exception {
        Thread[] threads = new Thread[n];
        double[] results = new double[n];
        final int segmentCount = doubles.length / n;
        CountDownLatch latch = new CountDownLatch(n);

        for (int i = 0; i < n; i++) {
            int m = i;
            threads[i] = new Thread(() -> {
                for (int j = m * segmentCount; j < (m + 1) * segmentCount; j++) {
                    results[m] += doubles[j];
                }
                latch.countDown();
            });
        }

        double resultM3 = 0.0;

        long start = System.currentTimeMillis();

        ThreadHelper.start(threads);
        latch.await();

        for (double v : results) {
            resultM3 += v;
        }
        long end = System.currentTimeMillis();
        System.out.println("m3: " + (end - start) + " result = " + df.format(resultM3));
    }

    @Test
    public void T1_single_vs_multi() throws Exception {
        // m1: 107 result = 49997084.77
        _single();

        // m2: 64 result = 49997084.77
        _two();

        // n必须能被doubles.length整除
        // m3: 780 result = 49996797.68
        int n = 10_000;
        _custom(n);
    }
}






































 
 
 
 
 
 
 
 
 
 























 
 
 
 
 
 






























3. 线程数多少最合适?

# 1. cpu型号
# machdep.cpu.brand_string: Intel(R) Core(TM) i7-8700B CPU @ 3.20GHz
sysctl machdep.cpu.brand_string

# 2. cpu物理核心数
# hw.physicalcpu: 6
sysctl hw.physicalcpu

# 3. cpu逻辑核心数
# hw.logicalcpu: 12
sysctl hw.logicalcpu
  1. 服务器时时刻刻都有线程在跑,所以你设置的线程不能把所有的core都利用上
  2. 服务安全的角度,cpu性能利用到80%
  3. 压测来确定
    1. 性能测试工具(profiler统称)=> eg:Jprofiler
    2. 生产环境,用阿里arthas分析 => arthas官网open in new window
  4. 测试出W/C,等待时间与计算时间的比率——《Java并发编程实战》
image-20230501095653104

5. Thread

1. Concept

  1. 同步:new T1().run();
  2. 异步:new T1().start();
/**
 * 线程启动的方式
 * 1. 同步:new T1().run();
 * 2. 异步:new T1().start();
 */
public class T00_WhatIsThread {

    /**
     * start()异步执行
     */
    @Test
    public void T1_start() {
        new T().start();
        for (int i = 0; i < 10; i++) {
            ThreadHelper.sleepSeconds(1);
            System.out.println("main: " + i);
        }
    }

    /**
     * run()同步执行
     */
    @Test
    public void T2_run() {
        new T().run();
        for (int i = 0; i < 10; i++) {
            ThreadHelper.sleepSeconds(1);
            System.out.println("main: " + i);
        }
    }
}

/**
 * T1线程
 */
class T extends Thread {
    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            ThreadHelper.sleepSeconds(1);
            System.out.println("T1: " + i);
        }
    }
}












 











 










 








2. Create

  1. extends Thread
  2. implements Runnable
  3. lambda
  4. implements Callable => FutureTask
  5. ThreadPool(是否return)
    • ExecutorService.execute()
    • ExecutorService.submit()
@Slf4j
public class T02_Thread_Create {

    /**
     * 1. extends Thread
     */
    @Test
    public void T1_extends() {
        MyThread t = new MyThread();
        t.start();
        ThreadHelper.join(t);
    }

    /**
     * 2. implements Runnable
     *      比extends更合理,java单继承
     */
    @Test
    public void T2_Runnable() {
        Thread t = new Thread(new MyRunnable());
        t.start();
        ThreadHelper.join(t);
    }

    /**
     * 3. lambda
     */
    @Test
    public void T3_lambda() {
        Thread t = new Thread(() -> {
            log.error("Hello Lambda!");
        });
        t.start();
        ThreadHelper.join(t);
    }

    /**
     * 4. implements Callable
     */
    @Test
    public void T4_Callable() throws ExecutionException, InterruptedException {
        /*
         * FutureTask,既是Runnable,也是Future
         *      FutureTask<V> implements RunnableFuture<V>
         *      RunnableFuture<V> extends Runnable, Future<V>
         */
        FutureTask<String> task = new FutureTask<>(new MyCallable());

        new Thread(task).start();

        // ...业务操作

        log.error(task.get()); // 阻塞类型,等线程返回值
    }

    /**
     * 5. execute(),无返回值
     */
    @Test
    public void T5_ThreadPool_execute() throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();
        es.execute(() -> {
            ThreadHelper.sleepSeconds(3);
            log.error("Hello ThreadPool!");
        });

        // ExecutorService停止接收task,等待已经提交的task执行完成。当所有提交task执行完毕,线程池即被关闭
        es.shutdown();
        // 等待timeout后,监测ExecutorService是否已经关闭,若关闭true,否则false
        while (!es.awaitTermination(1, TimeUnit.SECONDS)) {
            System.out.println("ThreadPool ==>> 运行");
        }
        System.out.println("ThreadPool ==>> 关闭");
    }

    /**
     * 6. submit(),有返回值
     */
    @Test
    public void T6_ThreadPool_submit() throws ExecutionException, InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();
        // 可以执行runnable、callable
        Future<String> f = es.submit(new MyCallable());

        // ...业务操作

        log.error(f.get());// 阻塞类型,等线程返回值
        es.shutdown();
    }
}

@Slf4j
class MyThread extends Thread {
    @Override
    public void run() {
        log.error("Hello MyThread!");
    }
}

@Slf4j
class MyRunnable implements Runnable {
    @Override
    public void run() {
        log.error("Hello MyRunnable!");
    }
}

@Slf4j
class MyCallable implements Callable<String> {
    @Override
    public String call() {
        log.error("Hello MyCallable!");
        return "success";
    }
}








 










 









 
 
 














 














 
 
 
 

















 









 







 







 






3. API

  1. sleep():线程状态:timed waiting
  2. yield():线程状态:running => ready。当前线程停止,随机切到下个线程
  3. join():当前thread进行waiting,等待join_thread结束
public class T03_Thread_API {

    /**
     * 1. sleep():线程状态:timed waiting
     */
    @Test
    public void T1_sleep() {
        Thread t = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                System.out.println("A" + i);
                try {
                    Thread.sleep(500);
                    // TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        t.start();
        ThreadHelper.join(t);
    }

    /**
     * 2. yield():线程状态:running => ready。当前线程停止,随机切到下个线程
     */
    @Test
    public void T2_yield() {
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                System.out.println("A" + i);
                if (i % 10 == 0) {
                    // 让出cpu
                    Thread.yield();
                }
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                System.out.println("---- >>>> B" + i);
                if (i % 10 == 0) {
                    Thread.yield();
                }
            }
        });

        ThreadHelper.start(t1, t2);
        ThreadHelper.join(t1, t2);
    }

    /**
     * 3. join():当前thread进行waiting,等待join_thread结束
     */
    @Test
    public void T3_join() {
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                System.out.println("t1: " + i);
                ThreadHelper.sleepMilli(500);
            }
        });

        Thread t2 = new Thread(() -> {
            // t1线程,运行结束。t2继续
            try {
                t1.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            for (int i = 0; i < 10; i++) {
                System.out.println("T2: " + i);
                ThreadHelper.sleepMilli(500);
            }
        });

        t1.start();
        ThreadHelper.sleepSeconds(1);
        t2.start();

        ThreadHelper.join(t1, t2);
    }
}











 





















 








 























 

















4. State

Linux中线程和进程区别不大

  1. NEW:线程刚刚创建,还没有启动
  2. RUNNABLE:可运行状态,由线程调度器可以安排执行
    1. READY:准备被调用(等待被cpu占用)
    2. RUNNING:正在运行(正在被cpu占用)
  3. WAITING:等待被唤醒——ReentrantLock、LockSupport
  4. TIMED WAITING:隔一段时间后自动唤醒
  5. BLOCKED:被阻塞,正在等待锁——synchronized
  6. TERMINATED:线程结束

线程状态迁移图

image-20230509201816438

linux中,thread理解为轻量的process

/**
 * ThreadState
 */
@Slf4j
public class T04_ThreadState {

    @Test
    public void T1_NEW_RUNNABLE_TERMINATED() throws InterruptedException {
        Thread t = new Thread(() -> {
            log.debug("2: " + Thread.currentThread().getState()); // 2: RUNNABLE
            for (int i = 0; i < 3; i++) {
                ThreadHelper.sleepSeconds(1);
                System.out.print(i + " ");
            }
            System.out.println();
        });
        log.debug("1: " + t.getState()); // 1: NEW
        t.start();
        t.join(); // 等待t结束
        log.debug("3: " + t.getState()); // 3: TERMINATED
    }

    @Test
    public void T2_WAITING_TIMED_WAITING() {
        Thread t = new Thread(() -> {
            LockSupport.park(); // 阻塞当前线程
            log.debug("t go on!");
            ThreadHelper.sleepSeconds(5);
        });

        t.start();
        ThreadHelper.sleepSeconds(1);
        log.debug("1: " + t.getState());// 1: WAITING,无期阻塞

        LockSupport.unpark(t); // 解放t线程
        ThreadHelper.sleepSeconds(1);
        log.debug("2: " + t.getState()); // 2: TIMED_WAITING,定时阻塞

        ThreadHelper.join(t);
    }

    /*
     * 1. synchronized是BLOCKED状态
     *      只有经过OS调度的才是BLOCKED状态
     */
    @Test
    public void T3_sync_BLOCKED() {
        final Object o = new Object();
        Thread t = new Thread(() -> {
            synchronized (o) {            // BLOCKED
                log.debug("t 得到了锁 o");
            }
        });

        new Thread(() -> {
            synchronized (o) {
                ThreadHelper.sleepSeconds(5);
            }
        }).start();
        ThreadHelper.sleepSeconds(1);

        t.start();
        ThreadHelper.sleepSeconds(1);

        // BLOCKED
        log.debug("1: " + t.getState());

        ThreadHelper.join(t);
    }

    /*
     * 2. juc的锁cas来实现,进入忙等待,是waiting状态,非blocked状态
     *      除了synchronized锁是blocked状态,其他都是waiting状态
     */
    @Test
    public void T4_ReentrantLock_WAITING() {
        Lock lock = new ReentrantLock();
        Thread t = new Thread(() -> {
            lock.lock(); // 省略try finally
            log.debug("t 得到了锁");
            lock.unlock();
        });

        new Thread(() -> {
            lock.lock();
            ThreadHelper.sleepSeconds(5);
            lock.unlock(); // 手动释放锁
        }).start();
        ThreadHelper.sleepSeconds(1);

        t.start();
        ThreadHelper.sleepSeconds(1);
        log.debug("1: " + t.getState()); // WAITING

        ThreadHelper.join(t);
    }

    @Test
    public void T5_LockSupport_WAITING() {
        Thread t = new Thread(() -> {
            LockSupport.park();
            log.debug("t go on!");
        });

        t.start();
        ThreadHelper.sleepSeconds(1);

        log.debug("1: " + t.getState()); // WAITING

        LockSupport.unpark(t);
        ThreadHelper.join(t);
    }
}









 






 


 












 



 












 




























 





















 












5. interrupt

  1. interrupt():设置打断标志位
  2. isInterrupted():查询打断标志位
  3. interrupted():清空打断标志位,并返回标志情况

  1. sleep(), wait(), join()与标志位冲突。抛InterruptedException,未设置上标志位
  2. lockInterruptibly(),抛InterruptedException,未设置上标志位
  3. LockSupport.park()失效,设置上标志位
  4. 不能打断BLOCKED状态线程,设置上标志位
  5. 不能打断ReentrantLock_lock,设置上标志位
/**
 * 1. interrupt():设置打断标志位
 * 2. isInterrupted():查询打断标志位
 * 3. interrupted():清空打断标志位,并返回标志情况
 */
@Slf4j
public class T05_Interrupt {

    @Test
    public void T1_interrupt_isInterrupted() {
        Thread t = new Thread(() -> {
            for (; ; ) {
                // 1. isInterrupted() 查询标志位
                if (Thread.currentThread().isInterrupted()) {
                    System.out.println("Thread is interrupted!");
                    break;
                } else {
                    System.out.println("Thread is not interrupted!");
                }
            }
        });

        t.start();
        ThreadHelper.sleepSeconds(1);

        // 设置标志位
        t.interrupt();

        ThreadHelper.join(t);
    }

    /*
     * 2. Thread.interrupted()查询并重置标志位
     *
     * t.interrupted() == Thread.interrupted()
     * 查询当前thread标志位
     * public static boolean interrupted() {
     *     return currentThread().isInterrupted(true);
     * }
     */
    @Test
    public void T2_interrupted() {
        Thread t = new Thread(() -> {
            for (; ; ) {
                // 查询当前线程标志位,并清空:true
                if (Thread.interrupted()) {
                    log.debug("t: {}", Thread.interrupted());
                    break;
                }
            }
        });

        t.start();
        ThreadHelper.sleepSeconds(1);

        t.interrupt();

        ThreadHelper.join(t);
    }

    /*
     * 1. sleep(), wait(), join()与标志位冲突。抛InterruptedException异常
     * 2. 未设置上标志位
     */
    @Test
    public void T3_sleep() {
        Thread t = new Thread(() -> {
            try {
                Thread.sleep(5_000);
            } catch (InterruptedException e) {
                System.out.println("Thread is interrupted!");
                // false
                log.debug("t :" + Thread.currentThread().isInterrupted());
            }
        });

        t.start();
        ThreadHelper.sleepSeconds(1);

        System.out.println("t.getState() = " + t.getState());
        t.interrupt();
        ThreadHelper.join(t);
    }

    @Test
    public void T4_synchronized() {
        final Object o = new Object();

        Thread t = new Thread(() -> {
            synchronized (o) {
                try {
                    o.wait();
                } catch (InterruptedException e) {
                    System.out.println("Thread is interrupted!");
                    // false
                    log.debug("t :" + Thread.currentThread().isInterrupted());
                }
            }
        });

        t.start();
        ThreadHelper.sleepSeconds(1);

        System.out.println("t.getState() = " + t.getState());
        t.interrupt();
        ThreadHelper.join(t);
    }

    Thread t;

    /**
     * 1. LockSupport.park()失效
     * 2. 设置上标志位
     */
    @Test
    public void T5_LockSupport() {
        t = new Thread(() -> {
            System.out.println("t start!");
            LockSupport.park(t);
            System.out.println("t end! 是否Interrupted: " + Thread.currentThread().isInterrupted());
        });
        t.start();
        ThreadHelper.sleepSeconds(2);

        System.out.println("t.getState() = " + t.getState());
        t.interrupt();
        // LockSupport.unpark(t);

        ThreadHelper.join(t);
    }

    /**
     * 1. 不能打断BLOCKED状态线程
     * 2. 设置上标志位
     */
    @Test
    public void T6_tryLock_synchronized() {
        final Object o = new Object();

        Thread t1 = new Thread(() -> {
            synchronized (o) {
                ThreadHelper.sleepSeconds(5);
                log.debug("t1 continue!");
            }
        });
        t1.start();
        ThreadHelper.sleepSeconds(1);

        // BLOCKED
        Thread t2 = new Thread(() -> {
            synchronized (o) {
            }
            log.debug("t2 end! 是否Interrupted: " + Thread.currentThread().isInterrupted());
        });
        t2.start();
        ThreadHelper.sleepSeconds(1);

        System.out.println("t2.getState() = " + t2.getState());
        t2.interrupt();
        ThreadHelper.join(t1, t2);
    }

    /**
     * 1. 不能打断ReentrantLock_lock
     * 2. 设置标志位
     */
    @Test
    public void T7_tryLock_ReentrantLock() {
        ReentrantLock lock = new ReentrantLock();

        Thread t1 = new Thread(() -> {
            lock.lock();
            ThreadHelper.sleepSeconds(5);
            log.debug("t1 continue!");
            lock.unlock();
        });
        t1.start();
        ThreadHelper.sleepSeconds(1);

        // WAITING
        Thread t2 = new Thread(() -> {
            lock.lock();
            log.debug("t2 end! 是否Interrupted: " + Thread.currentThread().isInterrupted());
            lock.unlock();
        });

        t2.start();
        ThreadHelper.sleepSeconds(1);

        System.out.println("t2.getState() = " + t2.getState());
        t2.interrupt();
        ThreadHelper.join(t1, t2);
    }

    /**
     * lockInterruptibly()
     * 1. 抛InterruptedException
     * 2. 未设置上标志位
     */
    @Test
    public void T8_tryLock_lockInterruptibly() {
        ReentrantLock lock = new ReentrantLock();

        Thread t1 = new Thread(() -> {
            System.out.println("t1 start!");
            lock.lock();
            ThreadHelper.sleepSeconds(5);
            System.out.println("t1 end!");
        });
        t1.start();
        ThreadHelper.sleepSeconds(1);

        Thread t2 = new Thread(() -> {
            System.out.println("t2 start!");
            try {
                // 可被interrupt的锁争抢
                lock.lockInterruptibly();
                System.out.println("t2 continue!");
                lock.unlock();
            } catch (InterruptedException e) {
                // lock争抢被打断后,业务处理
                e.printStackTrace();
                System.out.println("t2 WAITING,被打断");
            }
            System.out.println("t2 end! 是否Interrupted: " + Thread.currentThread().isInterrupted());
        });
        t2.start();
        ThreadHelper.sleepSeconds(1);

        System.out.println("t2.getState() = " + t2.getState());
        t2.interrupt();

        ThreadHelper.join(t1, t2);
    }
}













 












 


















 









 












 



 







 










 



 








 













 






 
























 







 






















 








 

























 













 




6. end

  1. 自然结束(能自然结束就尽量自然结束)
  2. stop(),@Deprecateded
  3. suspend(), resume(),@Deprecated
  4. volatile
    1. wait(), sleep(), recv(), accept()线程阻塞,没有办法停止
    2. 停止时刻难以控制
      • eg:一个阻塞容器,容量为5的时候结束生产者,但是,volatile同步线程标志位的时间控制不是很精确,可能生产者还继续生产
  5. interrupt(), isInterrupted()(比较优雅)
    • 停止时刻难以控制
    • 比volatile更优雅。wait(), sleep()线程状态会抛InterruptedException。正确处理异常即可
/**
 * 结束thread
 */
@Slf4j
public class T06_end {

    /*
     * 1. stop():@Deprecated
     *      释放所有锁(可能进行数据同步)。容易产生数据不一致的问题
     */
    @Test
    public void T1_stop() {
        Thread t = new Thread(() -> {
            while (true) {
                System.out.println("uploading file");
                ThreadHelper.sleepSeconds(1);
            }
        });

        t.start();
        ThreadHelper.sleepSeconds(5);

        // @Deprecated
        t.stop();
        ThreadHelper.join(t);
    }

    /*
     * 2. suspend(), resume():@Deprecated
     *      暂停时持有锁,锁可能会永远不释放。产生死锁
     */
    @Test
    public void T2_suspend_resume() {
        Thread t = new Thread(() -> {
            int i = 1;
            while (true) {
                System.out.println("uploading file: " + i++);
                ThreadHelper.sleepSeconds(1);
            }
        });
        t.start();
        ThreadHelper.sleepSeconds(5);

        t.suspend();
        ThreadHelper.sleepSeconds(3);
        t.resume();

        ThreadHelper.join(t);
    }

    private static volatile boolean running = true;

    /*
     * 3. volatile
     *      1. 停止时刻难以控制。eg:一个阻塞容器,容量为5的时候结束生产者
     *      2. `wait(), sleep(), recv(), accept()`线程阻塞,没有办法停止
     */
    @Test
    public void T3_volatile() {
        Thread t = new Thread(() -> {
            long i = 0L;
            while (running) {
                // wait(), sleep(), recv(), accept()
                i++;
            }
            // 很难控制循环了多少次:4168806262 4163032200……
            System.out.println("t end and i = " + i);
        });
        t.start();
        ThreadHelper.sleepSeconds(1);

        running = false;
        ThreadHelper.join(t);
    }

    /*
     * 4. interrupt()
     *      1. 停止时刻难以控制
     *      2. 比volatile更优雅。wait(), sleep()线程状态会抛InterruptedException。正确处理异常即可
     */
    @Test
    public void T4_interrupt() {
        Thread t = new Thread(() -> {
            long i = 0L;
            while (!Thread.interrupted()) {
                // sleep() wait()
                i++;
            }
            System.out.println("t end i = " + i);
        });
        t.start();
        ThreadHelper.sleepSeconds(1);

        t.interrupt();
        ThreadHelper.join(t);
    }
}























 



















 

 




 










 









 












 








 



6. Reference

jvm之强软弱虚引用open in new window

1. normalRef

  • 强引用,不会被GC。引用为null才会GC

2. softRef

  • 内存不够用了,才回收

3. weakRef

  1. GC立即回收
  2. ThreadLocal原理
  3. WeakHashMap

4. phantomRef

  1. 作用:回收堆外内存
  2. 直接内存(堆外内存)
    • 不被JVM管理的memory,memory不在堆上,GC回收不了。引用被干掉,加入到queue中。手动清除queue指定的memory(java堆外内存的回收用unsafe类)
/**
 * 强、软、弱、虚
 */
public class T1_Reference {

    /**
     * 1. 强引用,不会被GC。引用为null才会GC
     */
    @Test
    public void T1_normalRef() {
        M m = new M();
        System.gc(); // DisableExplicitGC

        // 延缓main线程,给GC时间执行
        LockSupport.park();
    }

    /*
     * 2. 软引用
     *      -Xms20M -Xmx20M
     *      1. 内存不够用了,才回收。回收后还不够,抛内存溢出异常
     *      2. 描述一些还有用,但并非必须的对象。软引用非常适合缓存使用
     *
     *  应用场景(缓存):
     *      1. 大图片
     *      2. 大数据
     */
    @Test
    public void T2_softRef() {
        // sr大小10M
        SoftReference<byte[]> sr = new SoftReference<>(new byte[1024 * 1024 * 10]);

        System.out.println(sr.get());
        System.gc();
        ThreadHelper.sleepSeconds(1);
        System.out.println(sr.get());

        // 再分配一个12M数组,heap装不下
        byte[] b = new byte[1024 * 1024 * 12];
        ThreadHelper.sleepSeconds(1);

        // GC一次,还不够,把SoftReference干掉,再GC
        System.out.println(sr.get());
    }

    /*
     * 3. 弱引用
     *      1. GC立即回收
     *      2. 内存泄漏、内存溢出,概念不同
     *
     *  应用场景:容器
     *      作业:1. weakHashMap
     */
    @Test
    public void T3_weakRef() {
        WeakReference<M> wr = new WeakReference<>(new M());

        System.out.println(wr.get());
        System.gc(); // GC回收

        ThreadHelper.sleepSeconds(1);
        System.out.println(wr.get());

        // 弱引用的典型应用,也是ThreadLocal问到的最深的地方
        ThreadLocal<M> tl = new ThreadLocal<>();
        // set() => ThreadLocalMap.set() => Entry extends WeakReference<ThreadLocal<?>>
        tl.set(new M()); // 本质是current_thread的ThreadLocalMap赋值
        // remove()务必。key为null,value也会导致memory泄漏。
        tl.remove();
    }

    private static final List<Object> LIST = new LinkedList<>();
    // 装得都是引用
    private static final ReferenceQueue<M> QUEUE = new ReferenceQueue<>();

    /*
     * 4. 虚引用(清理堆外内存)不是给程序员用的,给写JVM人用的
     *      -Xms50M -Xmx50M
     *      1. 唯一作用:对象被GC时,收到一个系统通知(ReferenceQueue)
     *      2. get()无法获取对象实例,GC立即回收
     *
     *      1. jdk中直接内存的回收就用到虚引用,直接内存是堆外内存
     *      2. GC范围是堆内存,直接内存的分配和回收都是Unsafe类去操作
     *      3. java申请直接内存,堆内存分配一个Obj保存这个堆外内存的引用,被GC管理,一旦这个对象被回收,相应的用户线程会收到通知并对直接内存进行清理工作
     *
     * DirectByteBuffer就是通过虚引用来实现堆外内存的释放
     */
    @Test
    public void T4_phantomRef() {
        PhantomReference<M> phantomReference = new PhantomReference<>(new M(), QUEUE);
        // System.out.println(phantomReference.get());

        ByteBuffer b = ByteBuffer.allocateDirect(1024);

        new Thread(() -> {
            while (true) {
                LIST.add(new byte[2 * 1024 * 1024]); // 2M
                ThreadHelper.sleepSeconds(1);
                // 虚引用,get()不到对象
                System.out.println(phantomReference.get());
            }
        }).start();

        // 垃圾回收线程
        new Thread(() -> {
            while (true) {
                Reference<? extends M> poll = QUEUE.poll();
                if (poll != null) {
                    System.out.println("--- 虚引用对象被JVM回收了 ---- " + JSONUtil.toJsonStr(poll));
                }
            }
        }).start();

        ThreadHelper.sleepSeconds(50);
    }

    @Test
    public void T5_phantom_gc() {
        ReferenceQueue<M> referenceQueue = new ReferenceQueue<>();
        PhantomReference<M> phantomReference = new PhantomReference<>(new M(), referenceQueue);

        System.out.println(phantomReference.get()); // null

        new Thread(() -> {
            while (true) {
                Reference<? extends M> poll = referenceQueue.poll();
                if (poll != null) {
                    System.out.println("--- 虚引用对象被jvm回收了 ---- " + poll); // java.lang.ref.PhantomReference@108beb45
                    System.out.println("--- 回收对象 ---- " + poll.get()); // null
                }
            }
        }).start();
        ThreadHelper.sleepSeconds(1);

        System.gc();
        ThreadHelper.sleepSeconds(1);

        // 第二次gc()才被感知到
        System.gc();

        ThreadHelper.sleepSeconds(50);
    }
}

/**
 * GC_FreeMemory。垃圾回收,释放内存
 */
class M {

    // 默认给3M的空间
    private int[] bytes = new int[1024 * 1024 * 3];

    /**
     * GC释放对象内存时,会调用finalize()。该方法永远不应该重写、调用
     */
    @Override
    protected void finalize() {
        System.out.println("GC将对象清除!!");
    }
}










 



















 
























 

































 




























 
 





 
























 






 


// T4_phantomRef
null
null
null
null
null
null
null
null
null
GC将对象清除!!
null
--- 虚引用对象被JVM回收了 ---- {}
null
null
// ...

// T5_phantom_gc
null
GC将对象清除!!
--- 虚引用对象被jvm回收了 ---- java.lang.ref.PhantomReference@108beb45
--- 回收对象 ---- null

7. ThreadLocal

【真实工作场景】中怎么用ThreadLocalopen in new window

  1. ThreadLocal给每个线程建立单独变量副本,从而不影响其他线程
  2. ThreadLocal不存储值,只是提供能查到value的key

JDK1.2 解决共享参数的频繁传递与线程安全等问题,Thread的局部变量

/*
 * ThreadLocal:当前线程局部变量
 * 需求:两个线程变量独享,互不影响
 *
 * 1. ThreadLocal是使用空间换时间
 * 2. synchronized是使用时间换空间
 *
 * 用途:
 *      声明式事务。数据库连接对象存于ThreadLocal中,一个事务中每个sql都使用同一个连接对象
 *      hibernate_session就存在与ThreadLocal中,避免synchronized的使用
 */
public class T2_TL_Basic {
    volatile static Person p = new Person();

    /**
     * t1线程变量被t2修改
     */
    @Test
    public void T1_common() {
        Thread t1 = new Thread(() -> {
            ThreadHelper.sleepSeconds(2);
            System.out.println(p);

        }, "t1");

        Thread t2 = new Thread(() -> {
            ThreadHelper.sleepSeconds(1);
            p.name = "listao";
        }, "t2");

        ThreadHelper.start(t1, t2);
        ThreadHelper.join(t1, t2);
    }

    static ThreadLocal<Person> tl = new ThreadLocal<>();

    /*
     * threadLocal => 设到了当前线程的map中
     * .set() => Thread.currentThread... map(ThreadLocal, person)
     */
    @Test
    public void T2_threadLocal() {
        Thread t1 = new Thread(() -> {
            tl.set(new Person("listao"));
            ThreadHelper.sleepSeconds(2);
            System.out.println(tl.get());

        }, "t1");

        Thread t2 = new Thread(() -> {
            ThreadHelper.sleepSeconds(1);
            tl.set(new Person());
        }, "t2");

        ThreadHelper.start(t1, t2);
        ThreadHelper.join(t1, t2);
    }
}

@ToString
@NoArgsConstructor
@AllArgsConstructor
class Person {
    String name = "zhangsan";
}












 








 





 















 







 













1. Multi_Thread

multi_thread都独享一个对象

/**
 * multi_thread都独享一个对象
 */
public class T3_TL_copy {
    public static ExecutorService es = Executors.newFixedThreadPool(10);

    private String m1(int seconds) {
        // 参数的单位是毫秒,从1970.1.1 00:00:00 GMT计时
        Date date = new Date(1_000L * seconds);
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return dateFormat.format(date);
    }

    /**
     * 1. 每个thread都创建SimpleDateFormat,消耗内存,GC有压力
     */
    @Test
    public void T1_normal() {
        for (int i = 0; i < 1_000; i++) {
            int finalI = i;
            es.submit(() -> {
                String date = m1(finalI);
                System.out.println(date);
            });
        }
        es.shutdown();
    }

// -----------------------------------------------------------------------------------------------

    static SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    private String m2(int seconds) {
        Date date = new Date(1_000L * seconds);
        return DATE_FORMAT.format(date);
    }

    /**
     * 2. 所有thread共享一个对象static SimpleDateFormat,thread_unsafe
     */
    @Test
    public void T1_static() {
        for (int i = 0; i < 1_000; i++) {
            int finalI = i;
            es.submit(() -> {
                String date = m2(finalI);
                System.out.println(date);
            });
        }
        es.shutdown();
    }

// -----------------------------------------------------------------------------------------------

    private String m3(int seconds) {
        Date date = new Date(1_000L * seconds);
        synchronized (this) {
            return DATE_FORMAT.format(date);
        }
    }

    /**
     * 3. synchronized(),thread_safe不过性能下降
     */
    @Test
    public void T3_static_sync() {
        for (int i = 0; i < 1_000; i++) {
            int finalI = i;
            es.submit(() -> {
                String date = m3(finalI);
                System.out.println(date);
            });
        }
        es.shutdown();
    }

// -----------------------------------------------------------------------------------------------

    private String m4(int seconds) {
        // 参数的单位是毫秒,从1970.1.1 00:00:00 GMT计时
        Date date = new Date(1_000L * seconds);
        SimpleDateFormat simpleDateFormat = TL_DataFormat.dateFormat_TL.get();
        return simpleDateFormat.format(date);
    }

    /**
     * 4. ThreadLocal<SimpleDateFormat>,每个thread都有一个对象副本
     * 副本只能被当前线程使用,是当前线程独享的成员变量
     */
    @Test
    public void T4_ThreadLocal() {
        for (int i = 0; i < 1_000; i++) {
            int finalI = i;
            es.submit(() -> {
                String date = m4(finalI);
                System.out.println(date);
            });
        }
        es.shutdown();
    }

}

class TL_DataFormat {
    public static ThreadLocal<SimpleDateFormat> dateFormat_TL = ThreadLocal.withInitial(
            () -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    );
}









 
 



















 



 





















 
 
 






















 
 





















 
 
 

2. Thread_phaser

同一个thread,不同阶段同一个变量

/**
 * 同一个thread,不同阶段同一个变量
 */
public class T4_TL_phaser {

    private static C0_Student init() {
        final C0_Student build = C0_Student.builder()
                .name("listao")
                .sex("male")
                .score(100).build();
        TL_Student.studentTL.set(build);
        return build;
    }

    @Test
    public void T1_tradition() {
        C0_Student student = init();
        new C1_Name().getName(student);
        new C2_Sex().getSex(student);
        new C3_Score().getScore(student);
    }

    @Test
    public void T2_ThreadLocal() {
        init();
        new C1_Name().getName();
        new C2_Sex().getSex();
        new C3_Score().getScore();
        TL_Student.studentTL.remove();
    }
}

class TL_Student {
    public static ThreadLocal<C0_Student> studentTL = new ThreadLocal<>();
}

@Builder
@Getter
class C0_Student {
    String name;
    String sex;
    int score;
}

// 获取name
class C1_Name {
    public void getName(C0_Student student) {
        System.out.println(student.name);
    }

    public void getName() {
        System.out.println(TL_Student.studentTL.get().name);
    }
}

// 获取sex
class C2_Sex {
    public void getSex(C0_Student student) {
        System.out.println(student.sex);
    }

    public void getSex() {
        System.out.println(TL_Student.studentTL.get().sex);
    }
}

// 获取score
class C3_Score {
    public void getScore(C0_Student student) {
        System.out.println(student.score);
    }

    public void getScore() {
        System.out.println(TL_Student.studentTL.get().score);
    }
}










 

















 




 

















 










 










 


3. SourceCode

为什么Entry要使用弱引用?

  1. tl = null
    • Entry中key为强引用:依然指向ThreadLocal对象,ThreadLocal始终不GC,内存泄漏
    • key为弱引用:ThreadLocal被回收,key = null的Entry会在get(), set()时进行回收
  2. Thread一直存在,并不在调用get(), set()key = null的Entry的value会内存泄漏。因此手动remove()协助回收

  1. Thread内部持有ThreadLocalMap成员变量
  2. ThreadLocalMap是ThreadLocal的内部类,Entry是ThreadLocalMap内部类
  3. ThreadLocal操作了ThreadLocalMap对象内部的数据,对外暴露的都是ThreadLocal的API,隐藏了ThreadLocalMap的具体实现
image-20230503145500627
image-20230503150436654
public class Thread implements Runnable {

    ThreadLocal.ThreadLocalMap threadLocals = null;

    @HotSpotIntrinsicCandidate
    public static native Thread currentThread();
}


 


 

public class ThreadLocal<T> {

    static class ThreadLocalMap {

        private Entry[] table;
        private static final int INITIAL_CAPACITY = 16;

        // Entry,key为弱引用,value为强引用
        static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }

        // ------------------------------------------------------------------------

        ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
            table = new Entry[INITIAL_CAPACITY];
            int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
            table[i] = new Entry(firstKey, firstValue);
            size = 1;
            setThreshold(INITIAL_CAPACITY);
        }

        private void set(ThreadLocal<?> key, Object value) {

            Entry[] tab = table;
            int len = tab.length;
          	// ThreadLocal对象作为key在Entry[]中的下标索引
            int i = key.threadLocalHashCode & (len-1);

          	// 3.2. 获取指定下标的Entry对象,不为空,则进入for
            for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { // 调用nextIndex()搜寻下一个合适的位置
                ThreadLocal<?> k = e.get();

              	// 3.2.1. 判断Entry的key和当前的ThreadLocal对象是否是同一个对象
                if (k == key) {
                  	// 是,进行值替换,并结束方法
                    e.value = value;
                    return;
                }

              	// 3.2.2. Entry的key是否失效,如果失效,则直接将失效的key和值进行替换
                if (k == null) {
                    replaceStaleEntry(key, value, i);
                    return;
                }
            }

          	// 3.3. Entry第一次
            tab[i] = new Entry(key, value);
            int sz = ++size;
          	// 判断是否满足扩容的条件,进行扩容
            if (!cleanSomeSlots(i, sz) && sz >= threshold)
                rehash();
        }

      	// 开放寻址法
        private static int nextIndex(int i, int len) {
            return ((i + 1 < len) ? i + 1 : 0);
        }

        private Entry getEntry(ThreadLocal<?> key) {
            int i = key.threadLocalHashCode & (table.length - 1);
            Entry e = table[i];
            if (e != null && e.get() == key)
                return e;
            else
                return getEntryAfterMiss(key, i, e);
        }

        private void remove(ThreadLocal<?> key) {
            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);
            for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
                if (e.get() == key) {
                    e.clear();
                    expungeStaleEntry(i);
                    return;
                }
            }
        }

    }

    // ------------------------------------------------------------------------

    public void set(T value) {
      	// 获取当前线程
        Thread t = Thread.currentThread();
      	// 1.1.
        ThreadLocalMap map = getMap(t);
        if (map != null) {
          	// 3.1.
          	// 将本ThreadLocal对象作为key,value作为值存储到ThreadLocalMap中
            map.set(this, value);
        } else {
          	// 2.1.
            createMap(t, value);
        }
    }

    ThreadLocalMap getMap(Thread t) {
        // 1.2.
        return t.threadLocals;
    }

    void createMap(Thread t, T firstValue) {
      	// 2.2.
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }

    public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
          	// 4.
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
      	// 5.1.
        return setInitialValue();
    }

    /*
     * 5.2. Map为空 || Entry[]中没有以当前ThreadLocal为key的Entry
     * 				进行初始化,key为ThreadLocal对象,value为null
     */
    private T setInitialValue() {
        T value = initialValue(); // null
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            map.set(this, value);
        } else {
            createMap(t, value);
        }
        if (this instanceof TerminatingThreadLocal) {
            TerminatingThreadLocal.register((TerminatingThreadLocal<?>) this);
        }
        return value;
    }

     public void remove() {
         ThreadLocalMap m = getMap(Thread.currentThread());
         if (m != null) {
           	 // 6.
             m.remove(this);
         }
     }

}