OkHttp 源码解析

接上篇的 Volley 源码解析,目前项目中更多的用到的是 OkHttpUtilsOkHttp 所以有必要了解它的原理,以便遇到网络相关的问题时,可以及时的定位并解决问题,下面就开始吧。

本文的目录大致是这样:

  • OkHttp 的基本使用
  • OkHttp 的源码解析(V3.5.0)
  • OkHttp 连接池复用
  • OkHttp 的优缺点

OkHttp 的基本使用

在 gradle 中添加依赖

1
compile 'com.squareup.okhttp3:okhttp:3.5.0'

1.首先创建OkHttpClient

1
OkHttpClient client = new OkHttpClient();

2.构造Request对象

1
2
3
4
Request request = new Request.Builder()
.get()
.url("https://www.baidu.com")
.build();

3.将Request封装为Call

1
Call call = client.newCall(request);

4.根据需要调用同步或者异步请求方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//同步调用,返回Response,会抛出IO异常
Response response = call.execute();

//异步调用,并设置回调函数
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {

}

@Override
public void onResponse(Call call, Response response) throws IOException {
Log.e("=====Younger==", "===" + (Looper.myLooper() == Looper.getMainLooper()));
//打印出的结果是false , 可以看出 这个回调并没有回到主线程,需要我们自己处理线程切换的问题
}
});

同步调用会阻塞主线程,一般不用

异步调用的回调函数是在子线程,我们不能在子线程更新UI,需要借助于runOnUiThread()方法或者Handler来处理

post 也是类似的, 相信大家都会用使用,接下来我们来看重头戏-源码。

OkHttp 源码解析

okHttpClient

首先来看,我们进行网络请求时使用的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Call call = okHttpClient.newCall(request);

实际调用

@Override public Call newCall(Request request) {
return new RealCall(this, request, false /* for web socket */);
}


new 了一个 RealCall,这是它的构造方法

RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}

RealCall

实际上的 Call 的 enqueue 调用的是 RealCall的 enqueue方法

1
call.enqueue(new ...);

下面我们看下 RealCall的 enqueue是如何实现的

1
2
3
4
5
6
7
8
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

可以看到最终的请求处理是 dispatcher 来完成的,接下来看下 dispatcher

dispatcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
 //最大并发请求书
private int maxRequests = 64;
//每个主机的最大请求数
private int maxRequestsPerHost = 5;
private Runnable idleCallback;

/** 执行的线程池. Created lazily. */
private ExecutorService executorService;

//将要运行的异步请求队列
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

//正在执行的异步请求队列
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//正在执行的同步请求队列
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();


public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}

public Dispatcher() {
}

public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}

Dispatcher 有两个构造方法,可以自己指定线程池, 如果没有指定, 则会默认创建默认线程池,可以看到核心数为0,缓存数可以是很大, 比较适合执行大量的耗时比较少的任务。

接着看 enqueue是如何实现的

1
2
3
4
5
6
7
8
9

synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}

当正在运行的异步请求队列中的数量小于64, 并且 正在运行的请求主机数小于5,把请求加载到runningAsyncCalls 中并在线程池中执行, 否则就加入到 readyAsyncCalls 进行缓存等待。

runningCallsForHost是如何实现的呢

1
2
3
4
5
6
7
8
9

/** Returns the number of running calls that share a host with {@code call}. */
private int runningCallsForHost(AsyncCall call) {
int result = 0;
for (AsyncCall c : runningAsyncCalls) {
if (c.host().equals(call.host())) result++;
}
return result;
}

正在执行的网络请求中 同一个host最多只能是5个。

上面可以看到传递进来的是 AsyncCall 然后 execute 那我们看下 AsyncCall方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;

AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}

String host() {
return originalRequest.url().host();
}

Request request() {
return originalRequest;
}

RealCall get() {
return RealCall.this;
}

@Override protected void execute() {
boolean signalledCallback = false;
try {
//获取请求报文
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}

看到 NamedRunnable 实现了 Runnable,AsyncCall 中的 execute 是对网络请求的具体处理。

1
Response response = getResponseWithInterceptorChain();

能明显看出这就是对请求的处理,在看它的具体实现之前先看下 client.dispatcher().finished 的方法实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
  /** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}

/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}

// 最后调用这个
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}

if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}

由于 promoteCalls 是true 我们看下 promoteCalls 的方法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();

if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}

if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}

根据代码可以明显看出 , 当一个请求结束了调用 finished 方法,最终到promoteCalls就是把 异步等待队列中的请求,取出放到 异步执行队列中。

  • 如果异步执行队列已经是满的状态就不加了,return
  • 如果 异步等待队列中 没有需要执行的网络请求 也就没有必要进行下一步了 return
  • 上面的两条都没遇到,遍历 异步等待队列,取出队首的请求,如果这个请求的 host 符合 (正在执行的网络请求中 同一个host最多只能是5个)的这个条件, 把 等待队列的这个请求移除, 加入到 正在执行的队列中, 线程开始执行。 如果不符合继续 遍历操作。

interceptors 拦截器

接着看 RealCall 的 getResponseWithInterceptorChain 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
//用户自己定义的拦截器
interceptors.addAll(client.interceptors());
//系统提供的重试拦截器,失败后的重试和重定向
interceptors.add(retryAndFollowUpInterceptor);
//负责把用户构造的请求转换为发送到服务器的请求 、把服务器返回的响应转换为用户友好的响应 处理 配置请求头等信息
//从应用程序代码到网络代码的桥梁。首先,它根据用户请求构建网络请求。然后它继续呼叫网络。最后,它根据网络响应构建用户响应。
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//处理 缓存配置 根据条件(存在响应缓存并被设置为不变的或者响应在有效期内)返回缓存响应
//设置请求头(If-None-Match、If-Modified-Since等) 服务器可能返回304(未修改)
//可配置用户自己设置的缓存拦截器
interceptors.add(new CacheInterceptor(client.internalCache()));
//连接拦截器 这里才是真正的请求网络
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
//配置okhttpClient 时设置的networkInterceptors
//返回观察单个网络请求和响应的不可变拦截器列表。
interceptors.addAll(client.networkInterceptors());
}
//执行流操作(写出请求体、获得响应数据) 负责向服务器发送请求数据、从服务器读取响应数据
//进行http请求报文的封装与请求报文的解析
interceptors.add(new CallServerInterceptor(forWebSocket));
//创建责任链
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
//执行 责任链
return chain.proceed(originalRequest);
}

看下 RealInterceptorChain 的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28


public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
Connection connection) throws IOException {

if (index >= interceptors.size()) throw new AssertionError();

calls++;
//创建新的拦截链,链中的拦截器集合index+1
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request);
// 执行当前的拦截器
Interceptor interceptor = interceptors.get(index);
// 执行拦截器
Response response = interceptor.intercept(next);

if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}

// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}

return response;
}

根据上面的代码 我们可以看出,新建了一个RealInterceptorChain 责任链 并且 index+1,然后 执行interceptors.get(index); 返回Response。

责任链中每个拦截器都会执行chain.proceed()方法之前的代码,等责任链最后一个拦截器执行完毕后会返回最终的响应数据,而chain.proceed() 方法会得到最终的响应数据,这时就会执行每个拦截器的chain.proceed()方法之后的代码,其实就是对响应数据的一些操作。

接下来看下各个拦截器的具体代码

RetryAndFollowUpInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();

streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);

int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}

Response response = null;
boolean releaseConnection = true;
try {
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), false, request)) {
throw e.getLastConnectException();
}
//如果出现异常 不释放连接, 继续重试
releaseConnection = false;
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
//如果出现异常 不释放连接, 继续重试
if (!recover(e, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}

// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}

Request followUp = followUpRequest(response);

if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}

closeQuietly(response.body());

//重试次数大于20次 ,不再试了,释放连接,
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}

if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}

if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(followUp.url()), callStackTrace);
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}

request = followUp;
priorResponse = response;
}
}

当发生 RouteException 和 IOException 都会进行 recover 重试。

BridgeInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();

RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}

long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}

if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}

if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}

// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}

List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}

if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}

Response networkResponse = chain.proceed(requestBuilder.build());

HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);

if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
}

return responseBuilder.build();
}

能看出 BridgeInterceptor 主要做的就是
在请求发出之前 把请求的 信息拿出来处理成Request.Builder.header 发送出去
当请求结果回来之后,处理header 信息。处理返回的信息。

缓存拦截器

CacheInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
 @Override public Response intercept(Chain chain) throws IOException {
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;

long now = System.currentTimeMillis();

CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
//如果networkRequest == null 则说明不使用网络请求
//获取缓存中(CacheStrategy)的Response
if (cache != null) {
cache.trackResponse(strategy);
}

//缓存无效 关闭资源
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}

// If we're forbidden from using the network and the cache is insufficient, fail.
//networkRequest == null 不使用网络请求 且没有缓存 cacheResponse == null 返回失败
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}

// If we don't need the network, we're done.
//如果无需网络请求, 把缓存中的结果取出来组装成返回体 返回
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}

Response networkResponse = null;
try {
//进行网络请求
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}

//网络请求结果回来了,根据情况更新缓存结果
// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();

// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}

Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();

if (HttpHeaders.hasBody(response)) {
CacheRequest cacheRequest = maybeCache(response, networkResponse.request(), cache);
response = cacheWritingResponse(cacheRequest, response);
}

return response;
}

如果用户自己配置了缓存拦截器,cacheCandidate = cache.Response 获取用户自己存储的Response,否则 cacheCandidate = null;同时从CacheStrategy 获取cacheResponse 和 networkRequest

如果cacheCandidate != null 而 cacheResponse == null 说明缓存无效清除cacheCandidate缓存。

如果networkRequest == null 说明没有网络,cacheResponse == null 没有缓存,返回失败的信息,责任链此时也就终止,不会在往下继续执行。

如果networkRequest == null 说明没有网络,cacheResponse != null 有缓存,返回缓存的信息,责任链此时也就终止,不会在往下继续执行。

然后

执行下一个拦截器,也就是请求网络

责任链执行完毕后,会返回最终响应数据,如果缓存存在更新缓存,如果缓存不存在加入到缓存中去。

ConnectInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();

// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();

return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

连接复用的逻辑就是这里面, 寻找可用的链接, 复用, 这个待会分析。

networkInterceptors

这个是自定义的网络拦截器

CallServerInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Override public Response intercept(Chain chain) throws IOException {
//HttpStream 就是先前在 ConnectInterceptor 创建出来的
HttpCodec httpCodec = ((RealInterceptorChain) chain).httpStream();
StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
Request request = chain.request();
/发送请求的时间戳
long sentRequestMillis = System.currentTimeMillis();
//写入请求头信息
httpCodec.writeRequestHeaders(request);
//写入请求体信息(有请求体的情况)
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
}
//结束请求
httpCodec.finishRequest();
//读取响应头信息
Response response = httpCodec.readResponseHeaders()
.request(request)
//握手?
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
//openResponseBody 获取响应体信息
int code = response.code();
//app 不走这个
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}

if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}

if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}

return response;
}

OkhttpClient 实现了Call.Fctory,负责为Request 创建 Call;

RealCall 为Call的具体实现,其enqueue() 异步请求接口通过Dispatcher()调度器利用ExcutorService实现,而最终进行网络请求时和同步的execute()接口一致,都是通过 getResponseWithInterceptorChain() 函数实现

getResponseWithInterceptorChain() 中利用 Interceptor 链条,责任链模式 分层实现缓存、透明压缩、网络 IO 等功能;最终将响应数据返回给用户。

OkHttp 连接池复用

我们知道 OkHttp 支持5个并发 socket 连接,默认keepAlive 时间为5分钟。 那究竟是怎么做到的呢

在 ConnectInterceptor 中我们知道 newStream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
...

try {
找“健康的”RealConnection
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);

HttpCodec resultCodec;
if (resultConnection.http2Connection != null) {
resultCodec = new Http2Codec(client, this, resultConnection.http2Connection);
} else {
// 通过RealConnection创建HttpCodec
resultConnection.socket().setSoTimeout(readTimeout);
resultConnection.source.timeout().timeout(readTimeout, MILLISECONDS);
resultConnection.sink.timeout().timeout(writeTimeout, MILLISECONDS);
resultCodec = new Http1Codec(
client, this, resultConnection.source, resultConnection.sink);
}

synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}


private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
while (true) {
// while循环直到找到return
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);

...

return candidate;
}
}




private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
...
// 如果为空,尝试从连接池中获取,这个方法的关键点,如果获取到connection不为空(第三个参数为this,找到合适的RealConnection赋值到connection

// Attempt to get a connection from the pool.
RealConnection pooledConnection = Internal.instance.get(connectionPool, address, this);
if (pooledConnection != null) {
this.connection = pooledConnection;
return pooledConnection;
}

selectedRoute = route;
}


if (selectedRoute == null) {
selectedRoute = routeSelector.next();
synchronized (connectionPool) {
route = selectedRoute;
refusedStreamCount = 0;
}
}
// 如果没有找到,则创建一个
RealConnection newConnection = new RealConnection(selectedRoute);

synchronized (connectionPool) {
acquire(newConnection);
Internal.instance.put(connectionPool, newConnection);
this.connection = newConnection;
if (canceled) throw new IOException("Canceled");
}
// 进行实际的的网络连接
newConnection.connect(connectTimeout, readTimeout, writeTimeout, address.connectionSpecs(),
connectionRetryEnabled);
routeDatabase().connected(newConnection.route());

return newConnection;
}

从上面的分析,获取RealConnection的流程,总结如下:

在ConnectInterceptor中获取StreamAllocation的引用,通过StreamAllocation去寻找RealConnection

如果RealConnection不为空,那么直接返回。否则去连接池中寻找并返回,如果找不到直接创建并设置到连接池中,然后再进一步判断是否重复释放到Socket。

在实际网络连接connect中,选择不同的链接方式(有隧道链接(Tunnel)和管道链接(Socket))
把RealConnection和HttpCodec传递给下一个拦截器

在从连接池中获取一个连接的时候,使用了 Internal 的 get() 方法。Internal 有一个静态的实例,会在 OkHttpClient 的静态代码快中被初始化。我们会在 Internal 的 get() 中调用连接池的 get() 方法来得到一个连接。并且,从中我们明白了连接复用的一个好处就是省去了进行 TCP 和 TLS 握手的一个过程。因为建立连接本身也是需要消耗一些时间的,连接被复用之后可以提升我们网络访问的效率。

ConnectionPool

连接池的位于 ConnectionPool 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/** 空闲 socket 最大连接数 */
private final int maxIdleConnections;
socket 的 keepAlive 时间
private final long keepAliveDurationNs;
private final Deque<RealConnection> connections = new ArrayDeque<>();
final RouteDatabase routeDatabase = new RouteDatabase();
boolean cleanupRunning;


public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}

public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

// Put a floor on the keep alive duration, otherwise cleanup will spin loop.
if (keepAliveDuration <= 0) {
throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
}
}

构造方法可以看到,空闲socket的最大连接数为5个,ConnectionPool是在 OkHttpClient 实例化时创建的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
RealConnection get(Address address, StreamAllocation streamAllocation) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.allocations.size() < connection.allocationLimit
&& address.equals(connection.route().address)
&& !connection.noNewStreams) {
streamAllocation.acquire(connection);
return connection;
}
}
return null;
}

void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
添加到 Deque 之前需要清理空闲的线程,
connections.add(connection);
}

看下 put,get 方法,get 方法会遍历 connection 缓存列表, 当某个连接计数小于限制的大小,并且 request 的地址和缓存列表中此链接的地址完全匹配时, 则直接复用缓存列表中的 connection 作为request 的连接。

上面可以看到 put 方法会调用清理线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
线程会不停的调用cleanup方法进行清理, 并返回下次需要清理的间隔时间, 然后调用 wait方法进行等待,当时间到了之后再次进行清理。 一直这样下去。

会调用 cleanup方法,下面是cleanup方法


long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;

// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();

// If the connection is in use, keep searching.
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}

idleConnectionCount++;

// If the connection is ready to be evicted, we're done.
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}

if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// We've found a connection to evict. Remove it from the list, then close it below (outside
// of the synchronized block).
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
cleanupRunning = false;
return -1;
}
}

closeQuietly(longestIdleConnection.socket());

// Cleanup again immediately.
return 0;
}

cleanup方法的过程是 根据连接中的引用计数来计算空闲连接数和活跃连接数,,然后标记出空闲连接数。
如果空闲连接keepAlive 时间超过5分钟,或者空闲连接数超过5个,则从Deque 中移除次连接,
如果空闲连接数大于0,则返回此连接即将到期的时间,如果都是活跃连接,并大于0,则返回5分钟。 如果没有任何连接,则返回-1,

清除算法,使用类似GC中的引用计算算法,如果弱引用StreamAllocation列表为0,则表示空闲需要进行回收。

可以看出连接池复用的核心就是用 Deque 来存储连接, 通过 put,get 等来对 Deque 进行操作, 另外通过判断连接中的技术对象 StreamAllocation 来进行自动回收连接。

OkHttp 的优缺点

优点:

  • 1、支持 HTTP/2,允许连接同一主机的所有请求分享一个 socket。 如果 HTTP/2 不可用,会使用连接池减少请求延迟。
  • 2、使用GZIP压缩下载内容,且压缩操作对用户是透明的。
  • 3、利用响应缓存来避免重复的网络请求。
  • 4、如果你的服务端有多个IP地址,当第一个地址连接失败时,OKHttp会尝试连接其他的地址,这对IPV4和IPV6以及寄宿在多个数据中心的服务而言,是非常有必要的。
  • 5、用户可自主定制拦截器,实现自己想要的网络拦截。
  • 6、支持大文件的上传和下载。
  • 7、支持cookie持久化。
  • 8、支持自签名的https链接,配置有效证书即可。
  • 9、支持Headers的缓存策略减少重复的网络请求。

缺点:

  • 1、网络请求的回调是子线程,需要用户手动操作发送到主线程。
  • 2、参数较多,配置起来复杂。

所以综合上面的缺点,OkHttpUtils 及类似的 封装应用而生。下一篇我们来通过 OkHttpUtils源码解析 看下是如何封装并解决这些问题的。

参考来源

请联系我!