CAS 是什么?

简单点讲, compare and swap (CAS) 是原子操作的一种, 可用于多线程编程中实现不被打断的数据交换操作, 解决数据不一致的问题.

大致的过程 > 假设内存有个数值 Value, 旧的数值记录为 Old, 需要修改的值是 New > 1. 比较 Value 是否等于 Old (compare) > 1. 如果相等, New 写入 Value (swap, 如果内存数据没有被修改过) > 1. 返回操作是否成功

从上面简要的过程, 可以很简单的看出, CAS有个ABA问题. > 就是当一个值从A->B->A, 然后使用CAS操作, 发现跟记录的Old数值一样, 没有变化, 则会认为他没有改变, 因此会导致错误的结果.

从 Java 的 AtomicInteger.java 入手

源码 (Java 10)

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    /*
     * This class intended to be implemented using VarHandles, but there
     * are unresolved cyclic startup dependencies.
     */
    // 获取 Unsafe 的实例
    private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
    // AtomicInteger 内 value 在内存里面的偏移量
    private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");
    // 上面提到的 value
    private volatile int value;

    /* ignore **/

    public final int incrementAndGet() {
        return U.getAndAddInt(this, VALUE, 1) + 1;
    }
    /* ignore **/
}

可以看到, 主要是调用 getAndAddInt 方法.

@HotSpotIntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!weakCompareAndSetInt(o, offset, v, v + delta));
    return v;
}

跟到最后, 是调用一个native方法 public final native boolean compareAndSetInt(Object o, long offset,int expected,int x);

到此就看完了. (不深入jdk) 其实, 底层是依靠CPU(多核心)硬件层提供的lock信号保证原子性.

那么, 我们回到 This class intended to be implemented using VarHandles, but there are unresolved cyclic startup dependencies. 这句注释. 现在就用VarHandles实现 AtomicInteger. 因为我们自己实现的, 不存在cyclic startup dependencies

AtomicInt

简单只实(chao)现(xi)两个方法吧.

public class AtomicInt {

    private static final VarHandle VH_VALUE_FIELD;

    static {
        try {
            VH_VALUE_FIELD = MethodHandles.lookup().in(AtomicInt.class).findVarHandle(AtomicInt.class, "value", int.class);
        } catch (Exception e) {
            throw new Error(e);
        }
    }

    private volatile int value;

    public AtomicInt() {
    }

    public AtomicInt(int value) {
        this.value = value;
    }

    public final int get() {
        return value;
    }

    public final void set(int newValue) {
        value = newValue;
    }

    public final int getAndIncrement() {
        return (int) VH_VALUE_FIELD.getAndAdd(this, 1);
    }

    public final int incrementAndGet() {
        return (int) VH_VALUE_FIELD.getAndAdd(this, 1) + 1;
    }
}

简单的测试用例

class AtomicIntTest extends Specification {

    def "GetAndIncrement"() {
        setup:
        def atomicInt = new AtomicInt(0)

        expect:
        atomicInt.getAndIncrement() == 0

    }

    def "IncrementAndGet"() {
        setup:
        def atomicInt = new AtomicInt(0)

        expect:
        atomicInt.incrementAndGet() == 1
    }

    def "parallel IncrementAndGet"() {
        setup:
        def atomicInt = new AtomicInt(0)
        def threads = (0..<10).collect({
            new Thread({
                for (int i = 0; i < 100; i++) {
                    atomicInt.incrementAndGet()
                }
            })
        })

        expect:
        atomicInt.get() == 0
        threads.size() == 10

        when:
        threads.each { it.start() }
        threads.each { it.join() }

        then:
        atomicInt.get() == 1000
    }

    def "parallel simple int"() {
        setup:
        def simpleInt = 0
        def threads = (0..<10).collect({
            new Thread({
                for (int i = 0; i < 100; i++) {
                    simpleInt++
                }
            })
        })

        expect:
        simpleInt == 0
        threads.size() == 10

        when:
        threads.each { it.start() }
        threads.each { it.join() }

        then:
        simpleInt == 1000
    }
}