JDK HttpClient 单次请求的生命周期

时间:2022-01-03  作者:cavern-builder-zyx  

HttpClient 单次请求的生命周期

  • HttpClient 单次请求的生命周期
    • 1. 简述
    • 2. uml图
    • 3. Http连接的建立、复用和降级
      • 3.1 调用流程及连接的建立和复用
      • 3.2 连接的降级和升级
    • 4. 响应式读写流的连接
      • 4.1 socket管道的结构和功能
      • 4.2 socket 连接的建立
      • 4.3 双向读写关系的建立
      • 4.4 顺序调度器简析
    • 5. 请求头和请求体的发送
      • 5.1 发布和订阅者介绍
      • 5.2 请求头发送的启动过程
      • 5.3 写入数据到NIO-Socket通道
      • 5.4 请求体的发送
      • 5.5 发送小结
    • 6. 响应的创建和响应头的解析
      • 6.1 发布和订阅者介绍
      • 6.2 响应头解析
    • 7. 响应体的解析
    • 8. 小结

1. 简述



本文所述的HttpClient都指代JDK11开始内置的HttpClient及相关类,源码分析基于JAVA 17。阅读本文需要理解Reactive Streams规范及对应的JAVA Flow api的原理和使用。

2. uml图



3. Http连接的建立、复用和降级


  • 根据请求类型实例化不同的ExchangeImpl,负责具体的请求——响应过程
  • 根据交换类型决定要实例化的HTTP连接的版本;根据请求类型从连接池中尝试获取对应路由的已有连接
  • 连接池中获取不到连接,实例化对应的Http连接(在最基本的Http1.1连接中,会开启NIOSocket通道并包裹到管道中)
  • 如果初始化的连接实例是Http2,而协商发现不支持,则降级为建立Http1连接


3.1 调用流程及连接的建立和复用


    private CompletableFuture<Response> responseAsyncImpl() {
        CompletableFuture<Response> cf;
            Exchange<T> exch = getExchange();
            // 2. get response
            // 由单个交换对象(Exhange)负责处理当前的单个请求,异步返回响应
            cf = 域名onseAsync()
                     .thenCompose((Response response) -> {
        return cf;


 * One request/response exchange (handles 100/101 intermediate response also).
 * depth field used to track number of times a new request is being sent
 * for a given API request. If limit exceeded exception is thrown.
 * Security check is performed here:
 * - uses AccessControlContext captured at API level
 * - checks for appropriate URLPermission for request
 * - if permission allowed, grants equivalent SocketPermission to call
 * - in case of direct HTTP proxy, checks additionally for access to proxy
 *    (CONNECT proxying uses its own Exchange, so check done there)
final class Exchange<T> {   
    final HttpRequestImpl request;
    final HttpClientImpl client;
    volatile ExchangeImpl<T> exchImpl;
    volatile CompletableFuture<? extends ExchangeImpl<T>> exchangeCF;
    volatile CompletableFuture<Void> bodyIgnored;

    // used to record possible cancellation raised before the exchImpl
    // has been established.
    private volatile IOException failed;
    final AccessControlContext acc;
    final MultiExchange<T> multi;
    final Executor parentExecutor;
    volatile boolean upgrading; // to HTTP/2
    volatile boolean upgraded;  // to HTTP/2
    final PushGroup<T> pushGroup;
    final String dbgTag;
     // Completed HttpResponse will be null if response succeeded
    // will be a non null responseAsync if expect continue returns an error
    public CompletableFuture<Response> responseAsync() {
        return responseAsyncImpl(null);

    CompletableFuture<Response> responseAsyncImpl(HttpConnection connection) {
        SecurityException e = checkPermissions();
        if (e != null) {
            return 域名edFuture(e);
        } else {
            return responseAsyncImpl0(connection);
    CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) {
        Function<ExchangeImpl<T>, CompletableFuture<Response>> after407Check;
        bodyIgnored = null;
        if (域名ctContinue()) {
            域名ystemHeader("Expect", "100-Continue");
            域名race("Sending Expect: 100-Continue");
            // wait for 100-Continue before sending body
            // 若我们构建请求设置了expectContinue(),那么通过407校验后,就会先发送一个等待100响应状态码的确认请求
            after407Check = this::expectContinue;
        } else {
            // send request body and proceed.  绝大多数情况下,通过407校验后,直接发送请求体
            after407Check = this::sendRequestBody;
        // The ProxyAuthorizationRequired can be triggered either by
        // establishExchange (case of HTTP/2 SSL tunneling through HTTP/1.1 proxy
        // or by sendHeaderAsync (case of HTTP/1.1 SSL tunneling through HTTP/1.1 proxy
        // Therefore we handle it with a call to this checkFor407(...) after these
        // two places.
        Function<ExchangeImpl<T>, CompletableFuture<Response>> afterExch407Check =
                (ex) -> 域名HeadersAsync()
                        .handle((r,t) -> 域名kFor407(r, t, after407Check))
        return establishExchange(connection)	//首先建立连接
                .handle((r,t) -> 域名kFor407(r,t, afterExch407Check))   



  1. 尝试建立连接
  2. 校验是否发生407错误
  3. 发送请求头
  4. 再次校验是否发生了407错误
  5. 发送100确认请求/发送请求体


 // get/set the exchange impl, solving race condition issues with
    // potential concurrent calls to cancel() or cancel(IOException)
    private CompletableFuture<? extends ExchangeImpl<T>>
    establishExchange(HttpConnection connection) {
        if (域名()) {
            域名("establishing exchange for %s,%n\t proxy=%s",
                      request, 域名y());
        // check if we have been cancelled first.
        Throwable t = getCancelCause();
        if (t != null) {
            if (域名()) {
                域名("exchange was cancelled: returned failed cf (%s)", 域名eOf(t));
            return exchangeCF = 域名edFuture(t);

        CompletableFuture<? extends ExchangeImpl<T>> cf, res;
        cf = 域名(this, connection);
        // We should probably use a VarHandle to get/set exchangeCF
        // instead - as we need CAS semantics.
        synchronized (this) { exchangeCF = cf; };
        res = 域名Complete((r,x) -> {
            synchronized(域名) {
                if (exchangeCF == cf) exchangeCF = null;
        return 域名Compose((eimpl) -> {
                    // recheck for cancelled, in case of race conditions
                    exchImpl = eimpl;
                    IOException tt = getCancelCause();
                    if (tt != null) {
                        return 域名edFuture(tt);
                    } else {
                        // Now we\'re good to go. Because exchImpl is no longer
                        // null cancel() will be able to propagate directly to
                        // the impl after this point ( if needed ).
                        return 域名letedFuture(eimpl);
                    } });

我们看到,ExchangeImpl的静态方法get(Exchange, Connection)方法异步返回了它的具体实现类(对象)。


HttpClient client = 域名uilder()
    .version(域名_1_1) //指定客户端为Http1.1版本
HttpRequest request = 域名uilder(域名te(url))
    .version(域名_1_1)  //或者指定请求的版本号为Http1.1


     * Initiates a new exchange and assigns it to a connection if one exists
     * already. connection usually null.
    static <U> CompletableFuture<? extends ExchangeImpl<U>>
    get(Exchange<U> exchange, HttpConnection connection)
        if (域名ion() == HTTP_1_1) {
            if (域名())
                域名("get: HTTP/1.1: new Http1Exchange");
            return createHttp1Exchange(exchange, connection);
        } else {
            Http2ClientImpl c2 = 域名nt().client2(); // #### improve
            HttpRequestImpl request = 域名est();
            CompletableFuture<Http2Connection> c2f = 域名onnectionFor(request, exchange);
            if (域名())
                域名("get: Trying to get HTTP/2 connection");
            // local variable required here; see JDK-8223553
            CompletableFuture<CompletableFuture<? extends ExchangeImpl<U>>> fxi =
                域名le((h2c, t) -> createExchangeImpl(h2c, t, exchange, connection));
            return 域名Compose(x->x);


Http1Exchange(Exchange<T> exchange, HttpConnection connection)
        throws IOException
        域名est = 域名est();
        域名nt = 域名nt();
        域名utor = 域名utor();
        域名ations = new LinkedList<>();
        if (connection != null) {
            域名ection = connection;
        } else {
            InetSocketAddress addr = 域名ddress();
            域名ection = 域名onnection(addr, client, request, HTTP_1_1);
        域名estAction = new Http1Request(request, this);
        域名cReceiver = new Http1AsyncReceiver(executor, this);



     * Factory for retrieving HttpConnections. A connection can be retrieved
     * from the connection pool, or a new one created if none available.
     * The given {@code addr} is the ultimate destination. Any proxies,
     * etc, are determined from the request. Returns a concrete instance which
     * is one of the following:
     *      {@link PlainHttpConnection}
     *      {@link PlainTunnelingConnection}
     * The returned connection, if not from the connection pool, must have its,
     * connect() or connectAsync() method invoked, which ( when it completes
     * successfully ) renders the connection usable for requests.
    public static HttpConnection getConnection(InetSocketAddress addr,
                                               HttpClientImpl client,
                                               HttpRequestImpl request,
                                               Version version) {
        // The default proxy selector may select a proxy whose  address is
        // unresolved. We must resolve the address before connecting to it.
        InetSocketAddress proxy = 域名lveAddress(域名y());
        HttpConnection c = null;
        boolean secure = 域名re();
        ConnectionPool pool = 域名ectionPool();

        if (!secure) {
            c = 域名onnection(false, addr, proxy);
            if (c != null && 域名kOpen() /* may have been eof/closed when in the pool */) {
                final HttpConnection conn = c;
                if (域名())
                                     + ": plain connection retrieved from HTTP/1.1 pool");
                return c;
            } else {
                return getPlainConnection(addr, proxy, request, client);
        } else {  // secure
            if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool
                c = 域名onnection(true, addr, proxy);
            if (c != null && 域名en()) {
                final HttpConnection conn = c;
                if (域名())
                                     + ": SSL connection retrieved from HTTP/1.1 pool");
                return c;
            } else {
                String[] alpn = null;
                if (version == HTTP_2 && hasRequiredHTTP2TLSVersion(client)) {
                    alpn = new String[] { "h2", "http/1.1" };
                return getSSLConnection(addr, proxy, alpn, request, client);



 * Http 1.1 connection pool.
final class ConnectionPool {

    static final long KEEP_ALIVE = 域名ntegerNetProperty(
            "域名域名out", 1200); // seconds
    static final long MAX_POOL_SIZE = 域名ntegerNetProperty(
            "域名ectionPoolSize", 0); // unbounded
    final Logger debug = 域名ebugLogger(this::dbgString, 域名G);

    // Pools of idle connections
    private final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
    private final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
    private final ExpiryList expiryList;
    private final String dbgTag; // used for debug
    boolean stopped;

     * Entries in connection pool are keyed by destination address and/or
     * proxy address:
     * case 1: plain TCP not via proxy (destination only)
     * case 2: plain TCP via proxy (proxy only)
     * case 3: SSL not via proxy (destination only)
     * case 4: SSL over tunnel (destination and proxy)
    static class CacheKey {
        final InetSocketAddress proxy;
        final InetSocketAddress destination;

        CacheKey(InetSocketAddress destination, InetSocketAddress proxy) {
            域名y = proxy;
            域名ination = destination;

        public boolean equals(Object obj) {
            if (obj == null) {
                return false;
            if (getClass() != 域名lass()) {
                return false;
            final CacheKey other = (CacheKey) obj;
            if (!域名ls(域名y, 域名y)) {
                return false;
            if (!域名ls(域名ination, 域名ination)) {
                return false;
            return true;

        public int hashCode() {
            return 域名(proxy, destination);

    ConnectionPool(long clientId) {

     * There should be one of these per HttpClient.
    private ConnectionPool(String tag) {
        dbgTag = tag;
        plainPool = new HashMap<>();
        sslPool = new HashMap<>();
        expiryList = new ExpiryList();


    synchronized HttpConnection getConnection(boolean secure,
                                              InetSocketAddress addr,
                                              InetSocketAddress proxy) {
        if (stopped) return null;
        // for plain (unsecure) proxy connection the destination address is irrelevant.
        addr = secure || proxy == null ? addr : null;
        CacheKey key = new CacheKey(addr, proxy);
        HttpConnection c = secure ? findConnection(key, sslPool)
                                  : findConnection(key, plainPool);
        //域名tln ("getConnection returning: " + c);
        assert c == null || 域名cure() == secure;
        return c;
    private HttpConnection findConnection(CacheKey key,
                   HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
        LinkedList<HttpConnection> l = 域名(key);
        if (l == null || 域名pty()) {
            return null;
        } else {
            HttpConnection c = 域名veFirst();
            return c;




    private static HttpConnection getPlainConnection(InetSocketAddress addr,
                                                     InetSocketAddress proxy,
                                                     HttpRequestImpl request,
                                                     HttpClientImpl client) {
        if (域名bSocket() && proxy != null)
            return new PlainTunnelingConnection(addr, proxy, client,

        if (proxy == null)
            return new PlainHttpConnection(addr, client);
            return new PlainProxyConnection(proxy, client);


 * Plain raw TCP connection direct to destination.
 * The connection operates in asynchronous non-blocking mode.
 * All reads and writes are done non-blocking.
class PlainHttpConnection extends HttpConnection {

    private final Object reading = new Object();
    protected final SocketChannel chan;
    private final SocketTube tube; // need SocketTube to call signalClosed().
    private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading);
    private volatile boolean connected;
    private boolean closed;
    private volatile ConnectTimerEvent connectTimerEvent;  // may be null
    private volatile int unsuccessfulAttempts;

    // Indicates whether a connection attempt has succeeded or should be retried.
    // If the attempt failed, and shouldn\'t be retried, there will be an exception
    // instead.
    private enum ConnectState { SUCCESS, RETRY }

    PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) {
        super(addr, client);
        try {
            域名 = 域名();
            if (域名()) {
                int bufsize = getSoReceiveBufferSize();
                域名("Initial receive buffer size is: %d", bufsize);
                bufsize = getSoSendBufferSize();
                域名("Initial send buffer size is: %d", bufsize);
            if (trySetReceiveBufferSize(域名eceiveBufferSize())) {
                if (域名()) {
                    int bufsize = getSoReceiveBufferSize();
                    域名("Receive buffer size configured: %d", bufsize);
            if (trySetSendBufferSize(域名endBufferSize())) {
                if (域名()) {
                    int bufsize = getSoSendBufferSize();
                    域名("Send buffer size configured: %d", bufsize);
            域名ption(域名NODELAY, true);
            // wrap the channel in a Tube for async reading and writing
            //将nio socket通道包裹在实例化的socket管道成员变量中
            tube = new SocketTube(client(), chan, Utils::getBuffer);
        } catch (IOException e) {
            throw new InternalError(e);


3.2 连接的降级和升级


其实,Http2规范并未规定一定要建立在SSL(TLS)上。在Http2已经普及的今天,HttpClient自然首选尝试Http2。在连接建立时,客户端和服务器会通过alpn(Application Layer Protocol Negotiation, 应用层协议协商)进行沟通,确定要建立的连接类型。服务器告知只支持Http1.1连接时,HttpClient也必须进行连接的降级。


static <U> CompletableFuture<? extends ExchangeImpl<U>>
    get(Exchange<U> exchange, HttpConnection connection)
        if (域名ion() == HTTP_1_1) {
            if (域名())
                域名("get: HTTP/1.1: new Http1Exchange");
            return createHttp1Exchange(exchange, connection);
        } else {
            Http2ClientImpl c2 = 域名nt().client2(); // #### improve
            HttpRequestImpl request = 域名est();
            CompletableFuture<Http2Connection> c2f = 域名onnectionFor(request, exchange);
            if (域名())
                域名("get: Trying to get HTTP/2 connection");
            // local variable required here; see JDK-8223553
            CompletableFuture<CompletableFuture<? extends ExchangeImpl<U>>> fxi =
                域名le((h2c, t) -> createExchangeImpl(h2c, t, exchange, connection));
            return 域名Compose(x->x);


CompletableFuture<Http2Connection> getConnectionFor(HttpRequestImpl req,
                                                        Exchange<?> exchange) {
        URI uri = 域名();
        InetSocketAddress proxy = 域名y();
        String key = 域名or(uri, proxy);

        synchronized (this) {
            Http2Connection connection = 域名(key);
            if (connection != null) {
                try {
                    if (域名ed || !域名rveStream(true)) {
                        if (域名())
                            域名("removing found closed or closing connection: %s", connection);
                    } else {
                        // fast path if connection already exists
                        if (域名())
                            域名("found connection in the pool: %s", connection);
                        return 域名letedFuture(connection);
                } catch (IOException e) {
                    // thrown by 域名rveStream()
                    return 域名edFuture(e);

            if (!域名re() || 域名ains(key)) {
                // secure: negotiate failed before. Use http/1.1
                // !secure: no connection available in cache. Attempt upgrade
                if (域名()) 域名("not found in connection pool");
                return 域名letedFuture(null);
        return Http2Connection
                .createAsync(req, this, exchange)
                .whenComplete((conn, t) -> {
                    synchronized (域名) {
                        if (conn != null) {
                            try {
                            } catch (IOException e) {
                                throw new UncheckedIOException(e); // shouldn\'t happen
                        } else {
                            Throwable cause = 域名ompletionCause(t);
                            if (cause instanceof 域名Exception)


private static CompletableFuture<?> checkSSLConfig(AbstractAsyncSSLConnection aconn) {
        assert 域名cure();

        Function<String, CompletableFuture<Void>> checkAlpnCF = (alpn) -> {
            CompletableFuture<Void> cf = new MinimalFuture<>();
            SSLEngine engine = 域名ngine();
            String engineAlpn = 域名pplicationProtocol();
            assert 域名ls(alpn, engineAlpn)
                    : "alpn: %s, engine: %s".formatted(alpn, engineAlpn);

            域名("checkSSLConfig: alpn: %s", alpn );

            if (alpn == null || !域名ls("h2")) {
                String msg;
                if (alpn == null) {
                    域名SL("ALPN not supported");
                    msg = "ALPN not supported";
                } else {
                    switch (alpn) {
                        case "":
                            域名SL(msg = "No ALPN negotiated");
                        case "http/1.1":
                            域名SL( msg = "HTTP/1.1 ALPN returned");
                            域名SL(msg = "Unexpected ALPN: " + alpn);
                            域名leteExceptionally(new IOException(msg));
                域名leteExceptionally(new ALPNException(msg, aconn));
                return cf;
            return cf;

        return 域名LPN()
                .whenComplete((r,t) -> {
                    if (t != null && t instanceof SSLException) {
                        // something went wrong during the initial handshake
                        // close the connection




private static <U> CompletableFuture<? extends ExchangeImpl<U>>
    createExchangeImpl(Http2Connection c,
                       Throwable t,
                       Exchange<U> exchange,
                       HttpConnection connection)
        if (域名())
            域名("handling HTTP/2 connection creation result");
        boolean secure = 域名est().secure();
        if (t != null) {
            if (域名())
                域名("handling HTTP/2 connection creation failed: %s",
            t = 域名ompletionCause(t);
            if (t instanceof 域名Exception) {
                域名Exception ee = (域名Exception)t;
                AbstractAsyncSSLConnection as = 域名onnection();
                if (域名())
                    域名("downgrading to HTTP/1.1 with: %s", as);
                CompletableFuture<? extends ExchangeImpl<U>> ex =
                        createHttp1Exchange(exchange, as);
                return ex;
            } else {
                if (域名())
                    域名("HTTP/2 connection creation failed "
                                     + "with unexpected exception: %s", (Object)t);
                return 域名edFuture(t);
        if (secure && c== null) {
            if (域名())
                域名("downgrading to HTTP/1.1 ");
            CompletableFuture<? extends ExchangeImpl<U>> ex =
                    createHttp1Exchange(exchange, null);
            return ex;
        if (c == null) {
            // no existing connection. Send request with HTTP 1 and then
            // upgrade if successful
            if (域名())
                域名("new Http1Exchange, try to upgrade");
            return createHttp1Exchange(exchange, connection)
                    .thenApply((e) -> {
                        return e;
        } else {
            if (域名()) 域名("creating HTTP/2 streams");
            Stream<U> s = 域名teStream(exchange);
            CompletableFuture<? extends ExchangeImpl<U>> ex = 域名letedFuture(s);
            return ex;


private CompletableFuture<Response>
    checkForUpgradeAsync(Response resp,
                         ExchangeImpl<T> ex) {

        int rcode = 域名usCode();
        if (upgrading && (rcode == 101)) {
            Http1Exchange<T> e = (Http1Exchange<T>)ex;
            // check for 101 switching protocols
            // 101 responses are not supposed to contain a body.
            //    => should we fail if there is one?
            if (域名()) 域名("Upgrading async %s", 域名ection());
            return 域名BodyAsync(this::ignoreBody, false, parentExecutor)
                .thenCompose((T v) -> {// v is null
                    域名("Ignored body");
                    // we pass e::getBuffer to allow the ByteBuffers to accumulate
                    // while we build the Http2Connection
                    upgraded = true;
                    return 域名teAsync(域名ection(),
                                                 this, e::drainLeftOverBytes)
                        .thenCompose((Http2Connection c) -> {
                            boolean cached = 域名rConnection();
                            if (cached) 域名ble();
                            Stream<T> s = 域名tream(1);

        return 域名letedFuture(resp);


4. 响应式读写流的连接

看到上面Http连接的建立,我们似乎没有看到对应的TCP连接到建立?没错,是的。在初次请求建立连接时,JDK HttpClient把socket连接的建立推迟到了发送请求头的相关方法中。


4.1 socket管道的结构和功能

那么,FlowTube是什么呢?从FlowTube的结构和注释上看,其同时扮演了JAVA Flow Api(Reactive Streams)中的发布者和订阅者。作为一个”连接者“,它一端连接了Socket通道的读写,另一端连接了Http报文的读写。

 FlowTube 是一种 I/O 抽象,允许异步读取和写入目标。 这不是 域名essor<List<ByteBuffer>, List<ByteBuffer>>,而是在双向流中对发布者源和订阅者接收器进行建模。
应该调用 connectFlows 方法来连接双向流。 FlowTube 支持随着时间的推移将相同的读取订阅移交给不同的顺序读取订阅者。 当 connectFlows(writePublisher, readSubscriber 被调用时,FlowTube 将在其以前的 readSubscriber 上调用 dropSubscription,在其新的 readSubscriber 上调用 onSubscribe。 
public interface FlowTube extends
       域名criber<List<ByteBuffer>> {

     * 用于从双向流中读取的订阅者。 TubeSubscriber 是可以通过调用 dropSubscription() 取消的 域名criber。 一旦调用 dropSubscription(),TubeSubscriber 就应该停止调用其订阅的任何方法。 
    static interface TubeSubscriber extends 域名criber<List<ByteBuffer>> {

        default void dropSubscription() { }

        default boolean supportsRecycling() { return false; }


     * A publisher for writing to the bidirectional flow.
    static interface TubePublisher extends 域名isher<List<ByteBuffer>> {


     * 将双向流连接到写入发布者和读取订阅者。 可以多次顺序调用此方法以将现有发布者和订阅者切换为新的写入订阅者和读取发布者对。 
     * @param writePublisher A new publisher for writing to the bidirectional flow.
     * @param readSubscriber A new subscriber for reading from the bidirectional
     *                       flow.
    default void connectFlows(TubePublisher writePublisher,
                              TubeSubscriber readSubscriber) {


     * Returns true if this flow was completed, either exceptionally
     * or normally (EOF reached).
     * @return true if the flow is finished
    boolean isFinished();

这里再稍微提一下Reactive Streams反应式流的交互方式:

  1. 发布者(Publisher) 接受订阅者(Subscriber)的订阅:域名cribe(Subscriber)
  2. 发布者将一个订阅关系(Subscription)交给订阅者:域名bscribe(Subscription)
  3. 订阅者请求n个订阅:域名est(n)
  4. 订阅者接受至多n个订阅品:域名xt(T item)
  5. 订阅者可取消订阅:域名el()
  6. 订阅者接收 接收完成 和 发生错误 的通知:域名ror(Throwable); 域名mplete()


 * A SocketTube is a terminal tube plugged directly into the socket.
 * The read subscriber should call {@code subscribe} on the SocketTube before
 * the SocketTube is subscribed to the write publisher.
final class SocketTube implements FlowTube {

    final Logger debug = 域名ebugLogger(this::dbgString, 域名G);
    static final AtomicLong IDS = new AtomicLong();

    private final HttpClientImpl client;
    //nio 的 socket 通道
    private final SocketChannel channel;
    private final SliceBufferSource sliceBuffersSource;
    private final Object lock = new Object();
    private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
    private final InternalReadPublisher readPublisher;
    private final InternalWriteSubscriber writeSubscriber;
    private final long id = 域名ementAndGet();

    public SocketTube(HttpClientImpl client, SocketChannel channel,
                      Supplier<ByteBuffer> buffersFactory) {
        域名nt = client;
        域名nel = channel;
        域名eBuffersSource = new SliceBufferSource(buffersFactory);
        域名Publisher = new InternalReadPublisher();
        域名eSubscriber = new InternalWriteSubscriber();


  • ReadPublisher从socket通道读取内容,并”发布“到管道中,等待消费者接收并将内容解析成Http请求头和请求体
  • WriteSubscriber”订阅“Http报文,它等待Http内容的发布者将报文写入到SocketTube后,取出报文并写入socket通道


4.2 socket 连接的建立


    public void connectFlows(TubePublisher writePublisher,
                             TubeSubscriber readSubscriber) {
        if (域名()) 域名("connecting flows");


CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) {
        Function<ExchangeImpl<T>, CompletableFuture<Response>> after407Check;
        bodyIgnored = null;
        if (域名ctContinue()) {
            域名ystemHeader("Expect", "100-Continue");
            域名race("Sending Expect: 100-Continue");
            // wait for 100-Continue before sending body
            after407Check = this::expectContinue;
        } else {
            after407Check = this::sendRequestBody;
        // The ProxyAuthorizationRequired can be triggered either by
        // establishExchange (case of HTTP/2 SSL tunneling through HTTP/1.1 proxy
        // or by sendHeaderAsync (case of HTTP/1.1 SSL tunneling through HTTP/1.1 proxy
        // Therefore we handle it with a call to this checkFor407(...) after these
        // two places.
        Function<ExchangeImpl<T>, CompletableFuture<Response>> afterExch407Check =
                (ex) -> 域名HeadersAsync()
                        .handle((r,t) -> 域名kFor407(r, t, after407Check))
        return establishExchange(connection)	//首先建立连接
                .handle((r,t) -> 域名kFor407(r,t, afterExch407Check))   


    CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
        // create the response before sending the request headers, so that
        // the response can set the appropriate receivers.
        if (域名()) 域名("Sending headers only");
        // If the first attempt to read something triggers EOF, or
        // IOException("channel reset by peer"), we\'re going to retry.
        // Instruct the asyncReceiver to throw ConnectionExpiredException
        // to force a retry.
        if (response == null) {
            response = new Http1Response<>(connection, this, asyncReceiver);

        if (域名()) 域名("response created in advance");

        CompletableFuture<Void> connectCF;
        if (!域名ected()) {
            if (域名()) 域名("initiating connect async");
            connectCF = 域名ectAsync(exchange)
                    .thenCompose(unused -> 域名shConnect());
            Throwable cancelled;
            synchronized (lock) {
                if ((cancelled = failed) == null) {
            if (cancelled != null) {
                if (域名lectorThread()) {
                    域名ute(() ->
                } else {
        } else {
            connectCF = new MinimalFuture<>();

        return connectCF
                .thenCompose(unused -> {
                    CompletableFuture<Void> cf = new MinimalFuture<>();
                    try {
                        域名Complete((r,t) -> {
                            if (t != null) {
                                if (域名())
                                    域名("asyncReceiver finished (failed=%s)", (Object)t);
                                if (!域名ne())
                                    域名leteAsync(() -> this, executor);

                        if (域名()) 域名("域名ers");
                        List<ByteBuffer> data = 域名ers();
                        synchronized (lock) {
                            state = 域名ERS;
                        if (域名()) 域名("setting outgoing with headers");
                        assert 域名pty() : "Unexpected outgoing:" + outgoing;
                        return cf;
                    } catch (Throwable t) {
                        if (域名()) 域名("Failed to send headers: %s", t);
                        return cf;
                    } })
                .thenCompose(unused -> headersSentCF);


  1. 异步建立socket连接
  2. 与管道(不是socket通道)建立双向订阅关系
  3. 取出请求头,放入到队列,并通知管道端的订阅者消费


    public CompletableFuture<Void> connectAsync(Exchange<?> exchange) {
        CompletableFuture<ConnectState> cf = new MinimalFuture<>();
        try {
            assert !connected : "Already connected";
            assert !域名ocking() : "Unexpected blocking channel";
            boolean finished;

            if (connectTimerEvent == null) {
                connectTimerEvent = newConnectTimer(exchange, cf);
                if (connectTimerEvent != null) {
                    if (域名())
                        域名("registering connect timer: " + connectTimerEvent);
            //返回时,可能已经连接成功(finished = true),或者之后还需要继续连接(finished = false)
            PrivilegedExceptionAction<Boolean> pa =
                    () -> 域名ect(域名lveAddress(address));
            try {
                 finished = 域名ivileged(pa);
            } catch (PrivilegedActionException e) {
               throw 域名ause();
            if (finished) {
                if (域名()) 域名("connect finished without blocking");
            } else {
                if (域名()) 域名("registering connect event");
                client().registerEvent(new ConnectEvent(cf, exchange));
            cf = 域名kCancelled(cf, this);
        } catch (Throwable throwable) {
            try {
            } catch (Exception x) {
                if (域名())
                    域名("Failed to close channel after unsuccessful connect");
        return 域名le((r,t) -> checkRetryConnect(r, t,exchange))




final class ConnectEvent extends AsyncEvent {
        private final CompletableFuture<ConnectState> cf;
        private final Exchange<?> exchange;

        ConnectEvent(CompletableFuture<ConnectState> cf, Exchange<?> exchange) {
            域名 = cf;
            域名ange = exchange;

        public SelectableChannel channel() {
            return chan;

        public int interestOps() {
            return 域名ONNECT;

        public void handle() {
            try {
                assert !connected : "Already connected";
                assert !域名ocking() : "Unexpected blocking channel";
                if (域名())
                    域名("ConnectEvent: finishing connect");
                //调用java nio channel 通道的finishConnect方法,在连接就绪(现在)时完成连接
                boolean finished = 域名shConnect();
                if (域名())
                    域名("ConnectEvent: connect finished: %s, cancelled: %s, Local addr: %s",
                              finished, 域名estCancelled(), 域名ocalAddress());
                assert finished || 域名estCancelled() : "Expected channel to be connected";
                // complete async since the event runs on the SelectorManager thread
                域名leteAsync(() -> 域名ESS, client().theExecutor());
            } catch (Throwable e) {
                if (canRetryConnect(e)) {
                    域名leteAsync(() -> 域名Y, client().theExecutor());
                Throwable t = 域名nnectException(e);
                client().theExecutor().execute( () -> 域名leteExceptionally(t));

        public void abort(IOException ioe) {
            client().theExecutor().execute( () -> 域名leteExceptionally(ioe));


4.3 双向读写关系的建立


private void connectFlows(HttpConnection connection) {
        FlowTube tube =  域名onnectionFlow();
        if (域名()) 域名("%s connecting flows", tube);

        // Connect the flow to our Http1TubeSubscriber:
        //   域名criber().


  1. SocketTube的InternalReadPublisher和Http1Exchange中的asyncReceiver的订阅器(Http1TubeSubscriber)连接

  2. Http1Exchange中的writePublisher(Http1Publisher)和SocketTube的InternalWriteSubScriber连接


