发出请求

我们从 Call.enqueue 开始研究源码,宏观上了解一下 okhttp 如何把请求发出去。

轻松发现对于异步请求,统一转换成了 AsyncCall 交给 Dispatcher 来执行。Dispatcher 会控制最大并发数与针对某一 host 的最大并发数。对于第二个需求,猜测可能需要一个类似 Map 的数据结构来存储 host 与当前已有的请求数,但实际上 okhttp 直接把这个数据存在请求本身里了。

// Dispatcher.promoteAndExecute()
private fun promoteAndExecute(): Boolean {
  this.assertThreadDoesntHoldLock()
  val executableCalls = mutableListOf<AsyncCall>()
  val isRunning: Boolean
  synchronized(this) {
    val i = readyAsyncCalls.iterator()
    while (i.hasNext()) {
      val asyncCall = i.next()

      if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
      if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

      i.remove()
      asyncCall.callsPerHost.incrementAndGet()
      executableCalls.add(asyncCall)
      runningAsyncCalls.add(asyncCall)
    }
    isRunning = runningCallsCount() > 0
  }

  for (i in 0 until executableCalls.size) {
    val asyncCall = executableCalls[i]
    asyncCall.executeOn(executorService)
  }
  return isRunning
}

可以看到 Dispatcher 从就绪列表(readyAsyncCalls)里取出请求,判断它们是否符合限制条件,对 callsPerHost 进行自增操作,最后放到 executorService 里执行。既然 Call 是新创建的,那这里自增请求数有什么用呢?

其实在 Dispatcher.enqueue() 里会根据主机名查找是否有正在排队或正在执行的请求,若有的话就同步它们的 callsPerHost 字段,它是一个 AtomicInteger 数据,callsPerHost 持有的只是一个引用,因此所有同一个 host 的请求实例总是有相同的值。

AsyncCall 实现了 Runnable 接口,run 方法自然也就是真正执行请求了。最核心的代码只有一行:

// AsyncCall.run()
val response = getResponseWithInterceptorChain()

对于同步请求则简单多了,不需要包装,不需要 Dispatcher,非常粗暴地直接调用核心代码完事:

// RealCall.execute()
override fun execute(): Response {
  // ...
  try {
    client.dispatcher.executed(this)
    return getResponseWithInterceptorChain() // <-- 直接请求
  } finally {
    client.dispatcher.finished(this)
  }
}

OkHttp 在 Call 中储存了 OkHttpClient 实例来取得全局化的配置(超时/证书/跳转等),并在 Call 中保存请求信息(host/path/arg 等),无论是同步还是异步,殊途同归,调用 RealCall.getResponseWithInterceptorChain() 来构造并发出请求,取得返回值。

graph LR
Call -->|异步| Dispatcher
Call -->|同步| G["getResponseWithInterceptorChain()"]
Dispatcher --> G

处理链

和很多大型开源库类似,OkHttp 也使用了处理链,跟踪进上面提到的最关键的函数 RealCall.getResponseWithInterceptorChain(),可以明显看到「链」的设计:

internal fun getResponseWithInterceptorChain(): Response {
  // Build a full stack of interceptors.
  val interceptors = mutableListOf<Interceptor>()
  interceptors += client.interceptors // <-- 可自定义拦截器(一般放这里)
  interceptors += RetryAndFollowUpInterceptor(client)
  interceptors += BridgeInterceptor(client.cookieJar)
  interceptors += CacheInterceptor(client.cache)
  interceptors += ConnectInterceptor
  if (!forWebSocket) {
    interceptors += client.networkInterceptors // <-- 可自定义拦截器
  }
  interceptors += CallServerInterceptor(forWebSocket)

  val chain = RealInterceptorChain(
      call = this,
      interceptors = interceptors,
      index = 0,
      exchange = null,
      request = originalRequest,
      connectTimeoutMillis = client.connectTimeoutMillis,
      readTimeoutMillis = client.readTimeoutMillis,
      writeTimeoutMillis = client.writeTimeoutMillis
  )
  // ...
}

构造发送请求时原始 Call 将依次经过这些拦截器(interceptors),得的响应也会逆序依次被它们处理。

具体来讲,这些链并不是链表结构,OkHttp 选择把这些拦截器放进数组,包装成一个对象 Interceptor.Chain,这个对象包含 index 字段,指明应该执行第几个拦截器了。每执行一个拦截器,会传入一个新的 Chain 对象,其内部的 index 已自增 1,拦截器内部将执行新的 Chainproceed() 方法,从而执行下一个拦截器。

即:拦截器没有直接调用下一个拦截器,它只负责调用 Chain.proceed(),至于下一步究竟要做什么,取决于 Chain 内部的状态。

除了最后一个拦截器外,每个拦截器的 intercept() 方法都要完成三个工作:

  1. 处理请求
  2. 继续执行链(调用 Chain.proceed()
  3. 处理响应

最后一个拦截器则省略第二步。

RetryAndFollowUpInterceptor

用于错误重试以及 301 等重定向处理。

一个细节是若请求出错,会把 newExchangeFinder 置为 false。那么进入下一轮循环时候就不会为这个 Call 创建一个新的 ExchangeFinder,那么它内部的状态(尝试到第几个路由等)就会保留。

// RetryAndFollowUpInterceptor
override fun intercept(chain: Interceptor.Chain): Response {
  val realChain = chain as RealInterceptorChain
  var request = chain.request
  val call = realChain.call
  var followUpCount = 0
  var priorResponse: Response? = null
  var newExchangeFinder = true
  while (true) {
    // prepare for connection
    call.enterNetworkInterceptorExchange(request, newExchangeFinder)

    var response: Response
    var closeActiveExchange = true
    try { // ...
      try {
        response = realChain.proceed(request)
        newExchangeFinder = true
      } catch (e: XxxxxxException) {
        // ...
        newExchangeFinder = false
        continue // retry
      }
      // ...
      val exchange = call.interceptorScopedExchange
      val followUp = followUpRequest(response, exchange)
      // ...
      request = followUp // redirect
      priorResponse = response
    } finally {
      call.exitNetworkInterceptorExchange(closeActiveExchange)
    }
  }
}

BridgeInterceptor

根据请求的数据构造请求(Content-Type/gzip/cookie 等),也负责处理响应数据(解压 gzip,保存 cookie 等)。

// BridgeInterceptor
override fun intercept(chain: Interceptor.Chain): Response {
  // 处理请求
  // ...

  // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing the transfer stream.
  var transparentGzip = false
  if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
    transparentGzip = true
    requestBuilder.header("Accept-Encoding", "gzip")
  }

  // 继续执行链
  val networkResponse = chain.proceed(requestBuilder.build())
  // 处理响应
  cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
  // ...
  return responseBuilder.build()
}

CacheInterceptor

与之前的类似,以 chain.proceed(networkRequest) 语句为分割点,之前的代码用于处理请求,即查看是否有缓存,若有则直接构造 Response 返回;之后的代码用于保存缓存。

ConnectInterceptor

最关键的一个拦截器,负责建立连接。看起来很人畜无害,那时因为实在太复杂,所以放到其他函数中了。

override fun intercept(chain: Interceptor.Chain): Response {
  val realChain = chain as RealInterceptorChain
  val exchange = realChain.call.initExchange(chain) // <-- 关键
  val connectedChain = realChain.copy(exchange = exchange)
  return connectedChain.proceed(realChain.request)
}

Exchange 这里可以理解为数据交换(发送/接收)即:http 连接,上面那一行关键代码主要做了两件事:

val codec = exchangeFinder!!.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder!!, codec)
  1. 获取一个编码解码器(codec = coder & decoder)。找到一个健康的(http)连接,根据这个连接挑选合适的编解码器(http1.1 / 2 etc.)。
graph LR
initExchange --> exchangeFinder.find --> findHealthyConnection --> findConnection
initExchange --> Exchange
  1. 通过获取的 codec 创建一个 Exchange 负责管理数据读写。

findConnection

此方法内部通过 4 种不同的方式尝试获得一个连接:

  1. 一个新的请求:从连接池获取一个可用连接。「可用」指 ①连接还愿意接受请求并且请求数不超载 ②port/protocol/proxy等匹配 ③host匹配 ④或虽然host不匹配但ip一致且符合http2连接合并要求
  2. 若获取连接失败则再试一次。这一次会传入 routes 参数以便判断是否满足 1.④ 的条件。

    Route 由三个数据组成:proxy, ip, port,一个目标地址的路由个数是它们的组合。这些路由按照代理类型分组,每一组是一个 Selection,多个 Selection 是一个 Selector。同个Selection 内的路由只有 ip 不同。
  3. 若再次获取失败则创建一个连接,最后再从连接池获取一个连接,若还是获取不到则把刚刚创建的连接放入连接池,否则复用连接池中的,释放刚刚建立的连接。

    这一步是为了应对一个极端情况:同时有两个 http2 请求同一个新的域名,则它们各自建立连接,但实际上它们可以共享一个连接,于是后建立连接的那一方释放多余连接。
  4. 一个旧的请求(出错重试/重定向):若连接不能复用(不再接受数据或host/port已变更)则释放它;否则复用之。

对于一个新的请求,最多会 3 次尝试从连接池中取连接:

  1. 只取不可多路复用的连接。(无需构建路由集合,省资源)
  2. 取所有可用连接。
  3. 只取可多路复用的连接。(用于共享同时创建的新连接)

局部变量 nextRouteToTry 用于加速下一次连接:

存在一个极端情况:刚刚建立连接,结果发现有可以复用的,于是释放它。但是刚刚释放,那个可以复用的连接又关闭了。这种情况下若下一次尝试的时候依然没有可以复用的连接,就不必遍历所有路由,可以直接尝试上一成功连上的路由。

CallServerInterceptor

比较简单的一个拦截器,用来真正发送请求。调用上一步创建好的对象(exchange)发送数据,并读取响应。

因为这是最后一个拦截器了,因此不会再调用 proceed() 而是直接返回自己得到的响应,再由上游拦截器进一步处理。

建立连接

上面说到,若真的无法从连接池取得可用连接,就只好创建一个。现在就仔细看看它是怎么连接的,入口函数是 RealConnection.connect()

// RealConnection.connect()
fun connect(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean,
    call: Call,
    eventListener: EventListener
) {
  //...
  while (true) {
    try {
      if (route.requiresTunnel()) {
        // 创建 HTTP Tunnel
        connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
        if (rawSocket == null) {
          // We were unable to connect the tunnel but properly closed down our resources.
          break
        }
      } else {
        // 创建普通 TCP 连接
        connectSocket(connectTimeout, readTimeout, call, eventListener)
      }
      // 建立 HTTP 连接
      establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
      eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)
      break
    } catch (e: IOException) {
      // ...
    }
  }
  // ...
}

HTTP Tunnel

根据规范,若我们访问的是 https 但使用的代理是 http,就需要建立 HTTP Tunnel。这块相对简单,只需和 HTTP 代理服务器建立 TCP 连接,根据规范发送数据就好。

建立 TCP 连接

TCP 连接比较底层,大部分都是标准库实现,OkHttp 也只是简单调用一下:

import java.io.IOException
import java.net.ConnectException
import java.net.Proxy
import java.net.Socket

private fun connectSocket(connectTimeout: Int, readTimeout: Int, call: Call, eventListener: EventListener) {
  val proxy = route.proxy
  val address = route.address

  val rawSocket = when (proxy.type()) {
    Proxy.Type.DIRECT, Proxy.Type.HTTP -> address.socketFactory.createSocket()!!
    else -> Socket(proxy)
  }
  this.rawSocket = rawSocket  // rawSocket 就是建立的底层 TCP 连接了

  eventListener.connectStart(call, route.socketAddress, proxy)
  rawSocket.soTimeout = readTimeout
  try {
    Platform.get().connectSocket(rawSocket, route.socketAddress, connectTimeout)
  }
  // ...
}

建立 HTTP 连接

建立 HTTP 连接是 OkHttp 自己写的,根据 HTTP 规范使用 TCP 连接发送握手数据。有三种情况:

  1. 明文 HTTP2: 直接建立 HTTP2 连接。
  2. 加密 HTTP1: 建立 TLS 连接
  3. 加密 HTTP2: 先建立 TLS 连接,再建立 HTTP2 连接。
  4. 明文 HTTP1: 无需建立连接

建立 TLS 连接

OkHttp 按照规范自己实现了 TLS 的握手,方法为 RealConnection.connectTls()。除了按规范对证书进行验证,也会遵循我们的配置(CertificatePinner 等)。

这个方法会得到一个 SSLSocket,并赋值给 RealConnection.socket,若是明文请求,则 RealConnection.socket 是原始的 rawSocket

Last modification:October 8, 2022