publicfinalclassDispatcher{ privateint maxRequests = 64; privateint maxRequestsPerHost = 5; /** Ready async calls in the order they'll be run. */ privatefinal Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */ privatefinal Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); /** Running synchronous calls. Includes canceled calls that haven't finished yet. */ privatefinal Deque<RealCall> runningSyncCalls = new ArrayDeque<>(); /** Used by {@code Call#execute} to signal it is in-flight. */ synchronizedvoidexecuted(RealCall call){//同步调用 runningSyncCalls.add(call); }
publicfinalclassRealInterceptorChainimplementsInterceptor.Chain{ public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection)throws IOException { if (index >= interceptors.size()) thrownew AssertionError(); calls++; // If we already have a stream, confirm that the incoming request will use it. if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) { thrownew IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must retain the same host and port"); } // If we already have a stream, confirm that this is the only call to chain.proceed(). if (this.httpCodec != null && calls > 1) { thrownew IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must call proceed() exactly once"); } // Call the next interceptor in the chain. RealInterceptorChain next = new RealInterceptorChain( interceptors, streamAllocation, httpCodec, connection, index + 1, request); Interceptor interceptor = interceptors.get(index); Response response = interceptor.intercept(next); // Confirm that the next interceptor made its required call to chain.proceed(). if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) { thrownew IllegalStateException("network interceptor " + interceptor + " must call proceed() exactly once"); } // Confirm that the intercepted response isn't null. if (response == null) { thrownew NullPointerException("interceptor " + interceptor + " returned null"); } return response; } }
publicfinalclassRetryAndFollowUpInterceptorimplementsInterceptor{ private Request followUpRequest(Response userResponse)throws IOException { if (userResponse == null) thrownew IllegalStateException(); Connection connection = streamAllocation.connection(); Route route = connection != null ? connection.route() : null; int responseCode = userResponse.code(); final String method = userResponse.request().method(); switch (responseCode) { //407,代理认证 case HTTP_PROXY_AUTH: Proxy selectedProxy = route != null ? route.proxy() : client.proxy(); if (selectedProxy.type() != Proxy.Type.HTTP) { thrownew ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy"); } return client.proxyAuthenticator().authenticate(route, userResponse); //401,未经认证 case HTTP_UNAUTHORIZED: return client.authenticator().authenticate(route, userResponse); //307,308 case HTTP_PERM_REDIRECT: case HTTP_TEMP_REDIRECT: // "If the 307 or 308 status code is received in response to a request other than GET // or HEAD, the user agent MUST NOT automatically redirect the request" if (!method.equals("GET") && !method.equals("HEAD")) { returnnull; } // fall-through //300,301,302,303 case HTTP_MULT_CHOICE: case HTTP_MOVED_PERM: case HTTP_MOVED_TEMP: case HTTP_SEE_OTHER: //客户端在配置中是否允许重定向 if (!client.followRedirects()) returnnull; String location = userResponse.header("Location"); if (location == null) returnnull; HttpUrl url = userResponse.request().url().resolve(location); // url为null,不允许重定向 if (url == null) returnnull; //查询是否存在http与https之间的重定向 boolean sameScheme = url.scheme().equals(userResponse.request().url().scheme()); if (!sameScheme && !client.followSslRedirects()) returnnull; // Most redirects don't include a request body. Request.Builder requestBuilder = userResponse.request().newBuilder(); if (HttpMethod.permitsRequestBody(method)) { finalboolean maintainBody = HttpMethod.redirectsWithBody(method); if (HttpMethod.redirectsToGet(method)) { requestBuilder.method("GET", null); } else { RequestBody requestBody = maintainBody ? userResponse.request().body() : null; requestBuilder.method(method, requestBody); } if (!maintainBody) { requestBuilder.removeHeader("Transfer-Encoding"); requestBuilder.removeHeader("Content-Length"); requestBuilder.removeHeader("Content-Type"); } } // When redirecting across hosts, drop all authentication headers. This // is potentially annoying to the application layer since they have no // way to retain them. if (!sameConnection(userResponse, url)) { requestBuilder.removeHeader("Authorization"); } return requestBuilder.url(url).build(); //408,超时 case HTTP_CLIENT_TIMEOUT: // 408's are rare in practice, but some servers like HAProxy use this response code. The // spec says that we may repeat the request without modifications. Modern browsers also // repeat the request (even non-idempotent ones.) if (userResponse.request().body() instanceof UnrepeatableRequestBody) { returnnull; } return userResponse.request(); default: returnnull; } } }
publicfinalclassStreamAllocation{ /** * Returns a connection to host a new stream. This prefers the existing connection if it exists, * then the pool, finally building a new connection. */ private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled)throws IOException { Route selectedRoute; synchronized (connectionPool) { if (released) thrownew IllegalStateException("released"); if (codec != null) thrownew IllegalStateException("codec != null"); if (canceled) thrownew IOException("Canceled"); //1 查看是否有完好的连接 RealConnection allocatedConnection = this.connection; if (allocatedConnection != null && !allocatedConnection.noNewStreams) { return allocatedConnection; } //2 连接池中是否用可用的连接,有则使用 Internal.instance.get(connectionPool, address, this, null); if (connection != null) { return connection; } selectedRoute = route; } //线程的选择,多IP操作 if (selectedRoute == null) { selectedRoute = routeSelector.next(); } //3 如果没有可用连接,则自己创建一个 RealConnection result; synchronized (connectionPool) { if (canceled) thrownew IOException("Canceled"); // Now that we have an IP address, make another attempt at getting a connection from the pool. // This could match due to connection coalescing. Internal.instance.get(connectionPool, address, this, selectedRoute); if (connection != null) { route = selectedRoute; return connection; } // Create a connection and assign it to this allocation immediately. This makes it possible // for an asynchronous cancel() to interrupt the handshake we're about to do. route = selectedRoute; refusedStreamCount = 0; result = new RealConnection(connectionPool, selectedRoute); acquire(result); } //4 开始TCP以及TLS握手操作 result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled); routeDatabase().connected(result.route()); //5 将新创建的连接,放在连接池中 Socket socket = null; synchronized (connectionPool) { // Pool the connection. Internal.instance.put(connectionPool, result); // If another multiplexed connection to the same address was created concurrently, then // release this connection and acquire that one. if (result.isMultiplexed()) { socket = Internal.instance.deduplicate(connectionPool, address, this); result = connection; } } closeQuietly(socket); return result; } }
publicfinalclassRealConnectionextendsHttp2Connection.ListenerimplementsConnection{ publicvoidconnect( int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled){ if (protocol != null) thrownew IllegalStateException("already connected"); //线路选择 RouteException routeException = null; List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs(); ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs); if (route.address().sslSocketFactory() == null) { if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) { thrownew RouteException(new UnknownServiceException( "CLEARTEXT communication not enabled for client")); } String host = route.address().url().host(); if (!Platform.get().isCleartextTrafficPermitted(host)) { thrownew RouteException(new UnknownServiceException( "CLEARTEXT communication to " + host + " not permitted by network security policy")); } } //开始连接 while (true) { try { //如果是通道模式,则建立通道连接 if (route.requiresTunnel()) { connectTunnel(connectTimeout, readTimeout, writeTimeout); } //否则进行Socket连接,一般都是属于这种情况 else { connectSocket(connectTimeout, readTimeout); } //建立https连接 establishProtocol(connectionSpecSelector); break; } catch (IOException e) { closeQuietly(socket); closeQuietly(rawSocket); socket = null; rawSocket = null; source = null; sink = null; handshake = null; protocol = null; http2Connection = null; if (routeException == null) { routeException = new RouteException(e); } else { routeException.addConnectException(e); } if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) { throw routeException; } } } if (http2Connection != null) { synchronized (connectionPool) { allocationLimit = http2Connection.maxConcurrentStreams(); } } }
/** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket. */ privatevoidconnectSocket(int connectTimeout, int readTimeout)throws IOException { Proxy proxy = route.proxy(); Address address = route.address(); //根据代理类型的不同处理Socket rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP ? address.socketFactory().createSocket() : new Socket(proxy); rawSocket.setSoTimeout(readTimeout); try { //建立Socket连接 Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout); } catch (ConnectException e) { ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress()); ce.initCause(e); throw ce; } // The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0 // More details: // https://github.com/square/okhttp/issues/3245 // https://android-review.googlesource.com/#/c/271775/ try { //获取输入/输出流 source = Okio.buffer(Okio.source(rawSocket)); sink = Okio.buffer(Okio.sink(rawSocket)); } catch (NullPointerException npe) { if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) { thrownew IOException(npe); } } } }
评论加载中