JAVA互斥锁(synchronized&Lock):行为分析及源码

JVM中有这样一段注释:

[code lang=”java”]
// The base-class, PlatformEvent, is platform-specific while the ParkEvent is
// platform-independent. PlatformEvent provides park(), unpark(), etc., and
// is abstract — that is, a PlatformEvent should never be instantiated except
// as part of a ParkEvent.
// Equivalently we could have defined a platform-independent base-class that
// exported Allocate(), Release(), etc. The platform-specific class would extend
// that base-class, adding park(), unpark(), etc.
//
// A word of caution: The JVM uses 2 very similar constructs:
// 1. ParkEvent are used for Java-level "monitor" synchronization.
// 2. Parkers are used by JSR166-JUC park-unpark.
//
// We’ll want to eventually merge these redundant facilities and use ParkEvent.
[/code]

其中是说ParkEvent用于Java语言级别的关键字synchronized。 Parkers用于Java类库中的并发数据集合,该集合是由JSR166发展来的。 这里说这两个东西功能类似,将来会统一使用ParkEvent。 那么它们究竟有什么区别呢? 我们先看看这两个类的大概接口样子: (ParkEvent)

[code lang=”java”]
class ParkEvent : public os::PlatformEvent {
private:
ParkEvent * FreeNext ;

// Current association
Thread * AssociatedWith ;
intptr_t RawThreadIdentity ; // LWPID etc
volatile int Incarnation ;

class PlatformEvent : public CHeapObj<mtInternal> {
// Use caution with reset() and fired() — they may require MEMBARs
void reset() { _Event = 0 ; }
int fired() { return _Event; }
void park () ;
void unpark () ;
int TryPark () ;
int park (jlong millis) ; // relative timed-wait only
[/code][code lang=”java”]
class Parker : public os::PlatformParker {
public:
// For simplicity of interface with Java, all forms of park (indefinite,
// relative, and absolute) are multiplexed into one call.
void park(bool isAbsolute, jlong time);
void unpark();

// Lifecycle operators
static Parker * Allocate (JavaThread * t) ;
static void Release (Parker * e) ;
private:
static Parker * volatile FreeList ;
static volatile int ListLock ;
[/code]

可以看到它们提供一致的相同接口,park和unpark。从而支撑Java中并发控制的功能。 它们究竟有什么不同呢?我们首先来执行2段类似的代码。 阻塞线程获取锁的顺序完全相反 首先是使用synchronized提供的锁机制,我们随便用一个Object lock = new Object()作为锁关联的对象,代码如下,它的功能是让10个线程进入阻塞状态,然后释放锁,观察随后线程获取锁的顺序: (可执行代码)

[code lang=”java”]
package com.psly.testLocks;

public class TestLockSynchronized {

private static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub
int N = 10;
Thread[] threads = new Thread[N];
for(int i = 0; i < N; ++i){
threads[i] = new Thread(new Runnable(){
public void run() {
synchronized(lock){
System.out.println(Thread.currentThread().getName() + " get synch lock!");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

});
}
synchronized(lock){
for(int i = 0; i < N; ++i){
threads[i].start();
Thread.sleep(200);
}
}

for(int i = 0; i < N; ++i)
threads[i].join();
}
}
[/code]

我们用一个0.2seconds的时间,从而让先创建的线程能够先进入阻塞状态,输出为:

[code lang=”java”]
Thread-9 get synch lock!
Thread-8 get synch lock!
Thread-7 get synch lock!
Thread-6 get synch lock!
Thread-5 get synch lock!
Thread-4 get synch lock!
Thread-3 get synch lock!
Thread-2 get synch lock!
Thread-1 get synch lock!
Thread-0 get synch lock!

[/code]

这有点奇怪,先尝试获取锁的线程竟然后获得锁! 先不管这个, 我们把这个例子改为JSR166的Lock重做一遍: (可执行代码)

[code lang=”java”]
package com.psly.testLocks;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TestLockSynchronized {

private static Lock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub
int N = 10;
Thread[] threads = new Thread[N];
for(int i = 0; i < N; ++i){
threads[i] = new Thread(new Runnable(){
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getName() + " get JSR166 lock!");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
lock.unlock();
}

});
}
lock.lock();
for(int i = 0; i < N; ++i){
threads[i].start();
Thread.sleep(200);
}
lock.unlock();

for(int i = 0; i < N; ++i)
threads[i].join();
}
}
[/code]

输出为:

[code lang=”java”]
Thread-0 get JSR166 lock!
Thread-1 get JSR166 lock!
Thread-2 get JSR166 lock!
Thread-3 get JSR166 lock!
Thread-4 get JSR166 lock!
Thread-5 get JSR166 lock!
Thread-6 get JSR166 lock!
Thread-7 get JSR166 lock!
Thread-8 get JSR166 lock!
Thread-9 get JSR166 lock!
[/code]

这个输出比较符合了我们的预期,毕竟先尝试获取锁的的确先获取了锁。 为什么这两种实现有这样的差异呢,我们来看下他们分别的阻塞队列实现,首先是JAVA的:

[code lang=”java”]
public void lock() {
sync.lock();
}

final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
[/code]

我们这里重点查看addWaiter(Node node);可以看出来,线程构造的阻塞节点是通过tail字段加入进队列的,并且作为next节点。这是个先进先出双向队列。 所以当锁被释放时,阻塞线程获取锁的顺序与进阻塞队列是一致的。 我们接着看下synchronized的实现,这里涉及到JVM中系统编程的源码,这里也只贴出跟进入阻塞队列相关的代码:

[code lang=”java”]
class ObjectMonitor;

class ObjectSynchronizer : AllStatic {
static void fast_enter (Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS);
static void slow_enter (Handle obj, BasicLock* lock, TRAPS);
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
if (UseBiasedLocking) {
if (!SafepointSynchronize::is_at_safepoint()) {
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
return;
}
} else {
assert(!attempt_rebias, "can not rebias toward VM thread");
BiasedLocking::revoke_at_safepoint(obj);
}
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}

slow_enter (obj, lock, THREAD) ;
}
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
markOop mark = obj->mark();
assert(!mark->has_bias_pattern(), "should not see bias pattern here");
…………….
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}
void ATTR ObjectMonitor::enter(TRAPS) {
// The following code is ordered to check the most common cases first
// and to reduce RTS->RTO cache line upgrades on SPARC and IA32 processors.
Thread * const Self = THREAD ;
void * cur ;
·········
for (;;) {
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition()
// or java_suspend_self()

EnterI (THREAD) ;

if (!ExitSuspendEquivalent(jt)) break ;

//
// We have acquired the contended monitor, but while we were
// waiting another thread suspended us. We don’t want to enter
// the monitor while suspended because that would surprise the
// thread that suspended us.
//
_recursions = 0 ;
······
}

void ATTR ObjectMonitor::EnterI (TRAPS) {
Thread * Self = THREAD ;
assert (Self->is_Java_thread(), "invariant") ;
assert (((JavaThread *) Self)->thread_state() == _thread_blocked , "invariant") ;

// Try the lock – TATAS
if (TryLock (Self) > 0) {
……
}

if (TrySpin (Self) > 0) {
……
}

ObjectWaiter node(Self) ;
Self->_ParkEvent->reset() ;
node._prev = (ObjectWaiter *) 0xBAD ;
node.TState = ObjectWaiter::TS_CXQ ;

// Push "Self" onto the front of the _cxq.
// Once on cxq/EntryList, Self stays on-queue until it acquires the lock.
// Note that spinning tends to reduce the rate at which threads
// enqueue and dequeue on EntryList|cxq.
ObjectWaiter * nxt ;
for (;;) {
node._next = nxt = _cxq ;
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;

// Interference – the CAS failed because _cxq changed. Just retry.
// As an optional optimization we retry the lock.
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
}
[/code]

这里的重点是,ObjectWaiter node(Self)

[code lang=”java”]
ObjectWaiter node(Self) ;
……..
for (;;) {
node._next = nxt = _cxq ;
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;

// Interference – the CAS failed because _cxq changed. Just retry.
// As an optional optimization we retry the lock.
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
}

[/code]

_cxq,我们采用比较并交换的原子指令,修改了_cxq,修改之前将_cxq的旧值填入node的next字段,这样一来我们就在_cxq上构造了个stack,也就是先进后出的队列。于是下次当我们索取_cxq时候自然就取得了最后填入的值。这解释了我们上面的执行示例,阻塞线程获取锁的顺序与进队列完全相反。 我们接着看下再复杂点的例子,依次启动10个线程,依次获取锁,获得锁的同时打印自身信息,然后主动调用wait语义的方法陷入阻塞状态。等到这10个线程都阻塞之后主线程获取锁,接着再启动10个无等待线程,这是个线程唯一做的事情就是依次获取锁,他们会按照我们上面所说的方式进入阻塞队列。接着主线程依次发送4次notify语义的信号(注意时间间隔),然后释放锁。我们感兴趣的是这几个收到通知的线程,他们相对已经在阻塞队列中的线程,谁会先获取锁?他们的排列又是怎么样的呢? 我们先执行JSR166的版本,代码如下: (可执行代码)

[code lang=”java”]
package com.psly.testLocks;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TestLockSynchronized {

private static Lock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
int N = 10;
Thread[] threads = new Thread[N];
Thread[] threadsForWaits = new Thread[N];
for(int i = 0; i < N; ++i){
threads[i] = new Thread(new Runnable(){
@Override
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getName() + " nowait get lock");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
lock.unlock();
}

});
}
for(int i = 0; i < N; ++i){
threadsForWaits[i] = new Thread(new Runnable(){
@Override
public void run() {
// TODO Auto-generated method stub
lock.lock(); //synchronized(lock){
System.out.println(Thread.currentThread().getName() + " wait first get lock");
try {
condition.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

System.out.println(Thread.currentThread().getName() + " wait second get lock");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
lock.unlock();
}

});
}

for(int i = 0; i < N; ++i){
threadsForWaits[i].start();
Thread.sleep(200);
}
lock.lock(); //synchronized(lock){
for(int i = 0; i < N; ++i){
threads[i].start();
Thread.sleep(200);
}
for(int i = 0; i < 4 ; ++i){
condition.signal();
}
Thread.sleep(200);
lock.unlock();

for(int i = 0; i < N; ++i)
threads[i].join();

for(int i = 0; i < N; ++i)
threadsForWaits[i].join();

}

}

[/code]

Thread-10到Thread-19为主动调用wait阻塞的线程,Thread-0到Thread-9为只获取锁的线程。 输出为:

[code lang=”java”]
Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-0 nowait get lock
Thread-1 nowait get lock
Thread-2 nowait get lock
Thread-3 nowait get lock
Thread-4 nowait get lock
Thread-5 nowait get lock
Thread-6 nowait get lock
Thread-7 nowait get lock
Thread-8 nowait get lock
Thread-9 nowait get lock
Thread-10 wait second get lock
Thread-11 wait second get lock
Thread-12 wait second get lock
Thread-13 wait second get

[/code]

可以看到JSR166的实现依然满足先进先出,即使Thread-10到Thread-13是先获取锁之后陷入wait的。我们接着看下这是如何做到的, 注意JSR166的实现是在JAVA层面完成的。 主要是三个调用:wait,notify,unlock。 await:

[code lang=”java”]
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
[/code]

这里的addConditionWaiter尝试添加等待队列的节点。acquireQueued用于将来被唤醒之后的再次尝试获取锁。 我们来看addConditionWaiter,

[code lang=”java”]

/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
[/code]

是将新节点作为lastWaiter的next节点,并且本身成为lastWaiter节点。那么这里说明这构造的是一个先进先出的队列。(这里是在已经获取锁的情况下,所以不需同步) 我们接着看 signal

[code lang=”java”]
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
[/code]

以上已经拿到了等待队列第一个节点,接着enq让他转移(transfer)到锁的阻塞队列

[code lang=”java”]
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
[/code]

这样一来我们就完成了将等待线程从处于wait的状态,转移到了未获得锁处于阻塞的状态。 最后看下当主线程释放锁时的操作: unlock

[code lang=”java”]
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
[/code]

可以看到,调用unlock的线程唤醒了阻塞队列head中的第一个线程。 (OVER) 因为阻塞队列跟等待队列都是先进先出,这样子能够得到一个比较好的行为。 从而导致了我们之前的输出,看上去比较符合预期。 最后我们来看看采用synchronized,这个示例下的输出是什么,代码如下: (可执行代码)

[code lang=”java”]
package com.psly.testLocks;

public class TestLockSynchronized {

private static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
int N = 10;
Thread[] threads = new Thread[N];
Thread[] threadsForWaits = new Thread[N];
for(int i = 0; i < N; ++i){
threads[i] = new Thread(new Runnable(){
@Override
public void run() {
synchronized(lock){
System.out.println(Thread.currentThread().getName() + " nowait get lock");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

});
}
for(int i = 0; i < N; ++i){
threadsForWaits[i] = new Thread(new Runnable(){
@Override
public void run() {
// TODO Auto-generated method stub
synchronized(lock){
System.out.println(Thread.currentThread().getName() + " wait first get lock");
try {
lock.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

System.out.println(Thread.currentThread().getName() + " wait second get lock");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

});
}

for(int i = 0; i < N; ++i){
threadsForWaits[i].start();
Thread.sleep(200);
}
synchronized(lock){
for(int i = 0; i < N; ++i){
threads[i].start();
Thread.sleep(200);
}
for(int i = 0; i < 4 ; ++i){
lock.notify();
}
Thread.sleep(200);
}

for(int i = 0; i < N; ++i)
threads[i].join();

for(int i = 0; i < N; ++i)
threadsForWaits[i].join();

}

}
[/code]

输出入下:

[code lang=”java”]

Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-10 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
Thread-9 nowait get lock
Thread-8 nowait get lock
Thread-7 nowait get lock
Thread-6 nowait get lock
Thread-5 nowait get lock
Thread-4 nowait get lock
Thread-3 nowait get lock
Thread-2 nowait get lock
Thread-1 nowait get lock
Thread-0 nowait get lock
[code lang="java"]

这个结果很奇怪,最奇怪在于居然是连续输出Thread-10,Thread-13,Thread-12,Thread-11:
[code lang="java"]
Thread-10 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
[/code]

我们调整一下发送notify的数量,给出所有等待线程数量的调用N。

[code lang=”java”]
for(int i = 0; i < N ; ++i){
lock.notify();
}
[/code]

输出为:

[code lang=”java”]
Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-10 wait second get lock
Thread-19 wait second get lock
Thread-18 wait second get lock
Thread-17 wait second get lock
Thread-16 wait second get lock
Thread-15 wait second get lock
Thread-14 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
Thread-9 nowait get lock
Thread-8 nowait get lock
Thread-7 nowait get lock
Thread-6 nowait get lock
Thread-5 nowait get lock
Thread-4 nowait get lock
Thread-3 nowait get lock
Thread-2 nowait get lock
Thread-1 nowait get lock
Thread-0 nowait get lock
[/code]

依然是Thread-10莫名其妙出现在最前,后面紧接着Thread-19到Thread-11倒序。 我们再尝试换下调用方式,采用notifyAll();

[code lang=”java”]
synchronized(lock){
for(int i = 0; i < N; ++i){
threads[i].start();
Thread.sleep(200);
}
lock.notifyAll();
Thread.sleep(200);
}
[/code]

输出为:

[code lang=”java”]
Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-19 wait second get lock
Thread-18 wait second get lock
Thread-17 wait second get lock
Thread-16 wait second get lock
Thread-15 wait second get lock
Thread-14 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
Thread-10 wait second get lock
Thread-9 nowait get lock
Thread-8 nowait get lock
Thread-7 nowait get lock
Thread-6 nowait get lock
Thread-5 nowait get lock
Thread-4 nowait get lock
Thread-3 nowait get lock
Thread-2 nowait get lock
Thread-1 nowait get lock
Thread-0 nowait get lock
[/code]

这下子又变了,Thread-10变为最后,完全逆序来获取锁了。 我们尝试进入JVM去看下这一切是怎么回事。与之前的过程类似,首先我们来看看等待之后线程节点如何组织的: 经研究,应该是下面这片代码: WAIT:

[code lang=”java”]
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
Thread * const Self = THREAD ;

TEVENT (Wait) ;

assert (Self->_Stalled == 0, "invariant") ;
Self->_Stalled = intptr_t(this) ;
jt->set_current_waiting_monitor(this);

ObjectWaiter node(Self);
node.TState = ObjectWaiter::TS_WAIT ;
Self->_ParkEvent->reset() ;
OrderAccess::fence(); // ST into Event; membar ; LD interrupted-flag

// Enter the waiting queue, which is a circular doubly linked list in this case
// but it could be a priority queue or any data structure.
// _WaitSetLock protects the wait queue. Normally the wait queue is accessed only
// by the the owner of the monitor *except* in the case where park()
// returns because of a timeout of interrupt. Contention is exceptionally rare
// so we use a simple spin-lock instead of a heavier-weight blocking lock.

Thread::SpinAcquire (&_WaitSetLock, "WaitSet – add") ;
AddWaiter (&node) ;
Thread::SpinRelease (&_WaitSetLock) ;
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
assert(node != NULL, "should not dequeue NULL node");
assert(node->_prev == NULL, "node already in list");
assert(node->_next == NULL, "node already in list");
// put node at end of queue (circular doubly linked list)
if (_WaitSet == NULL) {
_WaitSet = node;
node->_prev = node;
node->_next = node;
} else {
ObjectWaiter* head = _WaitSet ;
ObjectWaiter* tail = head->_prev;
assert(tail->_next == head, "invariant check");
tail->_next = node;
head->_prev = node;
node->_next = head;
node->_prev = tail;
}
}
[/code]

如果_WaitSet为空,则设置它,并且前驱和后继都是它。 如果只有_WaitSet一个,则将节点增加到它的后面。 不为空的情况下统一添加到next节点。 所以这里是个先进先出的队列。 notify

[code lang=”java”]
void ObjectMonitor::notify(TRAPS) {
CHECK_OWNER();
if (_WaitSet == NULL) {
TEVENT (Empty-Notify) ;
return ;
}
DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);

int Policy = Knob_MoveNotifyee ;

Thread::SpinAcquire (&_WaitSetLock, "WaitSet – notify") ;
ObjectWaiter * iterator = DequeueWaiter() ;
inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
// dequeue the very first waiter
ObjectWaiter* waiter = _WaitSet;
if (waiter) {
DequeueSpecificWaiter(waiter);
}
return waiter;
}

inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
// dequeue the very first waiter
ObjectWaiter* waiter = _WaitSet;
if (waiter) {
DequeueSpecificWaiter(waiter);
}
return waiter;
}
inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) {
assert(node != NULL, "should not dequeue NULL node");
assert(node->_prev != NULL, "node already removed from list");
assert(node->_next != NULL, "node already removed from list");
// when the waiter has woken up because of interrupt,
// timeout or other spurious wake-up, dequeue the
// waiter from waiting list
ObjectWaiter* next = node->_next;
if (next == node) {
assert(node->_prev == node, "invariant check");
_WaitSet = NULL;
} else {
ObjectWaiter* prev = node->_prev;
assert(prev->_next == node, "invariant check");
assert(next->_prev == node, "invariant check");
next->_prev = prev;
prev->_next = next;
if (_WaitSet == node) {
_WaitSet = next;
}
}
node->_next = NULL;
node->_prev = NULL;
}
static int Knob_MoveNotifyee = 2 ; // notify() – disposition of notifyee
if (Policy == 2) { // prepend to cxq
// prepend to cxq
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
}
} else
if (Policy == 3) { // append to cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Tail ;
Tail = _cxq ;
if (Tail == NULL) {

[/code]

这里的DequeueWaiter调用DequeueSpecificWaiter,效果是队列出一个元素,_WaitSet.next成为_WaitSet。这里有_EntryList、_cxq两个数据结构。 接着我们走Policy==2分支,注意这里并不是全部放入cxq(尽管注释如此),判断是_EntryList==NULL的时候,直接将我们的节点放入它。否则,将我们的节点添加到_cxq这个stack前面。想象一个,假如第一个节点进来,发现_EntryList为空,_EntryList设置为它自己。从第二个节点开始,所有节点都是进stack,这样的话是不是取出时,第二个往后的节点都颠倒了呢。假如我们取节点的方式是先驱_EntryList,然后再取stack中的元素。则就会发生示例中Thread-10提前的乱序情况。 但是注意,之前的notifyAll并没有产生这种效果。所以我们来看下notifyAll的代码:

[code lang=”java”]
if (Policy == 2) { // prepend to cxq
// prepend to cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
} else
[/code]

果然如此!notifyAll的逻辑跟notify大部分一样,除了它将所有节点都加入cxq。所以我们才会观察到notifyAll调用之后的节点获取锁顺序是逆序。 unlock 我们接着看看unlock的时候,是不是如我们猜测的那样先取_EntryList的元素,再来看cxq。

[code lang=”java”]
void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {
fast_exit (object, lock, THREAD) ;
}
void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
assert(!object->mark()->has_bias_pattern(), "should not see bias pattern here");
// if displaced header is null, the previous enter is recursive enter, no-op
………

ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ;
}
void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
……………….
for (;;) {
assert (THREAD == _owner, "invariant") ;

if (Knob_ExitPolicy == 0) {

………
} else {
……….
} else {
TEVENT (Inflated exit – complex egress) ;
}
}

guarantee (_owner == THREAD, "invariant") ;

ObjectWaiter * w = NULL ;
int QMode = Knob_QMode ;

w = _EntryList ;
if (w != NULL) {

assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}

w = _cxq ;
if (w == NULL) continue ;

for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}

if (QMode == 1) {
…………..
} else {
_EntryList = w ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
}

if (_succ != NULL) continue;

w = _EntryList ;
if (w != NULL) {
guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}
}
static int Knob_QMode = 0 ; // EntryList-cxq policy – queue discipline
static int Knob_ExitPolicy = 0 ;
[/code]

这里是先取_EntryList,假如有就调用ExitEpilog并返回,否则采用原子操作取_cxq,然后将这个值再次给_EntryList,并调用ExitEpilog。 总之这里最终都是将数据给_EntryList,只不过假如_EntryList原本就有值,那么我们会先使用它,之后再使用_cxq。 我们看下ExitEpilog完成了什么事:

[code lang=”java”]
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
assert (_owner == Self, "invariant") ;

// Exit protocol:
// 1. ST _succ = wakee
// 2. membar #loadstore|#storestore;
// 2. ST _owner = NULL
// 3. unpark(wakee)

_succ = Knob_SuccEnabled ? Wakee->_thread : NULL ;
ParkEvent * Trigger = Wakee->_event ;

Wakee = NULL ;

// Drop the lock
OrderAccess::release_store_ptr (&_owner, NULL) ;
OrderAccess::fence() ; // ST _owner vs LD in unpark()

if (SafepointSynchronize::do_call_back()) {
TEVENT (unpark before SAFEPOINT) ;
}

Trigger->unpark() ;

// Maintain stats and report events to JVMTI
if (ObjectMonitor::_sync_Parks != NULL) {
ObjectMonitor::_sync_Parks->inc() ;
}
}
[/code]

果然这里最后调用了unpark,从而唤醒了相应的那个线程。这里的_EntryList的值会如何变化?我们最后看下,当等待线程从wait中醒过来会做什么: // Note: a subset of changes to ObjectMonitor::wait() // will need to be replicated in complete_exit above

[code lang=”java”]
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
……….
ObjectWaiter node(Self);
node.TState = ObjectWaiter::TS_WAIT ;

………….
Thread::SpinAcquire (&_WaitSetLock, "WaitSet – add") ;
AddWaiter (&node) ;
Thread::SpinRelease (&_WaitSetLock) ;
…………
if (node.TState == ObjectWaiter::TS_WAIT) {
Thread::SpinAcquire (&_WaitSetLock, "WaitSet – unlink") ;
if (node.TState == ObjectWaiter::TS_WAIT) {
DequeueSpecificWaiter (&node) ; // unlink from WaitSet
assert(node._notified == 0, "invariant");
node.TState = ObjectWaiter::TS_RUN ;
}
…………..
assert (_owner != Self, "invariant") ;
ObjectWaiter::TStates v = node.TState ;
if (v == ObjectWaiter::TS_RUN) {
enter (Self) ;
} else {
[/code]

进入了enter(Self),

[code lang=”java”]
void ATTR ObjectMonitor::enter(TRAPS) {
for (;;) {
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition()
// or java_suspend_self()

EnterI (THREAD) ;

进入EnterI(THREAD),
void ATTR ObjectMonitor::EnterI (TRAPS) {
Thread * Self = THREAD ;
assert (Self->is_Java_thread(), "invariant") ;
assert (((JavaThread *) Self)->thread_state() == _thread_blocked , "invariant") ;

if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}

DeferredInitialize () ;

if (TrySpin (Self) > 0) {
assert (_owner == Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}

// The Spin failed — Enqueue and park the thread …
assert (_succ != Self , "invariant") ;
assert (_owner != Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;

ObjectWaiter node(Self) ;
Self->_ParkEvent->reset() ;
node._prev = (ObjectWaiter *) 0xBAD ;
node.TState = ObjectWaiter::TS_CXQ ;

ObjectWaiter * nxt ;
for (;;) {
node._next = nxt = _cxq ;
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;

// Interference – the CAS failed because _cxq changed. Just retry.
// As an optional optimization we retry the lock.
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
}

if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {
// Try to assume the role of responsible thread for the monitor.
// CONSIDER: ST vs CAS vs { if (Responsible==null) Responsible=Self }
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}

TEVENT (Inflated enter – Contention) ;
int nWakeups = 0 ;
int RecheckInterval = 1 ;

for (;;) {

if (TryLock (Self) > 0) break ;
assert (_owner != Self, "invariant") ;

if ((SyncFlags & 2) && _Responsible == NULL) {
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}

if (_Responsible == Self || (SyncFlags & 1)) {
TEVENT (Inflated enter – park TIMED) ;
Self->_ParkEvent->park ((jlong) RecheckInterval) ;
RecheckInterval *= 8 ;
if (RecheckInterval > 1000) RecheckInterval = 1000 ;
} else {
TEVENT (Inflated enter – park UNTIMED) ;
Self->_ParkEvent->park() ;
}

if (TryLock(Self) > 0) break ;

TEVENT (Inflated enter – Futile wakeup) ;
if (ObjectMonitor::_sync_FutileWakeups != NULL) {
ObjectMonitor::_sync_FutileWakeups->inc() ;
}
++ nWakeups ;

if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) {
Self->_ParkEvent->reset() ;
OrderAccess::fence() ;
}
if (_succ == Self) _succ = NULL ;

// Invariant: after clearing _succ a thread *must* retry _owner before parking.
OrderAccess::fence() ;
}

assert (_owner == Self , "invariant") ;
assert (object() != NULL , "invariant") ;
// I’d like to write:
// guarantee (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
// but as we’re at a safepoint that’s not safe.

UnlinkAfterAcquire (Self, &node) ;
if (_succ == Self) _succ = NULL ;

assert (_succ != Self, "invariant") ;
if (_Responsible == Self) {
_Responsible = NULL ;
OrderAccess::fence(); // Dekker pivot-point
………
[/code]

这里的重点是UnlinkAfterAcquire,

[code lang=”java”]
void ObjectMonitor::UnlinkAfterAcquire (Thread * Self, ObjectWaiter * SelfNode)
{
assert (_owner == Self, "invariant") ;
assert (SelfNode->_thread == Self, "invariant") ;

if (SelfNode->TState == ObjectWaiter::TS_ENTER) {
// Normal case: remove Self from the DLL EntryList .
// This is a constant-time operation.
ObjectWaiter * nxt = SelfNode->_next ;
ObjectWaiter * prv = SelfNode->_prev ;
if (nxt != NULL) nxt->_prev = prv ;
if (prv != NULL) prv->_next = nxt ;
if (SelfNode == _EntryList ) _EntryList = nxt ;
assert (nxt == NULL || nxt->TState == ObjectWaiter::TS_ENTER, "invariant") ;
assert (prv == NULL || prv->TState == ObjectWaiter::TS_ENTER, "invariant") ;
TEVENT (Unlink from EntryList) ;
} else {
[/code]

它会将_EntryList的值做更新,从而让锁的获取继续下去,保证不会出错。 到这里为止,我们终于大致走完了一遍synchronized锁与lock锁分别在JVM和JUC中的实现。 那么有个问题,linux中pthread锁的实现,行为模式又是怎么样的呢? 我们尝试将使用pthread来执行测试这两个例子: 锁获取代码: 编译生成执行文件testLock:gcc -pthread testLock.c -o testLock 执行:./testLock

[code lang=”java”]
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
#define N 10
void* runTask(void* pm){
pthread_mutex_lock(&mutex);
printf("%d get lock\n", (int)pm);
usleep(100000);
pthread_mutex_unlock(&mutex);
return 0;
}

int main(){
// int N = 10;
pthread_t threads[N];
int i = 0;
pthread_mutex_lock(&mutex);
for(i = 0; i < N; ++i){
pthread_create(&threads[i], 0, runTask, (void*)i);
usleep(100000);
}
pthread_mutex_unlock(&mutex);
for(i = 0; i < N; ++i){
pthread_join(threads[i], NULL);
}
return 0;

}
[/code]

输出:

[code lang=”java”]
0 get lock
1 get lock
2 get lock
3 get lock
4 get lock
5 get lock
6 get lock
7 get lock
8 get lock
9 get lock
[/code]

锁获取+阻塞等待代码:

[code lang=”java”]
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
#define N 10
void* runTask(void* pm){
pthread_mutex_lock(&mutex);
printf("%d get lock\n", (int)pm);
usleep(100000);
pthread_mutex_unlock(&mutex);
return 0;
}

void* runTaskWithWait(void* pm){
pthread_mutex_lock(&mutex);
printf("%d wait first get lock\n", (int)pm);
pthread_cond_wait(&cond, &mutex);
printf("%d wait second get lock\n", (int)pm);
usleep(300000);
pthread_mutex_unlock(&mutex);
}
int main(){
// int N = 10;
pthread_t threads[N];
pthread_t threadsForWaits[N];
int i = 0;
for(; i < N; ++i){
pthread_create(&threadsForWaits[i], 0, runTaskWithWait, (void*)i);
usleep(100000);
}

pthread_mutex_lock(&mutex);
for(i = 0; i < N; ++i){
pthread_create(&threads[i], 0, runTask, (void*)i);
usleep(100000);
}
//pthread_cond_broadcast(&cond);
for(i = 0; i < N; ++i)
pthread_cond_signal(&cond);
usleep(100000);
pthread_mutex_unlock(&mutex);
for(i = 0; i < N; ++i){
pthread_join(threads[i], NULL);
}
for(i = 0; i < N; ++i){
pthread_join(threadsForWaits[i], NULL);
}
return 0;

}
[/code]

输出:

[code lang=”java”]
0 wait first get lock
1 wait first get lock
2 wait first get lock
3 wait first get lock
4 wait first get lock
5 wait first get lock
6 wait first get lock
7 wait first get lock
8 wait first get lock
9 wait first get lock
0 get lock
1 get lock
2 get lock
3 get lock
4 get lock
5 get lock
6 get lock
7 get lock
8 get lock
9 get lock
1 wait second get lock
2 wait second get lock
3 wait second get lock
0 wait second get lock
4 wait second get lock
5 wait second get lock
6 wait second get lock
7 wait second get lock
8 wait second get lock
9 wait second get lock
[/code]

只能说等待线程转移到阻塞线程之后的排列,看起来是没啥规律 (=@__@=)

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: JAVA互斥锁(synchronized&Lock):行为分析及源码

  • Trackback 关闭
  • 评论 (0)
  1. 暂无评论

return top