okhttp源码分析(一):interceptors

本文是基于okhttp源码的整体分析和导读,通过本文你可以了解到okhttp的整体设计和部分细节原理。阅读本文前你不妨先记下这几个问题:

  • 一个请求是如何完成的?
  • 请求超时或者失败是否会重试?

整体流程

okhttp发请求大体流程如图所示:

(图片来自piasy的博客

不管同步还是异步执行请求,调用的都是RealCallgetResponseWithInterceptorChain方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)

val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)

var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}

可以看到调了RealInterceptorChain的proceed方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
 @Throws(IOException::class)
override fun proceed(request: Request): Response {
check(index < interceptors.size)

calls++

if (exchange != null) {
check(exchange.connection.supportsUrl(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
check(calls == 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
}

// Call the next interceptor in the chain.
val next = copy(index = index + 1, request = request)
val interceptor = interceptors[index]

@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")

if (exchange != null) {
check(index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
}

check(response.body != null) { "interceptor $interceptor returned a response with no body" }

return response
}

internal fun copy(
index: Int = this.index,
exchange: Exchange? = this.exchange,
request: Request = this.request,
connectTimeoutMillis: Int = this.connectTimeoutMillis,
readTimeoutMillis: Int = this.readTimeoutMillis,
writeTimeoutMillis: Int = this.writeTimeoutMillis
) = RealInterceptorChain(call, interceptors, index, exchange, request, connectTimeoutMillis,
readTimeoutMillis, writeTimeoutMillis)

通过责任链模式依次顺序调用每个interceptor的intercept方法:application interceptors -> RetryAndFollowUpInterceptor -> BridgeInterceptor -> CacheInterceptor -> ConnectInterceptor -> networkInterceptors -> CallServerInterceptor完成请求。那么每个interceptor具体作用是啥呢?

RetryAndFollowUpInterceptor

负责重试和重定向的拦截器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newExchangeFinder = true
while (true) {
call.enterNetworkInterceptorExchange(request, newExchangeFinder)

var response: Response
var closeActiveExchange = true
try {
if (call.isCanceled()) {
throw IOException("Canceled")
}

try {
response = realChain.proceed(request)
newExchangeFinder = true
} catch (e: RouteException) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
throw e.firstConnectException
}
newExchangeFinder = false
continue
} catch (e: IOException) {
// An attempt to communicate with a server failed. The request may have been sent.
if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
throw e
}
newExchangeFinder = false
continue
}

// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}

val exchange = call.interceptorScopedExchange
val followUp = followUpRequest(response, exchange)

if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}

val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}

response.body?.closeQuietly()

if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}

request = followUp
priorResponse = response
} finally {
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
}

其中请求失败(路由失败/服务器返回的失败)都会通过recover方法判断是否需要重试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Report and attempt to recover from a failure to communicate with a server. Returns true if
* `e` is recoverable, or false if the failure is permanent. Requests with a body can only
* be recovered if the body is buffered or if the failure occurred before the request has been
* sent.
*/
private fun recover(
e: IOException,
call: RealCall,
userRequest: Request,
requestSendStarted: Boolean
): Boolean {
// The application layer has forbidden retries.
if (!client.retryOnConnectionFailure) return false

// We can't send the request body again.
if (requestSendStarted && requestIsOneShot(e, userRequest)) return false

// This exception is fatal.
if (!isRecoverable(e, requestSendStarted)) return false

// No more routes to attempt.
if (!call.retryAfterFailure()) return false

// For failure recovery, use the same route selector with a new connection.
return true
}

里面列了一堆条件,从官方文档对retryOnConnectionFailure的描述来看,最主要的重试场景如下:

  • 多个可连接的IP中个别连接不上
  • 复用现有连接,但是该连接碰到socket timeout
  • 代理不可用,尝试直接连接

重定向主要根据服务端的返回码判断:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* Figures out the HTTP request to make in response to receiving [userResponse]. This will
* either add authentication headers, follow redirects or handle a client request timeout. If a
* follow-up is either unnecessary or not applicable, this returns null.
*/
@Throws(IOException::class)
private fun followUpRequest(userResponse: Response, exchange: Exchange?): Request? {
val route = exchange?.connection?.route()
val responseCode = userResponse.code

val method = userResponse.request.method
when (responseCode) {
HTTP_PROXY_AUTH -> {
val selectedProxy = route!!.proxy
if (selectedProxy.type() != Proxy.Type.HTTP) {
throw ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy")
}
return client.proxyAuthenticator.authenticate(route, userResponse)
}

HTTP_UNAUTHORIZED -> return client.authenticator.authenticate(route, userResponse)

HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT -> {
// "If the 307 or 308 status code is received in response to a request other than GET
// or HEAD, the user agent MUST NOT automatically redirect the request"
if (method != "GET" && method != "HEAD") {
return null
}
return buildRedirectRequest(userResponse, method)
}

HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> {
return buildRedirectRequest(userResponse, method)
}

HTTP_CLIENT_TIMEOUT -> {
// 408's are rare in practice, but some servers like HAProxy use this response code. The
// spec says that we may repeat the request without modifications. Modern browsers also
// repeat the request (even non-idempotent ones.)
if (!client.retryOnConnectionFailure) {
// The application layer has directed us not to retry the request.
return null
}

val requestBody = userResponse.request.body
if (requestBody != null && requestBody.isOneShot()) {
return null
}
val priorResponse = userResponse.priorResponse
if (priorResponse != null && priorResponse.code == HTTP_CLIENT_TIMEOUT) {
// We attempted to retry and got another timeout. Give up.
return null
}

if (retryAfter(userResponse, 0) > 0) {
return null
}

return userResponse.request
}

HTTP_UNAVAILABLE -> {
val priorResponse = userResponse.priorResponse
if (priorResponse != null && priorResponse.code == HTTP_UNAVAILABLE) {
// We attempted to retry and got another timeout. Give up.
return null
}

if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {
// specifically received an instruction to retry without delay
return userResponse.request
}

return null
}

HTTP_MISDIRECTED_REQUEST -> {
// OkHttp can coalesce HTTP/2 connections even if the domain names are different. See
// RealConnection.isEligible(). If we attempted this and the server returned HTTP 421, then
// we can retry on a different connection.
val requestBody = userResponse.request.body
if (requestBody != null && requestBody.isOneShot()) {
return null
}

if (exchange == null || !exchange.isCoalescedConnection) {
return null
}

exchange.connection.noCoalescedConnections()
return userResponse.request
}

else -> return null
}
}

一个个来看:

  • 407/401:带上认证header继续请求
  • 307/308:如果是GET/HEAD请求就重定向。注意 这里okhttp的实现和RFC标准似乎有些出入。可见rfc7231#section-6.4.7

The server SHOULD generate a Location header field in the response containing a URI reference for the different URI. The user agent MAY use the Location field value for automatic redirection. The server’s response payload usually contains a short hypertext note with a hyperlink to the different URI(s).

可见307/308其实并不是只有GET/HEAD才能重定向的,但是不太清楚这里为什么这样限制。(另外307和302的区别在于,302允许把post请求改成get请求,这取决于网络代理的实现)

  • 300/301/302/303:直接重定向
  • 408:重复原先request一次,还超时就放弃
  • 503:重复原先request一次,还超时就放弃
  • 421:服务器无法响应。重试另一个连接。

重定向字段在响应头的"Location"字段中。另外OKhttp重定向次数最多为20次。

1
2
3
4
5
/**
* How many redirects and auth challenges should we attempt? Chrome follows 21 redirects; Firefox,
* curl, and wget follow 20; Safari follows 16; and HTTP/1.0 recommends 5.
*/
private const val MAX_FOLLOW_UPS = 20

BridgeInterceptor

相当于上层API到更底下的网络请求代码之间的adapter,根据Request实例添加”Content-Type”,”Host”,”Cookie”,”User-Agent”等请求头,发起请求,并缓存响应里的Cookie 解压返回的gzip流。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()

val body = userRequest.body
if (body != null) {
val contentType = body.contentType()
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString())
}

val contentLength = body.contentLength()
// contentLength为-1表示请求体长度未知,添加"Transfer-Encoding:chunked" header
if (contentLength != -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")
} else {
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.removeHeader("Content-Length")
}
}

if (userRequest.header("Host") == null) {
requestBuilder.header("Host", userRequest.url.toHostHeader())
}

if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive")
}

// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}
// 添加cookie header,"a=b;c=d"格式
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}

if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}

val networkResponse = chain.proceed(requestBuilder.build())
// 缓存响应里的cookie
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)

val responseBuilder = networkResponse.newBuilder()
.request(userRequest)

// 解压gzip格式的返回
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}

return responseBuilder.build()
}

CacheInterceptor

负责请求的缓存逻辑,包括缓存的更新和读取。首先了解一下http控制缓存的策略:https://developers.google.com/web/fundamentals/performance/optimizing-content-efficiency/http-caching?hl=zh-cn

概括起来就是这张图:

接着来看intercept的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val cacheCandidate = cache?.get(chain.request())

val now = System.currentTimeMillis()

val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse

cache?.trackResponse(strategy)

if (cacheCandidate != null && cacheResponse == null) {
// The cache candidate wasn't applicable. Close it.
cacheCandidate.body?.closeQuietly()
}

// If we're forbidden from using the network and the cache is insufficient, fail.
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
}

// If we don't need the network, we're done.
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build()
}

var networkResponse: Response? = null
try {
networkResponse = chain.proceed(networkRequest)
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}

// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()

networkResponse.body!!.close()

// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response
} else {
cacheResponse.body?.closeQuietly()
}
}

val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()

if (cache != null) {
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response)
}

if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}

return response
}

缓存采用LRUCache, 这里使用了策略模式,根据网络请求和响应决定是否采用缓存。可见computeCandidate方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88

/** Returns a strategy to use assuming the request can use the network. */
private fun computeCandidate(): CacheStrategy {
// No cached response.
if (cacheResponse == null) {
return CacheStrategy(request, null)
}

// Drop the cached response if it's missing a required handshake.
if (request.isHttps && cacheResponse.handshake == null) {
return CacheStrategy(request, null)
}

// If this response shouldn't have been stored, it should never be used as a response source.
// This check should be redundant as long as the persistence store is well-behaved and the
// rules are constant.
if (!isCacheable(cacheResponse, request)) {
return CacheStrategy(request, null)
}

val requestCaching = request.cacheControl
// 如果cache-control是no-cache或者请求头包含If-Modified-Since或者If-None-Match则需要先请求
if (requestCaching.noCache || hasConditions(request)) {
return CacheStrategy(request, null)
}

val responseCaching = cacheResponse.cacheControl

val ageMillis = cacheResponseAge()
var freshMillis = computeFreshnessLifetime()

if (requestCaching.maxAgeSeconds != -1) {
freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong()))
}

var minFreshMillis: Long = 0
if (requestCaching.minFreshSeconds != -1) {
minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong())
}

var maxStaleMillis: Long = 0
if (!responseCaching.mustRevalidate && requestCaching.maxStaleSeconds != -1) {
maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong())
}

if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
val builder = cacheResponse.newBuilder()
if (ageMillis + minFreshMillis >= freshMillis) {
builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"")
}
val oneDayMillis = 24 * 60 * 60 * 1000L
if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"")
}
return CacheStrategy(null, builder.build())
}

// Find a condition to add to the request. If the condition is satisfied, the response body
// will not be transmitted.
val conditionName: String
val conditionValue: String?
when {
etag != null -> {
conditionName = "If-None-Match"
conditionValue = etag
}

lastModified != null -> {
conditionName = "If-Modified-Since"
conditionValue = lastModifiedString
}

servedDate != null -> {
conditionName = "If-Modified-Since"
conditionValue = servedDateString
}

else -> return CacheStrategy(request, null) // No condition! Make a regular request.
}

val conditionalRequestHeaders = request.headers.newBuilder()
conditionalRequestHeaders.addLenient(conditionName, conditionValue!!)

val conditionalRequest = request.newBuilder()
.headers(conditionalRequestHeaders.build())
.build()
return CacheStrategy(conditionalRequest, cacheResponse)
}

主要步骤如下:

  • cacheResponse 判空,为空,本地无可用cache,直接使用网络请求。
  • isCacheable 方法判断 cacheResponse 和 request 是否都支持缓存,只要一个不支持那么直接使用网络请求。
  • requestCaching 判断请求头是否带noCache 和 判断是否有 If-Modified-Since 和 If-None-Match,有则先请求
  • 判断 cacheResponse 的过期时间(包括 maxStaleMillis 的判断),如果没有过期,则使用 cacheResponse。
  • cacheResponse 过期了,那么如果 cacheResponse 有 eTag/If-None-Match 属性则将其添加到请求头中。

其中isCacheable方法判断逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/** Returns true if [response] can be stored to later serve another request. */
fun isCacheable(response: Response, request: Request): Boolean {
// Always go to network for uncacheable response codes (RFC 7231 section 6.1), This
// implementation doesn't support caching partial content.
when (response.code) {
HTTP_OK,
HTTP_NOT_AUTHORITATIVE,
HTTP_NO_CONTENT,
HTTP_MULT_CHOICE,
HTTP_MOVED_PERM,
HTTP_NOT_FOUND,
HTTP_BAD_METHOD,
HTTP_GONE,
HTTP_REQ_TOO_LONG,
HTTP_NOT_IMPLEMENTED,
StatusLine.HTTP_PERM_REDIRECT -> {
// These codes can be cached unless headers forbid it.
}

HTTP_MOVED_TEMP,
StatusLine.HTTP_TEMP_REDIRECT -> {
// These codes can only be cached with the right response headers.
// http://tools.ietf.org/html/rfc7234#section-3
// s-maxage is not checked because OkHttp is a private cache that should ignore s-maxage.
if (response.header("Expires") == null &&
response.cacheControl.maxAgeSeconds == -1 &&
!response.cacheControl.isPublic &&
!response.cacheControl.isPrivate) {
return false
}
}

else -> {
// All other codes cannot be cached.
return false
}
}

// A 'no-store' directive on request or response prevents the response from being cached.
return !response.cacheControl.noStore && !request.cacheControl.noStore
}

概括起来就是response cache可用的条件有:

  1. 特定响应码(如上所示)并且
  2. response头含有Expires字段或者max-age字段或者带public/private的Cache-Control字段 并且
  3. request和response的Cache-Control不是noStore

ConnectInterceptor

负责建立http(s)连接。一个ConnectionPool最多持有5个空闲连接,单个连接最多存活5分钟。建立连接的时候会考虑复用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*/
@Throws(IOException::class)
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
var foundPooledConnection = false
var result: RealConnection? = null
var selectedRoute: Route? = null
var releasedConnection: RealConnection?
val toClose: Socket?
synchronized(connectionPool) {
if (call.isCanceled()) throw IOException("Canceled")

releasedConnection = call.connection
// 上一个连接不可复用则close, 可复用的条件为端口相同且host相同或者证书相同
toClose = if (call.connection != null &&
(call.connection!!.noNewExchanges || !call.connection!!.supportsUrl(address.url))) {
call.releaseConnectionNoEvents()
} else {
null
}

if (call.connection != null) {
// We had an already-allocated connection and it's good.
result = call.connection
releasedConnection = null
}

if (result == null) {
// The connection hasn't had any problems for this call.
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0

// Attempt to get a connection from the pool. 同样需要同时满足条件:
// 1。如果请求要求多路复用(http2.0),则连接也要支持多路复用
// 2。根据host,路由IP,证书接着判断是否可复用
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
foundPooledConnection = true
result = call.connection
} else if (nextRouteToTry != null) {
selectedRoute = nextRouteToTry
nextRouteToTry = null
}
}
}
toClose?.closeQuietly()

if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection!!)
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
}
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result!!
}

// If we need a route selection, make one. This is a blocking operation.
var newRouteSelection = false
if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
var localRouteSelector = routeSelector
if (localRouteSelector == null) {
localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
this.routeSelector = localRouteSelector
}
newRouteSelection = true
routeSelection = localRouteSelector.next()
}

var routes: List<Route>? = null
synchronized(connectionPool) {
if (call.isCanceled()) throw IOException("Canceled")

if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
routes = routeSelection!!.routes
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
foundPooledConnection = true
result = call.connection
}
}

if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection!!.next()
}

// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
result = RealConnection(connectionPool, selectedRoute!!)
connectingConnection = result
}
}

// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
return result!!
}

// Do TCP + TLS handshakes. This is a blocking operation.
result!!.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
call.client.routeDatabase.connected(result!!.route())

var socket: Socket? = null
synchronized(connectionPool) {
connectingConnection = null
// Last attempt at connection coalescing, which only occurs if we attempted multiple
// concurrent connections to the same host.
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
// We lost the race! Close the connection we created and return the pooled connection.
result!!.noNewExchanges = true
socket = result!!.socket()
result = call.connection

// It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
// that case we will retry the route we just successfully connected with.
nextRouteToTry = selectedRoute
} else {
connectionPool.put(result!!)
call.acquireConnectionNoEvents(result!!)
}
}
socket?.closeQuietly()

eventListener.connectionAcquired(call, result!!)
return result!!
}

networkInterceptors

和最开头的interceptors(application interceptor)类似,但是有所区别(不然也不会有两种interceptor),详情可见:https://square.github.io/okhttp/interceptors/

总结来说就是:

  • application interceptors偏上层,不关心重定向等中间结果 只会执行一次,即使使用的是cache。接触到的是最初的request,不用管okhhtp可能注入的header,比如If-None-Match。
  • network interceptors可以直接操作重定向/重试等中间响应,可能触发多次,使用cache的时候不会触发,可以拿到connection对象。

CallServerInterceptor

最后一个interceptor, 真正地向server发起请求。不细看了。

单测

可以看到okhttp整个源码中单测非常多,粒度也比较细,很多类都有对应的单测,而且因为是基础框架 不涉及UI等Android环境,非常适合junit单测。同时通过MockWebServer本地起了一个server socket服务,可以手动构建响应数据,以此测试全流程。


本文主要简单分析了OKhttp发起请求的过程和其中的几个主要interceptor,比较大略,限于笔者对okhttp源码了解并不是很深入,就此打住。后续再深入分析后视情况出续文。如有错误或疏漏,欢迎指正。