0%

java多线程--AQS源码解析

框架

FIFO图解

1612860334591

NonfairSync(同步对象的非公平锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Sync object for non-fair locks
* 同步对象的非公平锁
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
* 执行锁定。尝试立即进行驳船,并在出现故障时备份到正常状态.
*/
final void lock() {
// 自旋一次
if (compareAndSetState(0, 1))
// 自旋成功,设置当前线程占有锁
setExclusiveOwnerThread(Thread.currentThread());
else
// 自旋未获取锁
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

acquire(1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* 在独占模式下获取,忽略中断。
* 通过至少调用一次{@link tryAcquire}并返回成功来实现。
* 否则,线程将排队,并可能反复阻塞和解除阻塞,并调用{@link tryAcquire}直到成功。
* 此方法可用于实现方法{@link Locklock}
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @param arg获取参数。
* 此值会传送到{@link tryAcquire},但否则不会被解释,可以代表您喜欢的任何内容。
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
  1. tryAcquire() 尝试直接去获取资源,如果成功则直接返回(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次,而 CLH 队列中可能还有别的线程在等待);
  2. addWaiter() 将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued() 使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断 selfInterrupt() ,将中断补上。

tryAcquire(arg)

1
2
3
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

addWaiter(Node.EXCLUSIVE)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Creates and enqueues node for current thread and given mode.
* 为当前线程和给定模式创建并排队节点.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @参数模式 Node.EXCLUSIVE 表示独占,Node.SHARED 表示共享
*
* @return the new node
*/
private Node addWaiter(Node mode) {
// 根据当前线程及节点模式 创建节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 尝试快速方式直接放到队尾。上一步失败则通过enq入队。

// 获取尾节点引用
Node pred = tail;
// 尾节点不为空,说明已经初始化过了
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 通过enq入队。
enq(node);
return node;
}

enq(node)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Inserts node into queue, initializing if necessary. See picture above.
* 将节点插入队列,必要时进行初始化
*
* @param node the node to insert
* @param 要插入的节点
*
* @return node's predecessor
* @return节点的前任节点的节点
*/
private Node enq(final Node node) {
// cas 自旋,知道成功的加入队尾
for (;;) {
Node t = tail;
if (t == null) { // Must initialize 必须初始化
// 如果tial为空,则新建一个节点,并将tail指向head
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 放入队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
* 以排他的不间断模式获取已在队列中的线程。
* 用于条件等待方法以及获取。
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功获取锁
boolean failed = true;
try {
// 标记线程等待过程中是否被中断过
boolean interrupted = false;
for (;;) {
// 拿到前驱节点
final Node p = node.predecessor();
// 如果前驱节点是head,即该节点为老二,那么便有资格去尝试获取锁
if (p == head && tryAcquire(arg)) {
// 获取锁成功,当前节点设置成 head
setHead(node);
// 原 head 节点出队,出队后不存在任何引用,会被GC
p.next = null; // help GC
// 获取锁成功
failed = false;
// 返回是否被中断过
return interrupted;
}
// 判断获取锁失败后是否可以挂起,如果可以挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果线程被中断过,就将 interrupted 标记为 true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

setHead(node)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

/**
* Sets head of queue to be node, thus dequeuing. Called only by
* acquire methods. Also nulls out unused fields for sake of GC
* and to suppress unnecessary signals and traversals.
*
* 将队列头设置为节点,从而出队。
* 仅通过 acquire 方法调用。
* 出于 GC 和抑制不必要的信号和遍历的目的,还应清空未使用的字段。
*
* @param node the node
*/
private void setHead(Node node) {
head = node; // head指向当前节点
node.thread = null; // 当前节点的 thread 设置为空
node.prev = null; // 当前节点的 头节点设置为空
}

shouldParkAfterFailedAcquire(p, node)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* 检查并更新无法获取的节点的状态。
* 如果线程应阻塞,则返回true。
* 这是所有采集循环中的主要信号控制。
* 要求 pred == node.prev。
*
* @param pred node's predecessor holding status
* @param pred 节点的前任保持状态
* @param node the node
* @return {@code true} if thread should block
* @return {@code true} 如果线程应该阻塞
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 拿到前驱节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// //如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
* 该节点已经设置了状态,要求释放以发出信号,以便可以安全地停放。
*/
return true;
if (ws > 0) {
/*
* 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
* 注意:那些放弃的结点,由于被自己“加塞”到它们前边,
* 它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!
*/
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
* 前任已取消。跳过前任并指示重试。
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
* waitStatus 必须为 0 或 PROPAGATE 。表示我们需要一个信号,但不要停车。
* 呼叫者将需要重试以确保在停车之前无法获取。
*/
// 如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。
// 有可能失败,人家说不定刚刚释放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。

parkAndCheckInterrupt()

1
2
3
4
5
6
7
8
9
10
11
12
13

/**
* Convenience method to park and then check if interrupted
* 停车的便捷方法,然后检查是否中断
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
// 调用 park() 使线程进入 waiting 状态
LockSupport.park(this);
return Thread.interrupted();
}

nonfairTryAcquire(acquires)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*
* 执行不公平的 tryLock。
* tryAcquire 是在子类中实现的,但是都需要对 trylock 方法进行不公平的尝试。
*/
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取 state 变量的值
int c = getState();
if (c == 0) { // 没有线程占用锁
if (compareAndSetState(0, acquires)) {
// 自旋一次,(compareAndSetState原子操作,替换)
// 占用锁成功,设置独占线程为当前线程
setExclusiveOwnerThread(current);
// 获取锁成功
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 当前线程已经占用该锁
// 重入次数 + acquires (acquires=1)
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 更新重入次数
setState(nextc);
// 获取锁成功
return true;
}
// 获取锁失败
return false;
}

小结:

  1. 调用自定义同步器的 tryAcquire() 尝试直接去获取资源,如果成功则直接返回;
  2. 没成功,则 addWaiter() 将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued() 使线程在等待队列中休息,有机会时(轮到自己,会被 unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回 true ,否则返回 false 。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断 selfInterrupt() ,将中断补上。

1612923550174

sync.release(1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* 以独占模式发布。
* 如果{@link tryRelease}返回true,则通过解锁一个或多个线程来实现。
* 此方法可用于实现方法{@link Lockunlock}。
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @param arg释放参数。
* 该值会传送到{@link tryRelease},但否则不会被解释,可以代表您喜欢的任何内容。
*
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 找到头结点
Node h = head;
if (h != null && h.waitStatus != 0)
// //唤醒等待队列里的下一个线程
unparkSuccessor(h);
return true;
}
return false;
}

tryRelease(arg)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/** 该方法不比考虑线程安全,
* 因为如果有其他线程修改 state 值后, state对该方法是可见的,
* 最多不唤醒下一个线程,而是由新的线程获取state的状态
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// c = 0 时,代表没有锁(可重入锁计数为0)
free = true;
setExclusiveOwnerThread(null);
}
// 设置 state 的状态
setState(c);
return free;
}

unparkSuccessor(h)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*
* 如果状态是否定的(即可能需要信号),请尝试清除以预期发出信号。
* 如果失败或等待线程更改状态,则可以。
*/
// node一般为当前线程所在的结点。
int ws = node.waitStatus;
if (ws < 0)
// 置零当前线程所在的结点状态,允许失败。
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*
* 释放线程保留在后续线程中,该线程通常只是下一个节点。
* 但是,如果已取消或明显为空,请从尾部向后移动以找到实际的未取消后继。
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

FairSync

tryAcquire(arg)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

hasQueuedPredecessors()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* Queries whether any threads have been waiting to acquire longer
* than the current thread.
* 查询是否有任何线程在等待获取比当前线程更长的时间。
*
* <p>An invocation of this method is equivalent to (but may be
* more efficient than):
* <pre> {@code
* getFirstQueuedThread() != Thread.currentThread() &&
* hasQueuedThreads()}</pre>
* 对此方法的调用等效于(但可能比它更有效):
* <pre> {@code getFirstQueuedThread()!= Thread.currentThread()&& hasQueuedThreads()} <pre>
*
* <p>Note that because cancellations due to interrupts and
* timeouts may occur at any time, a {@code true} return does not
* guarantee that some other thread will acquire before the current
* thread. Likewise, it is possible for another thread to win a
* race to enqueue after this method has returned {@code false},
* due to the queue being empty.
* 请注意,由于中断和超时引起的取消可能随时发生,
* 因此{@code true}返回值不能保证某些其他线程将在当前线程之前获取。
* 同样,由于队列为空,因此此方法返回{@code false}后,另一个线程也有可能赢得竞争。
*
* <p>This method is designed to be used by a fair synchronizer to
* avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>.
* Such a synchronizer's {@link #tryAcquire} method should return
* {@code false}, and its {@link #tryAcquireShared} method should
* return a negative value, if this method returns {@code true}
* (unless this is a reentrant acquire). For example, the {@code
* tryAcquire} method for a fair, reentrant, exclusive mode
* synchronizer might look like this:
* 方法设计为由公平同步器使用,以避免<a href="AbstractQueuedSynchronizerbarging">插入<a>。
* 此类同步器的{@link tryAcquire}方法应返回{@code false},并且如果此方法返回{@code true}(除非这是可重入获取),则其{@link tryAcquireShared}方法应返回负值。
* 例如,用于公平,可重入,互斥模式同步器的{@code tryAcquire}方法可能如下所示:
*
* <pre> {@code
* protected boolean tryAcquire(int arg) {
* if (isHeldExclusively()) {
* // A reentrant acquire; increment hold count
* return true;
* } else if (hasQueuedPredecessors()) {
* return false;
* } else {
* // try to acquire normally
* }
* }}</pre>
*
* @return {@code true} if there is a queued thread preceding the
* current thread, and {@code false} if the current thread
* is at the head of the queue or the queue is empty
* @since 1.7
*/
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// 正确性取决于 head 被初始化
// before tail and on head.next being accurate if the current
// thread is first in queue.
// 如果当前线程是第一个入队,则在tail和head.next之前是准确的。
Node t = tail; // Read fields in reverse initialization order
// 以相反的初始化顺序读取字段
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

参考:

https://www.cnblogs.com/waterystone/p/4920797.html


----------- 本文结束啦感谢您阅读 -----------