04-JUC

latch 英 [lætʃ] n. 插销;门闩;弹簧锁;碰锁 vi. 占有,抓住;闭锁

semaphore 英 [ˈseməfɔː(r)] n. 信号标,旗语;臂板信号装置

cyclic 英 [ˈsaɪklɪk] adj. 循环的;周期的

phaser 英 [ˈfeɪzə(r)] n. [电子] 移相器;相位器

1. ReentrantLock

  1. sync自动加解锁,ReentrantLock手动
  2. ReentrantLock可以condition,不同等待队列
  3. RL底层CAS,sync锁升级

  1. synchronized可重入性
    • t调用m1()m1()调用m2(),t线程为this对象加了两把锁
  2. ReentrantLock替代synchronized
    • sync遇到异常,jvm自动释放锁
    • ReentrantLock必须手动释放锁,因此经常在finally{}中进行锁的释放
  3. tryLock(),尝试锁定 || 指定时间内尝试锁定,线程可以决定是否继续等待
  4. lockInterruptibly()interrupt()打断
  5. 公平锁(fairLock) => new ReentrantLock(true);
    • Thread-2、Thread-1交替获得锁
  6. Condition本质:锁资源上不同的等待队列
    • 3个API,必须都在lock()后。否则java.lang.IllegalMonitorStateException
    1. await():释放lock。thread进入WAITING
    2. signal():不释放lock。随机激活Condition一个thread,从await()位置继续执行
    3. signalAll():不释放lock。激活Condition全部thread,只有一个获得lock,从await()位置继续执行
@Slf4j
public class T01_ReentrantLock {

    synchronized void m1() {
        System.out.println(Thread.currentThread().getName() + ": m1() ...");
        for (int i = 0; i < 8; i++) {
            ThreadHelper.sleepSeconds(1);
            System.out.println(i);
            if (i == 2) m2();
        }
    }

    synchronized void m2() {
        System.out.println(Thread.currentThread().getName() + ": m2() ...");
    }

    /*
     * 1. synchronized可重入性
     *      t调用m1(),m1()调用m2(),t线程为this对象加了两把锁
     */
    @Test
    public void T1_sync_reentrant() {
        T01_ReentrantLock rl = new T01_ReentrantLock();
        Thread t = new Thread(rl::m1);
        t.start();

        ThreadHelper.join(t);
    }

// ------------------------------------------------------------------------------------------------

    Lock lock = new ReentrantLock();

    void m3() {
        try {
            lock.lock(); // synchronized(this)
            for (int i = 0; i < 3; i++) {
                ThreadHelper.sleepSeconds(1);
                System.out.println("m3: " + i);
            }
        } finally {
            lock.unlock();
        }
    }

    void m4() {
        try {
            lock.lock();
            System.out.println("m4 ...");
        } finally {
            lock.unlock();
        }
    }

    /*
     * 2. ReentrantLock替代synchronized
     *      1. sync遇到异常,jvm自动释放锁
     *      2. ReentrantLock必须手动释放锁,因此经常在finally{}中进行锁的释放
     */
    @Test
    public void T2_ReentrantLock() {
        T01_ReentrantLock rl = new T01_ReentrantLock();
        Thread t1 = new Thread(rl::m3);
        t1.start();
        ThreadHelper.sleepSeconds(1);

        Thread t2 = new Thread(rl::m4);
        t2.start();

        ThreadHelper.join(t1, t2);
    }

// ------------------------------------------------------------------------------------------------

    /**
     * tryLock()抛出异常
     */
    void m5() {
        boolean locked = false;
        try {
            locked = lock.tryLock(1, TimeUnit.SECONDS);
            if (locked) {
                System.out.println("m5 ..." + locked + ", 业务...");
            } else {
                System.out.println("m5 ..." + locked);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (locked) lock.unlock();
        }
    }

    /**
     * 3. tryLock(),尝试锁定 || 指定时间内尝试锁定,线程可以决定是否继续等待
     */
    @Test
    public void T3_tryLock() {
        T01_ReentrantLock rl = new T01_ReentrantLock();
        Thread t1 = new Thread(rl::m3);
        t1.start();
        ThreadHelper.sleepSeconds(1);

        Thread t2 = new Thread(rl::m5);
        t2.start();

        ThreadHelper.join(t1, t2);
    }

// ------------------------------------------------------------------------------------------------

    /*
     * 4. lockInterruptibly()被interrupt()打断
     */
    @Test
    public void T4_lockInterruptibly() {
        Lock lock = new ReentrantLock();

        Thread t1 = new Thread(() -> {
            try {
                lock.lock();
                System.out.println("t1, start");
                TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
                System.out.println("t1, end");
            } catch (InterruptedException e) {
                System.out.println("t1, interrupted!");
            } finally {
                // 即使被interrupt(),也要unlock()
                lock.unlock();
            }
        });

        Thread t2 = new Thread(() -> {
            try {
                // lock.lock();
                lock.lockInterruptibly(); // 可以对interrupt()方法做出响应
                System.out.println("t2, start");
                TimeUnit.SECONDS.sleep(5);
                System.out.println("t2,end");
                lock.unlock();
            } catch (InterruptedException e) {
                System.out.println("t2, interrupted!");
            }
        });

        ThreadHelper.start(t1, t2);
        ThreadHelper.sleepSeconds(1);

        t2.interrupt(); // 打断t2的等待

        ThreadHelper.join(t1, t2);
    }

// ------------------------------------------------------------------------------------------------

    ReentrantLock fireLock = null;

    /*
     * 5. 公平锁(fairLock) => `new ReentrantLock(true);`
     *      Thread-2、Thread-1交替获得锁
     */
    public void m6() {
        Runnable r = () -> {
            for (int i = 0; i < 100; i++) {
                fireLock.lock();
                try {
                    log.error("获得锁");
                } finally {
                    fireLock.unlock();
                }
            }
        };
        Thread t1 = new Thread(r);
        Thread t2 = new Thread(r);
        ThreadHelper.start(t1, t2);
        ThreadHelper.join(t1, t2);
    }

    @Test
    public void T5_fireLock() {
        fireLock = new ReentrantLock(true);
        m6();
    }

    @Test
    public void T6_fireLock() {
        fireLock = new ReentrantLock(false);
        m6();
    }

// ------------------------------------------------------------------------------------------------

    /*
     * 6. Condition本质:锁资源上不同的等待队列
     *
     *   3个API,必须都在`lock()`后。否则`java.lang.IllegalMonitorStateException`
     *      1. await():释放lock。thread进入WAITING
     *      2. signal():不释放lock。随机激活Condition一个thread,从await()位置继续执行
     *      3. signalAll():不释放lock。激活Condition全部thread,只有一个获得lock,从await()位置继续执行
     */
    @Test
    public void T7_await_signal() {
        char[] aI = "1234567".toCharArray();
        char[] aC = "ABCDEFG".toCharArray();

        Condition conditionT1 = lock.newCondition();
        Condition conditionT2 = lock.newCondition();

        Thread t1 = new Thread(() -> {
            try {
                lock.lock();
                for (char c : aI) {
                    System.out.print(c);
                    conditionT2.signal();
                    conditionT1.await();
                }
                conditionT2.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        });

        Thread t2 = new Thread(() -> {
            try {
                lock.lock();
                for (char c : aC) {
                    System.out.print(c);
                    conditionT1.signal();
                    conditionT2.await();
                }
                conditionT1.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        });

        ThreadHelper.start(t1, t2);
    }

}



 




 



 






















 





 





 


 





























 
 







 






























 







 






 



 








 















 



 











 





 


















 
 






 
 














 
 













2. ReadWriteLock

读共享、写互斥

/**
 * 10个读thread,2个写thread
 */
@Slf4j
public class T02_ReadWriteLock {

    /**
     * 读取操作
     */
    public static void read(Lock lock) {
        try {
            lock.lock();
            ThreadHelper.sleepMilli(RandomUtil.randomInt(500, 1000));
            log.error("read over! ");
        } finally {
            lock.unlock();
        }
    }

    /**
     * 写操作
     */
    public static void write(Lock lock, int v) {
        try {
            lock.lock();
            ThreadHelper.sleepMilli(RandomUtil.randomInt(200, 1000));
            log.error("write over! ");
        } finally {
            lock.unlock();
        }
    }

    static Lock lock = new ReentrantLock();
    List<Thread> ts = new ArrayList<>(12);

    /**
     * 1. 普通锁,读写全部互斥
     */
    @Test
    public void T1_local() {
        for (int i = 0; i < 10; i++)
            ts.add(new Thread(() -> read(lock), "r" + i));

        for (int i = 0; i < 2; i++)
            ts.add(new Thread(() -> write(lock, new Random().nextInt()), "w" + i));

        Collections.shuffle(ts);
        ThreadHelper.start(ts);
        ThreadHelper.join(ts);
    }

    static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    static Lock readLock = readWriteLock.readLock();
    static Lock writeLock = readWriteLock.writeLock();

    /*
     * 2. 读写锁
     *      读锁:共享锁
     *      写锁:排他锁
     */
    @Test
    public void T2_read_write() {
        // 读共享、写互斥
        for (int i = 0; i < 10; i++)
            ts.add(new Thread(() -> read(readLock), "r" + i));

        for (int i = 0; i < 2; i++)
            ts.add(new Thread(() -> write(writeLock, new Random().nextInt()), "w" + i));

        Collections.shuffle(ts);
        ThreadHelper.start(ts);
        ThreadHelper.join(ts);
    }

}
































 



 














 
 
 








 












3. CountDownLatch

public class T03_CountDownLatch {
    static Thread[] threads = new Thread[100];
    static AtomicInteger atomicInt = new AtomicInteger(0);

    public void threads(Runnable runnable) {
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(runnable);
        }
    }

    @Test
    public void T1_CountDownLatch() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(threads.length);

        Runnable runnable = () -> {
            atomicInt.incrementAndGet();
            latch.countDown();
        };

        threads(runnable);
        ThreadHelper.start(threads);

        latch.await();

        System.out.println("end latch: " + atomicInt.get());
    }

    @Test
    public void T2_join() throws InterruptedException {
        Runnable runnable = () -> {
            atomicInt.incrementAndGet();
        };

        threads(runnable);
        ThreadHelper.start(threads);

        for (Thread thread : threads) {
            thread.join();
        }

        System.out.println("end join: " + atomicInt);
    }

}












 



 





 














 






4. Semaphore

限流(公平、非公平)

/*
 * 限流。允许n线程同时执行
 */
public class T04_Semaphore {

    public static void main(String[] args) {
        // Semaphore s = new Semaphore(2);
        Semaphore s = new Semaphore(1, true); // 是否公平

        new Thread(() -> {
            try {
                s.acquire();
                System.out.println("T1 running...");
                Thread.sleep(200);
                System.out.println("T1 running...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                s.release();
            }
        }).start();

        new Thread(() -> {
            try {
                s.acquire();
                System.out.println("T2 running...");
                Thread.sleep(200);
                System.out.println("T2 running...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                s.release();
            }
        }).start();
    }

}







 



 






 





 






 





5. CyclicBarrier

  1. 人满,发车
  2. 限流框架:Guava RateLimiter
/*
 * CyclicBarrier(人满,发车)
 * 应用场景:复杂操作3个线程并行,都执行完了才进行主线程操作
 */
public class T05_CyclicBarrier {

    @Test
    public void T1() {
        // CyclicBarrier barrier = new CyclicBarrier(20);
        CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println("人满,发车"));

        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                try {
                    barrier.await();
                    // 业务操作
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

}









 




 









6. Phaser

人满,下一步

/*
 * Phaser(人满,下一步)
 */
public class T06_Phaser {
    static Random r = new Random();
    static MarriagePhaser phaser = new MarriagePhaser();

    public static void main(String[] args) {
        phaser.bulkRegister(7);

        for (int i = 0; i < 5; i++)
            new Thread(new Person("p" + i)).start();

        new Thread(new Person("新郎")).start();
        new Thread(new Person("新娘")).start();
    }

    /**
     * 继承Phaser,重写onAdvance
     */
    static class MarriagePhaser extends Phaser {
        /**
         * onAdvance
         *
         * @param phase             当前是哪个阶段
         * @param registeredParties 当前阶段多少人
         * @return boolean
         */
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            switch (phase) {
                case 0:
                    System.out.println("所有人到齐了!线程数: " + registeredParties + "\n");
                    return false;
                case 1:
                    System.out.println("所有人吃完了!线程数: " + registeredParties + "\n");
                    return false;
                case 2:
                    System.out.println("所有人离开了!线程数: " + registeredParties + "\n");
                    return false;
                case 3:
                    System.out.println("婚礼结束!新郎新娘抱抱!线程数: " + registeredParties);
                    return true;
                default:
                    return true;
            }
        }
    }

    /*
     * 1. arriveAndAwaitAdvance():到达并等待共同前进
     * 2. arriveAndDeregister():到达并取消注册
     */
    @AllArgsConstructor
    static class Person implements Runnable {
        String name;

        public void arrive() {
            ThreadHelper.sleepMilli(r.nextInt(1000));
            System.out.printf("%s 到达现场!\n", name);
            phaser.arriveAndAwaitAdvance();
        }

        public void eat() {
            ThreadHelper.sleepMilli(r.nextInt(1000));
            System.out.printf("%s 吃完!\n", name);
            phaser.arriveAndAwaitAdvance();
        }

        public void leave() {
            ThreadHelper.sleepMilli(r.nextInt(1000));
            System.out.printf("%s 离开!\n", name);
            phaser.arriveAndAwaitAdvance();
        }

        private void hug() {
            if ("新郎".equals(name) || "新娘".equals(name)) {
                ThreadHelper.sleepMilli(r.nextInt(1000));
                System.out.printf("%s 洞房!\n", name);
                phaser.arriveAndAwaitAdvance();
            } else {
                phaser.arriveAndDeregister();
                // phaser.register()
            }
        }

        @Override
        public void run() {
            arrive();
            eat();
            leave();
            hug();
        }
    }

}





 


 











 



 
 
 

































 





 





 






 

 














7. Exchanger

两thread数据交换

/**
 * 两thread数据交换(阻塞)
 */
@Slf4j
public class T07_Exchanger {

    static Exchanger<String> exchanger = new Exchanger<>();

    public static void main(String[] args) {
        new Thread(() -> {
            String s = "T1";
            try {
                // 1. 数据放入Exchanger,t1线程阻塞,等待第2个线程放入交换数据
                s = exchanger.exchange(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.error(s);
        }, "t1").start();

        new Thread(() -> {
            String s = "T2";
            try {
                // 2. 数据放入Exchanger,进行交换。交换完成t1、t2线程继续运行
                s = exchanger.exchange(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.error(s);
        }, "t2").start();
    }

}






 






 










 








8. LockSupport

  1. park():停止任意thread
  2. unpark(t):任意thread启动
  3. Thread.start()后。unpark(t),可以在park()前执行
/**
 * 锁的拓展支持
 */
@Slf4j
public class T08_LockSupport {

    /*
     * 1. park():停止任意thread
     * 2. unpark(t):任意thread启动
     */
    @Test
    public void T1_park() {
        Thread t = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                System.out.println(i);
                if (i == 5) {
                    LockSupport.park();
                }

                ThreadHelper.sleepSeconds(1);
            }
        });
        t.start();

        ThreadHelper.sleepSeconds(8);
        System.out.println("after 8 seconds!");

        LockSupport.unpark(t);

        ThreadHelper.join(t);
    }

    /*
     * 3. Thread.start()后。unpark(t),可以在park()前执行
     */
    @Test
    public void T2_unpark_advance() {
        Thread t = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                System.out.println(i);
                if (i == 5) {
                    LockSupport.park();
                }
                ThreadHelper.sleepSeconds(1);
            }
        });
        t.start();
        LockSupport.unpark(t);

        ThreadHelper.join(t);
    }

}