如无特殊说明,本系列中「协程」默认特指 Kotlin 协程的 JVM 实现。

什么是协程

好吧...这真是一个极度刁钻困难的问题。哪怕限定于 Kotlin@JVM 中,也难以回答。困难到两位 GDE bennyhuo 霍丙乾与扔物线朱凯竟给出了矛盾的答案。当然,只是字面描述上的矛盾,思想上还是一致的。

我目前的答案是:协程就是程序自己调度的线程:它有一个函数体作为协程体,由应用程序自己决定何时执行、在哪个线程执行。

当然,这个答案漏掉百出,但我认为至少它揭示或者说隐含了这几个要点:

  • 协程是一段代码,可以被执行、暂停(挂起)、恢复。
  • 协程的调度权在我们自己,而不是操作系统。
  • 所以我们(Kotlin)要自己实现协程调用状态的保存。

一个 Kotlin 协程

创建

使用标准库函数 createCoroutine() 可以创建一个协程,它有一个 Receiver 作为协程体,同时还需要一个 Continuation 类型的参数作为完成回调。(playground)

val coroutine = suspend {
  println("In coroutine")
  "OK" // 协程体的返回值,类型要和完成回调的类型对应。
}.createCoroutine(object : Continuation<String> {
  override val context: CoroutineContext
  get() = EmptyCoroutineContext

  override fun resumeWith(result: Result<String>) {
    println("Coroutine END: $result")
  }
})
coroutine.resume(Unit) // 启动这个协程

// 输出:
// In coroutine
// Coroutine END: Success(OK)

其中比较迷惑的是 Continuation,它不仅是完成回调的类型,同时也是 createCoroutine() 返回的类型。所以...它既是回调又是协程本身?事实上,它只是一个接口,协程本身恰好实现了而已,但不能反过来说它就是协程。

不要把 Continuation 与线程中的 Thread 类比。后者实打实地代表了一个线程,前者只是代表从上次挂起(或从头)开始的一段代码。非要对比的话,Continuation 只能算作线程中的一次「挂起-恢复」 。Continuation 是「继续」的意思,也侧面反映了它只是协程中某一段。

线程由操作系统调度,java 无需也无法给我们提供单一「挂起-恢复」的 API。但协程不一样,整个生命周期都是我们自己控制的,所以才出现了这种细粒度的 API。

Thread 同级别的东西是 Job,它是 Kotlin 基于协程低级 API 所封装的东西。

协程创建之后需要手动启动。鉴于这个这个操作非常常见,Kotlin 提供了另一个 API startCoroutine() 创建之后顺手启动了,用法和 createCoroutine() 完全一样。

Continuation

刚刚说过,Continuation 只是一个接口,并不是协程本身。跟踪源码发现 createCoroutine() 返回的实际是 SafeContinuation 对象,而这个东西也只是一个包装,一个不到 100 行的迷你类,目的是做一些额外的状态检查,防止外部调用者(也就是我们)不懂协程原理瞎搞,弄出什么奇怪的问题 🐶

作为对比,startCoroutine() 内部就没有做 Safe 包装,因为它不返回给外部,可以看出 Kotlin 的程序猿对自己十分自信 🤓

Safe 是包装,那包装了啥?通过断点调试看到其 delegate 属性是一个类型为 MainKt$main$coroutine$1 的鬼玩意。从字节码的命名看出这是一个匿名内部类。哈?哪来的内部类?这就是协程体(那个 suspend 匿名函数)编译后的结果啦,Kotlin 把它搞成了一个类好能被 JVM 识别。它的字节码如下:

final class MainKt$main$coroutine$1 extends kotlin/coroutines/jvm/internal/SuspendLambda implements kotlin/jvm/functions/Function1

继承了 SuspendLambda,并实现它的抽象函数 invokeSuspend(),把我们协程体编译进去咯。当然,除了我们的协程体之外,还插入了一个 switch 语句,本质是一个状态机,用于实现挂起恢复,不过这是后话了。

至于实现的 Function1 接口,定义如下:

public interface Function1<in P1, out R> : Function<R> {
    /** Invokes the function with the specified argument. */
    public operator fun invoke(p1: P1): R
}

即:这是一个有一个参数和一个返回值的函数。恩?哪来的参数?这就是 suspend 函数的魔法所在,Kotlin 编译器会给它自动加一个类型为 Continuation 的协程实例参数来满足协程的各种需求,后面再仔细研究。

带有 Receiver 的协程体

Kotlin 还有另一组 API 来创建(启动)协程:

public fun <R, T> (suspend R.() -> T).startCoroutine(
    receiver: R,
    completion: Continuation<T>
)

与之前唯一的不同是这个协程体带有 Receiver。但是 Kotlin 并没有声明带有 Receiver 的 Lambda 的语法,下面这种写法是错误的:

// 语法错误
suspend MyScope.{ /*do something*/ }

所以为了愉快地使用这个 API,可以把它封装一下:

fun <R, T> launchCoroutine(receiver: R, block: suspend R.() -> T) {
    block.startCoroutine(receiver, object : Continuation<T> {
        override val context: CoroutineContext
            get() = EmptyCoroutineContext

        override fun resumeWith(result: Result<T>) {
            println("Coroutine END: $result")
        }
    })
}

// 愉快地使用
launchCoroutine(MyScope()){
  println("In coroutine")
}

那么有什么意义?

  1. Receiver 的方法可以在协程体内直接调用了。
  2. 可以使用注解 @RestrictsSuspension 限制协程体只能调用 Receiver 里的挂起方法避免意料之外的挂起。特定场景下比较有用。

挂起版 main 函数

从 Kotlin 1.3 开始,可以直接把 main 声明为挂起函数。显然,我们应该认识到这肯定是 Kotlin 编译的花招,因为挂起函数是 Kotlin 的语法,JVM 无法识别这种东西。Kotlin 一定是帮我们生成了一个普通的 main 函数,然后在内部调用挂起版的 main。

看看字节码,证实我们的猜想:

  public static synthetic main([Ljava/lang/String;)V
    NEW MainKt$$$main
    DUP
    ALOAD 0
    INVOKESPECIAL MainKt$$$main.<init> ([Ljava/lang/String;)V
    INVOKESTATIC kotlin/coroutines/jvm/internal/RunSuspendKt.runSuspend (Lkotlin/jvm/functions/Function1;)V
    RETURN

L5 从真正的 main 把启动参数传递给了挂起版本的 main(被编译为一个 Lambda 对象),然后使用 runSuspend 启动它。后者源码如下:

internal fun runSuspend(block: suspend () -> Unit) {
    val run = RunSuspend()
    block.startCoroutine(run)
    run.await()
}

和前面我们自己写的一样,RunSuspend 是协程的完成回调,也在上面的源码里,作用是利用 java 的 wait/notify 机制让 main 等待协程执行完毕,避免程序提前结束。

协程初探

隐含的传参

上面已经提到,协程编译后会自动添加一个类型为协程实例的参数,这也是无参协程体编译后实现了 Function1 接口(拥有一个参数)的原因。为了更形象地理解这一点,可以尝试从 java 侧调用挂起函数。

// Main.kt
suspend fun suspendFunction(): Int {
    // 挂起函数中可通过 suspendCoroutine 拿到当前协程实例 (it)
    return suspendCoroutine { thread { it.resume(5) } }
}
// Test.java
public static void test() {
  Object result = MainKt.suspendFunction(new Continuation<>() {
    @NotNull
    @Override
    public CoroutineContext getContext() {
      return EmptyCoroutineContext.INSTANCE;
    }

    @Override
    public void resumeWith(@NotNull Object o) {
      // coroutine END
    }
  });
}

在 java 中清晰地看到,需要额外传递一个 Continuation 参数,其除了提供协程上下文之外,也承担完成回调的作用,这很像 java 传统的异步写法。那么它的直接返回值是个啥?这要分两种情况讨论:

  1. 若函数没有挂起直接返回了结果,那么返回值就是这个结果,可以强转。此时不回调 resumeWith()
  2. 若挂起了,那么返回的就是一个标记(实际是一个枚举值 CoroutineSingletons.COROUTINE_SUSPENDED),表明已经挂起,需要在回调中获取真正的结果。

那为什么回调的类型也是 Object?Kotlin 中应该是 Result<Int>,但是 Result 这个东西比较特殊,好像是 Kotlin 和 java 中间的一个东西,没有对 java 暴露。java 实际接收到的是其包装的值 —— 可能是 int 也可能是 Exception,取公共父类变成 Object 了。

PS: 没错,挂起函数不一定真的挂起,它只是有能力挂起。只有切换了调用栈帧才实打实地挂起。说人话就是,只有在挂起函数返回之后 Continuation.resume() 再被调用才挂起。

协程上下文

协程上下文本质是一个集合类,储存了协程有关的元数据,被保存在协程实例 Continuation 中。用法上类似于 Map,每一个数据都有一个 Key。不同点在于 Key 定义在数据内部,也就是说每一条数据在加入上下文集合之前就已经确定了自己的 Key。

一个典型的上下文 item 类型就是 CoroutineName

public data class CoroutineName(val name: String) : AbstractCoroutineContextElement(CoroutineName) {
    public companion object Key : CoroutineContext.Key<CoroutineName>
    override fun toString(): String = "CoroutineName($name)"
}

// Key 的定义
public interface Key<E : Element>

不难看出,方便起见,Key 被设计成一个泛型接口,它的实参一般就是 item 的类型。如此一来就不必维护一堆常量用来索引了。假如用 String 作 Key,可以想象,每一个上下文类型都得写一个常量,比如 CoroutineName.KEY = "COROUTINE_NAME" 非常多此一举。

既然上下文是个集合,那么使用自然也和集合类似。在挂起函数中使用 coroutineContext 可获得当前协程的上下文对象,使用 Map 语法取值 (playground):

suspend {
  println("In coroutine: " + (coroutineContext[CoroutineName] as CoroutineName).name)
}.startCoroutine(object : Continuation<Unit> {
  override val context: CoroutineContext
    get() = CoroutineName("MyCo") // 设置上下文数据

  override fun resumeWith(result: Result<Unit>) {
  }
})

// 输出:
// In coroutine: MyCo

不难得出结论,除了内置的上下文数据类型,我们可以任意定义自己的数据,把它设置给协程并在其中获取、使用。

同样类似 Map,CoroutineContext 重载了 + 运算符,默认实现中相同 Key 的数据会被覆盖。

拦截器

拦截器是实现 AOP 的关键组件,在许多框架中都有它的概念,比如 web 框架中拦截器通常可以一层层过滤 web 请求,再一层层处理响应结果,例如编码成 json 等等。在 Kotlin 协程中,拦截器作用类似,它用于拦截协程的恢复,执行一些操作(例如把它调度到其他线程运行)。

下面我们实现一个简单的拦截器,只打印一下 log:

val logInterceptor = object : ContinuationInterceptor {
  override val key: CoroutineContext.Key<*> = ContinuationInterceptor

  override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
    object : Continuation<T> {
      override val context: CoroutineContext = continuation.context

      override fun resumeWith(result: Result<T>) {
        println("Before resume: $result")
        continuation.resumeWith(result)
        println("After resume")
      }
    }
}

拦截器也是协程上下文的一个类型(实现 CoroutineContext.Element),它的核心方法 interceptContinuation() 接收要恢复的协程,把它包装成一个新的实例返回,在这个新协程中可以执行特定的操作发挥拦截器的价值。注意在新的协程中要调用传入协程的 resume 相关方法,否则就成了真正意义上的拦截器了 —— 协程的恢复被拦截,原协程无法继续运行。

既然拦截器也是一个上下文,那么使用方法自然也和上下文一样 (playground):

suspend {
  println("In coroutine: " + (coroutineContext[CoroutineName] as CoroutineName).name)
}.startCoroutine(object : Continuation<Unit> {
  override val context: CoroutineContext
      get() = CoroutineName("MyCo") + logInterceptor

  override fun resumeWith(result: Result<Unit>) {
  }
})

// 输出:
// Before resume: Success(kotlin.Unit)
// In coroutine: MyCo
// After resume

协程体中没有调用其他挂起函数,因此只在首次启动时触发一次恢复,之后就一直运行到结束了。

那么拦截器包装函数是怎样执行的? 它在创建协程的时候就执行了!也就是说协程创建出来,SafeContinuation 里面包裹的不再是协程体,而是经过拦截器包装的 Continuation。查看创建协程的代码,发现它二话不说就调了一下 Continuation.intercepted(),它的 JVM 实现在这里,最终调用的是 ContinuationImpl 的实现:

public fun intercepted(): Continuation<Any?> =
  intercepted
    ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
      .also { intercepted = it }

和我们自己写的手动使用协程上下文几乎一样:通过 Key 从 context 中读取拦截器,返回包装后的结果。若没有拦截器就返回自身。

总结

这一篇我们初步探索了协程的实现。估计不少小伙伴看完了之后和没看一样 —— 协程到底怎么用?难道还要手动创建 Continuation 回调?

非也。今天我们使用到的函数都属于 Kotlin 核心库(而不是 kotlinx),它们叫协程的基础设施。之所以那么讲就是因为它们压根就不是给开发者用的。这些 API 足够底层与灵活来用于构建上层 API。如果直接用于生产环境,那无异于用汇编写项目。

Last modification:November 26, 2022