【深入浅出多线程】无锁编程

【深入浅出多线程】无锁编程

目录一、并发相关概念二、并发下的原子操作三、并发的原子性问题三、并发控制策略四、锁带来的问题五、无锁编程实现六、性能比较

多线程编程中,锁是大家比较熟悉的概念,但对无锁编程则没有太多了解。无锁编程是指不用加锁的方式去解决原本需要加锁才能解决的问题,从而使程序具有更高的性能,降低硬件成本。我们从并发开始说起。

一、并发相关概念

并发数:服务器同时并行处理的请求数量。

QPS:每秒处理完成的请求数量,是衡量系统吞吐量的一种方式。例如:Tomcat接收了200个请求,但1秒内处理完毕的请求为20个,则QPS为20。

高并发:高并发是一个相对的概念,假设机器配置差劲(例如1核1G),如果在这个机器上部署一个项目,这时200个请求就已经到达该机器的上限了,那么这时面临的场景就是高并发的场景。处理高并发并不一定是并发量非常高,我们才会用到这样的技术。可能你们公司的并发量只有1000,15万的用户量,依然可能会用到高并发的技术,为什么——降低成本(用户少,公司没钱__)。

二、并发下的原子操作

Oricle官网上Java入门手册中关于原子操作(Atomic Variables)的定义如下:

In programming, an atomic action is one that effectively happens all at once. An atomic action cannot stop in the middle: it either happens completely, or it doesn't happen at all. No side effects of an atomic action are visible until the action is complete.

原子操作可以是一个步骤或多个操作步骤,要么完全发生,要么根本不发生,不能停在中间状态。将整个操作视作一个整体,资源在该次操作中保持一致,这是线程安全中原子性的核心特征。

三、并发的原子性问题

来看一个计数器的例子:

class Counter {

private int i = 0;

public void increment() {

i++;

}

public void decrement() {

i--;

}

public int value() {

return i;

}

}

我们为它配上启动器,启动器中开启两个线程,每个线程执行10000次increment()函数调用,所以执行完预期计数器结果为20000。

public class Starter {

public static void main(String[] args) throws InterruptedException {

var counter = new Counter();

for (int i = 0; i < 2; i++) {

new Thread(() -> {

for (int j = 0; j < 10000; j++) {

counter.increment();

}

}).start();

}

Thread.sleep(5000);

System.out.println(counter.value());

}

}

运行结果一般是不正确的。

这显然是一个不正确的结果,为什么不正确?——i++并没有保证原子性。我们可以通过分析JVM执行的字节码(byte code)来一探究竟(IDEA可以通过安装插件来实现,具体参考:IDEA下查看Java字节码插件):

0 aload_0

1 dup

2 getfield #2

5 iconst_1

6 iadd

7 putfield #2

10 return

可以看到字节码包括以下关键的三个步骤:

取值:2 getfield #2

相加: 6 iadd

放回:7 putfield #2

接下来结合多线程来进行分析为什么i++没有实现原子性。

做了两次计算,得出同样的值,很明显这不是我们期望的结果,不符合我们编写程序的期望——操作失败。这就是原子性问题的来源。

这里举一个计数器的应用——限流。限流的目的是超过一定数量的请求,不予操作。例如,有一个Controller方法:

@RestController

class XXXController {

final Counter counter = new Counter();

@GetController(...)

public void index() {

// 统计正在处理的请求数量

counter.increase();

try {

if (counter.value() > 3000) {

return;

}

// TODO: 很多业务逻辑,消耗CPU、内存,资源有限

} finally {

counter.decrease();

}

}

}

类似的限流在Redis、SpringCloud、Nginx、微服务、网关中都有,其最核心技术一定是计数器。

三、并发控制策略

并发导致了上述问题,我们需要控制并发,以达到我们想要的效果。并发控制有两种策略:悲观策略(Pessimistic concurrency control)和乐观策略(Optimistic concurrency control)。这是两种解决并发问题的思路。

悲观策略是一种悲观的思想,即认为写较多,遇到并发的可能性高,每次拿数据时,都会认为别人会修改数据,所以同一时间只允许一个线程执行。

乐观策略是一种乐观的思想,即认为读多写少,遇到并发的可能性低,每次拿数据时都认为别人不会修改,所以不会上锁。但是在更新的时候会判断判断值有没有发生变化,如果发生变化了,意味着其他线程已经更新,则此次计算更新失效,需要重新取值计算。

适用场景:

重试成本小;

并发数量小;

无锁编程要先了解有锁编程的问题。

四、锁带来的问题

按照悲观策略的思想,我们当然可以通过加锁的方式解决并发不一致问题,计数器例子的加锁版本代码如下:

class SynchronizedCounter {

private int i = 0;

public synchronized void increment() {

i++;

}

public synchronized void decrement() {

i--;

}

public synchronized int value() {

return i;

}

}

但是加锁存在一些问题:

我们知道,线程的数量一定的远远大于CPU数量的,如上图所示,CPU只有3个,而线程数则可能成百上千。如何把这么多的线程分配到有限的CPU上,这就是操作系统调度程序做的事情。JVM进程中的线程也是由操作系统中的调度器负责将其调度到指定的CPU上执行。

再来分析上面加锁版本的计数器。线程-1和线程-2都在抢锁,假设线程-1抢到了锁,线程2没有抢到锁,线程2变为阻塞状态。那么,下一次线程-2什么时候?可能会在1ms之后,也可能10ms之后执行。也就是说,由于操作系统调度存在,所以这是不确定的。线程-1执行完毕。

假设执行一次i++需要0.05ms,则可能会发生以下场景:

​ 第一次i++ 0.05ms

​ 线程切换 --- 10ms

​ 第二次i++ 0.05ms

这就是说,每发生一次枪锁失败导致的线程切换,都会增加一定的线程切换时间、等待时间,从而导致请求的处理时间变长,降低了系统的吞吐量。

五、无锁编程实现

通过分析计数器例子中计数器累加操作i++,我们发现:

读取值:没有问题——不会破坏数据的状态;

计算值:没有问题——不会破坏数据的状态;

赋值:会修改内存数据,导致数据结果不符合程序的原子性要求——会破坏数据的状态,即:存在并发安全问题

按照乐观并发控制策略,假设不存在多线程竞争,我们不进行加锁操作,那么第1步和第2步都没有问题。第3步,写结果到内存时,需要判断在第1步和第2步中,刚才的假设是否成立,如果成立,则直接将结果写入内存,否则刚才计算结果不正确,需要重新计算。也就是说,最后一步是无锁编程的核心,如何确保判断并根据结果写回内存这一操作具备原子性?

这需要硬件的支持。

最后一步判断并根据结果写回内存本质是操作内存中一个值,而内存正好提供了这么一种机制——CAS(Compare and swap)机制,属于硬件同步原语。该方法需要三个参数:内存地址、当前值A、新值B。

CAS的具体操作:先比较输入的当前值和内存中的值是否一致,不一致则代表内存数据发生了变化,CAS失败。如果输入的旧值A和内存中的值一致,则将值A替换为值B,CAS操作成功。

Java提供了一种直接操作内存的工具类sun.misc.Unsafe,顾名思义,这个类对于普通程序员来说是”危险“的,一般应用开发者不会用到这个类。在Unsafe类中就有一系列的compareAndSwapXXX()方法,例如,本文要用到的compareAndSwapInt函数:

@ForceInline

public final boolean compareAndSwapInt(Object o, long offset,

int expected,

int x) {

return theInternalUnsafe.compareAndSetInt(o, offset, expected, x);

}

这个函数的四个参数分别代表要操作的对象、对象属性相对该对象的偏移量、期望内存中的值以及计算后的值。参数1、3、4都很容易获取,第2个成员变量相对该对象的偏移量怎么获取?我们也可以通过这个工具类获取:

@ForceInline

public long objectFieldOffset(Field f) {

return theInternalUnsafe.objectFieldOffset(f);

}

最后我们要调用这些方法,我们首先要获取Unsafe的实例变量。然而,这个类的构造函数时私有的,应用程序也无法通过其静态成员方法getUnsafe()获取其实例。那么,怎么获取Unsafe的实例呢?这就要使用强大的反射机制了(反射就像一面镜子,我们往JVM中放了什么东西,我们就可以从JVM里获取到这些东西,也许这是反射名字的由来吧!)。

public static Unsafe getUnsafe() throws IllegalAccessException {

Field unsafeField = Unsafe.class.getDeclaredFields()[0];

unsafeField.setAccessible(true);

return (Unsafe) unsafeField.get(null);

}

最后,基于CAS机制的Counter实现如下:

public class CASCounter {

private int i = 0;

private static Unsafe unsafe; // Unsafe实例

private static long offset; // i相对CASCounter对象实例的偏移

static {

try {

// 通过反射机制获取Unsafe实例

var unsafeField = Unsafe.class.getDeclaredField("theUnsafe");

unsafeField.setAccessible(true);

unsafe = (Unsafe) unsafeField.get(null);

// 获取i相对CASCounter对象实例的偏移

Field fc = CASCounter.class.getDeclaredField("i");

offset = unsafe.objectFieldOffset(fc);

} catch (NoSuchFieldException | IllegalAccessException e) {

e.printStackTrace();

}

}

public void increment() {

while (true) {

int currentValue = i;

int newValue = currentValue + 1;

// 原子方式的比较写回操作,如果成功则返回,否则重新获取并计算(重试)

if (unsafe.compareAndSwapInt(this, offset, currentValue, newValue)) {

return;

}

}

}

public void decrement() {

while (true) {

int currentValue = i;

int newValue = currentValue - 1;

// 原子方式的比较写回操作,如果成功则返回,否则重新获取并计算(重试)

if (unsafe.compareAndSwapInt(this, offset, currentValue, newValue)) {

return;

}

}

}

public int value() {

return i;

}

}

六、性能比较

我们在启动器开启的线程中加入计时逻辑,如下:

for (int i = 0; i < 2; i++) {

new Thread(() -> {

var begin = System.nanoTime();

for (int j = 0; j < 10000; j++) {

counter.increment();

}

System.out.println(System.nanoTime() - begin);

}).start();

}

运行结果如下:

使用SynchronizeCounter的运行结果

使用CASCounter的运行结果

可以看到两者性能差距在10倍以上!

相关推荐

再到海上去:大航海与流行文化
365bet线上攻略

再到海上去:大航海与流行文化

📅 10-07 👁️ 3168
苹果手机怎么重启
国内在365投注

苹果手机怎么重启

📅 08-21 👁️ 6685
掐的笔顺
国内在365投注

掐的笔顺

📅 08-03 👁️ 3211