playground

CAS

版本

CAS,全称CompareAndSwap(或CompareAndSet),即比较并交换,是并发编程中经常用到的一种技术,它可以实现对一个变量读取-修改-写回操作的原子性。

原理

我们经常在多线程环境下对一个字段进行自增操作,比如以下代码。

public class Test {

    private int number;

    public void inc() {
        number++;
    }
}

但是由于自增操作不是一个原子操作,它分为读取-修改-写回三个步骤,在多线程环境下就有可能导致这三个步骤互相交错的执行,从而导致结果不正确。比如有两个线程同时执行自增操作,我们期望的执行过程和结果是这样的。

但实际上它有可能是这样的。

由于线程1和线程2的代码交错执行,导致最后的结果不正确。

通常我们可以用synchronized和简单的解决这个问题。

public class Test {

    private int number;

    public synchronized void inc() {
        number++;
    }
}

但是synchronized关键字需要进行加锁操作,效率不高,因此JDK1.5引入了CAS,它不需要加锁,效率更高并且易于使用。

public class Test {

    private AtomicInteger number = new AtomicInteger();

    public void inc() {
        number.incrementAndGet();
    }
}

现在我们来看看它是如何实现的。

// java.util.concurrent.atomic.AtomicInteger#incrementAndGet
private static final long VALUE;

public final int incrementAndGet() {
    return U.getAndAddInt(this, VALUE, 1) + 1;
}

代码中的U是一个Unsafe类的实例,之所以叫Unsafe是因为它提供了一些比较底层的操作,比如可以直接读写内存地址。

private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();

代码中的VALUE是一个long型的静态常量,它的值是在静态块中被初始化的。

private static final long VALUE;

static {
    try {
        VALUE = U.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
    } catch (ReflectiveOperationException e) {
        throw new Error(e);
    }
}

方法jdk.internal.misc.Unsafe#objectFieldOffset用来获取一个字段相对于对象的起始内存地址的字节偏移量。这里的偏移量是一个相对值,因此对于同一个类型下的不同对象,同一个字段的偏移量是一样的。

下面我们进入jdk.internal.misc.Unsafe#getAndAddInt方法。

// jdk.internal.misc.Unsafe#getAndAddInt
public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        // jdk.internal.misc.Unsafe#getIntVolatile是一个本地方法,直接从主存中获取对象指定偏移量处的int值。
        // 本例中获取到的是Test对象中number字段的最新值。
        v = getIntVolatile(o, offset);
    } while (!weakCompareAndSetInt(o, offset, v, v + delta));
    return v;
}

public final boolean weakCompareAndSetInt(Object o, long offset,
                                          int expected,
                                          int x) {
    // 调用本地方法jdk.internal.misc.Unsafe#compareAndSetInt。
    return compareAndSetInt(o, offset, expected, x);
}

方法jdk.internal.misc.Unsafe#weakCompareAndSetInt又调用了一个本地方法jdk.internal.misc.Unsafe#compareAndSetInt

// jdk.internal.misc.Unsafe#compareAndSetInt
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSetInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) {
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *)index_oop_from_field_offset_long(p, offset);

  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
} UNSAFE_END

主要是通过Atomic::cmpxchg这个方法来进行比较和交换,该方法在不同硬件平台下有不同的实现,比如在Windows x86、Solaris x86、Linux ARM等平台上都有不同的实现。

这里我们只看下Linux在x86架构上的实现,你可以在这里找到源码。

inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value, cmpxchg_memory_order order) {
  // 判断是不是多处理器环境。
  int mp = os::is_MP();
  // __asm__表示在C++中嵌入汇编语言。
  // volatile表示禁止指令重排序。
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
}

#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "

C++中嵌入汇编语言的格式如下。

asm ( assembler template
      : output operands                  /* optional */
      : input operands                   /* optional */
      : list of clobbered registers      /* optional */
    );

cmpxchgl %1,(%3)其实就是cmpxchgl exchange_value,dest,它的原型是CMPXCHG <dest>,<src>cmpxchgl会比较eax寄存器中的值和dest的值,如果相等那么就把exchange_value赋值给dest,否则把dest的值赋值给eax寄存器。通过"a" (compare_value)可以知道,eax寄存器里的值就是compare_value,因此这里就会判断compare_valuedest的值是否相等,这其实就是汇编层面的CAS操作。

LOCK_IF_MP是C++中的宏,它的作用是,如果当前程序运行在一个多核环境中,那么就在cmpxchgl指令前加上lock指令。

那么为什么这里需要lock呢?这要从CPU的缓存说起。

现代的CPU通常会有多个核心,每个核心有独立的L1、L2缓存以及共享的L3缓存。当CPU访问内存时,它会首先查询缓存,如果缓存中存在那么就直接返回,否则从内存中读取并载入到缓存中,最后返回给CPU。

cmpxchgl指令为例,假设有2个核心同时执行,那么每个核心的缓存中都有和内存中dest值相同的副本dest',如果两次调用的compare_value也一样,那么对于上层来说这两个CAS操作都成功了,但是我们要的是只有一个能成功。

解决方法就是加上lock指令,它可以保证在多核心环境下让某个核心独占使用一部分共享内存。 在早期的CPU中是通过锁定总线的方式实现的。总线是CPU和其它组件进行数据传输的通道,当某个核心要锁定总线时,它会发出一个信号,总线仲裁器会以轮流的策略选择一个核心独占,一但某个核心锁定了总线,那么其余的核心就无法在总线上和内存之间进行数据传输,从而达到原子性的目的。这种做法效率较低,真正做到“一核有难,九核围观”。

现代处理器通过缓存一致性协议来实现同样的功能,比如X86使用基于MESI协议改进后的MESIF协议。其主要思路是每个核心都知道其它核心的缓存状态,当多个核心同时执行cmpxchgl指令时,它们都会试图去修改同一内存地址在各自缓存中的副本,这时它们都会向环形总线(Ring Bus)发送消息,让其它核心这部分的缓存失效。环形总线会进行仲裁,决定是哪个核心可以使其它核心的缓存失效,没有被选中的那个核心会让自己的缓存失效,并且读取被选中的核心修改后的值。

存在的问题

A-B-A问题

CAS操作需要先进行对比,然后进行交换,但是如果一个值从A变成B又立刻变回A那么就有可能存在问题。

通常的解决方法是增加一个版本号,每次值变化都要更新版本号,并且在对比时也要对比版本号。

Java中可以使用java.util.concurrent.atomic.AtomicStampedReference类解决。

CPU负载问题

在高并发环境下,由于CAS操作不断的自旋,可能会引起CPU负载过高。

参考

  1. 《Java CAS底层实现详解》
  2. 《Java CAS 原理剖析》
  3. 《Java CAS 原理分析》
  4. 《浅论Lock 与X86 Cache 一致性》
  5. 《理解 CPU Cache》
  6. 《JAVA中CAS-ABA的问题解决方案》