前文 Java 多线程与并发 - synchronized 中介绍了关键字 synchronized
来实现同步访问,在 Java 5 之后,java.util.concurrent.locks 包下提供了另外一种方式来实现同步访问,那就是 Lock
。
本文通过 Lock
的常用实现类 ReentrantLock
,介绍了 Lock
的核心概念、ReentrantLock
的核心机制以及其公平锁和非公平锁实现的原理。
Lock 核心概念
Lock
和 synchronized
都可以实现线程同步,Lock
提供的功能更多,能应对更复杂的场景,使用也更复杂。它们的功能对比如下:
功能 |
synchronized |
Lock |
加锁(等待) |
✅ |
✅ |
尝试加锁(不等待) |
❌ |
✅ |
尝试加锁(限时等待) |
❌ |
✅ |
让出锁(等待) |
✅ |
✅ |
唤醒 |
✅ |
✅ |
多条件 |
❌ |
✅ |
公平锁 |
❌ |
✅ |
释放锁 |
✅ |
✅ |
Lock 组成结构
Lock and Condition
Lock 锁
JUC 中的锁由 Lock
和 Condition
组成,Lock
负责加锁和释放锁,确保锁同一时刻只能被 一个线程持有。与 synchronized
类似,当 Lock
锁被一个线程占用时,其他获取该锁的线程会被阻塞,直到锁被释放。不同之处在于 synchronized
针对的是其后跟的对象,而 Lock
是锁对象本身。
Condition 条件
Lock
锁可以基于业务构建多个 Condition
,当线程获取锁之后有权操作 Condition
,没有获取锁时如果操作 Condition
会和 synchronized
代码块之外调用 Object.wait() 一样报 IllegalMonitorStateException
。
用一个例子来理解 Condition
:
假设有一个产品容器,5 个生产者线程负责生产产品到容器中,5 个消费者线程负责从容器中取走产品。生产者线程运行的的条件是 容器未满,消费者运行的条件是 容器非空,否则阻塞等待。
Condition sample 1
如果使用 synchronized
实现,当某一个线程获取锁之后,如果其他线程不满足运行条件,调用 Object.wait()
进入等待集,线程释放锁之后使用 Object.notify()
唤醒等待集中的线程。此时无法确定唤醒的是生产者线程还是消费者线程,因为他们都会放入同一个等待集中,Object.notify()
之后和等待获取锁的阻塞线程一起竞争锁。如果等待集中的线程获得了锁,还要继续检测是否满足其运行条件,不满足继续 wait。
Condition sample 2
上述可知,在这种场景下 synchronized
存在局限性。
Lock
最大不同是可以有多个条件,每个条件都有一个等待集,可唤醒指定等待集中的线程。在上例中当生产完成之后,指定唤醒消费者的条件队列,而当消费完成后指定唤醒生产者的条件队列。
示例代码如下:
public class LockTest { private final Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); private final Object[] items = new Object[20];
int putIndex; int takeIndex; int count;
public void put(Object o) throws InterruptedException { lock.lock(); try { while (count == items.length) { notFull.await(); } if (++putIndex == items.length) { putIndex = 0; } ++count; notEmpty.signal(); } finally { lock.unlock(); } }
public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) { notEmpty.await(); } final Object item = items[takeIndex]; if (++takeIndex == items.length) { takeIndex = 0; } --count; notFull.signal(); return item; } finally { lock.unlock(); } }
}
|
写个测试测试一下:
@Test public void test() throws InterruptedException { final LockTest lockTest = new LockTest(); final ExecutorService producer = Executors.newFixedThreadPool(5); final ExecutorService consumer = Executors.newFixedThreadPool(5); for (int i = 0; i < 25; i++) { String name = "put-" + i; producer.submit(() -> { try { System.out.println("生产:" + name); lockTest.put(name); } catch (InterruptedException e) { e.printStackTrace(); } }); } Thread.sleep(10); System.out.println("容器中有 20 个产品,生产者线程 5 个等待中"); for (int i = 0; i < 25; i++) { consumer.submit(() -> { try { System.out.printf("消费:%s\n", lockTest.take()); } catch (InterruptedException e) { e.printStackTrace(); } }); } Thread.sleep(10); System.out.println("容器中有 0 个产品,消费者线程 5 个等待中"); }
|
ReentrantLock 核心机制
核心机制
ReentrantLock
称为可重入锁,是指一个线程可以重复获取该锁,即调用 lock()
方法后,没有 unlock()
之前再次调用 lock()
可取锁。可重入机制可以防止方法递归时发生死锁现象。
死锁发生
假设我们自定义一个不可重用的 Lock
,下面的代码就会发生死锁
public void method() { lock.lock(); try { lock.lock(); } finally { lock.unlock(); } }
|
第一次调用 lock()
状态为占用,第二次获取锁时因为被占用,将会进入阻塞队列,导致永远无法调用 unlock()
解锁。
代码示例:
@Test public void test1() { final MyLock myLock = new MyLock(); lock.lock(); try { lock.lock(); } finally { lock.unlock(); } }
private static class MyLock extends AbstractQueuedSynchronizer { public void lock() { acquire(1); }
public void unlock() { release(1); }
@Override protected boolean tryAcquire(int arg) { return compareAndSetState(0, 1); }
@Override protected boolean tryRelease(int arg) { setState(0); return true; } }
|
ReentrantLock
可以解决这个问题,它在 lock()
会把当前线程设为锁的当前占有者,当前线程如果继续调用 lock()
即使是占有状态,也不会阻塞,而只是状态 +1,调用 unlock()
状态 -1。 通过调用 AQS 的 setExclusiveOwnerThread()
为设置锁的当前拥有者。
公平锁和非公平锁机制
ReentrantLock
不仅实现了重入锁机制,同时也实现了公平锁和非公平锁机制。
非公平锁
ReentrantLock
并发获取锁时,内部机制依赖 CAS 修改状态获取,并不能确定哪个线程能成功修改。这种情况下锁的获取与其调用 lock()
顺序无关,这就是非公平锁,但由于这种不用排队的竞争机制性能最高,所以默认情况下 ReentrantLock
是使用的公平锁。
公平锁
公平锁指严格按照顺序获取锁,后来者不能直接参与锁的竞争。其强制获取锁时先判断在其之前是否有在等待的锁。
下面的代码演示公平锁和非公平锁:
@Test public void test2() { final ReentrantLock lock = new ReentrantLock(true); ExecutorService executorService = Executors.newFixedThreadPool(3); for (int i = 0; i < 3; i++) { executorService.submit(() -> { for (int k = 0; k < 2; k++) { lock.lock(); System.out.printf("线程%s 获得锁\n", Thread.currentThread().getName()); lock.unlock(); } }); } }
|
公平锁打印结果严格按照调用顺序:
线程 pool-1-thread-1 获得锁
线程 pool-1-thread-2 获得锁
线程 pool-1-thread-3 获得锁
线程 pool-1-thread-1 获得锁
线程 pool-1-thread-2 获得锁
线程 pool-1-thread-3 获得锁
非公平锁结果如下,当线程 1 获得锁以后,其它线程将进入阻塞队列,而当线程 1 释放锁时线程 1 可以和其它线程真接竞争,因为其它线程还需要唤醒,所以线程 1 获得锁的顺在其他线程之前:
线程 pool-1-thread-1 获得锁
线程 pool-1-thread-1 获得锁
线程 pool-1-thread-3 获得锁
线程 pool-1-thread-3 获得锁
线程 pool-1-thread-2 获得锁
线程 pool-1-thread-2 获得锁
ReentrantLock 底层原理
ReentrantLock
底层基于 AQS 实现同步。基本原理是修改 AQS 中的状态值,等于 0 表示锁未占用,修改状态为 1 表示已经占用,如果同一线程重入加锁,状态值会累加。解锁后状态值为-1,为 0 时表示锁已释放。
基本结构
ReentrantLock
AbstractQueuedSynchronizer
:AQS 同步器(抽象类)
Sync
:ReentrantLock
中的内部抽象类,用于实现基础同步,继承自 AQS
NonfairSync
:非公平锁同步器实现,继承自 Sync
FairSync
:公平锁同步器实现,继承自 Sync
加锁流程
非公平锁
先看代码
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L;
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
|
接下来看 NonfairSync
中的 tryAcquire()
方法,即 nonfairTryAcquire(acquires)
。
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
|
总结
非公平锁加锁流程
公平锁
主要特性:
- 加锁时不会尝试修改,直接走
acquire()
流程
- 在
tryAcquire()
中修改状态前先判断自己是否处于队列头部
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L;
final void lock() { acquire(1); }
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
|
总结
公平锁加锁流程
释放锁
公平锁和非公平锁的释放锁相同,是 Sync
中的 tryRelease()
方法。
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
|