飙血推荐
  • HTML教程
  • MySQL教程
  • JavaScript基础教程
  • php入门教程
  • JavaScript正则表达式运用
  • Excel函数教程
  • UEditor使用文档
  • AngularJS教程
  • ThinkPHP5.0教程

ApacheTomcat如何高并发处理请求-

时间:2022-03-31  作者:chenzw93  

介绍

作为常用的http协议服务器,tomcat应用非常广泛。tomcat也是遵循Servelt协议的,Servelt协议可以让服务器与真实服务逻辑代码进行解耦。各自只需要关注Servlet协议即可。
对于tomcat是如何作为一个高性能的服务器的呢?你是不是也会有这样的疑问?

tomcat是如何接收网络请求?

如何做到高性能的http协议服务器?

tomcat从8.0往后开始使用了NIO非阻塞io模型,提高了吞吐量,本文的源码是tomcat 域名版本

接收Socket请求

域名域名.Acceptor实现了Runnable接口,在一个单独的线程中以死循环的方式一直进行socket的监听

线程的初始化及启动是在方法域名域名.AbstractEndpoint#startAcceptorThread

有个很重要的属性域名域名.AbstractEndpoint;同时实现了run方法,方法中主要有以下功能:

  • 请求最大连接数限制: 最大为 8*1024;请你注意到达最大连接数后操作系统底层还是会接收客户端连接,但用户层已经不再接收
  • 获取socketChannel
public void run() {
        int errorDelay = 0;
        try {
            // Loop until we receive a shutdown command
            while (!stopCalled) {
					...
                if (stopCalled) {
                    break;
                }
                state = 域名ING;

                try {
                    //if we have reached max connections, wait
                    // 如果连接超过了 8*1024,则线程阻塞等待; 是使用域名域名域名tLatch类实现了分享锁(内部实现了AbstractQueuedSynchronizer)
                    // 请你注意到达最大连接数后操作系统底层还是会接收客户端连接,但用户层已经不再接收。
                    域名tUpOrAwaitConnection();

                    // Endpoint might have been paused while waiting for latch
                    // If that is the case, don\'t accept new connections
                    if (域名used()) {
                        continue;
                    }

                    U socket = null;
                    try {
                        // Accept the next incoming connection from the server
                        // socket
                        // 抽象方法,不同的endPoint有不同的实现方法。NioEndPoint为例,实现方法为域名pt(),这个方法主要看serverSock实例化时如果为阻塞,accept方法为阻塞;反之为立即返回,如果没有socket链接,则为null
                        socket = 域名erSocketAccept();
                    } catch (Exception ioe) {
                        // We didn\'t get a socket
                        域名tDownConnection();
                        if (域名nning()) {
                            // Introduce delay if necessary
                            errorDelay = handleExceptionWithDelay(errorDelay);
                            // re-throw
                            throw ioe;
                        } else {
                            break;
                        }
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // Configure the socket
                    if (!stopCalled && !域名used()) {
                        // setSocketOptions() will hand the socket off to
                        // an appropriate processor if successful
                        // endPoint类的抽象方法,不同的endPoint有不同的实现。处理获取到的socketChannel链接,如果该socket链接能正常处理,那么该方法会返回true,否则为false
                        if (!域名ocketOptions(socket)) {
                            域名eSocket(socket);
                        }
                    } else {
                        域名roySocket(socket);
                    }
                } catch (Throwable t) {
                    ...
                }
            }
        } finally {
            域名tDown();
        }
        state = 域名D;
    }

再来看下域名域名.NioEndpoint#setSocketOptions方法的具体实现(NioEndpoint为例)

这个方法中主要做的事:

  • 创建NioChannel
  • 设置socket为非阻塞
  • 将socket添加到Poller的队列中
 protected boolean setSocketOptions(SocketChannel socket) {
        NioSocketWrapper socketWrapper = null;
        try {
            // Allocate channel and wrapper
            // 优先使用已有的缓存nioChannel
            NioChannel channel = null;
            if (nioChannels != null) {
                channel = 域名();
            }
            if (channel == null) {
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                        域名ppReadBufSize(),
                        域名ppWriteBufSize(),
                        域名irectBuffer());
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(bufhandler, this);
                } else {
                    channel = new NioChannel(bufhandler);
                }
            }
            // 将nioEndpoint与NioChannel进行包装
            NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
            域名t(socket, newWrapper);
            域名(socket, newWrapper);
            socketWrapper = newWrapper;

            // Set socket properties
            // Disable blocking, polling will be used
            // 设置当前链接的socket为非阻塞
            域名igureBlocking(false);
            if (getUnixDomainSocketPath() == null) {
                域名roperties(域名et());
            }

            域名eadTimeout(getConnectionTimeout());
            域名riteTimeout(getConnectionTimeout());
            域名eepAliveLeft(域名axKeepAliveRequests());
            // 将包装后的nioChannel与nioEndpoint进行注册,注册到Poller,将对应的socket包装类添加到Poller的队列中,同时唤醒selector
            域名ster(socketWrapper);
            return true;
        } catch (Throwable t) {
            域名leThrowable(t);
            try {
                域名r(域名tring("域名etOptionsError"), t);
            } catch (Throwable tt) {
                域名leThrowable(tt);
            }
            if (socketWrapper == null) {
                destroySocket(socket);
            }
        }
        // Tell to close the socket if needed
        return false;
    }

Socket请求轮询

上一小节是接收到了socket请求,进行包装之后,将socket添加到了Poller的队列上,并可能唤醒了Selector,本小节就来看看,Poller是如何进行socket的轮询的。

首先域名域名.域名er也是实现了Runnable接口,是一个可以单独启动的线程

初始化及启动是在域名域名.NioEndpoint#startInternal

重要的属性:

  • 域名域名ctor:在Poller对象初始化的时候,就会启动轮询器
  • SynchronizedQueue<PollerEvent>:同步的事件队列

再来看下具体处理逻辑,run方法的源码

		public void run() {
            // Loop until destroy() is called
            while (true) {

                boolean hasEvents = false;

                try {
                    if (!close) {
                        // 去SynchronizedQueue事件队列中拉去,看是否已经有了事件,如果有,则返回true
                        // 如果从队列中拉取到了event(即上一步将NioSocketWrapper封装为PollerEvent添加到次队列中),将socketChannel注册到Selector上,标记为域名EAD,添加处理函数attachment(为Accetpor添加到Poller时的    
                        // NioSocketWrapper)
                        hasEvents = events();
                        if (域名ndSet(-1) > 0) {
                            // If we are here, means we have other stuff to do
                            // Do a non blocking select
                            keyCount = 域名ctNow();
                        } else {
                            keyCount = 域名ct(selectorTimeout);
                        }
                        域名(0);
                    }
                    if (close) {
                        events();
                        timeout(0, false);
                        try {
                            域名e();
                        } catch (IOException ioe) {
                            域名r(域名tring("域名ctorCloseFail"), ioe);
                        }
                        break;
                    }
                    // Either we timed out or we woke up, process events first
                    if (keyCount == 0) {
                        hasEvents = (hasEvents | events());
                    }
                } catch (Throwable x) {
                    域名leThrowable(x);
                    域名r(域名tring("域名ctorLoopError"), x);
                    continue;
                }

                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? 域名ctedKeys().iterator() : null;
                // Walk through the collection of ready keys and dispatch
                // any active event.
                // selector轮询获取已经注册的事件,如果有事件准备好,此时通过selectKeys方法就能拿到对应的事件
                while (iterator != null && 域名ext()) {
                    SelectionKey sk = 域名();
                    // 获取到事件后,从迭代器删除事件,防止事件重复轮询
                    域名ve();
                    // 获取事件的处理器,这个attachment是在event()方法中注册的,后续这个事件的处理,就交给这个wrapper去处理
                    NioSocketWrapper socketWrapper = (NioSocketWrapper) 域名chment();
                    // Attachment may be null if another thread has called
                    // cancelledKey()
                    if (socketWrapper != null) {
                        processKey(sk, socketWrapper);
                    }
                }

                // Process timeouts
                timeout(keyCount,hasEvents);
            }

            getStopLatch().countDown();
        }

在这里,有一个很重要的方法,域名域名.域名er#events(),他是从Poller的事件队列中获取Acceptor接收到的可用socket,并将其注册到Selector

		/**
         * Processes events in the event queue of the Poller.
         *
         * @return <code>true</code> if some events were processed,
         *   <code>false</code> if queue was empty
         */
        public boolean events() {
            boolean result = false;

            PollerEvent pe = null;
            // 如果Acceptor将socket添加到队列中,那么域名()方法就能拿到对应的事件,否则拿不到就返回false
            for (int i = 0, size = 域名(); i < size && (pe = 域名()) != null; i++ ) {
                result = true;
                NioSocketWrapper socketWrapper = 域名ocketWrapper();
                SocketChannel sc = 域名ocket().getIOChannel();
                int interestOps = 域名nterestOps();
                if (sc == null) {
                    域名(域名tring("域名SocketChannel"));
                    域名e();
                } else if (interestOps == OP_REGISTER) {
                    // 如果是Acceptor刚添加到队列中的事件,那么此时的ops就是OP_REGISTER
                    try {,
                        // 将次socket注册到selector上,标记为OP_READ事件,添加事件触发时处理函数socketWrapper
                        域名ster(getSelector(), 域名EAD, socketWrapper);
                    } catch (Exception x) {
                        域名r(域名tring("域名sterFail"), x);
                    }
                } else {
                    // ??这里的逻辑,不清楚什么情况下会进入到这个分支里面
                    final SelectionKey key = 域名or(getSelector());
                    if (key == null) {
                        // The key was cancelled (e.g. due to socket closure)
                        // and removed from the selector while it was being
                        // processed. Count down the connections at this point
                        // since it won\'t have been counted down when the socket
                        // closed.
                        域名e();
                    } else {
                        final NioSocketWrapper attachment = (NioSocketWrapper) 域名chment();
                        if (attachment != null) {
                            // We are registering the key to start with, reset the fairness counter.
                            try {
                                int ops = 域名restOps() | interestOps;
                                域名restOps(ops);
                                域名restOps(ops);
                            } catch (CancelledKeyException ckx) {
                                cancelledKey(key, socketWrapper);
                            }
                        } else {
                            cancelledKey(key, socketWrapper);
                        }
                    }
                }
                if (running && !paused && eventCache != null) {
                    域名t();
                    域名(pe);
                }
            }

            return result;
        }

还有一个重要方法就是域名域名.域名er#processKey,上一个方法是获取event,并注册到selector,那这个方法就是通过Selector获取到的数据准备好的event,并开始封装成对应的业务处理线程SocketProcessorBase,扔到线程池里开始处理

	    protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) {
            try {
                if (close) {
                    cancelledKey(sk, socketWrapper);
                } else if (域名lid()) {
                    if (域名adable() || 域名itable()) {
                        if (域名endfileData() != null) {
                            processSendfile(sk, socketWrapper, false);
                        } else {
                            unreg(sk, socketWrapper, 域名yOps());
                            boolean closeSocket = false;
                            // Read goes before write
                            if (域名adable()) {
                                //这里如果是异步的操作,就会走这里
                                if (域名Operation != null) {
                                    if (!域名ess()) {
                                        closeSocket = true;
                                    }
                                } else if (域名Blocking) {
                                    // readBlocking默认为false
                                    synchronized (域名Lock) {
                                        域名Blocking = false;
                                        域名fy();
                                    }
                                } else if (!processSocket(socketWrapper, 域名_READ, true)) {
                                    // 处理正常的事件,这里的processSocket就要正式开始处理请求了。
                                    // 将对应的事件封装成对应的线程,然后交给线程池去处理正式的请求业务
                                    closeSocket = true;
                                }
                            }
                            if (!closeSocket && 域名itable()) {
                                if (域名eOperation != null) {
                                    if (!域名ess()) {
                                        closeSocket = true;
                                    }
                                } else if (域名eBlocking) {
                                    synchronized (域名eLock) {
                                        域名eBlocking = false;
                                        域名fy();
                                    }
                                } else if (!processSocket(socketWrapper, 域名_WRITE, true)) {
                                    closeSocket = true;
                                }
                            }
                            if (closeSocket) {
                                cancelledKey(sk, socketWrapper);
                            }
                        }
                    }
                } else {
                    // Invalid key
                    cancelledKey(sk, socketWrapper);
                }
            } catch (CancelledKeyException ckx) {
                cancelledKey(sk, socketWrapper);
            } catch (Throwable t) {
                域名leThrowable(t);
                域名r(域名tring("域名rocessingError"), t);
            }
        }

请求具体处理

上一步,Selector获取到了就绪的请求socket,然后根据socket注册的触发处理函数等,将这些数据进行封装,扔到了线程池里,开始具体的业务逻辑处理。本节就是从工作线程封装开始,域名域名.SocketProcessorBase为工作线程类的抽象类,实现了Runnable接口,不同的Endpoint实现具体的处理逻辑,本节以NioEndpoint为例

以下为域名域名.AbstractEndpoint#processSocket方法源码

    /**
     * Process the given SocketWrapper with the given status. Used to trigger
     * processing as if the Poller (for those endpoints that have one)
     * selected the socket.
     *
     * @param socketWrapper The socket wrapper to process
     * @param event         The socket event to be processed
     * @param dispatch      Should the processing be performed on a new
     *                          container thread
     *
     * @return if processing was triggered successfully
     */
    public boolean processSocket(SocketWrapperBase<S> socketWrapper,
            SocketEvent event, boolean dispatch) {
        try {
            if (socketWrapper == null) {
                return false;
            }
            // 优先使用已经存在的线程
            SocketProcessorBase<S> sc = null;
            if (processorCache != null) {
                sc = 域名();
            }
            if (sc == null) {
                sc = createSocketProcessor(socketWrapper, event);
            } else {
                域名t(socketWrapper, event);
            }
            // 获取线程池。线程池的初始化,是在Acceptor、Poller这两个单独线程启动之前创建
            // tomcat使用了自定义的域名域名域名Queue,这块tomcat也进行了小的适配开发
            // 核心线程为10个,最大200线程
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
                域名ute(sc);
            } else {
                域名();
            }
        } catch (RejectedExecutionException ree) {
            getLog().warn(域名tring("域名", socketWrapper) , ree);
            return false;
        } catch (Throwable t) {
            域名leThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            getLog().error(域名tring("域名"), t);
            return false;
        }
        return true;
    }

上面的方法是得到了处理业务逻辑的线程SocketProcessorBase,NioEndpoint内部类域名域名.域名etProcessor继承了这个抽象类,也就是具体的业务处理逻辑在域名域名.域名etProcessor#doRun方法中,最终调用到我们的Servlet

        protected void doRun() {
            /*
             * Do not cache and re-use the value of 域名ocket() in
             * this method. If the socket closes the value will be updated to
             * CLOSED_NIO_CHANNEL and the previous value potentially re-used for
             * a new connection. That can result in a stale cached value which
             * in turn can result in unintentionally closing currently active
             * connections.
             */
            Poller poller = 域名er;
            if (poller == null) {
                域名e();
                return;
            }

            try {
                int handshake = -1;
                try {
                    // 握手相关判断逻辑
                   ... 
                } catch (IOException x) {
                  ...
                }
                // 三次握手成功了
                if (handshake == 0) {
                    SocketState state = 域名;
                    // Process the request from this socket
                    // event为域名_READ,这个变量是域名域名.域名er#processKey方法赋值
                    if (event == null) {
                        state = getHandler().process(socketWrapper, 域名_READ);
                    } else {
                        // 这里就开始正式处理请求了
                        state = getHandler().process(socketWrapper, event);
                    }
                    if (state == 域名ED) {
                        域名elledKey(getSelectionKey(), socketWrapper);
                    }
                } else if (handshake == -1 ) {
                    getHandler().process(socketWrapper, 域名ECT_FAIL);
                    域名elledKey(getSelectionKey(), socketWrapper);
                } else if (handshake == 域名EAD){
                    域名sterReadInterest();
                } else if (handshake == 域名RITE){
                    域名sterWriteInterest();
                }
            } catch (CancelledKeyException cx) {
                域名elledKey(getSelectionKey(), socketWrapper);
            } catch (VirtualMachineError vme) {
                域名leThrowable(vme);
            } catch (Throwable t) {
                域名r(域名tring("域名"), t);
                域名elledKey(getSelectionKey(), socketWrapper);
            } finally {
                socketWrapper = null;
                event = null;
                //return to cache
                if (running && !paused && processorCache != null) {
                    域名(this);
                }
            }
        }

总结

  • Tomcat是如何接收网络请求?

    使用java nio的同步非阻塞去进行网络监听。

    域名域名.AbstractEndpoint#bindWithCleanup中初始化网络监听、SSL

    		{	
                ....
                serverSock = 域名();
                域名roperties(域名et());
                InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
                // 当应用层面的连接数到达最大值时,操作系统可以继续接收连接,那么操作系统能继续接收的最大连接数就是这个队列长度,可以通过acceptCount 参数配置,默认是 100
                域名(addr, getAcceptCount());
            }
            域名igureBlocking(true); //mimic APR behavior
    

    域名域名.NioEndpoint#startInternal中初始化业务处理的线程池、连接限制器、Poller线程、Acceptor线程

  • 如何做到高性能的http协议服务器?

    Tomcat把接收连接、检测 I/O 事件以及处理请求进行了拆分,用不同规模的线程去做对应的事情,这也是tomcat能高并发处理请求的原因。不让线程阻塞,尽量让CPU忙起来

tomcat-socket

  • 是怎么设计的呢?

    通过接口、抽象类等,将不同的处理逻辑拆分,各司其职

    • 域名域名.AbstractEndpoint:I/O事件的检测、处理逻辑都在这个类的实现类里面。使用模板方法,不同的协议有不同的实现方法。NioEndpoint/Nio2Endpoint/AprEndpoint
      • 域名域名.域名er:引用了域名域名ctor,内部有个事件队列,监听I/O事件具体就是在这里做的
      • 域名域名.域名ocketWrapper
      • 域名域名.域名etProcessor: 具体处理请求的线程类

参考:

NioEndpoint组件:Tomcat如何实现非阻塞I/O?

Java NIO浅析

标签:编程
湘ICP备14001474号-3  投诉建议:234161800@qq.com   部分内容来源于网络,如有侵权,请联系删除。