05-AQS
1. 读源码方法
1. 原则
- 跑不起来不读
- 解决问题就好 —— 目的性(6手的源码)
- 一条线索到底(debug)
- 无关细节略过(读源码先读骨架)
- 一般不读静态
- 一般动态读法
2. 读源码很难
理解别人的思路!
- 数据结构基础
- 设计模式
2. Idea_Structure
1. Class_StructureJMHTest
2. 类的继承关系
ctl + h
3. 类UML
cmd + opt + u
4. 类声明
ctl + shift + q
:弹出一个提示,显示当前类的声明 / 上下文信息
5. Idea_plugins
- 安装idea_plugins:《PlantUML integration》
3. ReentrantLock源码
java.util.concurrent.locks.ReentrantLock$NonfairSync@3dd3bcd[State = 0, empty queue]
$NonfairSync
:$代表内部类
- AQS = CLH,三个人的名字的首字母
- 底层为CAS + volatile
- volatile:state被volatile修饰
- CAS:node通过CAS加入queue,而不是对queue加锁
AbstractQueuedSynchronizer
是JUC的核心
- state:子类不同,实现意义不同
- ReentrantLock用来记录线程重入次数
- CountDownLatch记录等待线程数
- AQS => Node => volatile Thread
1. UML
2. 泳道图
3. 公平理解
- 公平:new_thread直接排到等待队列的最后
- 非公平:new_thread上来就抢锁,抢不到进行排队
juc1.8 => T1_ReentrantLock_SC
public class T1_ReentrantLock_SC {
private static volatile int i = 0;
/**
* ReentrantLock, ReentrantReadWriteLock, CountDownLatch, Semaphore底层都为AQS
*/
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
new Thread(() -> {
lock.lock();
LockSupport.park();
}).start();
ThreadHelper.sleepSeconds(1);
lock.lock();
i++;
lock.unlock();
}
}
4. ReentrantLock
public class ReentrantLock implements Lock {
private final Sync sync;
public ReentrantLock() {
sync = new NonfairSync();
}
public void lock() {
// 1... NonfairSync
sync.lock();
}
public void unlock() {
// 21... AQS
sync.release(1);
}
// -----------------------------------------------------------------------
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 2... AQS
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
// 4... Sync
return nonfairTryAcquire(acquires);
}
}
// ----------------------------------------------------------------------
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 5... AQS
int c = getState();
if (c == 0) {
// 6... AQS。 CAS修改state,将当前thread设置为排他线程
if (compareAndSetState(0, acquires)) {
// 7... AbstractOwnableSynchronizer
setExclusiveOwnerThread(current);
return true;
}
}
// 7. 当前线程已经占有该锁,重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
}
5. AbsOwnableSynchronizer
public abstract class AbstractOwnableSynchronizer {
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
}
6. AQS
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer {
private transient volatile Node head; // volatile
private transient volatile Node tail;
private volatile int state; // 锁状态。解锁为0,加锁成功为1,重入+1
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
Node nextWaiter;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
}
public final void acquire(int arg) {
// 3... NonfairSync 尝试请求锁
if (!tryAcquire(arg) &&
// 9.1.. 获取锁失败,addWaiter()
// 10.1.. 获取队列,acquireQueued()
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 11..
selfInterrupt();
}
// 增加node节点,到队尾
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 9.2. CAS增加tail
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 9.3.. 队列没有初始化,加入队列
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
// 初始化队列
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
}
// head => new node(), tail => new node(Thread)
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 前node尝试获取lock
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 10.2.. 获取失败,是否park
if (shouldParkAfterFailedAcquire(p, node) &&
// 10.3..
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/* 1. 前node进入park。本node进入park
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/* 2. 前node被取消了,切换前node
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/* 3. 前node等待状态设置。Node.SIGNAL => pred.waitStatus
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
// ----------------------------------------------------------------------
public final boolean release(int arg) {
// 22.1... Sync
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 22.2..
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
// --------------------------------------------------------------------------------
protected final int getState() {
return state;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
}
4. VarHandle
- 普通属性原子性操作
- 比反射快,直接操纵二进制码
- native底层C、C++
/**
* jdk1.8以上
*/
public class T02_VarHandle {
int x = 8;
// 指针
private static VarHandle handle;
static {
try {
handle = MethodHandles.lookup().findVarHandle(T02_VarHandle.class, "x", int.class);
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
}
/*
* 1. 普通属性原子性操作
* 2. 比反射快,直接操纵二进制码
* 3. native底层C、C++
*/
public static void main(String[] args) {
T02_VarHandle t = new T02_VarHandle();
// plain read / write
System.out.println("get() => " + handle.get(t));
handle.set(t, 9);
System.out.println("set() => " + t.x);
// CAS原子性操作
handle.compareAndSet(t, 9, 10);
System.out.println("compareAndSet() => " + t.x);
handle.getAndAdd(t, 10);
System.out.println("getAndAdd() => " + t.x);
}
}