使用协程解决实际问题

前两章重点研究了协程如何简化代码,在 Android 中提供主线程安全,以及如何避免协程泄露。在此基础上,协程是一个在 Android 中进行后台处理以及以及简化回调的优秀方案。

到目前为止,我们主要关注的是什么是协程以及如何管理它们。在这篇文章中,我们将看看如何使用它们来完成实战任务。协程是一个通用的语言特性,与函数在同一个级别——你可以用它实现任何可以用函数或对象来实现的东西。当然,实际工作中有两种情况是非常适合使用协程解决的:

  1. 一次性的请求。一旦得到结果那个这次请求就结束了。
  2. 流式请求。它持续地观察数据改变并通知调用者——即使得到结果也会持续执行。

这两种任务都非常适合协程。这篇文章中,我们将深入研究一次性请求,并探索如何在 Android 中用协程实现它们。

一次性请求

一次性请求当被调用时执行,当得到结果时返回并结束。就像普通的函数那样——调用它,执行一些工作,然后返回。由于与函数调用的相似性,它比流式请求更容易理解。

一次性请求当被调用时执行,得到结果时结束。

A one shot request is performed each time it’s called. It stops executing as soon as a result is ready.

举个例子,想想浏览器是如何加载网页的。当你点击这篇文章的链接,浏览器会向服务器发送一个请求来加载页面。一旦页面成功传输到浏览器,便会停止与服务器的通讯——浏览器已经得到了所有它需要的数据。如果服务器修改了文章,除非你刷新网页,否则浏览器不会及时修改。

尽管这缺少流式请求的实时推送特性,但一次性请求还是非常强大。一个应用的许多功能都可以通过一次性请求来实现,比如获取、保存、更新数据。对于排序列表之类的事情,它也是一种很好的模式。

需求:显示一个排序的列表

让我们通过探索如何显示排序列表来研究一次性请求。为了更加具体,让我们构建一个库存管理应用,供商店的员工使用。它可以根据最后一次进货的时间查找商品——他们希望能够对列表进行升序和降序排序。但是商品太多了,排序需要几乎1秒才能完成——所以我们使用协程来避免阻塞主线程!

在这个应用中,所有的商品都存储在一个数据库中,使用了 Room 框架。这是一个很好的用例,因为它不需要涉及网络请求,我们可以专注于一次性请求模式本身,麻雀虽小,五脏俱全。

要实现此功能,你需要将协程引入 ViewModel, Repository 以及 Dao。让我们逐个看看怎么与协程进行结合。

class ProductsViewModel(val productsRepository: ProductsRepository): ViewModel() {
   private val _sortedProducts = MutableLiveData<List<ProductListing>>()
   val sortedProducts: LiveData<List<ProductListing>> = _sortedProducts

   /**
    * 当用户点击排序按钮时调用这些函数。
    */
   fun onSortAscending() = sortPricesBy(ascending = true)
   fun onSortDescending() = sortPricesBy(ascending = false)

   private fun sortPricesBy(ascending: Boolean) {
       viewModelScope.launch {
           // suspend and resume make this database request main-safe
           // so our ViewModel doesn't need to worry about threading
           _sortedProducts.value =
                   productsRepository.loadSortedProducts(ascending)
       }
   }
}

ProductsViewModel 负责接收 UI 层的事件,然后请求 repository 更新数据。它使用 LiveData 保存一个已排序的列表供 UI 显示。但用户操作时,sortProductsBy 启动一个新的协程来排序,并在完成后更新 LiveData。在这个框架中,ViewModel 通常是启动协程的好地方,因为它可以在 onCleared 时自动取消协程。如果用户离开了界面,那么这些排序工作通常是多余的。

如果你不经常使用 LiveData,可以看看这篇 @CeruleanOtter 写的非常棒的文章,它介绍了 LiveData 怎样为 UI 提供数据。

这是 Android 中使用协程的场景模式。因为 Android 系统本身不会调用 suspend 函数,你需要协调协程来响应 UI 事件。最简单的方法是在事件发生时启动一个新的协程——最自然的做法当然是在 ViewModel 中。

在 ViewModel 中启动协程是最常见的模式。

As a general pattern, start coroutines in the ViewModel.

ViewModel 实际上使用 ProductsRepository 来获取数据,它是这样的:

class ProductsRepository(val productsDao: ProductsDao) {

  /**
    * 这是一个常规的 suspend 函数,必须在协程环境中调用。
    * 因为 repository 不具备生命周期,所以它不负责协程的启动或取消。
    *
    * 它*可能*被 Dispatchers.Main 调用,
    * 并且是主线程安全的,因为 Room 框架确保了这一点。
    */
   suspend fun loadSortedProducts(ascending: Boolean): List<ProductListing> {
       return if (ascending) {
           productsDao.loadProductsByDateStockedAscending()
       } else {
           productsDao.loadProductsByDateStockedDescending()
       }
   }
}

ProductsRepository 提供了一个接口用于获取商品数据。在这个应用中因为所有数据都在本地数据库中,这里只需要一个接口来对接 @Dao 的两个不同的排序函数。

在 Android 架构指南中,repository 是可选的,但如果你在应用中引入它或类似的层,这里更适合公开(提供) suspend 函数。因为 repository 没有生命周期——它只是一个对象——无法取消不必要的工作,因此默认情况下任何 repository 中启动的协程都会泄露。

为了避免这种情况,这里只提供常规的 suspend 函数,很容易在不同的组件中重用,任何启动了协程的地方都可以调用 loadSortedProducts。例如一个 WorkManager 启动的后台服务器就可以调用这个接口。

repository 应当提供常规的主线程安全的 suspend 函数。(而不是自己启动协程)

A repository should prefer to expose regular suspend functions that are main-safe.

注意: 一些后台保存操作可能需要在用户离开屏幕后继续执行,这种情况下适合使用没有生命周期的 scope,但是大多情况下 viewModelScope 是更合理的选择。

再来看看 ProductsDao

@Dao
interface ProductsDao {
   // 因为被标记为 suspend, Room 会使用内置的 dispatcher
   // 在主线程安全的情况下运行这个查询。
   @Query("select * from ProductListing ORDER BY dateStocked ASC")
   suspend fun loadProductsByDateStockedAscending(): List<ProductListing>

   // Because this is marked suspend, Room will use it's own dispatcher
   //  to run this query in a main-safe way.
   @Query("select * from ProductListing ORDER BY dateStocked DESC")
   suspend fun loadProductsByDateStockedDescending(): List<ProductListing>
}

ProductsDao 是一个 Room 组件的 @Dao公开了两个 suspend 函数。因为它们被标记为 suspend,Room 框架会确保他们是主线程安全的。这意味着你可以直接在 Dispatchers.Main 中调用它们。

如果你还不了解 Room 中的协程,可以看看 @FMuntenescu 写的 Room 🔗 Coroutines

不过还是警告一下,调用它的协程将位于主线程,如果你对返回的结果进行复杂的操作,例如转换为一个新的 list,你需要确保这不会阻塞线程。

注意: Room 使用自己的 dispatcher 在后台线程上运行查询。你不应该在 withContext(Dispatchers.IO) 中调用 suspend 的 room 查询。这将使代码变得复杂并降低运行效率。

Room 中的 suspend 函数是主线程安全的,并且运行在一个自定义的 dispatcher 中。

Suspend functions in Room are main-safe and run on a custom dispatcher.

一次性请求模式

这就是在 Android 架构组件中应用协程进行一次性请求的完整框架。我们在 ViewModel, Repository, Room 中引入了协程,每一个层都有不同的职责。

  • ViewModel 在主线程启动一个协程,当得到结果时便结束运行。
  • Repository 提供了常规的 suspend 函数并确保是主线程安全的。
  • Room(数据库)或其他数据源(例如网络)提供了常规的 suspend 函数并确保是主线程安全的。

ViewModel 的职责是启动协程,并确保当用户离开屏幕时取消他们,这里不执行复杂的代码,而是依赖其他两个层来执行这些工作。一旦得到了结果就通过 LiveData 通知 UI。

因为 ViewModel 不执行复杂的操作,所以它在主线程上启动协程,这样它可以更加方便与快速地响应 UI 事件。

Repository 提供了常规的 suspend 函数来获取数据。因为它无法及时取消协程,所以通常不会在一个不可控的生命周期内启动协程。当 Repository 需要进行繁重操作例如转置一个列表时,应当使用 withContext 提供一个主线程安全的接口函数。

数据层(Room 或网络)总是提供 suspend 函数。在使用 Kotlin 协程时这些挂起函数是主线程安全的,这一点很重要。RoomRetrofit 都遵守了这一规范。

在一次请求中,data 层只提供 suspend 函数。当调用者需要刷新数据时就需要再次调用它,就像点击浏览器的刷新按钮那样。

值得花点时间来确保你理解一次性请求的模式。这是 Android 上协程的常见模式,你将一直使用它。

我们的首个 Bug 报告!

在测试这套方案后,你发布了应用,一切都非常棒直到你收到了一个很奇怪的 Bug 反馈:

主题:🐞 - 排序错误!

报告:当我快速点击排序按钮时,有时结果是错误的。这不是经常发生🙃。

你看了看,挠挠头。有什么可能出错呢?程序逻辑看起来非常简单:

  1. 用户点击时开始排序。
  2. 在 Room dispatcher 中进行排序。
  3. 显示结果。

你想要关闭这个 issue “不处理-别那么快点击按钮就好了”,但是你担心肯定有代码出错了。在添加 log 并编写一个测试来同时调用多个排序之后,终于明白了!

最终显示的并不是真正的排序结果,而是最后一次排序完成的结果。当用户疯狂点击按钮,同时开始了多个排序任务,但可能会以任意一个顺序结束!

当启动一个新的协程来响应 UI 事件,注意考虑如果在完成之前又启动了一个新的,会有什么后果。

When starting a new coroutine in response to a UI event, consider what happens if the user starts another before this one completes.

这是一个并发造成的 bug,实际上与协程本身并没有关系。当使用回调、Rx 甚至 ExecutorService 都会遇到相同的问题。

ViewModelRepository 中都有许多方案来修复。让我们来探索如何确保一次性请求完成的顺序与用户期望的一致。

最佳方案:禁用按钮

最根本的问题在于我们做了两次排序,如果只做一次的话就修复这个问题了!最简单的办法就是直接禁用按钮来阻止新的 UI 事件。

也许看起来这个方案和简陋,但是非常实用。实现这一点的代码很简单,易于测试,只要能控制 UI,就可以完全解决问题!

为了禁用按钮,要通知 UI 排序请求正在 sortPricesBy 中执行,如下所示:

// Solution 0: Disable the sort buttons when any sort is running

class ProductsViewModel(val productsRepository: ProductsRepository): ViewModel() {
   private val _sortedProducts = MutableLiveData<List<ProductListing>>()
   val sortedProducts: LiveData<List<ProductListing>> = _sortedProducts
  
   private val _sortButtonsEnabled = MutableLiveData<Boolean>()
   val sortButtonsEnabled: LiveData<Boolean> = _sortButtonsEnabled
  
   init {
       _sortButtonsEnabled.value = true
   }

   /**
    * Called by the UI when the user clicks the appropriate sort button
    */
   fun onSortAscending() = sortPricesBy(ascending = true)
   fun onSortDescending() = sortPricesBy(ascending = false)

   private fun sortPricesBy(ascending: Boolean) {
       viewModelScope.launch {
           // disable the sort buttons whenever a sort is running
           _sortButtonsEnabled.value = false
           try {
               _sortedProducts.value =
                       productsRepository.loadSortedProducts(ascending)
           } finally {
               // re-enable the sort buttons after the sort is complete
               _sortButtonsEnabled.value = true
           }
       }
   }
}

还不错,只需要围绕 sortPricesBy 代码禁用按钮就行了。

在大多数情况下,这是解决这个问题的正确方法。但如果我们想让按钮保存可用并修复 bug 呢?这有点难,我们将用剩下的篇幅寻找几个可用方案。

注意: 这段代码展示了从主线程启动的一个重要优势——当响应 UI 事件后可以立即禁用按钮。如果在这里发生线程切换,手速快的人依然可以触发多次点击!

并发模式

接下来的几节将探讨进阶话题——如果你只是开始学习协程,不需要马上理解它们。简单地禁用按钮在大多数时候是最佳方案。

在剩余篇幅中,我们将探讨如何使用协程在保持按钮可用的同时,确保一次性请求的执行顺序不会让用户感到意外。在启动协程时我们可以控制多余的并发来解决这个问题。

可以使用三种基本模式来确保每次只运行一个请求:

  1. 在启动新任务之前取消之前的
  2. 排队执行任务,在启动新的之前等待其他任务执行完毕。
  3. 合并任务。如果已经存在一个任务,那么就直接返回它的结果而不是启动新的任务。

当学习这些解决方案时,也许感觉它们的实现有些复杂。为了关注如何使用这些模式而不是实现细节,我创建了一个代码块,用可以复用的抽象方案实现了这三个模式。

1:取消之前的任务

在这个排序的用例中,当收到用户最新的点击事件,往往意味着可以取消最后一次排序任务。毕竟如果用户已经告诉应用他们不想要这个结果,那么继续下去又有什么意义呢?

为了取消之前的请求,我们需要跟踪它。代码块中的 cancelPreviousThenRun 就是做了这件事。

来看看如何修复 bug:


// Solution #1: Cancel previous work

// 对于排序和筛选之类的任务,这是一个很好的解决方案
// 如果有了新的请求,就取消之前的请求。

class ProductsRepository(val productsDao: ProductsDao, val productsApi: ProductsService) {
   var controlledRunner = ControlledRunner<List<ProductListing>>()

   suspend fun loadSortedProducts(ascending: Boolean): List<ProductListing> {
       // 在启动新的之前取消旧的任务
       return controlledRunner.cancelPreviousThenRun {
           if (ascending) {
               productsDao.loadProductsByDateStockedAscending()
           } else {
               productsDao.loadProductsByDateStockedDescending()
           }
       }
   }
}

你可以查看 gist 中 cancelPreviousThenRun实现来了解如何跟踪正在执行的任务。


// see the complete implementation at
// https://gist.github.com/objcode/7ab4e7b1df8acd88696cb0ccecad16f7
suspend fun cancelPreviousThenRun(block: suspend () -> T): T {
   // If there is an activeTask, cancel it because it's result is no longer needed
   activeTask?.cancelAndJoin()

   // ...

简而言之,它利用 activeTask 成员变量保存对正在执行的排序工作的追踪。当新的排序启动时,它会立即对 activeTask 所保存的正在执行的任务执行 cancelAndJoin。效果就是在开始一个新的排序之前,取消任何正在进行的排序。

应当使用类似 ControlledRunner<T> 的抽象来封装这样的逻辑,而不是将具体的协程与应用程序逻辑混合在一起。

尽量构建抽象方法,避免将特定的并发模式与应用逻辑糅杂在一起。

Consider building abstractions to avoid mixing ad-hoc concurrency patterns with application code.

注意: 这种模式不适合在全局单例中使用,因为不相关的调用者不应该相互取消。

2:队列执行

有一种解决并发 bug 的方法总是有效的,那就是把请求排队,一次只能执行一个任务!请求将按开始的顺序一次执行一个。

对于这个具体的排序问题取消可能比排队要好,但排队依然值得讨论,因为它总是有效的。

// Solution #2: Add a Mutex

// 注意: 注意对于排序这种用例此方案不是最优的。
// 但对于网络请求来说就很适合。

class ProductsRepository(val productsDao: ProductsDao, val productsApi: ProductsService) {
   val singleRunner = SingleRunner()

   suspend fun loadSortedProducts(ascending: Boolean): List<ProductListing> {
       // 启动新之前等待旧的任务完成
       return singleRunner.afterPrevious {
           if (ascending) {
               productsDao.loadProductsByDateStockedAscending()
           } else {
               productsDao.loadProductsByDateStockedDescending()
           }
       }
   }
}

每当有新的排序时,它使用 SingleRunner 实例来确保一次只运行一个排序任务。

它使用了 Mutex,类似于一个线程锁,为了进入代码块,协程必须持有锁才可以。如果在协程运行的过程中另一个协程尝试执行,它将挂起直到持有锁的协程执行完毕。

Mutex 允许确保一次只运行一个协程——并且它们将按开始的顺序结束。

A Mutex lets you ensure only one coroutine runs at a time — and they will finish in the order they started.

3:合并任务

第三个解决方案是合并前面的工作。如果新请求将重新启动已经完成了一半的相同工作,这将是一个好办法。

这种模式对于排序需求没有多大意义,但对于加载网络数据之类的需求,是非常自然的选择。

用于我们的商品管理应用,用户将需要从服务器获取新商品目录。一个简单的方式是提供一个刷新按钮,点击时会启动新的网络请求。

就像排序按钮一样,简单地在请求执行时禁用按钮就可以完全解决这个问题。但是如果我们不想或不能这样做,就可以合并现有的请求。

让我们看看 gist 中 joinPreviousOrRun 部分的代码来了解这个方案:

class ProductsRepository(val productsDao: ProductsDao, val productsApi: ProductsService) {
   var controlledRunner = ControlledRunner<List<ProductListing>>()

   suspend fun fetchProductsFromBackend(): List<ProductListing> {
       // 如果已经有了一个正在执行的请求,就返回这个已经存在的请求的结果。
       // 否则启动一个新的请求。
       return controlledRunner.joinPreviousOrRun {
           val result = productsApi.getProducts()
           productsDao.insertAll(result)
           result
       }
   }
}

cancelPreviousAndRun 的做法相反,不是丢弃前一个请求,而是丢弃新的请求不去执行它。如果已经有一个请求在运行,则会等待它并返回结果,不再启动新的。只有当没有正在执行的请求时才会执行启动代码块。

joinPreviousOrRun 开头你可以看见具体是如何操作的——如果 activeTask 保存了任务,那么只是简单地返回它的结果:


// see the complete implementation at
// https://gist.github.com/objcode/7ab4e7b1df8acd88696cb0ccecad16f7#file-concurrencyhelpers-kt-L124

suspend fun joinPreviousOrRun(block: suspend () -> T): T {
    // if there is an activeTask, return it's result and don't run the block
    activeTask?.let {
        return it.await()
    }
    // ...

这种模式适用于通过 id 获取商品之类的请求。你可以创建一个 Map 保存 idDeferred 的映射,然后使用上述方案来跟踪对相同商品的请求。

合并任务是避免重复网络请求的一个很好的解决方案。

Join previous work is a great solution to avoiding repeated network requests.

总结

这篇文章中我们研究了如何使用 Kotlin 协程实现一次性请求。首先我们实现了一个完整的模式,展示了如何在 ViewModel 启动协程,然后在 Repository 和 Room Dao 中提供常规的 suspend 函数。

对于大多数情况,这就是在 Android 中使用协程所需的全部工作。这种模式可以应用于许多常见的任务,比如我们在这里展示的排序列表。您还可以使用它来获取、保存或更新网络上的数据。

然后我们研究了一个可能出现的 bug 和可能的解决方案。修复这个问题最简单(通常也是最好)的方法是在排序过程中禁用 UI 按钮。

最后,我们研究了一些高级并发模式以及如何在协程中实现它们。这方面的代码有些困难,但为进阶的协程应用提供了很好的介绍。

Last modification:July 2, 2019