从自旋锁的优化到 AQS 的设计
- lock指令的几个作用:
- Lock 前缀指令可以通过对总线或者处理器内部缓存加锁,使得其他处理器无法读写该指令要访问的内存区域,因此能保存指令执行的原子性。
- Lock 前缀指令将禁止该指令与之前和之后的读和写指令重排序。
- Lock 前缀指令将会把写缓冲区中的所有数据立即刷新到主内存中(多核处理器中,每个 cpu 会通过嗅探总线上传播的数据来检查自己的缓存是否过期,当一个 cpu 刷新自己的数据到内存时,其它 cpu 会自动过期自己的缓存)。
- 关于 锁总线 和 锁内部缓存 的解释 :
- 早期的处理器只支持通过总线锁保证原子性。所谓总线锁就是使用处理器提供的一个 LOCK#信号,当一个处理器在总线上输出此信号时,其他处理器的请求将被阻塞住,那么该处理器可以独占使用共享内存。很显然,这会带来昂贵的开销。
- 缓存锁定是改进后的方案。在同一时刻我们只需保证对某个内存地址的操作是原子性即可,但总线锁定把 CPU 和内存之间通信锁住了,这使得锁定期间,其他处理器不能操作其他内存地址的数据,所以总线锁定的开销比较大,最近的处理器在某些场合下使用缓存锁定代替总线锁定来进行优化。缓存锁定是指当两个 CPU 的缓存行同时指向一片内存区域时,如果 A CPU 希望对该内存区域进行修改并使用了缓存锁定,那么 B CPU 将无法访问自己缓存中相应的缓存行,自然也没法访问对应的内存区域,这样就 A CPU 就实现了独享内存。
- 但是有两种情况下处理器不会使用缓存锁定。第一种情况是:当操作的数据不能被缓存在处理器内部,或操作的数据跨多个缓存行(cache line),则处理器会调用总线锁定。第二种情况是:有些处理器不支持缓存锁定。对于 Inter486 和奔腾处理器,就算锁定的内存区域在处理器的缓存行中也会调用总线锁定。
- 总线风暴:
- 在多处理架构中,所有处理器会共享一条总线,靠此总线连接主存,每个处理器核心都有自己的高速缓存,各核相对于 BUS 对称分布,这种结构称为“对称多处理器”即 SMP。当主存中的数据同时存在于多个处理器高速缓存的时候,某一个处理器的高速缓存中相应的数据更新之后,会通过总线使其它处理器的高速缓存中相应的数据失效,从而使其重新通过总线从主存中加载最新的数据,大家通过总线的来回通信称为“Cache 一致性流量”,因为总线被设计为固定的“通信能力”,如果 Cache 一致性流量过大,总线将成为瓶颈。
一个自旋锁的例子:
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
public class BusyLock {
private int count;
// 自旋锁状态
private AtomicInteger state;
private BusyLock() {
this.count = 0;
this.state = new AtomicInteger(0);
}
/**
* 利用 CAS 实现自旋锁
*/
private void lock() {
while (!state.compareAndSet(0, 1))
;
}
/**
* 解锁
*/
private void unlock() {
state.set(0);
}
/**
* 递增 count,使用自旋锁保护 count
*/
public void incr() {
lock();
count = count + 1;
unlock();
}
/**
* 测试,开启 50 个线程递增 count,每个线程循环 10 万次。
*
* @param warnUp
* @throws Exception
*/
public static void runTests(boolean warnUp) throws Exception {
int threads = 50;
int n = 100000;
BusyLock bl = new BusyLock();
Thread[] ts = new Thread[threads];
long start = System.nanoTime();
CyclicBarrier barrier = new CyclicBarrier(threads + 1);
for (int i = 0; i < threads; i++) {
ts[i] = new Thread(new Runnable() {
@Override
public void run() {
try {
barrier.await();
for (int j = 0; j < n; j++)
bl.incr();
barrier.await();
} catch (Exception e) {
}
}
});
ts[i].start();
}
barrier.await();
barrier.await();
for (Thread t : ts) {
t.join();
}
long duration = (System.nanoTime() - start) / 1000000;
if (!warnUp)
System.out.println("count= " + bl.count + ",cost:" + duration
+ "ms");
}
public static void main(String[] args) throws Exception {
// 测试,先 warm up
runTests(true);
// 实际测试
runTests(false);
}
}
问题所在:
当考虑到多核状态下的高速缓存的时候,如果对于锁的争用非常激烈,那么会有很多的线程在 while 上的 compareAndSet
原子操作上等待,保存 state 的缓存行就会在多个 CPU 之间『颠簸』,导致了不必要的总线争用。
为了减少总线争用,我们可以改进下 lock:
private void lock() {
while (!state.compareAndSet(0, 1))
//对于未能取得锁所有权的线程,在内层循环上等待
//因为获取了 state 一份共享的高速缓存副本,
//不会再进一步产生总线通信量
while (state.get() == 1)
;
}
通过增加一个内循环 while (state.get() == 1)
,让没有获得锁的线程在 state 状态上等待,避免原子操作,可以减少总线争用。每个忙等待的 CPU 都将获得 state 状态的共享副本,以后的循环都将在共享副本上执行,不会再产生总线通信。
当占有锁的线程释放锁(原子地设置 state 为 0)的时候,这些共享的副本会失效(write-invalidate 策略,比如常见的 MESI 协议)或者被更新到最新状态(write-update 策略),这时候才会产生总线通信。
测试两个版本的 lock
方法,在我的机器上有比较明显的差异,没有内层循环的 lock
版本耗时在 6-8 秒,而增加了内层循环的 lock
耗时在 2.5 -5 秒。
volatile 为什么不能保证原子性
volatile 和 cas 都是基于 lock 前缀实现,但 volatile 却无法保证原子性这是因为:Lock 前缀只能保证缓存一致性,但不能保证寄存器中数据的一致性,如果指令在 lock 的缓存刷新生效之前把数据写入了寄存器,那么寄存器中的数据不会因此失效而是继续被使用,就好像数据库中的事务执行失败却没有回滚,原子性就被破坏了。以被 volatile 修饰的 i 作 i++为例,实际上分为 4 个步骤:
mov 0xc(%r10),%r8d ; 把 i 的值赋给寄存器
inc %r8d ; 寄存器的值+1
mov %r8d,0xc(%r10) ; 把寄存器的值写回
lock addl $0x0,(%rsp) ; 内存屏障,禁止指令重排序,并同步所有缓存
如果两个线程 AB 同时把 i 读进自己的寄存器,此时 B 线程等待,A 线程继续工作,把 i++后放回内存。按照原子性的性质,此时 B 应该回滚,重新从内存中读取 i,但因为此时 i 已经拷贝到寄存器中,所以 B 线程会继续运行,原子性被破坏。 而 cas 没有这个问题,因为 cas 操作对应指令只有一个:
lock cmpxchg dword ptr [edx], ecx ;
该指令确保了直接从内存拿数据(ptr [edx]),然后放回内存这一系列操作都在 lock 状态下,所以是原子性的。 总结:volatile 之所以不是原子性的原因是 jvm 对 volatile 语义的实现只是在 volatile 写后面加一个内存屏障,而内存屏障前的操作不在 lock 状态下,这些操作可能会把数据放入寄存器从而导致无法有效同步;cas 能保证原子性是因为 cas 指令只有一个,这个指令从头到尾都是在 lock 状态下而且从内存到内存,所以它是原子性的。
进一步的思考:
自旋锁在这个例子中之所以性能不佳,原因就在于递增 +1 这个操作非常快,导致自旋锁版本的代码线程争抢锁非常激烈,但是任务却没有多大进展,空耗了大量 CPU。因此,自旋锁的优化有一些思路:
比如分散争抢时间
比如排队。
首先是退让(Back off),每个线程在争抢锁之前等待一小会,通常第一次不退让,但是之后就按照某个规则变化这个退让时间,我们修改下原来的 BusyLock 例子来演示这个优化,这里采用指数退让:
//最大退让时间 10 毫秒
private static final int max_backoff_msg=10;
/**
* 利用 CAS 实现自旋锁
*/
private void lock() {
//其实退让时间 1 毫秒。
int backoff=1;
while (!state.compareAndSet(0, 1)){
while (state.get() == 1)
;
//不是第一次,退让一下
if(backoff>1){
try{
Thread.sleep(backoff);
}catch(Exception InterruptedException){
Thread.currentThread().interrupt();
}
}
//还没有超过最大退让时间,指数增加退让时间。
if(backoff<max_backoff_msg){
backoff*=2;
}
}
}
通过引入一个退让机制,我们重新跑下原来的测试,整体执行时间下降到了 120~200 毫秒了
但是,这个实现依然有很多问题,首先没有处理可重入锁,也不支持 tryLock 超时、取消等;其次,用的 sleep 做退让,最小单位是毫秒,并且 sleep 精度无法保证。另外,没有任何公平性可言,等待最久的线程可能退让最多,新来的很可能先抢到锁等等。
排队机制:
我们可以利用 Java 给我们提供的 AQS 机制,事实上,ReentrantLock 是这么做的,synchronized 从轻量级锁升级到重量级锁也是这么做的。
先尝试用 CAS 获取锁,如果成功了,那就设置独占线程为自己,如果失败,进入 AQS 框架的 aquire 方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
aquire
里同样先尝试下能不能获取,不行就去调用 addWaiter
和 acquireQueued
去尝试排队,如果还是不行,就中断取消。
排队过程也是一个 lock-free 的循环过程。acquireQueued
仍然会去自旋尝试获取自旋锁,如果仍然继续失败,就调用 park
让出 CPU 不再参与调度,等锁被释放的时候被前驱节点的线程(释放锁的线程)唤醒再次参与争抢。最后,通过引入队列禁止抢占,也可以支持严格的公平锁了。