• lock指令的几个作用:
    • Lock 前缀指令可以通过对总线或者处理器内部缓存加锁,使得其他处理器无法读写该指令要访问的内存区域,因此能保存指令执行的原子性。
    • Lock 前缀指令将禁止该指令与之前和之后的读和写指令重排序。
    • Lock 前缀指令将会把写缓冲区中的所有数据立即刷新到主内存中(多核处理器中,每个 cpu 会通过嗅探总线上传播的数据来检查自己的缓存是否过期,当一个 cpu 刷新自己的数据到内存时,其它 cpu 会自动过期自己的缓存)。
  • 关于 锁总线 和 锁内部缓存 的解释 :
    • 早期的处理器只支持通过总线锁保证原子性。所谓总线锁就是使用处理器提供的一个 LOCK#信号,当一个处理器在总线上输出此信号时,其他处理器的请求将被阻塞住,那么该处理器可以独占使用共享内存。很显然,这会带来昂贵的开销。
    • 缓存锁定是改进后的方案。在同一时刻我们只需保证对某个内存地址的操作是原子性即可,但总线锁定把 CPU 和内存之间通信锁住了,这使得锁定期间,其他处理器不能操作其他内存地址的数据,所以总线锁定的开销比较大,最近的处理器在某些场合下使用缓存锁定代替总线锁定来进行优化。缓存锁定是指当两个 CPU 的缓存行同时指向一片内存区域时,如果 A CPU 希望对该内存区域进行修改并使用了缓存锁定,那么 B CPU 将无法访问自己缓存中相应的缓存行,自然也没法访问对应的内存区域,这样就 A CPU 就实现了独享内存。
    • 但是有两种情况下处理器不会使用缓存锁定。第一种情况是:当操作的数据不能被缓存在处理器内部,或操作的数据跨多个缓存行(cache line),则处理器会调用总线锁定。第二种情况是:有些处理器不支持缓存锁定。对于 Inter486 和奔腾处理器,就算锁定的内存区域在处理器的缓存行中也会调用总线锁定。
  • 总线风暴:
    • 在多处理架构中,所有处理器会共享一条总线,靠此总线连接主存,每个处理器核心都有自己的高速缓存,各核相对于 BUS 对称分布,这种结构称为“对称多处理器”即 SMP。当主存中的数据同时存在于多个处理器高速缓存的时候,某一个处理器的高速缓存中相应的数据更新之后,会通过总线使其它处理器的高速缓存中相应的数据失效,从而使其重新通过总线从主存中加载最新的数据,大家通过总线的来回通信称为“Cache 一致性流量”,因为总线被设计为固定的“通信能力”,如果 Cache 一致性流量过大,总线将成为瓶颈。

一个自旋锁的例子:

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;

public class BusyLock {

  private int count;
  // 自旋锁状态
  private AtomicInteger state;

  private BusyLock() {
      this.count = 0;
      this.state = new AtomicInteger(0);
  }

  /**
  * 利用 CAS 实现自旋锁
  */
  private void lock() {
      while (!state.compareAndSet(0, 1))
          ;
  }

  /**
  * 解锁
  */
  private void unlock() {
      state.set(0);
  }

  /**
  * 递增 count,使用自旋锁保护 count
  */
  public void incr() {
      lock();
      count = count + 1;
      unlock();
  }

  /**
  * 测试,开启 50 个线程递增 count,每个线程循环 10 万次。
  * 
  * @param warnUp
  * @throws Exception
  */
  public static void runTests(boolean warnUp) throws Exception {
      int threads = 50;
      int n = 100000;
      BusyLock bl = new BusyLock();
      Thread[] ts = new Thread[threads];
      long start = System.nanoTime();
      CyclicBarrier barrier = new CyclicBarrier(threads + 1);
      for (int i = 0; i < threads; i++) {
          ts[i] = new Thread(new Runnable() {

              @Override
              public void run() {
                  try {
                      barrier.await();
                      for (int j = 0; j < n; j++)
                          bl.incr();
                      barrier.await();
                  } catch (Exception e) {

                  }
              }
          });
          ts[i].start();
      }
      barrier.await();
      barrier.await();
      for (Thread t : ts) {
          t.join();
      }
      long duration = (System.nanoTime() - start) / 1000000;
      if (!warnUp)
          System.out.println("count= " + bl.count + ",cost:" + duration
                  + "ms");

  }

  public static void main(String[] args) throws Exception {
      // 测试,先 warm up
      runTests(true);
      // 实际测试
      runTests(false);
  }

}

问题所在:

当考虑到多核状态下的高速缓存的时候,如果对于锁的争用非常激烈,那么会有很多的线程在 while 上的 compareAndSet 原子操作上等待,保存 state 的缓存行就会在多个 CPU 之间『颠簸』,导致了不必要的总线争用。

为了减少总线争用,我们可以改进下 lock:

private void lock() {
  while (!state.compareAndSet(0, 1))
      //对于未能取得锁所有权的线程,在内层循环上等待
      //因为获取了 state 一份共享的高速缓存副本,
      //不会再进一步产生总线通信量
      while (state.get() == 1)
          ;
}

通过增加一个内循环 while (state.get() == 1),让没有获得锁的线程在 state 状态上等待,避免原子操作,可以减少总线争用。每个忙等待的 CPU 都将获得 state 状态的共享副本,以后的循环都将在共享副本上执行,不会再产生总线通信。

当占有锁的线程释放锁(原子地设置 state 为 0)的时候,这些共享的副本会失效(write-invalidate 策略,比如常见的 MESI 协议)或者被更新到最新状态(write-update 策略),这时候才会产生总线通信。

测试两个版本的 lock 方法,在我的机器上有比较明显的差异,没有内层循环的 lock 版本耗时在 6-8 秒,而增加了内层循环的 lock 耗时在 2.5 -5 秒。


volatile 为什么不能保证原子性

volatile 和 cas 都是基于 lock 前缀实现,但 volatile 却无法保证原子性这是因为:Lock 前缀只能保证缓存一致性,但不能保证寄存器中数据的一致性,如果指令在 lock 的缓存刷新生效之前把数据写入了寄存器,那么寄存器中的数据不会因此失效而是继续被使用,就好像数据库中的事务执行失败却没有回滚,原子性就被破坏了。以被 volatile 修饰的 i 作 i++为例,实际上分为 4 个步骤:

mov     0xc(%r10),%r8d ; 把 i 的值赋给寄存器
inc     %r8d      ; 寄存器的值+1
mov     %r8d,0xc(%r10) ; 把寄存器的值写回
lock addl  $0x0,(%rsp)  ; 内存屏障,禁止指令重排序,并同步所有缓存

如果两个线程 AB 同时把 i 读进自己的寄存器,此时 B 线程等待,A 线程继续工作,把 i++后放回内存。按照原子性的性质,此时 B 应该回滚,重新从内存中读取 i,但因为此时 i 已经拷贝到寄存器中,所以 B 线程会继续运行,原子性被破坏。   而 cas 没有这个问题,因为 cas 操作对应指令只有一个:

lock cmpxchg dword ptr [edx], ecx ;

该指令确保了直接从内存拿数据(ptr [edx]),然后放回内存这一系列操作都在 lock 状态下,所以是原子性的。   总结:volatile 之所以不是原子性的原因是 jvm 对 volatile 语义的实现只是在 volatile 写后面加一个内存屏障,而内存屏障前的操作不在 lock 状态下,这些操作可能会把数据放入寄存器从而导致无法有效同步;cas 能保证原子性是因为 cas 指令只有一个,这个指令从头到尾都是在 lock 状态下而且从内存到内存,所以它是原子性的。


进一步的思考:

自旋锁在这个例子中之所以性能不佳,原因就在于递增 +1 这个操作非常快,导致自旋锁版本的代码线程争抢锁非常激烈,但是任务却没有多大进展,空耗了大量 CPU。因此,自旋锁的优化有一些思路:

比如分散争抢时间

比如排队。


首先是退让(Back off),每个线程在争抢锁之前等待一小会,通常第一次不退让,但是之后就按照某个规则变化这个退让时间,我们修改下原来的 BusyLock 例子来演示这个优化,这里采用指数退让:

//最大退让时间 10 毫秒
 private static final int max_backoff_msg=10;

     /**
     * 利用 CAS 实现自旋锁
     */
    private void lock() {
        //其实退让时间 1 毫秒。
        int backoff=1;
        while (!state.compareAndSet(0, 1)){
            while (state.get() == 1)
                ;
            //不是第一次,退让一下
            if(backoff>1){
                try{
                    Thread.sleep(backoff);
                }catch(Exception InterruptedException){
                    Thread.currentThread().interrupt();
                }
            }
            //还没有超过最大退让时间,指数增加退让时间。
            if(backoff<max_backoff_msg){
                backoff*=2;
            }
        }
    }

通过引入一个退让机制,我们重新跑下原来的测试,整体执行时间下降到了 120~200 毫秒了

但是,这个实现依然有很多问题,首先没有处理可重入锁,也不支持 tryLock 超时、取消等;其次,用的 sleep 做退让,最小单位是毫秒,并且 sleep 精度无法保证。另外,没有任何公平性可言,等待最久的线程可能退让最多,新来的很可能先抢到锁等等。


排队机制:

我们可以利用 Java 给我们提供的 AQS 机制,事实上,ReentrantLock 是这么做的,synchronized 从轻量级锁升级到重量级锁也是这么做的。

先尝试用 CAS 获取锁,如果成功了,那就设置独占线程为自己,如果失败,进入 AQS 框架的 aquire 方法:

  public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

aquire 里同样先尝试下能不能获取,不行就去调用 addWaiteracquireQueued 去尝试排队,如果还是不行,就中断取消。

排队过程也是一个 lock-free 的循环过程。acquireQueued 仍然会去自旋尝试获取自旋锁,如果仍然继续失败,就调用 park 让出 CPU 不再参与调度,等锁被释放的时候被前驱节点的线程(释放锁的线程)唤醒再次参与争抢。最后,通过引入队列禁止抢占,也可以支持严格的公平锁了。