飙血推荐
  • HTML教程
  • MySQL教程
  • JavaScript基础教程
  • php入门教程
  • JavaScript正则表达式运用
  • Excel函数教程
  • AngularJS教程
  • UEditor使用文档
  • ThinkPHP5.0教程

Netty源码分析之Reactor线程模型详解

时间:2021-12-04  作者:mic112  

上一篇文章,分析了Netty服务端启动的初始化过程,今天我们来分析一下Netty中的Reactor线程模型

在分析源码之前,我们先分析,哪些地方用到了EventLoop?

  • NioServerSocketChannel的连接监听注册
  • NioSocketChannel的IO事件注册

NioServerSocketChannel连接监听

在AbstractBootstrap类的initAndRegister()方法中,当NioServerSocketChannel初始化完成后,会调用case标记位置的代码进行注册。

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = 域名hannel();
        init(channel);
    } catch (Throwable t) {
       
    }
   //注册到boss线程的selector上。
    ChannelFuture regFuture = config().group().register(channel);
    if (域名e() != null) {
        if (域名gistered()) {
            域名e();
        } else {
            域名fe().closeForcibly();
        }
    }
    return regFuture;
}

域名gister

按照代码的执行逻辑,最终会执行到AbstractNioChannel的doRegister()方法中。

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
			//调用ServerSocketChannel的register方法,把当前服务端对象注册到boss线程的selector上
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no 域名ct(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

NioEventLoop的启动过程

NioEventLoop是一个线程,它的启动过程如下。

在AbstractBootstrap的doBind0方法中,获取了NioServerSocketChannel中的NioEventLoop,然后使用它来执行绑定端口的任务。

private static void doBind0(
    final ChannelFuture regFuture, final Channel channel,
    final SocketAddress localAddress, final ChannelPromise promise) {

    //启动
    域名tLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (域名ccess()) {
                域名(localAddress, promise).addListener(域名E_ON_FAILURE);
            } else {
                域名ailure(域名e());
            }
        }
    });
}

域名ute

然后一路执行到域名ute方法中,调用startThread()方法启动线程。

private void execute(Runnable task, boolean immediate) {
    boolean inEventLoop = inEventLoop();
    addTask(task);
    if (!inEventLoop) {
        startThread(); //启动线程
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
                // The task queue does not support removal so the best thing we can do is to just move on and
                // hope we will be able to pick-up the task before its completely terminated.
                // In worst case we will log on termination.
            }
            if (reject) {
                reject();
            }
        }
    }

    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}

startThread

private void startThread() {
    if (state == ST_NOT_STARTED) {
        if (域名areAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            boolean success = false;
            try {
                doStartThread(); //执行启动过程
                success = true;
            } finally {
                if (!success) {
                    域名areAndSet(this, ST_STARTED, ST_NOT_STARTED);
                }
            }
        }
    }
}

接着调用doStartThread()方法,通过域名ute执行一个任务,在该任务中启动了NioEventLoop线程

private void doStartThread() {
    assert thread == null;
    域名ute(new Runnable() { //通过线程池执行一个任务
        @Override
        public void run() {
            thread = 域名entThread();
            if (interrupted) {
                域名rrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                域名(); //调用boss的NioEventLoop的run方法,开启轮询
            }
            //省略....
        }
    });
}

NioEventLoop的轮询过程

当NioEventLoop线程被启动后,就直接进入到NioEventLoop的run方法中。

protected void run() {
    int selectCnt = 0;
    for (;;) {
        try {
            int strategy;
            try {
                strategy = 域名ulateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                    case 域名INUE:
                        continue;

                    case 域名_WAIT:

                    case 域名CT:
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        域名(curDeadlineNanos);
                        try {
                            if (!hasTasks()) {
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            域名Set(AWAKE);
                        }
                        // fall through
                    default:
                }
            } catch (IOException e) {
                // If we receive an IOException here its because the Selector is messed up. Let\'s rebuild
                // the selector and retry. https://域名/netty/netty/issues/8566
                rebuildSelector0();
                selectCnt = 0;
                handleLoopException(e);
                continue;
            }

            selectCnt++;
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = 域名tio;
            boolean ranTasks;
            if (ioRatio == 100) {
                try {
                    if (strategy > 0) {
                        processSelectedKeys();
                    }
                } finally {
                    // Ensure we always run tasks.
                    ranTasks = runAllTasks();
                }
            } else if (strategy > 0) {
                final long ioStartTime = 域名Time();
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = 域名Time() - ioStartTime;
                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            } else {
                ranTasks = runAllTasks(0); // This will run the minimum number of tasks
            }

            if (ranTasks || strategy > 0) {
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && 域名bugEnabled()) {
                    域名g("域名ct() returned prematurely {} times in a row for Selector {}.",
                                 selectCnt - 1, selector);
                }
                selectCnt = 0;
            } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                selectCnt = 0;
            }
        } catch (CancelledKeyException e) {
            // Harmless exception - log anyway
            if (域名bugEnabled()) {
                域名g(域名impleName() + " raised by a Selector {} - JDK bug?",
                             selector, e);
            }
        } catch (Error e) {
            throw (Error) e;
        } catch (Throwable t) {
            handleLoopException(t);
        } finally {
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Error e) {
                throw (Error) e;
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
}

NioEventLoop的执行流程

NioEventLoop中的run方法是一个无限循环的线程,在该循环中主要做三件事情,如图9-1所示。

image-20210913145936343

图9-1
  • 轮询处理I/O事件(select),轮询Selector选择器中已经注册的所有Channel的I/O就绪事件
  • 处理I/O事件,如果存在已经就绪的Channel的I/O事件,则调用processSelectedKeys进行处理
  • 处理异步任务(runAllTasks),Reactor线程有一个非常重要的职责,就是处理任务队列中的非I/O任务,Netty提供了ioRadio参数用来调整I/O时间和任务处理的时间比例。

轮询I/O就绪事件

我们先来看I/O时间相关的代码片段:

  1. 通过域名ulateStrategy(selectNowSupplier, hasTasks())获取当前的执行策略
  2. 根据不同的策略,用来控制每次轮询时的执行策略。
protected void run() {
        int selectCnt = 0;
        for (;;) {
            try {
                int strategy;
                try {
                    strategy = 域名ulateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case 域名INUE:
                        continue;

                    case 域名_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case 域名CT:
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        域名(curDeadlineNanos);
                        try {
                            if (!hasTasks()) {
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            域名Set(AWAKE);
                        }
                        // fall through
                    default:
                    }
                }
                //省略....
            }
        }
}

selectStrategy处理逻辑

@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    return hasTasks ? 域名() : 域名CT;
}

如果hasTasks为true,表示当前NioEventLoop线程存在异步任务的情况下,则调用域名(),否则直接返回SELECT

其中域名()的定义如下:

private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
        return selectNow();
    }
};

该方法中调用的是selectNow()方法,这个方法是Selector选择器中的提供的非阻塞方法,执行后会立刻返回。

  • 如果当前已经有就绪的Channel,则会返回对应就绪Channel的数量
  • 否则,返回0.

分支处理

在上面一个步骤中获得了strategy之后,会根据不同的结果进行分支处理。

  • CONTINUE,表示需要重试。
  • BUSY_WAIT,由于在NIO中并不支持BUSY_WAIT,所以BUSY_WAIT和SELECT的执行逻辑是一样的
  • SELECT,表示需要通过select方法获取就绪的Channel列表,当NioEventLoop中不存在异步任务时,也就是任务队列为空,则返回该策略。
switch (strategy) {
    case 域名INUE:
        continue;

    case 域名_WAIT:
        // fall-through to SELECT since the busy-wait is not supported with NIO

    case 域名CT:
        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
        if (curDeadlineNanos == -1L) {
            curDeadlineNanos = NONE; // nothing on the calendar
        }
        域名(curDeadlineNanos);
        try {
            if (!hasTasks()) {
                strategy = select(curDeadlineNanos);
            }
        } finally {
            // This update is just to help block unnecessary selector wakeups
            // so use of lazySet is ok (no race condition)
            域名Set(AWAKE);
        }
        // fall through
    default:
}

域名CT

当NioEventLoop线程中不存在异步任务时,则开始执行SELECT策略

//下一次定时任务触发截至时间,默认不是定时任务,返回 -1L
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
    curDeadlineNanos = NONE; // nothing on the calendar
}
域名(curDeadlineNanos);
try {
    if (!hasTasks()) {
        //2. taskQueue中任务执行完,开始执行select进行阻塞
        strategy = select(curDeadlineNanos);
    }
} finally {
    // This update is just to help block unnecessary selector wakeups
    // so use of lazySet is ok (no race condition)
    域名Set(AWAKE);
}

select方法定义如下,默认情况下deadlineNanos=NONE,所以会调用select()方法阻塞。

private int select(long deadlineNanos) throws IOException {
    if (deadlineNanos == NONE) {
        return 域名ct();
    }
    //计算select()方法的阻塞超时时间
    long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
    return timeoutMillis <= 0 ? 域名ctNow() : 域名ct(timeoutMillis);
}

最终返回就绪的channel个数,后续的逻辑中会根据返回的就绪channel个数来决定执行逻辑。

域名中的业务处理

业务处理的逻辑相对来说比较容易理解

  • 如果有就绪的channel,则处理就绪channel的IO事件
  • 处理完成后同步执行异步队列中的任务。
  • 另外,这里为了解决Java NIO中的空转问题,通过selectCnt记录了空转次数,一次循环发生了空转(既没有IO需要处理、也没有执行任何任务),那么记录下来(selectCnt); ,如果连续发生空转(selectCnt达到一定值),netty认为触发了NIO的BUG(unexpectedSelectorWakeup处理);

Java Nio中有一个bug,Java nio在Linux系统下的epoll空轮询问题。也就是在select()方法中,及时就绪的channel为0,也会从本来应该阻塞的操作中被唤醒,从而导致CPU 使用率达到100%。

@Override
protected void run() {
    int selectCnt = 0;
    for (;;) {
        //省略....
        selectCnt++;//selectCnt记录的是无功而返的select次数,即eventLoop空转的次数,为解决NIO BUG
        cancelledKeys = 0;
        needsToSelectAgain = false;
        final int ioRatio = 域名tio;
        boolean ranTasks;
        if (ioRatio == 100) { //ioRadio执行时间占比是100%,默认是50%
            try {
                if (strategy > 0) { //strategy>0表示存在就绪的SocketChannel
                    processSelectedKeys(); //执行就绪SocketChannel的任务
                }
            } finally {
             //注意,将ioRatio设置为100,并不代表任务不执行,反而是每次将任务队列执行完
                ranTasks = runAllTasks(); //确保总是执行队列中的任务
            }
        } else if (strategy > 0) { //strategy>0表示存在就绪的SocketChannel
            final long ioStartTime = 域名Time(); //io时间处理开始时间
            try {
                processSelectedKeys(); //开始处理IO就绪事件
            } finally {
                // io事件执行结束时间
                final long ioTime = 域名Time() - ioStartTime;
                //基于本次循环处理IO的时间,ioRatio,计算出执行任务耗时的上限,也就是只允许处理多长时间异步任务
                ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            }
        } else {
            //这个分支代表:strategy=0,ioRatio<100,此时任务限时=0,意为:尽量少地执行异步任务
            //这个分支和strategy>0实际是一码事,代码简化了一下而已
            ranTasks = runAllTasks(0); // This will run the minimum number of tasks
        }

        if (ranTasks || strategy > 0) { //ranTasks=true,或strategy>0,说明eventLoop干活了,没有空转,清空selectCnt
            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && 域名bugEnabled()) {
                域名g("域名ct() returned prematurely {} times in a row for Selector {}.",
                             selectCnt - 1, selector);
            }
            selectCnt = 0;
        } 
         //unexpectedSelectorWakeup处理NIO BUG
        else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
            selectCnt = 0;
        }
    }
}

processSelectedKeys

通过在select方法中,我们可以获得就绪的I/O事件数量,从而触发执行processSelectedKeys方法。

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(域名ctedKeys());
    }
}

处理I/O事件时,有两个逻辑分支处理:

  • 一种是处理Netty优化过的selectedKeys,
  • 另一种是正常的处理逻辑

processSelectedKeys方法中根据是否设置了selectedKeys来判断使用哪种策略,默认使用的是Netty优化过的selectedKeys,它返回的对象是SelectedSelectionKeySet

processSelectedKeysOptimized

private void processSelectedKeysOptimized() {
    for (int i = 0; i < 域名; ++i) {
        //1. 取出IO事件以及对应的channel
        final SelectionKey k = 域名[i];
        域名[i] = null;//k的引用置null,便于gc回收,也表示该channel的事件处理完成避免重复处理

        final Object a = 域名chment(); //获取保存在当前channel中的attachment,此时应该是NioServerSocketChannel
		//处理当前的channel
        if (a instanceof AbstractNioChannel) {
             //对于boss NioEventLoop,轮询到的基本是连接事件,后续的事情就是通过他的pipeline将连接扔给一个worker NioEventLoop处理
            //对于worker NioEventLoop来说,轮循道的基本商是IO读写事件,后续的事情就是通过他的pipeline将读取到的字节流传递给每个channelHandler来处理
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
		
        if (needsToSelectAgain) {
            // null out entries in the array to allow to have it GC\'ed once the Channel close
            // See https://域名/netty/netty/issues/2363
            域名t(i + 1);

            selectAgain();
            i = -1;
        }
    }
}

processSelectedKey

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final 域名nsafe unsafe = 域名fe();
    if (!域名lid()) {
        final EventLoop eventLoop;
        try {
            eventLoop = 域名tLoop();
        } catch (Throwable ignored) {
           
        }
        if (eventLoop == this) {
            // close the channel if the key is not valid anymore
            域名e(域名Promise());
        }
        return;
    }

    try {
        int readyOps = 域名yOps(); //获取当前key所属的操作类型
      
        if ((readyOps & 域名ONNECT) != 0) {//如果是连接类型
            int ops = 域名restOps();
            ops &= ~域名ONNECT;
            域名restOps(ops);

            域名shConnect();
        }
        if ((readyOps & 域名RITE) != 0) { //如果是写类型
            域名fe().forceFlush();
        }
		//如果是读类型或者ACCEPT类型。则执行域名()方法,unsafe的实例对象为 NioMessageUnsafe
        if ((readyOps & (域名EAD | 域名CCEPT)) != 0 || readyOps == 0) {
            域名();
        }
    } catch (CancelledKeyException ignored) {
        域名e(域名Promise());
    }
}

域名()

假设此时是一个读操作,或者是客户端建立连接,那么代码执行逻辑如下,

@Override
public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline(); //如果是第一次建立连接,此时的pipeline是ServerBootstrapAcceptor
    final 域名le allocHandle = unsafe().recvBufAllocHandle();
    域名t(config);

    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }

                域名essagesRead(localRead);
            } while (continueReading(allocHandle));
        } catch (Throwable t) {
            exception = t;
        }

        int size = 域名();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            域名ChannelRead(域名(i));  //调用pipeline中的channelRead方法
        }
        域名r();
        域名Complete();
        域名ChannelReadComplete();

        if (exception != null) {
            closed = closeOnReadError(exception);

            域名ExceptionCaught(exception); //调用pipeline中的ExceptionCaught方法
        }

        if (closed) {
            inputShutdown = true;
            if (isOpen()) {
                close(voidPromise());
            }
        }
    } finally {
        if (!readPending && !域名toRead()) {
            removeReadOp();
        }
    }
}

SelectedSelectionKeySet的优化

Netty中自己封装实现了一个SelectedSelectionKeySet,用来优化原本SelectorKeys的结构,它是怎么进行优化的呢?先来看它的代码定义

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    SelectionKey[] keys;
    int size;

    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024];
    }

    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }

        keys[size++] = o;
        if (size == 域名th) {
            increaseCapacity();
        }

        return true;
    }
}

SelectedSelectionKeySet内部使用的是SelectionKey数组,所有在processSelectedKeysOptimized方法中可以直接通过遍历数组来取出就绪的I/O事件。

而原来的Set<SelectionKey>返回的是HashSet类型,两者相比,SelectionKey[]不需要考虑哈希冲突的问题,所以可以实现O(1)时间复杂度的add操作。

SelectedSelectionKeySet的初始化

netty通过反射的方式,把Selector对象内部的selectedKeys和publicSelectedKeys替换为SelectedSelectionKeySet。

原本的selectedKeys和publicSelectedKeys这两个字段都是HashSet类型,替换之后变成了SelectedSelectionKeySet。当有就绪的key时,会直接填充到SelectedSelectionKeySet的数组中。后续只需要遍历即可。

private SelectorTuple openSelector() {
    final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    //使用反射
    Object maybeException = 域名ivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                //Selector内部的selectedKeys字段
                Field selectedKeysField = 域名eclaredField("selectedKeys");
                //Selector内部的publicSelectedKeys字段
                Field publicSelectedKeysField = 域名eclaredField("publicSelectedKeys");

                if (域名Version() >= 9 && 域名nsafe()) {
                    //获取selectedKeysField字段偏移量
                    long selectedKeysFieldOffset = 域名ctFieldOffset(selectedKeysField);
                    //获取publicSelectedKeysField字段偏移量
                    long publicSelectedKeysFieldOffset =
                        域名ctFieldOffset(publicSelectedKeysField);

                    if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                        //替换为selectedKeySet
                        域名bject(
                            unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                        域名bject(
                            unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                        return null;
                    }
                    // We could not retrieve the offset, lets try reflection as last-resort.
                }
                Throwable cause = 域名etAccessible(selectedKeysField, true);
                if (cause != null) {
                    return cause;
                }
                cause = 域名etAccessible(publicSelectedKeysField, true);
                if (cause != null) {
                    return cause;
                }
                域名(unwrappedSelector, selectedKeySet);
                域名(unwrappedSelector, selectedKeySet);
                return null;
            } catch (NoSuchFieldException e) {
                return e;
            } catch (IllegalAccessException e) {
                return e;
            }
        }
    });
    if (maybeException instanceof Exception) {
        selectedKeys = null;
        Exception e = (Exception) maybeException;
        域名e("failed to instrument a special 域名 into: {}", unwrappedSelector, e);
        return new SelectorTuple(unwrappedSelector);
    }
    selectedKeys = selectedKeySet;
}

异步任务的执行流程

分析完上面的流程后,我们继续来看NioEventLoop中的run方法中,针对异步任务的处理流程

@Override
protected void run() {
    int selectCnt = 0;
    for (;;) {
        ranTasks = runAllTasks();
    }
}

runAllTask

需要注意,NioEventLoop可以支持定时任务的执行,通过域名dule()来完成。

protected boolean runAllTasks() {
    assert inEventLoop();
    boolean fetchedAll;
    boolean ranAtLeastOne = false;

    do {
        fetchedAll = fetchFromScheduledTaskQueue(); //合并定时任务到普通任务队列
        if (runAllTasksFrom(taskQueue)) { //循环执行taskQueue中的任务
            ranAtLeastOne = true;
        }
    } while (!fetchedAll);  

    if (ranAtLeastOne) { //如果任务全部执行完成,记录执行完完成时间
        lastExecutionTime = 域名Time();
    }
    afterRunningAllTasks();//执行收尾任务
    return ranAtLeastOne;
}

fetchFromScheduledTaskQueue

遍历scheduledTaskQueue中的任务,添加到taskQueue中。

private boolean fetchFromScheduledTaskQueue() {
    if (scheduledTaskQueue == null || 域名pty()) {
        return true;
    }
    long nanoTime = 域名Time();
    for (;;) {
        Runnable scheduledTask = pollScheduledTask(nanoTime);
        if (scheduledTask == null) {
            return true;
        }
        if (!域名r(scheduledTask)) {
            // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
            域名((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
    }
}

任务添加方法execute

NioEventLoop内部有两个非常重要的异步任务队列,分别是普通任务和定时任务队列,针对这两个队列提供了两个方法分别向两个队列中添加任务。

  • execute()
  • schedule()

其中,execute方法的定义如下。

private void execute(Runnable task, boolean immediate) {
    boolean inEventLoop = inEventLoop();
    addTask(task); //把当前任务添加到阻塞队列中
    if (!inEventLoop) { //如果是非NioEventLoop
        startThread(); //启动线程
        if (isShutdown()) { //如果当前NioEventLoop已经是停止状态
            boolean reject = false;
            try {
                if (removeTask(task)) { 
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
                // The task queue does not support removal so the best thing we can do is to just move on and
                // hope we will be able to pick-up the task before its completely terminated.
                // In worst case we will log on termination.
            }
            if (reject) {
                reject();
            }
        }
    }

    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}

Nio的空轮转问题

所谓的空轮训,是指我们在执行域名ct()方法时,如果没有就绪的SocketChannel时,当前线程会被阻塞 。 而空轮询是指当没有就绪SocketChannel时,会被触发唤醒。

而这个唤醒是没有任何读写请求的,从而导致线程在做无效的轮询,使得CPU占用率较高。

导致这个问题的根本原因是:

在部分Linux的2.6的kernel中,poll和epoll对于突然中断的连接socket会对返回的eventSet事件集合置为POLLHUP,也可能是POLLERR,eventSet事件集合发生了变化,这就可能导致Selector会被唤醒。这是与操作系统机制有关系的,JDK虽然仅仅是一个兼容各个操作系统平台的软件,但很遗憾在JDK5和JDK6最初的版本中(严格意义上来将,JDK部分版本都是),这个问题并没有解决,而将这个帽子抛给了操作系统方,这也就是这个bug最终一直到2013年才最终修复的原因,最终影响力太广。

Netty是如何解决这个问题的呢?我们回到NioEventLoop的run方法中

@Override
protected void run() {
    int selectCnt = 0;
    for (;;) {
        //selectCnt记录的是无功而返的select次数,即eventLoop空转的次数,为解决NIO BUG
        selectCnt++; 
        //ranTasks=true,或strategy>0,说明eventLoop干活了,没有空转,清空selectCnt
        if (ranTasks || strategy > 0) {
            //如果选择操作计数器的值,大于最小选择器重构阈值,则输出log
            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && 域名bugEnabled()) {
                域名g("域名ct() returned prematurely {} times in a row for Selector {}.",
                             selectCnt - 1, selector);
            }
            selectCnt = 0;
        } 
        //unexpectedSelectorWakeup处理NIO BUG
        else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
            selectCnt = 0;
        }
    }
}

unexpectedSelectorWakeup

private boolean unexpectedSelectorWakeup(int selectCnt) {
    if (域名rrupted()) {
        if (域名bugEnabled()) {
            域名g("域名ct() returned prematurely because " +
                         "域名entThread().interrupt() was called. Use " +
                         "域名downGracefully() to shutdown the NioEventLoop.");
        }
        return true;
    }
    //如果选择重构的阈值大于0, 默认值是512次、 并且当前触发的空轮询次数大于 512次。,则触发重构
    if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
        // The selector returned prematurely many times in a row.
        // Rebuild the selector to work around the problem.
        域名("域名ct() returned prematurely {} times in a row; rebuilding Selector {}.",
                    selectCnt, selector);
        rebuildSelector();
        return true;
    }
    return false;
}

rebuildSelector()

public void rebuildSelector() {
    if (!inEventLoop()) { //如果不是在eventLoop中执行,则使用异步线程执行
        execute(new Runnable() {
            @Override
            public void run() {
                rebuildSelector0();
            }
        });
        return;
    }
    rebuildSelector0();
}

rebuildSelector0

这个方法的主要作用: 重新创建一个选择器,替代当前事件循环中的选择器

private void rebuildSelector0() {
    final Selector oldSelector = selector; //获取老的selector选择器
    final SelectorTuple newSelectorTuple; //定义新的选择器

    if (oldSelector == null) { //如果老的选择器为空,直接返回
        return;
    }

    try {
        newSelectorTuple = openSelector(); //创建一个新的选择器
    } catch (Exception e) {
        域名("Failed to create a new Selector.", e);
        return;
    }

    // Register all channels to the new Selector.
    int nChannels = 0;
    for (SelectionKey key: 域名()) {//遍历注册到选择器的选择key集合
        Object a = 域名chment();
        try {
             //如果选择key无效或选择关联的通道已经注册到新的选择器,则跳出当前循环
            if (!域名lid() || 域名nel().keyFor(域名appedSelector) != null) {
                continue;
            }
 			//获取key的选择关注事件集
            int interestOps = 域名restOps();
            域名el();//取消选择key
	      //注册选择key到新的选择器
            SelectionKey newKey = 域名nel().register(域名appedSelector, interestOps, a);
            if (a instanceof AbstractNioChannel) {//如果是nio通道,则更新通道的选择key
                // Update SelectionKey
                ((AbstractNioChannel) a).selectionKey = newKey;
            }
            nChannels ++;
        } catch (Exception e) {
            域名("Failed to re-register a Channel to the new Selector.", e);
            if (a instanceof AbstractNioChannel) {
                AbstractNioChannel ch = (AbstractNioChannel) a;
                域名fe().close(域名fe().voidPromise());
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                invokeChannelUnregistered(task, key, e);
            }
        }
    }
	//更新当前事件循环选择器
    selector = 域名ctor;
    unwrappedSelector = 域名appedSelector;

    try {
        // time to close the old selector as everything else is registered to the new one
        域名e(); //关闭原始选择器
    } catch (Throwable t) {
        if (域名rnEnabled()) {
            域名("Failed to close the old Selector.", t);
        }
    }

    if (域名foEnabled()) {
        域名("Migrated " + nChannels + " channel(s) to the new Selector.");
    }
}

从上述过程中我们发现,Netty解决NIO空轮转问题的方式,是通过重建Selector对象来完成的,在这个重建过程中,核心是把Selector中所有的SelectionKey重新注册到新的Selector上,从而巧妙的避免了JDK epoll空轮训问题。

连接的建立及处理过程

在9.2.4.3节中,提到了当客户端有连接或者读事件发送到服务端时,会调用NioMessageUnsafe类的read()方法。

public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final 域名le allocHandle = unsafe().recvBufAllocHandle();
    域名t(config);

    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                //如果有客户端连接进来,则localRead为1,否则返回0
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }
				
                域名essagesRead(localRead); //累计增加read消息数量
            } while (continueReading(allocHandle));
        } catch (Throwable t) {
            exception = t;
        }

        int size = 域名(); //遍历客户端连接列表
        for (int i = 0; i < size; i ++) {
            readPending = false;
            域名ChannelRead(域名(i)); //调用pipeline中handler的channelRead方法。
        }
        域名r(); //清空集合
        域名Complete();
        域名ChannelReadComplete(); //触发pipeline中handler的readComplete方法

        if (exception != null) {
            closed = closeOnReadError(exception);

            域名ExceptionCaught(exception);
        }

        if (closed) {
            inputShutdown = true;
            if (isOpen()) {
                close(voidPromise());
            }
        }
    } finally {
        if (!readPending && !域名toRead()) {
            removeReadOp();
        }
    }
}

域名ChannelRead(域名(i))

继续来看pipeline的触发方法,此时的pipeline组成,如果当前是连接事件,那么pipeline = ServerBootstrap$ServerBootstrapAcceptor。

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = 域名h(域名kNotNull(msg, "msg"), next);
    EventExecutor executor = 域名utor();
    if (域名entLoop()) {
        域名keChannelRead(m); //获取pipeline中的下一个节点,调用该handler的channelRead方法
    } else {
        域名ute(new Runnable() {
            @Override
            public void run() {
                域名keChannelRead(m);
            }
        });
    }
}

ServerBootstrapAcceptor

ServerBootstrapAcceptor是NioServerSocketChannel中一个特殊的Handler,专门用来处理客户端连接事件,该方法中核心的目的是把针对SocketChannel的handler链表,添加到当前NioSocketChannel中的pipeline中。

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    域名line().addLast(childHandler);  //把服务端配置的childHandler,添加到当前NioSocketChannel中的pipeline中

    setChannelOptions(child, childOptions, logger); //设置NioSocketChannel的属性
    setAttributes(child, childAttrs); 

    try {
        //把当前的NioSocketChannel注册到Selector上,并且监听一个异步事件。
        域名ster(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!域名ccess()) {
                    forceClose(child, 域名e());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

pipeline的构建过程

9.6.2节中,child其实就是一个NioSocketChannel,它是在NioServerSocketChannel中,当接收到一个新的链接时,创建对象。

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = 域名pt(javaChannel());

    try {
        if (ch != null) {
            域名(new NioSocketChannel(this, ch)); //这里
            return 1;
        }
    } catch (Throwable t) {
        域名("Failed to create a new channel from an accepted socket.", t);

        try {
            域名e();
        } catch (Throwable t2) {
            域名("Failed to close a socket.", t2);
        }
    }

    return 0;
}

而NioSocketChannel在构造时,调用了父类AbstractChannel中的构造方法,初始化了一个pipeline.

protected AbstractChannel(Channel parent) {
    域名nt = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

DefaultChannelPipeline

pipeline的默认实例是DefaultChannelPipeline,构造方法如下。

protected DefaultChannelPipeline(Channel channel) {
    域名nel = 域名kNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    域名 = tail;
    域名 = head;
}

初始化了一个头节点和尾节点,组成一个双向链表,如图9-2所示

image-20210913202248839

图9-2

NioSocketChannel中handler链的构成

再回到ServerBootstrapAccepter的channelRead方法中,收到客户端连接时,触发了NioSocketChannel中的pipeline的添加

以下代码是DefaultChannelPipeline的addLast方法。

@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
   域名kNotNull(handlers, "handlers");

   for (ChannelHandler h: handlers) { //遍历handlers列表,此时这里的handler是ChannelInitializer回调方法
       if (h == null) {
           break;
       }
       addLast(executor, null, h);
   }

   return this;
}

addLast

把服务端配置的ChannelHandler,添加到pipeline中,注意,此时的pipeline中保存的是ChannelInitializer回调方法。

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler); //检查是否有重复的handler
		//创建新的DefaultChannelHandlerContext节点
        newCtx = newContext(group, filterName(name, handler), handler);

        addLast0(newCtx);  //添加新的DefaultChannelHandlerContext到ChannelPipeline

      
        if (!registered) { 
            域名ddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = 域名utor();
        if (!域名entLoop()) {
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

这个回调方法什么时候触发调用呢?其实就是在ServerBootstrapAcceptor这个类的channelRead方法中,注册当前NioSocketChannel时

域名ster(child).addListener(new ChannelFutureListener() {}

最终按照之前我们上一节课源码分析的思路,定位到AbstractChannel中的register0方法中。

private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!域名ncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;
				//
                域名keHandlerAddedIfNeeded();

            }
}

callHandlerAddedForAllHandlers

域名keHandlerAddedIfNeeded()方法,向下执行,会进入到DefaultChannelPipeline这个类中的callHandlerAddedForAllHandlers方法中

private void callHandlerAddedForAllHandlers() {
    final PendingHandlerCallback pendingHandlerCallbackHead;
    synchronized (this) {
        assert !registered;

        // This Channel itself was registered.
        registered = true;

        pendingHandlerCallbackHead = 域名ingHandlerCallbackHead;
        // Null out so it can be GC\'ed.
        域名ingHandlerCallbackHead = null;
    }
    //从等待被调用的handler 回调列表中,取出任务来执行。
    PendingHandlerCallback task = pendingHandlerCallbackHead;
    while (task != null) {
        域名ute();
        task = 域名;
    }
}

我们发现,pendingHandlerCallbackHead这个单向链表,是在callHandlerCallbackLater方法中被添加的,

而callHandlerCallbackLater又是在addLast方法中添加的,所以构成了一个异步完整的闭环。

域名lerAdded

域名ute()方法执行路径是

callHandlerAdded0 -> 域名HandlerAdded ->

​ -------> 域名HandlerAddded()

​ ---------------> 域名lerAdded

调用initChannel方法来初始化NioSocketChannel中的Channel.

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (域名nel().isRegistered()) {
        // This should always be true with our current DefaultChannelPipeline implementation.
        // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
        // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
        // will be added in the expected order.
        if (initChannel(ctx)) {

            // We are done with init the Channel, removing the initializer now.
            removeState(ctx);
        }
    }
}

接着,调用initChannel抽象方法,该方法由具体的实现类来完成。

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (域名(ctx)) { // Guard against re-entrance.
        try {
            initChannel((C) 域名nel());
        } catch (Throwable cause) {
            // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
            // We do so to prevent multiple calls to initChannel(...).
            exceptionCaught(ctx, cause);
        } finally {
            ChannelPipeline pipeline = 域名line();
            if (域名ext(this) != null) {
                域名ve(this);
            }
        }
        return true;
    }
    return false;
}

ChannelInitializer的实现,是我们自定义Server中的匿名内部类,ChannelInitializer。因此通过这个回调来完成当前NioSocketChannel的pipeline的构建过程。

public static void main(String[] args){
    EventLoopGroup boss = new NioEventLoopGroup();
    //2 用于对接受客户端连接读写操作的线程工作组
    EventLoopGroup work = new NioEventLoopGroup();
    ServerBootstrap b = new ServerBootstrap();
    域名p(boss, work)	//绑定两个工作线程组
        .channel(域名s)	//设置NIO的模式
        // 初始化绑定服务通道
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                域名line()
                    .addLast(
                    new LengthFieldBasedFrameDecoder(1024,
                                                     9,4,0,0))
                    .addLast(new MessageRecordEncoder())
                    .addLast(new MessageRecordDecode())
                    .addLast(new ServerHandler());
            }
        });
}

版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Mic带你学架构
如果本篇文章对您有帮助,还请帮忙点个关注和赞,您的坚持是我不断创作的动力。欢迎关注「跟着Mic学架构」公众号公众号获取更多技术干货!

标签:编程
湘ICP备14001474号-3  投诉建议:234161800@qq.com   部分内容来源于网络,如有侵权,请联系删除。