05-AQS

1. 读源码方法

1. 原则

  1. 跑不起来不读
  2. 解决问题就好 —— 目的性(6手的源码)
  3. 一条线索到底(debug)
  4. 无关细节略过(读源码先读骨架)
  5. 一般不读静态
  6. 一般动态读法

2. 读源码很难

理解别人的思路!

  1. 数据结构基础
  2. 设计模式

2. Idea_Structure

1. Class_StructureJMHTest

image-20230505135553198

2. 类的继承关系

ctl + h

3. 类UML

cmd + opt + u

4. 类声明

ctl + shift + q:弹出一个提示,显示当前类的声明 / 上下文信息

5. Idea_plugins

  • 安装idea_plugins:《PlantUML integration》

3. ReentrantLock源码

java.util.concurrent.locks.ReentrantLock$NonfairSync@3dd3bcd[State = 0, empty queue]

$NonfairSync:$代表内部类


  1. Java之AQS代码原理解析open in new window
  2. java并发编程中最难的AQS框架源码解析open in new window
  3. JUC AQS ReentrantLock源码分析open in new window

  1. AQS = CLH,三个人的名字的首字母
  2. 底层为CAS + volatile
    • volatile:state被volatile修饰
    • CAS:node通过CAS加入queue,而不是对queue加锁
  3. AbstractQueuedSynchronizer是JUC的核心
image-20230505141208617
  1. state:子类不同,实现意义不同
    • ReentrantLock用来记录线程重入次数
    • CountDownLatch记录等待线程数
  2. AQS => Node => volatile Thread

1. UML

image-20230505135816581

2. 泳道图

image-20231125110353753

3. 公平理解

  1. 公平:new_thread直接排到等待队列的最后
  2. 非公平:new_thread上来就抢锁,抢不到进行排队

juc1.8 => T1_ReentrantLock_SC

public class T1_ReentrantLock_SC {
    private static volatile int i = 0;

    /**
     * ReentrantLock, ReentrantReadWriteLock, CountDownLatch, Semaphore底层都为AQS
     */
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();

        new Thread(() -> {
            lock.lock();
            LockSupport.park();
        }).start();

        ThreadHelper.sleepSeconds(1);

        lock.lock();
        i++;
        lock.unlock();
    }

}

4. ReentrantLock

public class ReentrantLock implements Lock {

    private final Sync sync;

    public ReentrantLock() {
        sync = new NonfairSync();
    }

    public void lock() {
      	// 1... NonfairSync
        sync.lock();
    }

    public void unlock() {
      	// 21... AQS
        sync.release(1);
    }

    // -----------------------------------------------------------------------

    static final class NonfairSync extends Sync {

        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
              	// 2... AQS
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
          	// 4... Sync
            return nonfairTryAcquire(acquires);
        }
    }

    // ----------------------------------------------------------------------

    abstract static class Sync extends AbstractQueuedSynchronizer {

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            // 5... AQS
            int c = getState();
            if (c == 0) {
              	// 6... AQS。 CAS修改state,将当前thread设置为排他线程
                if (compareAndSetState(0, acquires)) {
                    // 7... AbstractOwnableSynchronizer
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
          	// 7. 当前线程已经占有该锁,重入
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    }

}





 




 




 











 




 










 


 










 






 





 

 





5. AbsOwnableSynchronizer

public abstract class AbstractOwnableSynchronizer {

    private transient Thread exclusiveOwnerThread;

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

}


 


 



6. AQS

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer {

    private transient volatile Node head; // volatile
    private transient volatile Node tail;
    private volatile int state;           // 锁状态。解锁为0,加锁成功为1,重入+1

    static final class Node {
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        volatile int waitStatus;
        Node nextWaiter;

        volatile Node prev;
        volatile Node next;
        volatile Thread thread;

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
    }

    public final void acquire(int arg) {
      	//  3... NonfairSync 尝试请求锁
        if (!tryAcquire(arg) &&
            // 9.1.. 获取锁失败,addWaiter()
            // 10.1.. 获取队列,acquireQueued()
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 11..
            selfInterrupt();
    }

  	// 增加node节点,到队尾
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
          	// 9.2. CAS增加tail
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
      	// 9.3.. 队列没有初始化,加入队列
        enq(node);
        return node;
    }

    private Node enq(final Node node) {
        for (;;) {
          	// 初始化队列
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            }
          	// head => new node(), tail => new node(Thread)
          	else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
              	// 前node尝试获取lock
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
              	// 10.2.. 获取失败,是否park
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // 10.3..
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*	1. 前node进入park。本node进入park
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*	2. 前node被取消了,切换前node
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*	3. 前node等待状态设置。Node.SIGNAL => pred.waitStatus
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

    // ----------------------------------------------------------------------

    public final boolean release(int arg) {
      	// 22.1... Sync
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
              	// 22.2..
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

    // --------------------------------------------------------------------------------

    protected final int getState() {
        return state;
    }

    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

}



 
 
 



















 
 
 







 









 


 
 





 




 





 








 





 














 
 
 




 

 





































 
 



 






 
 


 








 









 














4. VarHandle

  1. 普通属性原子性操作
  2. 比反射快,直接操纵二进制码
  3. native底层C、C++
/**
 * jdk1.8以上
 */
public class T02_VarHandle {

    int x = 8;
    // 指针
    private static VarHandle handle;

    static {
        try {
            handle = MethodHandles.lookup().findVarHandle(T02_VarHandle.class, "x", int.class);
        } catch (NoSuchFieldException | IllegalAccessException e) {
            e.printStackTrace();
        }
    }

    /*
     * 1. 普通属性原子性操作
     * 2. 比反射快,直接操纵二进制码
     * 3. native底层C、C++
     */
    public static void main(String[] args) {
        T02_VarHandle t = new T02_VarHandle();

        // plain read / write
        System.out.println("get() => " + handle.get(t));
        handle.set(t, 9);
        System.out.println("set() => " + t.x);


        // CAS原子性操作
        handle.compareAndSet(t, 9, 10);
        System.out.println("compareAndSet() => " + t.x);

        handle.getAndAdd(t, 10);
        System.out.println("getAndAdd() => " + t.x);
    }

}