CAS,全称CompareAndSwap(或CompareAndSet),即比较并交换,是并发编程中经常用到的一种技术,它可以实现对一个变量读取-修改-写回
操作的原子性。
我们经常在多线程环境下对一个字段进行自增操作,比如以下代码。
public class Test {
private int number;
public void inc() {
number++;
}
}
但是由于自增操作不是一个原子操作,它分为读取-修改-写回
三个步骤,在多线程环境下就有可能导致这三个步骤互相交错的执行,从而导致结果不正确。比如有两个线程同时执行自增操作,我们期望的执行过程和结果是这样的。
但实际上它有可能是这样的。
由于线程1和线程2的代码交错执行,导致最后的结果不正确。
通常我们可以用synchronized
和简单的解决这个问题。
public class Test {
private int number;
public synchronized void inc() {
number++;
}
}
但是synchronized
关键字需要进行加锁操作,效率不高,因此JDK1.5引入了CAS,它不需要加锁,效率更高并且易于使用。
public class Test {
private AtomicInteger number = new AtomicInteger();
public void inc() {
number.incrementAndGet();
}
}
现在我们来看看它是如何实现的。
// java.util.concurrent.atomic.AtomicInteger#incrementAndGet
private static final long VALUE;
public final int incrementAndGet() {
return U.getAndAddInt(this, VALUE, 1) + 1;
}
代码中的U
是一个Unsafe
类的实例,之所以叫Unsafe
是因为它提供了一些比较底层的操作,比如可以直接读写内存地址。
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
代码中的VALUE
是一个long
型的静态常量,它的值是在静态块中被初始化的。
private static final long VALUE;
static {
try {
VALUE = U.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
方法jdk.internal.misc.Unsafe#objectFieldOffset
用来获取一个字段相对于对象的起始内存地址的字节偏移量。这里的偏移量是一个相对值,因此对于同一个类型下的不同对象,同一个字段的偏移量是一样的。
下面我们进入jdk.internal.misc.Unsafe#getAndAddInt
方法。
// jdk.internal.misc.Unsafe#getAndAddInt
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
// jdk.internal.misc.Unsafe#getIntVolatile是一个本地方法,直接从主存中获取对象指定偏移量处的int值。
// 本例中获取到的是Test对象中number字段的最新值。
v = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset, v, v + delta));
return v;
}
public final boolean weakCompareAndSetInt(Object o, long offset,
int expected,
int x) {
// 调用本地方法jdk.internal.misc.Unsafe#compareAndSetInt。
return compareAndSetInt(o, offset, expected, x);
}
方法jdk.internal.misc.Unsafe#weakCompareAndSetInt
又调用了一个本地方法jdk.internal.misc.Unsafe#compareAndSetInt
。
// jdk.internal.misc.Unsafe#compareAndSetInt
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSetInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) {
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *)index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
} UNSAFE_END
主要是通过Atomic::cmpxchg
这个方法来进行比较和交换,该方法在不同硬件平台下有不同的实现,比如在Windows x86、Solaris x86、Linux ARM等平台上都有不同的实现。
这里我们只看下Linux在x86架构上的实现,你可以在这里找到源码。
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value, cmpxchg_memory_order order) {
// 判断是不是多处理器环境。
int mp = os::is_MP();
// __asm__表示在C++中嵌入汇编语言。
// volatile表示禁止指令重排序。
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}
#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "
C++中嵌入汇编语言的格式如下。
asm ( assembler template
: output operands /* optional */
: input operands /* optional */
: list of clobbered registers /* optional */
);
cmpxchgl %1,(%3)
。%1
是指第2个操作数,%3
是指第4个输入操作数。操作数包括输出操作数和输入操作数,编号从0
开始,按照先输出操作数,后输入操作数的顺序进行编号。=a
,对应eax
寄存器。exchange_value
,第2个是compare_value
,第3个是dest
,第4个是mp
。r
代表任意寄存器,a
代表eax
寄存器。cc
参数表示编译器cmpxchgl
的执行将影响到标志寄存器;memory
参数是让编译器重新从内存中读取变量的值。cmpxchgl %1,(%3)
其实就是cmpxchgl exchange_value,dest
,它的原型是CMPXCHG <dest>,<src>
。cmpxchgl
会比较eax
寄存器中的值和dest
的值,如果相等那么就把exchange_value
赋值给dest
,否则把dest
的值赋值给eax
寄存器。通过"a" (compare_value)
可以知道,eax
寄存器里的值就是compare_value
,因此这里就会判断compare_value
和dest
的值是否相等,这其实就是汇编层面的CAS操作。
LOCK_IF_MP
是C++中的宏,它的作用是,如果当前程序运行在一个多核环境中,那么就在cmpxchgl
指令前加上lock
指令。
那么为什么这里需要lock
呢?这要从CPU的缓存说起。
现代的CPU通常会有多个核心,每个核心有独立的L1、L2缓存以及共享的L3缓存。当CPU访问内存时,它会首先查询缓存,如果缓存中存在那么就直接返回,否则从内存中读取并载入到缓存中,最后返回给CPU。
以cmpxchgl
指令为例,假设有2个核心同时执行,那么每个核心的缓存中都有和内存中dest
值相同的副本dest'
,如果两次调用的compare_value
也一样,那么对于上层来说这两个CAS操作都成功了,但是我们要的是只有一个能成功。
解决方法就是加上lock
指令,它可以保证在多核心环境下让某个核心独占使用一部分共享内存。
在早期的CPU中是通过锁定总线的方式实现的。总线是CPU和其它组件进行数据传输的通道,当某个核心要锁定总线时,它会发出一个信号,总线仲裁器会以轮流的策略选择一个核心独占,一但某个核心锁定了总线,那么其余的核心就无法在总线上和内存之间进行数据传输,从而达到原子性的目的。这种做法效率较低,真正做到“一核有难,九核围观”。
现代处理器通过缓存一致性协议来实现同样的功能,比如X86使用基于MESI
协议改进后的MESIF
协议。其主要思路是每个核心都知道其它核心的缓存状态,当多个核心同时执行cmpxchgl
指令时,它们都会试图去修改同一内存地址在各自缓存中的副本,这时它们都会向环形总线(Ring Bus)发送消息,让其它核心这部分的缓存失效。环形总线会进行仲裁,决定是哪个核心可以使其它核心的缓存失效,没有被选中的那个核心会让自己的缓存失效,并且读取被选中的核心修改后的值。
CAS操作需要先进行对比,然后进行交换,但是如果一个值从A变成B又立刻变回A那么就有可能存在问题。
通常的解决方法是增加一个版本号,每次值变化都要更新版本号,并且在对比时也要对比版本号。
Java中可以使用java.util.concurrent.atomic.AtomicStampedReference
类解决。
在高并发环境下,由于CAS操作不断的自旋,可能会引起CPU负载过高。