OkHttp 源码初探
本文使用 okhttp:4.4.1
发出请求
我们从 Call.enqueue
开始研究源码,宏观上了解一下 okhttp 如何把请求发出去。
轻松发现对于异步请求,统一转换成了 AsyncCall
交给 Dispatcher
来执行。Dispatcher 会控制最大并发数与针对某一 host 的最大并发数。对于第二个需求,猜测可能需要一个类似 Map
的数据结构来存储 host 与当前已有的请求数,但实际上 okhttp 直接把这个数据存在请求本身里了。
// Dispatcher.promoteAndExecute()
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
可以看到 Dispatcher 从就绪列表(readyAsyncCalls
)里取出请求,判断它们是否符合限制条件,对 callsPerHost
进行自增操作,最后放到 executorService
里执行。既然 Call 是新创建的,那这里自增请求数有什么用呢?
其实在 Dispatcher.enqueue()
里会根据主机名查找是否有正在排队或正在执行的请求,若有的话就同步它们的 callsPerHost
字段,它是一个 AtomicInteger
数据,callsPerHost
持有的只是一个引用,因此所有同一个 host 的请求实例总是有相同的值。
AsyncCall
实现了 Runnable
接口,run
方法自然也就是真正执行请求了。最核心的代码只有一行:
// AsyncCall.run()
val response = getResponseWithInterceptorChain()
对于同步请求则简单多了,不需要包装,不需要 Dispatcher,非常粗暴地直接调用核心代码完事:
// RealCall.execute()
override fun execute(): Response {
// ...
try {
client.dispatcher.executed(this)
return getResponseWithInterceptorChain() // <-- 直接请求
} finally {
client.dispatcher.finished(this)
}
}
OkHttp 在 Call
中储存了 OkHttpClient
实例来取得全局化的配置(超时/证书/跳转等),并在 Call
中保存请求信息(host/path/arg 等),无论是同步还是异步,殊途同归,调用 RealCall.getResponseWithInterceptorChain()
来构造并发出请求,取得返回值。
graph LR
Call -->|异步| Dispatcher
Call -->|同步| G["getResponseWithInterceptorChain()"]
Dispatcher --> G
处理链
和很多大型开源库类似,OkHttp 也使用了处理链,跟踪进上面提到的最关键的函数 RealCall.getResponseWithInterceptorChain()
,可以明显看到「链」的设计:
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors // <-- 可自定义拦截器(一般放这里)
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors // <-- 可自定义拦截器
}
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
// ...
}
构造发送请求时原始 Call 将依次经过这些拦截器(interceptors),得的响应也会逆序依次被它们处理。
具体来讲,这些链并不是链表结构,OkHttp 选择把这些拦截器放进数组,包装成一个对象 Interceptor.Chain
,这个对象包含 index
字段,指明应该执行第几个拦截器了。每执行一个拦截器,会传入一个新的 Chain
对象,其内部的 index
已自增 1,拦截器内部将执行新的 Chain
的 proceed()
方法,从而执行下一个拦截器。
即:拦截器没有直接调用下一个拦截器,它只负责调用 Chain.proceed()
,至于下一步究竟要做什么,取决于 Chain
内部的状态。
除了最后一个拦截器外,每个拦截器的 intercept()
方法都要完成三个工作:
- 处理请求
- 继续执行链(调用
Chain.proceed()
) - 处理响应
最后一个拦截器则省略第二步。
RetryAndFollowUpInterceptor
用于错误重试以及 301 等重定向处理。
一个细节是若请求出错,会把 newExchangeFinder
置为 false
。那么进入下一轮循环时候就不会为这个 Call
创建一个新的 ExchangeFinder
,那么它内部的状态(尝试到第几个路由等)就会保留。
// RetryAndFollowUpInterceptor
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newExchangeFinder = true
while (true) {
// prepare for connection
call.enterNetworkInterceptorExchange(request, newExchangeFinder)
var response: Response
var closeActiveExchange = true
try { // ...
try {
response = realChain.proceed(request)
newExchangeFinder = true
} catch (e: XxxxxxException) {
// ...
newExchangeFinder = false
continue // retry
}
// ...
val exchange = call.interceptorScopedExchange
val followUp = followUpRequest(response, exchange)
// ...
request = followUp // redirect
priorResponse = response
} finally {
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
}
BridgeInterceptor
根据请求的数据构造请求(Content-Type/gzip/cookie 等),也负责处理响应数据(解压 gzip,保存 cookie 等)。
// BridgeInterceptor
override fun intercept(chain: Interceptor.Chain): Response {
// 处理请求
// ...
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing the transfer stream.
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}
// 继续执行链
val networkResponse = chain.proceed(requestBuilder.build())
// 处理响应
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
// ...
return responseBuilder.build()
}
CacheInterceptor
与之前的类似,以 chain.proceed(networkRequest)
语句为分割点,之前的代码用于处理请求,即查看是否有缓存,若有则直接构造 Response 返回;之后的代码用于保存缓存。
ConnectInterceptor
最关键的一个拦截器,负责建立连接。看起来很人畜无害,那时因为实在太复杂,所以放到其他函数中了。
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(chain) // <-- 关键
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
Exchange
这里可以理解为数据交换(发送/接收)即:http 连接,上面那一行关键代码主要做了两件事:
val codec = exchangeFinder!!.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder!!, codec)
- 获取一个编码解码器(codec = coder & decoder)。找到一个健康的(http)连接,根据这个连接挑选合适的编解码器(http1.1 / 2 etc.)。
graph LR
initExchange --> exchangeFinder.find --> findHealthyConnection --> findConnection
initExchange --> Exchange
- 通过获取的 codec 创建一个
Exchange
负责管理数据读写。
findConnection
此方法内部通过 4 种不同的方式尝试获得一个连接:
-
一个新的请求:从连接池获取一个可用连接。「可用」指 ①连接还愿意接受请求并且请求数不超载 ②port/protocol/proxy等匹配 ③host匹配 ④或虽然host不匹配但ip一致且符合http2连接合并要求
-
若获取连接失败则再试一次。这一次会传入
routes
参数以便判断是否满足1.④
的条件。Route 由三个数据组成:proxy, ip, port,一个目标地址的路由个数是它们的组合。这些路由按照代理类型分组,每一组是一个
Selection
,多个Selection
是一个Selector
。同个Selection
内的路由只有 ip 不同。 -
若再次获取失败则创建一个连接,最后再从连接池获取一个连接,若还是获取不到则把刚刚创建的连接放入连接池,否则复用连接池中的,释放刚刚建立的连接。
这一步是为了应对一个极端情况:同时有两个 http2 请求同一个新的域名,则它们各自建立连接,但实际上它们可以共享一个连接,于是后建立连接的那一方释放多余连接。
-
一个旧的请求(出错重试/重定向):若连接不能复用(不再接受数据或host/port已变更)则释放它;否则复用之。
对于一个新的请求,最多会 3 次尝试从连接池中取连接:
- 只取不可多路复用的连接。(无需构建路由集合,省资源)
- 取所有可用连接。
- 只取可多路复用的连接。(用于共享同时创建的新连接)
局部变量
nextRouteToTry
用于加速下一次连接:存在一个极端情况:刚刚建立连接,结果发现有可以复用的,于是释放它。但是刚刚释放,那个可以复用的连接又关闭了。这种情况下若下一次尝试的时候依然没有可以复用的连接,就不必遍历所有路由,可以直接尝试上一成功连上的路由。
CallServerInterceptor
比较简单的一个拦截器,用来真正发送请求。调用上一步创建好的对象(exchange
)发送数据,并读取响应。
因为这是最后一个拦截器了,因此不会再调用 proceed()
而是直接返回自己得到的响应,再由上游拦截器进一步处理。
建立连接
上面说到,若真的无法从连接池取得可用连接,就只好创建一个。现在就仔细看看它是怎么连接的,入口函数是 RealConnection.connect()
// RealConnection.connect()
fun connect(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
call: Call,
eventListener: EventListener
) {
//...
while (true) {
try {
if (route.requiresTunnel()) {
// 创建 HTTP Tunnel
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
if (rawSocket == null) {
// We were unable to connect the tunnel but properly closed down our resources.
break
}
} else {
// 创建普通 TCP 连接
connectSocket(connectTimeout, readTimeout, call, eventListener)
}
// 建立 HTTP 连接
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)
break
} catch (e: IOException) {
// ...
}
}
// ...
}
HTTP Tunnel
根据规范,若我们访问的是 https 但使用的代理是 http,就需要建立 HTTP Tunnel。这块相对简单,只需和 HTTP 代理服务器建立 TCP 连接,根据规范发送数据就好。
建立 TCP 连接
TCP 连接比较底层,大部分都是标准库实现,OkHttp 也只是简单调用一下:
import java.io.IOException
import java.net.ConnectException
import java.net.Proxy
import java.net.Socket
private fun connectSocket(connectTimeout: Int, readTimeout: Int, call: Call, eventListener: EventListener) {
val proxy = route.proxy
val address = route.address
val rawSocket = when (proxy.type()) {
Proxy.Type.DIRECT, Proxy.Type.HTTP -> address.socketFactory.createSocket()!!
else -> Socket(proxy)
}
this.rawSocket = rawSocket // rawSocket 就是建立的底层 TCP 连接了
eventListener.connectStart(call, route.socketAddress, proxy)
rawSocket.soTimeout = readTimeout
try {
Platform.get().connectSocket(rawSocket, route.socketAddress, connectTimeout)
}
// ...
}
建立 HTTP 连接
建立 HTTP 连接是 OkHttp 自己写的,根据 HTTP 规范使用 TCP 连接发送握手数据。有三种情况:
- 明文 HTTP2: 直接建立 HTTP2 连接。
- 加密 HTTP1: 建立 TLS 连接
- 加密 HTTP2: 先建立 TLS 连接,再建立 HTTP2 连接。
- 明文 HTTP1: 无需建立连接
建立 TLS 连接
OkHttp 按照规范自己实现了 TLS 的握手,方法为 RealConnection.connectTls()
。除了按规范对证书进行验证,也会遵循我们的配置(CertificatePinner 等)。
这个方法会得到一个 SSLSocket
,并赋值给 RealConnection.socket
,若是明文请求,则 RealConnection.socket
是原始的 rawSocket
。