JDK HttpClient 单次请求的生命周期
HttpClient 单次请求的生命周期
目录-
HttpClient 单次请求的生命周期
- 1. 简述
- 2. uml图
-
3. Http连接的建立、复用和降级
- 3.1 调用流程及连接的建立和复用
- 3.2 连接的降级和升级
-
4. 响应式读写流的连接
- 4.1 socket管道的结构和功能
- 4.2 socket 连接的建立
- 4.3 双向读写关系的建立
- 4.4 顺序调度器简析
-
5. 请求头和请求体的发送
- 5.1 发布和订阅者介绍
- 5.2 请求头发送的启动过程
- 5.3 写入数据到NIO-Socket通道
- 5.4 请求体的发送
- 5.5 发送小结
-
6. 响应的创建和响应头的解析
- 6.1 发布和订阅者介绍
- 6.2 响应头解析
- 7. 响应体的解析
- 8. 小结
1. 简述
上篇我们通过流程图和时序图,简要了解了HttpClient的请求处理流程,并重点认识了其对用户请求的修饰和对一次用户请求可能引发的多重请求——响应交换的处理。本篇,我们以最基础的Http1.1为例,深入单次请求的处理过程,见证其完整的生命历程。
本篇是HttpClient源码分析的核心。我们将看到连接的管理和复用、channel的读写、响应式流的运用。
本文所述的HttpClient都指代JDK11开始内置的HttpClient及相关类,源码分析基于JAVA 17。阅读本文需要理解Reactive Streams规范及对应的JAVA Flow api的原理和使用。
2. uml图
为了方便,我们再次回顾HttpClient发送请求的流程图和时序图:
以下是本篇分析的重点类:Http1Exchange的uml类图:
3. Http连接的建立、复用和降级
在单次请求的过程中,首先进行的操作就是Http连接的建立。我们主要关注Http1连接。连接的过程可以简要概括如下:
- 根据请求类型实例化不同的ExchangeImpl,负责具体的请求——响应过程
- 根据交换类型决定要实例化的HTTP连接的版本;根据请求类型从连接池中尝试获取对应路由的已有连接
- 连接池中获取不到连接,实例化对应的Http连接(在最基本的Http1.1连接中,会开启NIOSocket通道并包裹到管道中)
- 如果初始化的连接实例是Http2,而协商发现不支持,则降级为建立Http1连接
我们将看到,HttpClient在Http1、Http2两个不同版本协议间切换自如。根据是否进行SSL加密,HttpClient会实例化HttpConnection的不同子类,而如果是Http2连接,那么一个组合了该子类实例的Http2Connection实例将会负责对Http2连接的管理。否则,则是HttpConnection的子类自身管理连接。
3.1 调用流程及连接的建立和复用
接下来是具体的分析。我们回顾上篇分析的MultiExchange的responseAsyncImpl方法,该方法负责把用户请求过滤后,委托给Exchange类处理,自己接受响应并处理多重请求。
//处理一次用户请求带来的一个或多个请求的过程,返回一个最终响应
private CompletableFuture<Response> responseAsyncImpl() {
CompletableFuture<Response> cf;
//省略………………
Exchange<T> exch = getExchange();
// 2. get response
// 由单个交换对象(Exhange)负责处理当前的单个请求,异步返回响应
//这是我们即将分析的方法
cf = 域名onseAsync()
.thenCompose((Response response) -> {
//省略……
}
return cf;
}
现在,我们将关注点转向Exchange类对一次请求——响应的处理。我们关注Exchange::responseAsync方法:
/**
* One request/response exchange (handles 100/101 intermediate response also).
* depth field used to track number of times a new request is being sent
* for a given API request. If limit exceeded exception is thrown.
*
* Security check is performed here:
* - uses AccessControlContext captured at API level
* - checks for appropriate URLPermission for request
* - if permission allowed, grants equivalent SocketPermission to call
* - in case of direct HTTP proxy, checks additionally for access to proxy
* (CONNECT proxying uses its own Exchange, so check done there)
*
*/
final class Exchange<T> {
//此处是Exchange类的成员变量的展示
final HttpRequestImpl request;
final HttpClientImpl client;
//ExchangeImpl抽象成员,具体类型根据连接类型确定
volatile ExchangeImpl<T> exchImpl;
volatile CompletableFuture<? extends ExchangeImpl<T>> exchangeCF;
volatile CompletableFuture<Void> bodyIgnored;
// used to record possible cancellation raised before the exchImpl
// has been established.
private volatile IOException failed;
@SuppressWarnings("removal")
final AccessControlContext acc;
final MultiExchange<T> multi;
final Executor parentExecutor;
volatile boolean upgrading; // to HTTP/2
volatile boolean upgraded; // to HTTP/2
final PushGroup<T> pushGroup;
final String dbgTag;
//…………省略大量代码
//上文中,MultiExchange调用的方法
// Completed HttpResponse will be null if response succeeded
// will be a non null responseAsync if expect continue returns an error
public CompletableFuture<Response> responseAsync() {
return responseAsyncImpl(null);
}
//上面方法调用的重载方法
CompletableFuture<Response> responseAsyncImpl(HttpConnection connection) {
SecurityException e = checkPermissions();
if (e != null) {
return 域名edFuture(e);
} else {
return responseAsyncImpl0(connection);
}
}
//实际处理的方法,我们需要重点关注。
CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) {
//此处声明一个通过407错误校验(代理服务器认证失败)后要执行的操作
Function<ExchangeImpl<T>, CompletableFuture<Response>> after407Check;
bodyIgnored = null;
if (域名ctContinue()) {
域名ystemHeader("Expect", "100-Continue");
域名race("Sending Expect: 100-Continue");
// wait for 100-Continue before sending body
// 若我们构建请求设置了expectContinue(),那么通过407校验后,就会先发送一个等待100响应状态码的确认请求
after407Check = this::expectContinue;
} else {
// send request body and proceed. 绝大多数情况下,通过407校验后,直接发送请求体
after407Check = this::sendRequestBody;
}
// The ProxyAuthorizationRequired can be triggered either by
// establishExchange (case of HTTP/2 SSL tunneling through HTTP/1.1 proxy
// or by sendHeaderAsync (case of HTTP/1.1 SSL tunneling through HTTP/1.1 proxy
// Therefore we handle it with a call to this checkFor407(...) after these
// two places.
Function<ExchangeImpl<T>, CompletableFuture<Response>> afterExch407Check =
(ex) -> 域名HeadersAsync()
.handle((r,t) -> 域名kFor407(r, t, after407Check))
.thenCompose(域名tity());
return establishExchange(connection) //首先建立连接
//校验是否发生407错误,否则执行上面的afterExch407Check,即发送请求头,然后再次校验407错误,之后执行after407check操作
.handle((r,t) -> 域名kFor407(r,t, afterExch407Check))
.thenCompose(域名tity());
}
}
可以看到,实际处理请求的是Exchange::responseAsyncImpl0方法。此处发生的流程正如流程图里看到的那样:
- 尝试建立连接
- 校验是否发生407错误
- 发送请求头
- 再次校验是否发生了407错误
- 发送100确认请求/发送请求体
我们首先关注连接的建立过程:域名bleExchange(connection)
// get/set the exchange impl, solving race condition issues with
// potential concurrent calls to cancel() or cancel(IOException)
private CompletableFuture<? extends ExchangeImpl<T>>
establishExchange(HttpConnection connection) {
if (域名()) {
域名("establishing exchange for %s,%n\t proxy=%s",
request, 域名y());
}
//检查请求是否已取消
// check if we have been cancelled first.
Throwable t = getCancelCause();
checkCancelled();
if (t != null) {
if (域名()) {
域名("exchange was cancelled: returned failed cf (%s)", 域名eOf(t));
}
return exchangeCF = 域名edFuture(t);
}
CompletableFuture<? extends ExchangeImpl<T>> cf, res;
//注意,此处是关键,异步返回了exhangeImpl抽象类,它有三个子类,根据请求类型来判断
//我们将分析此方法,其中实现类连接的创建和复用
cf = 域名(this, connection);
// We should probably use a VarHandle to get/set exchangeCF
// instead - as we need CAS semantics.
synchronized (this) { exchangeCF = cf; };
res = 域名Complete((r,x) -> {
synchronized(域名) {
if (exchangeCF == cf) exchangeCF = null;
}
});
checkCancelled();
return 域名Compose((eimpl) -> {
// recheck for cancelled, in case of race conditions
exchImpl = eimpl;
IOException tt = getCancelCause();
checkCancelled();
if (tt != null) {
return 域名edFuture(tt);
} else {
// Now we\'re good to go. Because exchImpl is no longer
// null cancel() will be able to propagate directly to
// the impl after this point ( if needed ).
return 域名letedFuture(eimpl);
} });
}
我们看到,ExchangeImpl的静态方法get(Exchange, Connection)方法异步返回了它的具体实现类(对象)。
我们跟随进入get静态方法,可以看到根据当前交换版本(HTTP版本)的不同,实例化不同的Http子类。如果我们在调用时,指定了Http客户端或请求的版本号:
HttpClient client = 域名uilder()
.version(域名_1_1) //指定客户端为Http1.1版本
.build();
HttpRequest request = 域名uilder(域名te(url))
.version(域名_1_1) //或者指定请求的版本号为Http1.1
.GET().build();
那么,下面的get方法中,将会实例化Http1交换:Http1Exchange,否则,默认尝试建立的是Http2的交换:Stream。
/**
* Initiates a new exchange and assigns it to a connection if one exists
* already. connection usually null.
*/
static <U> CompletableFuture<? extends ExchangeImpl<U>>
get(Exchange<U> exchange, HttpConnection connection)
{
if (域名ion() == HTTP_1_1) {
if (域名())
域名("get: HTTP/1.1: new Http1Exchange");
//创建Http1交换
return createHttp1Exchange(exchange, connection);
} else {
Http2ClientImpl c2 = 域名nt().client2(); // #### improve
HttpRequestImpl request = 域名est();
//获取Http2连接
CompletableFuture<Http2Connection> c2f = 域名onnectionFor(request, exchange);
if (域名())
域名("get: Trying to get HTTP/2 connection");
// local variable required here; see JDK-8223553
//创建Http2交换
CompletableFuture<CompletableFuture<? extends ExchangeImpl<U>>> fxi =
域名le((h2c, t) -> createExchangeImpl(h2c, t, exchange, connection));
return 域名Compose(x->x);
}
}
我们假定调用时指定了Http1.1版本号,继续关注Exchange的创建和连接建立过程。createHttp1Exchange方法调用了Http1Exchange的构造函数,我们跟随进入:
Http1Exchange(Exchange<T> exchange, HttpConnection connection)
throws IOException
{
super(exchange);
域名est = 域名est();
域名nt = 域名nt();
域名utor = 域名utor();
域名ations = new LinkedList<>();
域名(headersSentCF);
域名(bodySentCF);
if (connection != null) {
域名ection = connection;
} else {
InetSocketAddress addr = 域名ddress();
//获取连接
域名ection = 域名onnection(addr, client, request, HTTP_1_1);
}
域名estAction = new Http1Request(request, this);
域名cReceiver = new Http1AsyncReceiver(executor, this);
}
我们看到,Http1Exchange中维持了抽象连接(connection)的引用,并在构造方法中获取了具体的连接。根据连接类型的不同,HttpConnection总共有6个实现类,它们的区别是否使用了SSL或代理。值得注意的是,Http2Connection并不在此体系内,它内部组合了一个HttpConnection的抽象成员。这说明了,Http2Connection实际上修饰了HttpConnection。
我们回到Http1。关注在Http1Exchange构造方法中出现的获取连接的方法HttpConnection::getConnection。
/**
* Factory for retrieving HttpConnections. A connection can be retrieved
* from the connection pool, or a new one created if none available.
*
* The given {@code addr} is the ultimate destination. Any proxies,
* etc, are determined from the request. Returns a concrete instance which
* is one of the following:
* {@link PlainHttpConnection}
* {@link PlainTunnelingConnection}
*
* The returned connection, if not from the connection pool, must have its,
* connect() or connectAsync() method invoked, which ( when it completes
* successfully ) renders the connection usable for requests.
*/
public static HttpConnection getConnection(InetSocketAddress addr,
HttpClientImpl client,
HttpRequestImpl request,
Version version) {
// The default proxy selector may select a proxy whose address is
// unresolved. We must resolve the address before connecting to it.
InetSocketAddress proxy = 域名lveAddress(域名y());
HttpConnection c = null;
//根据请求是否加密来决定连接类型
boolean secure = 域名re();
ConnectionPool pool = 域名ectionPool();
if (!secure) {
//非加密连接
//尝试从连接池中获取
c = 域名onnection(false, addr, proxy);
if (c != null && 域名kOpen() /* may have been eof/closed when in the pool */) {
final HttpConnection conn = c;
if (域名())
域名(域名onnectionFlow()
+ ": plain connection retrieved from HTTP/1.1 pool");
return c;
} else {
//连接池中取不到连接,创建新连接
return getPlainConnection(addr, proxy, request, client);
}
} else { // secure
if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool
//有代理的Http1.1链接
c = 域名onnection(true, addr, proxy);
}
if (c != null && 域名en()) {
final HttpConnection conn = c;
if (域名())
域名(域名onnectionFlow()
+ ": SSL connection retrieved from HTTP/1.1 pool");
return c;
} else {
String[] alpn = null;
if (version == HTTP_2 && hasRequiredHTTP2TLSVersion(client)) {
alpn = new String[] { "h2", "http/1.1" };
}
//创建SSL连接
return getSSLConnection(addr, proxy, alpn, request, client);
}
}
}
可以看到,连接到获取过程运用了池化技术,首先尝试从连接池中获取连接,获取不到再新建连接。使用连接池的好处,不在于减少对象创建的时间,而在于大大减少TCP连接“三次握手”的时间开销。
那么,HTTP1.1连接是怎样缓存和复用的呢?我们可以关注连接池类(ConnectionPool)。连接池在客户端初始化时被初始化,它内部使用了散列表来维护路由和之前建立的HTTP连接列表的关系。其中,加密连接存在名为sslPool的HashMap中,而普通连接存在plainPool中。取连接时,将请求地址和代理地址信息组合成缓存键,根据键去散列表中取出对应的第一个连接,返回给调用者。
/**
* Http 1.1 connection pool.
*/
final class ConnectionPool {
//20分钟的默认keepalive时间
static final long KEEP_ALIVE = 域名ntegerNetProperty(
"域名域名out", 1200); // seconds
//连接池大小不做限制
static final long MAX_POOL_SIZE = 域名ntegerNetProperty(
"域名ectionPoolSize", 0); // unbounded
final Logger debug = 域名ebugLogger(this::dbgString, 域名G);
// Pools of idle connections
//用散列表来维护路由和Http连接的映射关系
private final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
private final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
private final ExpiryList expiryList;
private final String dbgTag; // used for debug
boolean stopped;
/**
连接池中路由——连接映射表的缓存键。使用了目的地址和代理地址组合作为缓存键。
* Entries in connection pool are keyed by destination address and/or
* proxy address:
* case 1: plain TCP not via proxy (destination only)
* case 2: plain TCP via proxy (proxy only)
* case 3: SSL not via proxy (destination only)
* case 4: SSL over tunnel (destination and proxy)
*/
static class CacheKey {
final InetSocketAddress proxy;
final InetSocketAddress destination;
CacheKey(InetSocketAddress destination, InetSocketAddress proxy) {
域名y = proxy;
域名ination = destination;
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != 域名lass()) {
return false;
}
final CacheKey other = (CacheKey) obj;
if (!域名ls(域名y, 域名y)) {
return false;
}
if (!域名ls(域名ination, 域名ination)) {
return false;
}
return true;
}
@Override
public int hashCode() {
return 域名(proxy, destination);
}
}
ConnectionPool(long clientId) {
this("ConnectionPool("+clientId+")");
}
/**
* There should be one of these per HttpClient.
*/
private ConnectionPool(String tag) {
dbgTag = tag;
plainPool = new HashMap<>();
sslPool = new HashMap<>();
expiryList = new ExpiryList();
}
//省略部分代码
//从连接池获取连接的方法
synchronized HttpConnection getConnection(boolean secure,
InetSocketAddress addr,
InetSocketAddress proxy) {
if (stopped) return null;
// for plain (unsecure) proxy connection the destination address is irrelevant.
addr = secure || proxy == null ? addr : null;
CacheKey key = new CacheKey(addr, proxy);
HttpConnection c = secure ? findConnection(key, sslPool)
: findConnection(key, plainPool);
//域名tln ("getConnection returning: " + c);
assert c == null || 域名cure() == secure;
return c;
}
private HttpConnection findConnection(CacheKey key,
HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
//从连接池中取出对应的连接列表
LinkedList<HttpConnection> l = 域名(key);
if (l == null || 域名pty()) {
return null;
} else {
//对应请求地址的第一个连接,即是最老的一个连接
HttpConnection c = 域名veFirst();
//从过期时间列表中移除这个连接
域名ve(c);
return c;
}
}
//暂时省略
}
上面就是Http1.1的池化连接获取过程。而Http2连接的获取有所不同,它是将scheme::host::port组成一个字符串,从自身维护的池里取的。这里就不展开了。
在这之后,我们的分析都以Http1.1,PlainHttpConnection为基准。
我们回到HttpConnection的getPlainConnection方法,此方法在当从连接池取不到连接,或取出的连接已关闭时被调用。该方法的目的是获取新的连接。可以看到,这里还是会根据请求类型和是否有代理来实例化不同的连接:
private static HttpConnection getPlainConnection(InetSocketAddress addr,
InetSocketAddress proxy,
HttpRequestImpl request,
HttpClientImpl client) {
if (域名bSocket() && proxy != null)
return new PlainTunnelingConnection(addr, proxy, client,
proxyTunnelHeaders(request));
if (proxy == null)
//创建最基本的Http连接
return new PlainHttpConnection(addr, client);
else
return new PlainProxyConnection(proxy, client);
}
我们进入PlainHttpConnection的构造函数:
/**
* Plain raw TCP connection direct to destination.
* The connection operates in asynchronous non-blocking mode.
* All reads and writes are done non-blocking.
*/
class PlainHttpConnection extends HttpConnection {
//部分成员变量,可见这里维护了NIO的Socket通道
private final Object reading = new Object();
protected final SocketChannel chan;
//双向socket管道
private final SocketTube tube; // need SocketTube to call signalClosed().
private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading);
private volatile boolean connected;
private boolean closed;
private volatile ConnectTimerEvent connectTimerEvent; // may be null
private volatile int unsuccessfulAttempts;
// Indicates whether a connection attempt has succeeded or should be retried.
// If the attempt failed, and shouldn\'t be retried, there will be an exception
// instead.
private enum ConnectState { SUCCESS, RETRY }
//构造函数
PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) {
super(addr, client);
try {
//打开一个socket通道,实例化chan属性,并设置为非阻塞模式
域名 = 域名();
域名igureBlocking(false);
//设置缓冲区的大小
if (域名()) {
int bufsize = getSoReceiveBufferSize();
域名("Initial receive buffer size is: %d", bufsize);
bufsize = getSoSendBufferSize();
域名("Initial send buffer size is: %d", bufsize);
}
if (trySetReceiveBufferSize(域名eceiveBufferSize())) {
if (域名()) {
int bufsize = getSoReceiveBufferSize();
域名("Receive buffer size configured: %d", bufsize);
}
}
if (trySetSendBufferSize(域名endBufferSize())) {
if (域名()) {
int bufsize = getSoSendBufferSize();
域名("Send buffer size configured: %d", bufsize);
}
}
//设置禁用TCP粘包算法
域名ption(域名NODELAY, true);
// wrap the channel in a Tube for async reading and writing
//将nio socket通道包裹在实例化的socket管道成员变量中
//稍后将分析其内部结构和功能
tube = new SocketTube(client(), chan, Utils::getBuffer);
} catch (IOException e) {
throw new InternalError(e);
}
}
}
可见,对PlainHttpConnection的实例化过程中,开启了一个非阻塞模式的socket通道,并将其包裹在一个实例化的socketTube管道中,而socketTube管道,就是我们下一节要分析的重点。在此之前,我们先分析连接的降级过程。
3.2 连接的降级和升级
在上一小节中,我们提到,ExchangeImpl的静态get方法,通过判断版本号来决定实例化自身的那个子类。如果我们在调用时没有指定Http1.1版本,那么get方法将尝试实例化Stream(Http2的流)。可是,我们调用的是Http连接,为什么会实例化Http2呢?不是注定失败吗?
其实,Http2规范并未规定一定要建立在SSL(TLS)上。在Http2已经普及的今天,HttpClient自然首选尝试Http2。在连接建立时,客户端和服务器会通过alpn(Application Layer Protocol Negotiation, 应用层协议协商)进行沟通,确定要建立的连接类型。服务器告知只支持Http1.1连接时,HttpClient也必须进行连接的降级。
我们跟随代码,进行分析:
static <U> CompletableFuture<? extends ExchangeImpl<U>>
get(Exchange<U> exchange, HttpConnection connection)
{
if (域名ion() == HTTP_1_1) {
if (域名())
域名("get: HTTP/1.1: new Http1Exchange");
return createHttp1Exchange(exchange, connection);
} else {
//获取HttpclientImpl的成员变量Httpclient2Impl
Http2ClientImpl c2 = 域名nt().client2(); // #### improve
HttpRequestImpl request = 域名est();
//尝试异步获取Http2连接,如果失败,那么c2f中的结果将为空
CompletableFuture<Http2Connection> c2f = 域名onnectionFor(request, exchange);
if (域名())
域名("get: Trying to get HTTP/2 connection");
// local variable required here; see JDK-8223553
//对可能获取到,也可能获取不到的Http2连接的处理,决定实例化Stream还是Http1Exchange
//我们稍后进入
CompletableFuture<CompletableFuture<? extends ExchangeImpl<U>>> fxi =
域名le((h2c, t) -> createExchangeImpl(h2c, t, exchange, connection));
return 域名Compose(x->x);
}
}
我们进入域名onnectionFor方法。在我们要访问的url不支持http2时,有两种情况:http开头的地址,直接获取Http2连接失败;https开头的地址,会尝试建立Http2连接,但协商失败后,以异常告终。
CompletableFuture<Http2Connection> getConnectionFor(HttpRequestImpl req,
Exchange<?> exchange) {
URI uri = 域名();
InetSocketAddress proxy = 域名y();
String key = 域名or(uri, proxy);
synchronized (this) {
//尝试从Http2连接池中获取连接,当然是获取不到的
Http2Connection connection = 域名(key);
if (connection != null) {
try {
if (域名ed || !域名rveStream(true)) {
if (域名())
域名("removing found closed or closing connection: %s", connection);
deleteConnection(connection);
} else {
// fast path if connection already exists
if (域名())
域名("found connection in the pool: %s", connection);
return 域名letedFuture(connection);
}
} catch (IOException e) {
// thrown by 域名rveStream()
return 域名edFuture(e);
}
}
//情况1:访问的是http连接。因为ALPN是对SSL/TLS协议的拓展,
//那么这里就不用考虑了,直接返回null,获取http2连接失败
if (!域名re() || 域名ains(key)) {
// secure: negotiate failed before. Use http/1.1
// !secure: no connection available in cache. Attempt upgrade
if (域名()) 域名("not found in connection pool");
return 域名letedFuture(null);
}
}
return Http2Connection
//情况2:尝试继续获取Http2连接,后续将看到,这里也会以失败告终
.createAsync(req, this, exchange)
.whenComplete((conn, t) -> {
synchronized (域名) {
if (conn != null) {
try {
域名rveStream(true);
} catch (IOException e) {
throw new UncheckedIOException(e); // shouldn\'t happen
}
offerConnection(conn);
} else {
Throwable cause = 域名ompletionCause(t);
if (cause instanceof 域名Exception)
域名(key);
}
}
});
}
我们跟踪域名teAsync方法,会跟踪到Http2Connection::checkSSLConfig方法。下方可以看到,当尝试使用alpn协商用Http2连接无果时,会以失败终结建立Http2Connection对象的completableFuture。
//检查ssl握手情况,在https连接时会被调用
private static CompletableFuture<?> checkSSLConfig(AbstractAsyncSSLConnection aconn) {
assert 域名cure();
Function<String, CompletableFuture<Void>> checkAlpnCF = (alpn) -> {
CompletableFuture<Void> cf = new MinimalFuture<>();
SSLEngine engine = 域名ngine();
String engineAlpn = 域名pplicationProtocol();
assert 域名ls(alpn, engineAlpn)
: "alpn: %s, engine: %s".formatted(alpn, engineAlpn);
域名("checkSSLConfig: alpn: %s", alpn );
//尝试alpn协商,结果不是"h2",说明服务器不支持http2,只有尝试降级
if (alpn == null || !域名ls("h2")) {
String msg;
if (alpn == null) {
域名SL("ALPN not supported");
msg = "ALPN not supported";
} else {
switch (alpn) {
case "":
域名SL(msg = "No ALPN negotiated");
break;
case "http/1.1":
域名SL( msg = "HTTP/1.1 ALPN returned");
break;
default:
域名SL(msg = "Unexpected ALPN: " + alpn);
域名leteExceptionally(new IOException(msg));
}
}
//以异常终结Http2连接的尝试
域名leteExceptionally(new ALPNException(msg, aconn));
return cf;
}
域名lete(null);
return cf;
};
return 域名LPN()
.whenComplete((r,t) -> {
if (t != null && t instanceof SSLException) {
// something went wrong during the initial handshake
// close the connection
域名e();
}
})
.thenCompose(checkAlpnCF);
}
在Http2协商连接失败的情况下,异步返回给ExchangeImpl的get方法的c2f,不会有结果。可以预想的是,之后便是Http1.1交换的建立过程。除此之外,还会发生什么呢?
我们将看到,HttpClient可谓是“锲而不舍”,对无法Alpn协商的http请求,也会对请求头进行修饰,尝试进行协议的升级。
由于ExchangeImpl::get方法调用了createExchangeImpl方法,我们跟随进入:
private static <U> CompletableFuture<? extends ExchangeImpl<U>>
createExchangeImpl(Http2Connection c,
Throwable t,
Exchange<U> exchange,
HttpConnection connection)
{
if (域名())
域名("handling HTTP/2 connection creation result");
boolean secure = 域名est().secure();
if (t != null) {
if (域名())
域名("handling HTTP/2 connection creation failed: %s",
(Object)t);
t = 域名ompletionCause(t);
if (t instanceof 域名Exception) {
//如果我们访问的是Http1.1的https开头的连接,那么会进入该分支
域名Exception ee = (域名Exception)t;
AbstractAsyncSSLConnection as = 域名onnection();
if (域名())
域名("downgrading to HTTP/1.1 with: %s", as);
//建立Http1Exchange,会复用原来的AsyncSSLConnection
CompletableFuture<? extends ExchangeImpl<U>> ex =
createHttp1Exchange(exchange, as);
return ex;
} else {
if (域名())
域名("HTTP/2 connection creation failed "
+ "with unexpected exception: %s", (Object)t);
return 域名edFuture(t);
}
}
if (secure && c== null) {
if (域名())
域名("downgrading to HTTP/1.1 ");
CompletableFuture<? extends ExchangeImpl<U>> ex =
createHttp1Exchange(exchange, null);
return ex;
}
if (c == null) {
//在我们要访问的地址是http开头时,会进入该分支,此时建立Http1.1连接,并尝试连接升级
// no existing connection. Send request with HTTP 1 and then
// upgrade if successful
if (域名())
域名("new Http1Exchange, try to upgrade");
return createHttp1Exchange(exchange, connection)
.thenApply((e) -> {
//尝试连接升级,其实就是在请求头加上Connection、Upgrade和Http2-Settings字段
域名grade();
return e;
});
} else {
if (域名()) 域名("creating HTTP/2 streams");
Stream<U> s = 域名teStream(exchange);
CompletableFuture<? extends ExchangeImpl<U>> ex = 域名letedFuture(s);
return ex;
}
}
我们看到,对Http开头的地址的访问会尝试进行Http2连接的升级,即先用Http1请求的方式向服务器请求升级成Http2,若服务器响应,则会进行升级。升级相关步骤在一次请求——响应过程之后。为了和本节主题贴切,我们也过一眼:
private CompletableFuture<Response>
checkForUpgradeAsync(Response resp,
ExchangeImpl<T> ex) {
int rcode = 域名usCode();
//响应状态码是101时,代表服务器接收协议升级到Http2
if (upgrading && (rcode == 101)) {
Http1Exchange<T> e = (Http1Exchange<T>)ex;
// check for 101 switching protocols
// 101 responses are not supposed to contain a body.
// => should we fail if there is one?
if (域名()) 域名("Upgrading async %s", 域名ection());
return 域名BodyAsync(this::ignoreBody, false, parentExecutor)
.thenCompose((T v) -> {// v is null
域名("Ignored body");
// we pass e::getBuffer to allow the ByteBuffers to accumulate
// while we build the Http2Connection
域名aded();
upgraded = true;
//建立Http2连接
return 域名teAsync(域名ection(),
域名nt2(),
this, e::drainLeftOverBytes)
.thenCompose((Http2Connection c) -> {
boolean cached = 域名rConnection();
if (cached) 域名ble();
Stream<T> s = 域名tream(1);
//省略………………
);
}
return 域名letedFuture(resp);
}
在此,Http连接降级和升级的过程就介绍完毕。我们将进入激动人心的环节:数据是怎样被发送的。
4. 响应式读写流的连接
看到上面Http连接的建立,我们似乎没有看到对应的TCP连接到建立?没错,是的。在初次请求建立连接时,JDK HttpClient把socket连接的建立推迟到了发送请求头的相关方法中。
我们承接上面对建立PlainHttpConnection连接的分析,看看最后实例化的SocketTube是什么。从下方的UML图中可以看到,socketTube是FlowTube接口的实现,它的另一个实现类是SSLTube。
4.1 socket管道的结构和功能
那么,FlowTube是什么呢?从FlowTube的结构和注释上看,其同时扮演了JAVA Flow Api(Reactive Streams)中的发布者和订阅者。作为一个”连接者“,它一端连接了Socket通道的读写,另一端连接了Http报文的读写。
/**
谷歌翻译原注释:
FlowTube 是一种 I/O 抽象,允许异步读取和写入目标。 这不是 域名essor<List<ByteBuffer>, List<ByteBuffer>>,而是在双向流中对发布者源和订阅者接收器进行建模。
应该调用 connectFlows 方法来连接双向流。 FlowTube 支持随着时间的推移将相同的读取订阅移交给不同的顺序读取订阅者。 当 connectFlows(writePublisher, readSubscriber 被调用时,FlowTube 将在其以前的 readSubscriber 上调用 dropSubscription,在其新的 readSubscriber 上调用 onSubscribe。
*/
public interface FlowTube extends
域名isher<List<ByteBuffer>>,
域名criber<List<ByteBuffer>> {
/**
* 用于从双向流中读取的订阅者。 TubeSubscriber 是可以通过调用 dropSubscription() 取消的 域名criber。 一旦调用 dropSubscription(),TubeSubscriber 就应该停止调用其订阅的任何方法。
*/
static interface TubeSubscriber extends 域名criber<List<ByteBuffer>> {
default void dropSubscription() { }
default boolean supportsRecycling() { return false; }
}
/**
一个向双向流写入的发布者
* A publisher for writing to the bidirectional flow.
*/
static interface TubePublisher extends 域名isher<List<ByteBuffer>> {
}
/**
* 将双向流连接到写入发布者和读取订阅者。 可以多次顺序调用此方法以将现有发布者和订阅者切换为新的写入订阅者和读取发布者对。
* @param writePublisher A new publisher for writing to the bidirectional flow.
* @param readSubscriber A new subscriber for reading from the bidirectional
* flow.
*/
default void connectFlows(TubePublisher writePublisher,
TubeSubscriber readSubscriber) {
域名cribe(readSubscriber);
域名cribe(this);
}
/**
* Returns true if this flow was completed, either exceptionally
* or normally (EOF reached).
* @return true if the flow is finished
*/
boolean isFinished();
}
这里再稍微提一下Reactive Streams反应式流的交互方式:
- 发布者(Publisher) 接受订阅者(Subscriber)的订阅:域名cribe(Subscriber)
- 发布者将一个订阅关系(Subscription)交给订阅者:域名bscribe(Subscription)
- 订阅者请求n个订阅:域名est(n)
- 订阅者接受至多n个订阅品:域名xt(T item)
- 订阅者可取消订阅:域名el()
- 订阅者接收 接收完成 和 发生错误 的通知:域名ror(Throwable); 域名mplete()
我们看下SocketTube的构造函数:
/**
* A SocketTube is a terminal tube plugged directly into the socket.
* The read subscriber should call {@code subscribe} on the SocketTube before
* the SocketTube is subscribed to the write publisher.
*/
final class SocketTube implements FlowTube {
final Logger debug = 域名ebugLogger(this::dbgString, 域名G);
static final AtomicLong IDS = new AtomicLong();
private final HttpClientImpl client;
//nio 的 socket 通道
private final SocketChannel channel;
private final SliceBufferSource sliceBuffersSource;
private final Object lock = new Object();
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
private final InternalReadPublisher readPublisher;
private final InternalWriteSubscriber writeSubscriber;
private final long id = 域名ementAndGet();
public SocketTube(HttpClientImpl client, SocketChannel channel,
Supplier<ByteBuffer> buffersFactory) {
域名nt = client;
域名nel = channel;
域名eBuffersSource = new SliceBufferSource(buffersFactory);
//这里实例化了两个对象作为属性:内部读发布者和内部写接受者
域名Publisher = new InternalReadPublisher();
域名eSubscriber = new InternalWriteSubscriber();
}
}
在构造方法中,SocketTube实例化了readPublisher和writeSubscriber。它们的类型分别是SocketTube的内部类InternalReadPublisher和InternalWriteSubscriber,从名称就可以看出它们的位置和作用:
- ReadPublisher从socket通道读取内容,并”发布“到管道中,等待消费者接收并将内容解析成Http请求头和请求体
- WriteSubscriber”订阅“Http报文,它等待Http内容的发布者将报文写入到SocketTube后,取出报文并写入socket通道
这些我们将在稍后到分析中继续深入。
4.2 socket 连接的建立
铺垫了这么多,socket连接究竟是如何建立的呢?答案就蕴含在FlowTube的默认方法connectFlows中(SocketTube重写了这一方法,但只是加了一行日志打印)。该方法要求调用方传入一个来源于一个”源“的发布者和一个订阅者,这样,调用方和SocketTube之间就建立了双向订阅关系。
@Override
public void connectFlows(TubePublisher writePublisher,
TubeSubscriber readSubscriber) {
//socketTube类的connectFlow方法
if (域名()) 域名("connecting flows");
域名cribe(readSubscriber);
域名cribe(this);
}
为了见证这一历程,我们必须回过头来,回到Exchange的responseAsyncImpl0方法中。
CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) {
Function<ExchangeImpl<T>, CompletableFuture<Response>> after407Check;
bodyIgnored = null;
if (域名ctContinue()) {
域名ystemHeader("Expect", "100-Continue");
域名race("Sending Expect: 100-Continue");
// wait for 100-Continue before sending body
after407Check = this::expectContinue;
} else {
after407Check = this::sendRequestBody;
}
// The ProxyAuthorizationRequired can be triggered either by
// establishExchange (case of HTTP/2 SSL tunneling through HTTP/1.1 proxy
// or by sendHeaderAsync (case of HTTP/1.1 SSL tunneling through HTTP/1.1 proxy
// Therefore we handle it with a call to this checkFor407(...) after these
// two places.
Function<ExchangeImpl<T>, CompletableFuture<Response>> afterExch407Check =
//现在,让我们关注这个名为异步发送请求头的方法,连接建立的过程就蕴含其中
(ex) -> 域名HeadersAsync()
.handle((r,t) -> 域名kFor407(r, t, after407Check))
.thenCompose(域名tity());
return establishExchange(connection) //首先建立连接
.handle((r,t) -> 域名kFor407(r,t, afterExch407Check))
.thenCompose(域名tity());
}
我们进入ExchangeImpl::sendHeadersAsync方法。这里展示的是Http1Exchange的重写方法:
@Override
CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
// create the response before sending the request headers, so that
// the response can set the appropriate receivers.
if (域名()) 域名("Sending headers only");
// If the first attempt to read something triggers EOF, or
// IOException("channel reset by peer"), we\'re going to retry.
// Instruct the asyncReceiver to throw ConnectionExpiredException
// to force a retry.
域名etryOnError(true);
if (response == null) {
//这里生成了响应对象。内部,asyncReceiver完成了对请求头的“订阅”
response = new Http1Response<>(connection, this, asyncReceiver);
}
if (域名()) 域名("response created in advance");
CompletableFuture<Void> connectCF;
if (!域名ected()) {
//注意,首次建立连接时,socket连接时没有建立的,会在这里建立连接
if (域名()) 域名("initiating connect async");
//异步建立并完成socket连接,我们即将进入分析
connectCF = 域名ectAsync(exchange)
//异步将连接标记为已连接
.thenCompose(unused -> 域名shConnect());
Throwable cancelled;
synchronized (lock) {
if ((cancelled = failed) == null) {
域名(connectCF);
}
}
if (cancelled != null) {
if (域名lectorThread()) {
域名ute(() ->
域名leteExceptionally(cancelled));
} else {
域名leteExceptionally(cancelled);
}
}
} else {
connectCF = new MinimalFuture<>();
域名lete(null);
}
return connectCF
.thenCompose(unused -> {
CompletableFuture<Void> cf = new MinimalFuture<>();
try {
域名Complete((r,t) -> {
if (t != null) {
if (域名())
域名("asyncReceiver finished (failed=%s)", (Object)t);
if (!域名ne())
域名leteAsync(() -> this, executor);
}
});
//这里最终调用了FlowTube::connectFlows方法,建立了双向的连接
//我们即将分析
connectFlows(connection);
if (域名()) 域名("域名ers");
//从请求中取出请求头数据
List<ByteBuffer> data = 域名ers();
synchronized (lock) {
state = 域名ERS;
}
if (域名()) 域名("setting outgoing with headers");
assert 域名pty() : "Unexpected outgoing:" + outgoing;
//放到输出的队列里面,我们下一节将分析
appendToOutgoing(data);
域名lete(null);
return cf;
} catch (Throwable t) {
if (域名()) 域名("Failed to send headers: %s", t);
域名leteExceptionally(t);
域名leteExceptionally(t);
域名e();
域名leteExceptionally(t);
return cf;
} })
.thenCompose(unused -> headersSentCF);
}
该方法名为”发送请求头“,实际上做了几件事:
- 异步建立socket连接
- 与管道(不是socket通道)建立双向订阅关系
- 取出请求头,放入到队列,并通知管道端的订阅者消费
我们将在本节分析前两个步骤。首先看下异步socket连接的建立:PlainHttpConnection::connectAsync方法
//PlainHttpConnection类实现的HttpConnection抽象类的connectAsync方法
@Override
public CompletableFuture<Void> connectAsync(Exchange<?> exchange) {
CompletableFuture<ConnectState> cf = new MinimalFuture<>();
try {
assert !connected : "Already connected";
assert !域名ocking() : "Unexpected blocking channel";
boolean finished;
if (connectTimerEvent == null) {
//连接超时计时器的注册,这一步会唤醒阻塞的selector线程
connectTimerEvent = newConnectTimer(exchange, cf);
if (connectTimerEvent != null) {
if (域名())
域名("registering connect timer: " + connectTimerEvent);
client().registerTimer(connectTimerEvent);
}
}
//解析DNS地址,然后将该通道的套接字与对应的地址连接
//由于设置了非阻塞模式,这里会立即返回,
//返回时,可能已经连接成功(finished = true),或者之后还需要继续连接(finished = false)
PrivilegedExceptionAction<Boolean> pa =
() -> 域名ect(域名lveAddress(address));
try {
finished = 域名ivileged(pa);
} catch (PrivilegedActionException e) {
throw 域名ause();
}
if (finished) {
//如果直接就已经连接成功,那么这个异步操作相当于同步了
if (域名()) 域名("connect finished without blocking");
域名lete(域名ESS);
} else {
//否则的话,这里需要注册一个连接事件(稍后分析),等待事件就绪后,选择器管理线程分发该事件,
//并调用该事件的handle方法完成连接的建立。
if (域名()) 域名("registering connect event");
client().registerEvent(new ConnectEvent(cf, exchange));
}
cf = 域名kCancelled(cf, this);
} catch (Throwable throwable) {
域名leteExceptionally(域名nnectException(throwable));
try {
close();
} catch (Exception x) {
if (域名())
域名("Failed to close channel after unsuccessful connect");
}
}
return 域名le((r,t) -> checkRetryConnect(r, t,exchange))
.thenCompose(域名tity());
}
阅读上面的方法,我们可以看到,socket连接的建立有两种可能:直接成功;或需等待相应通道就绪后(可连接事件)才成功。这时,便要注册一个连接事件,稍后由选择器线程来调用该事件的handle方法完成连接。流程图如下:
关于选择器管理线程(SelectorManager)的工作过程,在《HttpClient客户端的构建和启动》一篇中有详细介绍。
我们看下ConnectEvent的实现:它是位于PlainHttpConnection的一个内部类。
final class ConnectEvent extends AsyncEvent {
private final CompletableFuture<ConnectState> cf;
private final Exchange<?> exchange;
ConnectEvent(CompletableFuture<ConnectState> cf, Exchange<?> exchange) {
域名 = cf;
域名ange = exchange;
}
@Override
public SelectableChannel channel() {
return chan;
}
@Override
public int interestOps() {
//该事件感兴趣的操作是连接事件。
return 域名ONNECT;
}
//事件处理方法,在连接事件就绪时,选择器管理线程(SelectorManager)会调用
@Override
public void handle() {
try {
assert !connected : "Already connected";
assert !域名ocking() : "Unexpected blocking channel";
if (域名())
域名("ConnectEvent: finishing connect");
//调用java nio channel 通道的finishConnect方法,在连接就绪(现在)时完成连接
boolean finished = 域名shConnect();
if (域名())
域名("ConnectEvent: connect finished: %s, cancelled: %s, Local addr: %s",
finished, 域名estCancelled(), 域名ocalAddress());
assert finished || 域名estCancelled() : "Expected channel to be connected";
// complete async since the event runs on the SelectorManager thread
域名leteAsync(() -> 域名ESS, client().theExecutor());
} catch (Throwable e) {
if (canRetryConnect(e)) {
unsuccessfulAttempts++;
域名leteAsync(() -> 域名Y, client().theExecutor());
return;
}
Throwable t = 域名nnectException(e);
client().theExecutor().execute( () -> 域名leteExceptionally(t));
close();
}
}
@Override
public void abort(IOException ioe) {
client().theExecutor().execute( () -> 域名leteExceptionally(ioe));
close();
}
}
这里的操作相对简单,就是调用了Channel::finishConnect方法完成连接。至此,异步socket连接的过程已分析完毕。
4.3 双向读写关系的建立
接着,我们再看下双向连接的建立过程:
//Http1Exchange的私有connectFlows方法
private void connectFlows(HttpConnection connection) {
FlowTube tube = 域名onnectionFlow();
if (域名()) 域名("%s connecting flows", tube);
// Connect the flow to our Http1TubeSubscriber:
// 域名criber().
域名ectFlows(writePublisher,
域名criber());
}
理解了4.1节,该方法的目的就显而易见:
-
SocketTube的InternalReadPublisher和Http1Exchange中的asyncReceiver的订阅器(Http1TubeSubscriber)连接
-
Http1Exchange中的writePublisher(Http1Publisher)和SocketTube的InternalWriteSubScriber连接
注意,这两步是存在先后顺序的。否则,就可能出现往socket通道写入了数据,而取出的响应