.gist-file
.gist-data {max-height: 400px;max-width: width:100%;}

上一节我们已经了解的 Kotlin 协程的基础设施,同时也强调了这些不是面向开发者的,需要经过一些封装才适用于生产。诚然,一般情况下开发者不用学习这些,只需要知道 kotlinx 为我们封装好的框架如何使用就行了。

不过既然叫深入 Kotlin 协程,当然不能那么浮于表面。我们将尝试自己实现一套类似官方 kotlinx 的协程框架以加深对标准库协程的理解。这可是个大工程,为了降低入门难度,一开始先尝试实现一些其他语言中较为简单的协程 API。

⚠️ 警告:「协程」是很混乱的概念,不同语言不同平台的协程本质可能不一样。这里我们只谈「形式上」「风格上」的实现,而不关心本质。

⚠️ 警告:看上面

⚠️ 警告:看上面

Python 风格

Python 实现

传统的 Python 协程是使用 yield() 实现的生成器 (Generator),看一个例子 (playground):

def nums():
    i = 0
    while i < 3:
        yield(i)
        i += 1

gen = nums()
for i in gen:
    print(f"[{i}]")

[分析]

函数 nums() 看起来没有返回值,实际上类似于 Kotlin suspend,Python 解释器也有一些魔法操作:带有 yield() 调用的函数将返回 Generator。for-in 是一个缩写,等价于不断调用标准库函数 next(gen),请求下一个值。

sequenceDiagram
autonumber

loop 
forin->>nums: next(gen)
activate nums
nums->>forin: yield(i)
deactivate nums
end

需要明确,gen = nums() 执行时 nums() 不会真正执行(只创建了 Generator),首先 for-in 隐式调用了 next(gen) 此时调度权转移到 nums(),for-in 挂起。nums 执行到 yiled() 时调度权转移回去,nums 挂起,for-in 从 next(gen) 处恢复执行,并取得值。

Kotin 实现

先看效果:

val numsGen = generator<Int> {
  repeat(3){i->
    yield(i)
  }
}

for (i in numsGen) {
  println("[$i]")
}

经过上面的分析得知这类实现的核心是 Generator,Python 中由解释器自动生成,Kotlin 中就得自己写了。Generator 有以下要点:

  • 实现迭代器操作重载来支持相关便捷语法。
  • 「懒」的:消费者不请求就不生产数据。
  • Caller 通过 next() 请求数据,同时转移调度。
  • 有个 yield() 方法生产数据,同时转移调度。
  • 实现一个状态机,这也是协程的核心。

关于状态机,实际上有两个。一是 Kotlin 编译器维护的协程状态机,负责挂起恢复等。二是我们自己实现的状态机,仅用于 Generator 内部确保数据不混乱,主要因为 java 的迭代器有个 hasNext() 函数,这要求 Generator 生成但不消耗数据,从而在「生产-挂起」的简单模式中引入了第三个状态,暂且称之为 READY,把事情变的复杂起来。

(playground)

[分析]

因 kotlin(java) 的特性,Generator 只是一个壳,具体逻辑都在迭代器 GeneratorIterator 里。

我们创建了一个 GeneratorScope,它有两个作用:

  1. 提供 yield() 函数。
  2. 借助 @RestrictsSuspension 注解限制挂起函数调用,避免无意中弄乱状态。

与 Python 一样,默认只创建一个协程但不启动它。当需要新的值时(调用 hasNext()next())启动(或恢复)之。

yield() 实现了新值的保存并挂起自身(看一下 suspendCoroutine{} 的注释就明白了,它挂起了协程,只要内部不调用 continuation.resumt() 就一直保持挂起),至于把调度转回到 Caller 是 Kotlin 编译器实现的。

Generator 内部状态流转如下:

stateDiagram-v2
direction LR

[*] --> Not_Ready
Not_Ready --> Ready: yield(i)
Ready --> Not_Ready: next()
Not_Ready --> Done: resume
Done --> [*]
这里印证了一开始的说法,Continuation 并不是「协程」的代表,而只是「一段挂起-恢复」的代表,所以 yield() 函数中要保存新的 continuation,这样下一次才能从刚才中断(挂起)的地方继续执行。

官方实现

其实,Kotlin 已经有了一个 sequence,它和刚才我们自己实现的 Generator 很类似,并且得到了集合操作符的加持。

简单使用与我们的 Generator 是一样的,更典型的用例是对较大的集合进行操作,比如 filter, map, first 等,比 List 有更好的性能。不过这不是本文的重点,就不深入说了。

JS 风格

JS 实现

JS 采用的是目前比较流行的 Promise 模型,也就是 async/await。async 标记一个函数是异步的,异步函数返回的是 Promise 对象。await 用于等待一个异步完成,在此之前保持阻塞,它是把异步代码形式上同步化的关键。await 会阻塞,但不必担心,因为它只能在 await 函数中被调用,所以不会阻塞“主线程”。

例子:

// javascript
function heavyJob() {
  return new Promise((resolve) => {
    setTimeout(function () {
      resolve("slow");
    }, 2000);
  });
}

async function asyncCall(){
    result = await heavyJob()
    console.log(result)
}

asyncCall()
console.log("first")

这个例子会先输出 "first",大概 2 秒后再输出 "slow"。由此可见没有阻塞“主线程”。

[分析]

async/await 可以看作是 Promise 的语法糖。而 Promise 简单理解是对 Callback 的封装,主要目的是解决回调地狱问题。需要强调的是,传统来讲,js 对于开发者来说是单线程的,Promise 没有开启新线程的能力,验证方法是在 Promise 内执行死循环会卡死程序。 但广义的 js 运行时的的确确有多个线程来承载特定的任务,比如上面的 setTimeout 以及 http 请求等,实际上由其他线程承载,通过消息机制在合适的时候回调到主线程(也就是 js 执行线程)。

私以为,Promise 不能算协程,它只是一套 API 来帮助更好地书写异步程序。这也是我们的目的 —— 我们不是创造协程,而是把 Kotlin 的协程封装为易用的 API。

Kotlin 实现

效果:

// kotlin
fun heavyJob(): Promise<String> {
    return Promise { callback ->
        thread {
            Thread.sleep(2000)
            callback("slow")
        }
    }
}

fun asyncCall() {
    async {
        val r = await { heavyJob() }
        println(r)
    }
}

fun main() {
    asyncCall()
    println("first")
}

这段代码和 JS 版本的一样,先输出 "first",约 2 秒后输出 "slow"。

模仿 JS,我们造了一套基于 Promise 的 API,然后又把它封装成了 async/await 的风格。源码如下:

[分析]

首先是简单的 Promise:它包装了执行体并提供一个 then(callback) 方法。Promise 也是「懒」的,只有 then 被调用才真正执行任务,并通过传入的回调传出结果。Promise 本身不具备线程切换能力。

heavyJob() 创建了一个 Promise,其内部启动一个线程并延时,模拟耗时的任务。它等价于 js 中的 setTimeout(),只不过前者是我们显式切换了协程,后者是 js 内部隐式切了线程。

async() 启动了一个自定义作用域的协程,这个作用域拥有 await() 挂起函数。await() 会挂起当前协程(async 创建的),等待复杂的任务回调后再恢复。因此等待的过程不会阻塞 async 外其他代码的执行。

也许有同学好奇,明明讲协程,咋又创建了一个线程呢? 线程和协程并不冲突。事实上在真实工程中(例如 Android App)大部分的协程会被调度到后台线程上执行。协程没有魔法,一个任务一定需要一个线程承载,如果这个任务持续等待或占用 CPU 那么就会阻塞线程。当线程被阻塞,那么这个线程上的所有协程都不能执行,比如这个例子:

fun main() {
    GlobalScope.launch(Dispatchers.Unconfined) {
        Thread.sleep(2000)
        println("coroutine ok")
    }
    println("main ok")
}

虽然 sleep 发生在协程中,但它阻塞了线程,而我们又强制这个协程在主线程上调度。于是 "main ok" 无法执行,被迫等待 2 秒。而我们日常大部分数据操作都会阻塞线程,或 IO 阻塞或 CPU 跑满阻塞,因此这里新开一个线程去跑 heavyJob 符合实际情况。

对于「阻塞」「非阻塞」有疑问的同学,可以看看扔物线的视频「到底什么是「非阻塞式」挂起?协程真的比线程更轻量级吗?」

那协程体现在哪? 体现在我们构造了一套 API 让主线程可以非阻塞地等待后台线程的完成,并且(形式上)无需回调。

我们实现的 async/await 与 suspend 有啥区别? 没有区别。suspend 本来就扮演了 async/await 的角色。我们的这套设计属于脱裤子放屁,没有任何实用价值。这不是为了加深对 Kotlin 协程基础设施的理解才写的么 🐶

Last modification:November 23, 2022