文章

Java 多线程与锁那些事

多线程

什么是线程

一个非常古老,但好多人都没有明确答案的问题:进程、CPU 线程、操作系统线程 有什么区别?

[进程]

进程是操作系统为程序开辟的一块独立的,隔离的内存空间。科班出身的同学应该了解过 Linux Process Address Space,对于一个程序来说,它可以视为自己独享全部内存。至于这些内存与实际物理内存的映射,是操作系统管理的。进程是操作系统资源分配的最小单位。

一个进程就像一个家庭,与外界是相对隔离的。

[线程]

一个进程可以同时做许多事情,这些就是线程。就像家庭中的多个人,他们平常互不影响,但可以相互通讯与协作,也可能会争夺/共享同一个资源(只有一个卫生间的痛😭)。

[CPU 线程]

常说的 8 核 16 线程,这里的线程指的是 CPU 在物理上可以同时做多少事。操作系统会把这些物理资源进一步拆分(大致是按照时间片的形式)来提高利用率。假设某系统一共有 100 个线程在运行,但物理 CPU 只可能执行 16 线程,那么操作系统就是在这 100 个线程之间快速切换,给人一种同时执行的假象。所以单核单线程 CPU 照样可以运行多线程程序。操作系统线程与系统线程没有之间关系。

那为什么 CPU 1 个核心可以运行 2 个线程?这其实是操作系统那套调度机制的硬件化,当然,具体来说是非常专业与高级的东西,这里也没有必要深究。

线程池

Java 中 Thread 对象基本与操作系统中的线程对应,追踪 Thread.start() 的源码可以看到最终调用的是 private native void start0(),后面就是与平台相关的实现了。

我们知道线程的创建相对比较昂贵,所以更希望把执行完的线程暂时缓存起来以便复用。退一步讲,即使不复用,也希望能够统一管理所创建的线程,避免资源泄漏或其他生命周期问题。所以通常不直接使用 Thread 类,而是创建一个线程池。

创建线程池很简单, new ThreadPoolExecutor() 就可以了。它有几个参数:

  • corePoolSize: 最小线程数量。线程池创建时就会创建这些线程,即使用不到或执行完也不会回收。
  • maximumPoolSize: 最大线程数量。达到最大限制后如果有新任务则只能排队,直到有线程执行完毕。
  • keepAliveTime: 回收额外线程前等待的时间。多余的线程执行完毕后可以配置为等一会再回收。
  • unit: 上一个参数的时间单位。
  • workQueue: 保存排队的任务的队列实现。

如果没有特别的定制需求,还可以使用 Executors 提供的静态便捷函数:

  • newCachedThreadPool(): 创建一个无最小线程,无最大限制的线程池。适用场景广泛。
  • newSingleThreadExecutor(): 永远有且只有一个线程的线程池。用于后台处理大量轻量任务。
  • newFixedThreadPool(): 永远有固定数量线程的线程池。同于一次性提交大量任务,通常提交完毕后立即调用 ExecutorService.shutDown() 关闭,不再接受新任务。

线程安全

volatile

来看一个例子:

public static void main(String[] args) throws InterruptedException {
  new Thread(() -> {
    while (!stop) {
    }
  }).start();

  Thread.sleep(1000);
  stop = true;
}

猛一看程序应该在大约 1 秒后退出,但实际上会一直执行下去。

Java 中只有除了守护线程之外的所有线程全部结束后程序才会结束。

虽然线程之间可以共享变量,但是 java 会把用到的变量拷贝一份到自己线程的内存区域来提高读写性能,如此一来线程内对共享变量的修改不会直接反映到原变量那里,而是需要时间把修改同步回去。同理,外部线程对变量的修改也不会反映到子线程,并且不会自动同步。所以子线程内的 stop 变量一直是最初的值,我们的程序不会结束。

解决方案是给对应变量加上 volatile 关键字:

private static volatile Boolean stop = false;

有了 volatile 关键字的变量每次修改都会通知其他线程的缓存失效,下一次使用时重新读取。

同步方法

有一个经典的例子:

private static volatile int x = 0;
private static void increase() {
  x++;
}

public static void main(String[] args) throws InterruptedException {
  new Thread(() -> {
    for (int i = 0; i < 10000; i++)
      increase();
    System.out.println("x=" + x);
  }).start();
  new Thread(() -> {
    for (int i = 0; i < 10000; i++)
      increase();
    System.out.println("x=" + x);
  }).start();
}

期望中应该有一个线程打印出 20000,但实际上很可能不是这样,并且每次执行打印的结果都不一样。

我们已经知道线程被操作系统调度,一个线程的执行随时可能被中断。而自增运算并不是一个原子操作,它实际上是三步操作:

  1. 读取 x 的值
  2. 加一
  3. 写回 x

那就有可能 A 线程读取 x 后被打断,B 线程完成自增操作后回到 A 线程。此时 A 不会重新读取,而是从打断的地方继续执行,从而覆盖了刚才 B 线程的结果。所以最终答案很可能小于预期。

一个解决方案是给 increase() 方法加上 synchronized 关键字使之变成同步方法,同步方法同时提供了原子性与可见性,此时 x 的 volatile 可以去掉了。

private static synchronized void increase() {
  x++;
}

从原理来讲,java 为同步方法创建一个监视器 (Monitor),也就是所谓的「锁」。对于同步方法来讲,这个锁就是对象自身。如果是静态方法,这个锁就是 Class。当尝试进入这个方法时:

  1. 若 monitor 计数器为 0,则允许进入,计数器 +1;
  2. 若计数器不为 0,但当前线程已获得这个锁,则允许进入,计数器 +1;
  3. 若计数器不为 0,且当前线程没有获得锁,则阻塞,直到计数器为 0 时重试进入。

退出同步方法时 monitor 计数器 -1,若结果是 0 则此线程不再持有这个锁。

上面的原理隐含了一个事实:一个对象的所有同步方法共享同一个监视器(锁)。

同步代码块

刚刚说到,所有同步方法共享同一个锁,这有什么问题呢?

说到线程同步,其实我们想保护的是数据,而不是方法本身。方法的每次调用在内存中都有自己的调用栈,多线程同时调用没有任何问题。如果两个方法都修改了共享的变量,自然都要加上锁。但如果它们修改的是不同的变量,那就没有理由不允许这个两个方法同时执行。使用同步方法这种语法限制太过严格,会降低执行效率。

同步代码块的写法允许我们手动指定 monitor,任何对象都可以作为 monitor,比如这样:

private static int age = 0;
private static String name;

private static final Object ageLock = new Object;
private static final Object nameLock = new Object();

private static void setAge(int newAge) {
  synchronized (ageLock){
    age = newAge;
  }
}

private static void setName(String newName) {
  synchronized (nameLock){
    name = newName;
  }
}

如此一来,setAge()setName() 就允许同时执行了,但它们俩各自只能同时被一个线程调用。

原子变量

对于简单的单变量多线程操作,除了笨重的同步代码块之外,可以直接使用 java 提供的原子版本的变量。对于基本类型,有对应的 AtomicXxxx 实现,否则可使用 AtomicReference<T> 泛型类。

比如下面的例子,即使 increase() 方法不加锁也能得到正确的结果:

private static final AtomicInteger x = new AtomicInteger();
private static  void increase() {
  x.incrementAndGet();
}

public static void main(String[] args) throws InterruptedException {
  new Thread(() -> {
    for (int i = 0; i < 10000; i++)
      increase();
    System.out.println("x=" + x.get());
  }).start();
  new Thread(() -> {
    for (int i = 0; i < 10000; i++)
      increase();
    System.out.println("x=" + x.get());
  }).start();
}

单例

说到单例,学过的人应该可以随手写出一个双重检查锁:

public class SingleMan {
    private static SingleMan instance = null; // 需要 volatile

    public static SingleMan getInstance() {
        if (instance == null) {
            synchronized (SingleMan.class) {
                if (instance == null)
                    instance = new SingleMan("Bob");
            }
        }
        return instance;
    }
}

这个实现还有问题:对象初始化不是一个原子操作,真实的对象创建分为三步:

  1. 分配内存
  2. 调用构造函数,完成初始化
  3. 赋值给变量

实际运行时 2/3 两步可能发生重排序。于是线程 A 先完成赋值,还没有初始化,此时线程 B 进行检查得到的就不是空,于是返回了一个还没有初始化的对象,例如这个对象的 name 可能不是 "Bob" 而是 null。

所以记得双重检查锁的实例变量要加 volatile,就能解决这个问题。因为 volatile 除了保证可见性之外还禁止指令重排序优化。

在 JDK5 之前 volatile 有 bug 无法禁止重排序。不过在 2022 时代,这样写已经足够安全了。

读写分离锁

我们已经知道,线程安全的本质是资源安全。同一个变量不能被多个线程同时修改,也不能一个线程修改,另一个线程读取。那多个线程同时读取呢?其实是可以的。只要不涉及写(更新)操作就不会出现线程安全问题。

但是用同步代码块的写法,为了保证不会同时读写,就必须把读和写的代码加上同一个锁,这样只读操作也被禁止了,发生了无意义的等待。为了解决这个问题,可以使用 Lock 对象手动加锁。

在我们这个用例中可使用 ReentrantReadWriteLock

private static int x = 0;
private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();

private static void setX(int newX) {
  writeLock.lock();
  try {
    x = newX;
  } finally {
    writeLock.unlock();
  }
}

private static int getX() {
  readLock.lock();
  try {
    return x;
  } finally {
    readLock.unlock();
  }
}

注意:手动加锁的解锁代码要写在 finally 块中,防止中间抛异常引发后续死锁。

ReentrantLock 是可重入锁,这意味着得到锁的线程可以再次获得锁而不会被阻塞。

各种锁

这里的各种锁其实不是锁的类型,而是一种并发控制思想。

悲观锁/乐观锁与 CAS

乐观锁假定不会有其他线程修改数据,也不会加锁,从而提高性能。在最后更新之前会检查较自己读取时数据是否发生了改变,如果改变了就不再写入,根据具体代码逻辑可能会重试或抛异常,以此来保证线程安全。乐观锁适合不太可能多线程修改同一数据的场景。

悲观锁假定会有其他线程来捣乱,所以在读取之前就加锁,直到写入完毕后才释放。这样只要获得了锁就一定可以成功更新数据,不过加锁本身也有性能损耗,所以悲观锁适合经常多线程操作数据的场景。

在 java 中,乐观锁的实现是 CAS (Compare And Swap) 算法,之前提到的原子类 AtomicInteger 等内部也是使用 CAS。所谓 CAS 算法,和上面乐观锁的描述的一模一样,就是只有在数据没有发生改变时才更新,这里「比较与更新」是一个整体的原子操作。

跟踪 AtomicInteger.incrementAndGet() 源码看到最终实现如下:

// Unsafe.java
public final int getAndAddInt(Object o, long offset, int delta) {
  int v;
  do {
    v = getIntVolatile(o, offset);
  } while (!weakCompareAndSetInt(o, offset, v, v + delta));
  return v;
}

offset 是真正的 int 值在 AtomicInteger 对象中的偏移。先取出当前 int 的值,完成计算并更新之前,比较一下内存里的值是否改变,只有没变才继续赋值(weakCompareAndSetInt 的原子操作),否则就继续尝试。

通过源码可以看到乐观锁有如下问题

  • 多次重试会占用 CPU,开销大。所以只适合不太可能出现线程冲突的场合。
  • 只保证一个变量操作的原子性。不能解决转账问题(减去一个变量加到另一个变量)。
  • ABA 问题。如果中间变量被多次更改,最后值恰好与开始的值一样,这种 CAS 算法是监测不出来的。不过通常也不需要检测出来,这个问题不会影响数据的准确性。要是确有需要可以使用 AtomicStampedReference 类,它的算法是额外维护一个类似版本号的东西,每次更新数据都自增,以此来监测数据的修改与否。

自旋锁

Java 从 jdk1.6 开始默认启用自旋锁,并引入适应性自旋锁。

自旋锁是竞争传统重量级锁的算法。所谓「自旋」就是执行无意义的循环代码来等待,而不挂起线程让出 CPU。等等,这不是浪费么?的确浪费,但线程切换本来也挺复杂。很多时候竞争不是很激烈,只要稍微重试几次就能得到锁,这种情况下自旋要比切换线程来得更高效。

自旋:

graph LR
Get[获取锁] --> If{结果}
If --成功--> 执行
If --失败--> Get

传统重量级锁:

graph LR
Get[获取锁] --> If{结果}
If --成功--> 执行
If --失败--> 挂起 --> 占用资源的线程释放了锁 --> Get

在 JDK 1.8 中,synchronized 的具体实现是:

  1. 首先用 CAS 算法,成功了就直接获得锁。
  2. 否则升级为重量级锁(使用操作系统的 Mutex Lock)
  3. 若重量级锁竞争失败,不立即挂起,而是自旋

至于自旋的次数,就得提到「适应性自旋锁」。为了防止设置自旋 10 次,偏偏 11 次才能获得锁的尴尬情况,适应性自旋锁会根据上一次自旋的次数来调整本次自旋的次数,尽可能降低开销。

也许有朋友发现了,自旋锁与乐观锁实现起来很像,都是循环重试。它们的区别是乐观锁本质是无锁,自旋锁是使用乐观锁的思想去竞争一个锁,它最后还是要获得重量级锁的,所以无法避免 Mutex 的开销。

偏向锁

上面提到,java synchronized 的实现是先使用 CAS 算法实现的轻量级锁尝试占有资源。这个锁不牵扯操作系统的线程切换,速度较快。虽然如此,如果一个同步代码块总是被同一个线程执行,即使是轻量级锁也是一种浪费,所以引入了 「偏向锁」

当一个线程获得锁就记录一下这个线程 ID,之后每次执行同步代码,先比较一下 ID 是不是相等,如果相等连 CAS 都免了,直接授权执行。轻量锁在进入和退出时都需要 CAS 原子操作,而偏向锁只需在进入时进行一次原子比较,性能更高。

线程等待

wait/notify

有这样一个场景:线程 A 在修改变量 name,线程 B 需要在 A 修改完后再继续执行来读取 name。其中一个实现方法就是借助 Object.wait()Object.notify()需要强调的是 wait/notify 都是 monitor 执行的操作,所以必须在同步代码块中调用对应的 monitor 的方法

private static String name;

private static synchronized void initName() {
  name = "chenhe";
  Main.class.notify(); // 唤醒
}

private static synchronized void printName() {
  while (name == null) {
    try {
      Main.class.wait(); // 等待。Main.class 是静态方法的默认 monitor
    } catch (InterruptedException ignored) {
    }
  }
  System.out.println(name);
}

public static void main(String[] args) {
  new Thread(() -> {
    try {
      Thread.sleep(2000);
    } catch (InterruptedException ignored) {
    }
    initName();
  }).start();
  new Thread(() -> {
    printName();
  }).start();
}
// 最终会在约 2 秒后输出 chenhe

wait 内部的原理是:

  1. 调用 wait() 的线程进入等待区,直到被唤醒。
  2. notify() 方法从等待区随机取出一个线程放入队列准备竞争锁。
  3. 被唤醒的线程竞争到锁后继续执行。

所以,有如下结论:

  • 有 wait 就必须有 notify,否则这个线程会一直等下去。
  • notify() 只能唤醒一个线程,所以更常用的方法是 notifyAll(),它会唤醒等待区的所有线程。

join

不难看出 wait/notify 比较松散,而 join 就是一个关系更紧密的等待。它要求拿到要等待线程的对象,调用 Thread.join() 后会阻塞当前线程,直到等待的线程结束。

上面的例子用 join 可以这么写:

private static String name;

private static synchronized void initName() {
  name = "chenhe";
}

private static synchronized void printName() {
  System.out.println(name);
}

public static void main(String[] args) {
  Thread initThread = new Thread(() -> {
    try {
      Thread.sleep(2000);
    } catch (InterruptedException ignored) {
    }
    initName();
  });
  initThread.start();
  new Thread(() -> {
    try {
      initThread.join(); // 等待初始化线程完成
    } catch (InterruptedException ignored) {
    }
    printName();
  }).start();
}

与 wait 不同,join 不需要唤醒,如果等待的线程没有启动就视为已经完成。所以 join 的代码相对好维护一点。

InterruptedException

Java 中和线程等待有关的方法都要求捕获 InterruptedException 异常,包括

  • Thread.sleep()
  • Thread.wait()
  • Thread.join()

最佳实践是,捕获到这个异常后应该终止任务,执行收尾工作,及时关闭线程。

它们的本质原因是一样的。要搞清这个,就到追溯到停止线程为什么都用 Thread.interrupt()Thread.stop() 被弃用。

stop() 粗暴地停止线程,无论线程内部状态如何。比如一个线程内部在处理转账问题,可能刚刚从 A 帐户扣款还没来得及增加 B 帐户的余额就被停止了,导致数据不一致问题。又或者线程内部打开了一个文件,没来及关闭就被干掉了,那么这个文件句柄就一直存在(别忘了进程才是操作系统分配资源的最小单位,线程结束系统是不会自动回收它打开的资源的)。

因此线程的结束必须是协作式的interrupt() 就是这么一个实现,它其实不会结束线程,而只是修改一个标记,线程内部需要自己在适当的时候检查这个标记,进行收尾操作,再退出。如果线程忽略标记呢?那自然就不会提前退出了。虽然这样有一些浪费资源,但至少可以保证我们程序的正确性。

一个合理的线程实现应该是这样的:

new Thread(){
  @Override
  public void run() {
    for (int i = 0; i < 5; i++) {
      if (isInterrupted()){
        // 收尾操作
        return; // 响应关闭请求,及时退出线程。
      }
      heavyProcess();
    }
  }
};

但是,interrupt() 不是在任何时候都只是修改一个标记。如果线程正处于等待状态,就可以放心地打断,因为这个等待不再有意义,打断也没有什么副作用——反正线程都要结束了。但还得给它个机会来处理后事,那么就抛一个异常,InterruptedException 由此就出现了。

注意,在等待时被打断不会修改 isInterrupted 标记,无法主动判断线程是否应该停止,只能用异常捕获。

很多时候我们偷懒没有处理这个异常,这和不主动检查 isInterrupted 性质一样,除了进行了一些无用的计算,通常不会导致程序出 bug。所以即使不处理程序也跑的好好的。

参考