飙血推荐
  • HTML教程
  • MySQL教程
  • JavaScript基础教程
  • php入门教程
  • JavaScript正则表达式运用
  • Excel函数教程
  • UEditor使用文档
  • AngularJS教程
  • ThinkPHP5.0教程

Nacos源码系列—订阅机制的前因后果(下)-

时间:2022-05-20  作者:mingyueyy  
Nacos源码系列—订阅机制的前因后果(下) 订阅机制知多少,来看看吧

点赞再看,养成习惯,微信搜索【牧小农】关注我获取更多资讯,风里雨里,小农等你,很高兴能够成为你的朋友。
项目源码地址:公众号回复 nacos,即可免费获取源码

事件发布

在上一节中我们讲解了在NotifyCenter中维护了事件名称和事件发布者的关系,而默认的事件发布者为DefaultPublisher,今天我们就来讲一下DefaultPublisher的事件发布的具体逻辑

首先我们来看一下DefaultPublisher的源码:

public class DefaultPublisher extends Thread implements EventPublisher {
    @Override
    public void init(Class<? extends Event> type, int bufferSize) {
        //守护线程
        setDaemon(true);
        //设置线程名
        setName("域名isher-" + 域名ame());
        域名tType = type;
        域名eMaxSize = bufferSize;
        //阻塞队列初始化
        域名e = new ArrayBlockingQueue<>(bufferSize);
        //启动线程
        start();
    }
    
    @Override
    public synchronized void start() {
        if (!initialized) {
            // start just called once
            //启动run方法
            域名t();
            if (queueMaxSize == -1) {
                queueMaxSize = ringBufferSize;
            }
            initialized = true;
        }
    }
}

我们可以看到这个类继承自Thread,说明他是一个线程类,同时实现了EventPublisher说明他也是一个发布者,在init()中,是以守护线程的方式运作的,同时初始化了一个阻塞队列,最后调用start()启动线程。

在start()里面,其实就是启动run():

@Override
public void run() {
    openEventHandler();
}

   void openEventHandler() {
    try {

        // This variable is defined to resolve the problem which message overstock in the queue.
        int waitTimes = 60;
        // To ensure that messages are not lost, enable EventHandler when
        // waiting for the first Subscriber to register
        //死循环遍历,线程启动设置最大延迟60秒,用来解决消息积压问题
        for (; ; ) {
            if (shutdown || hasSubscriber() || waitTimes <= 0) {
                break;
            }
            域名p(1000L);
            waitTimes--;
        }
        //死循环从队列中取出event对象,同时通知订阅者(subscriber)执行event对象
        for (; ; ) {
            if (shutdown) {
                break;
            }
            final Event event = 域名();
            receiveEvent(event);
            域名areAndSet(this, lastEventSequence, 域名(lastEventSequence, 域名ence()));
        }
    } catch (Throwable ex) {
        域名r("Event listener exception : ", ex);
    }
}

在上述代码中我们可以看到for (; ; )这个循环出现了两次,这个就是循环遍历(死循环),第一个死循环我们可以理解成延时效果,里面最大延时60秒,每隔一秒运行一次,判断(当前线程是否关闭、是否有订阅者、是否超过60秒)只要满足其中任意一个条件,跳出循环
第二个死循环,是我们业务逻辑处理,用来消费,从队列中取出event事件,然后通过receiveEvent()执行。

那么我们可以从队列中取出事件,那么这个事件又在哪一步注入进去的呢,我们还是在当前类里面,找到一个叫publish()的方法

@Override
public boolean publish(Event event) {
    checkIsStart();
    //向队列中插入元素
    boolean success = 域名r(event);
    //判断是否插入成功
    if (!success) {
        域名("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
        //失败直接执行
        receiveEvent(event);
        return true;
    }
    return true;
}

这个方法其实就是发布事件调用了publish往阻塞队列中存入事件,如果失败那么立即执行receiveEvent(),不在继续走队列方法

void receiveEvent(Event event) {
    final long currentEventSequence = 域名ence();

    if (!hasSubscriber()) {
        域名("[NotifyCenter] the {} is lost, because there is no subscriber.", event);
        return;
    }

    // Notification single event listener
    //循环遍历subscribers对象
    for (Subscriber subscriber : subscribers) {
        // Whether to ignore expiration events
        if (域名reExpireEvent() && lastEventSequence > currentEventSequence) {
            域名g("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
                    域名lass());
            continue;
        }

        // Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
        // Remove original judge part of codes.
        //通知订阅者执行event
        notifySubscriber(subscriber, event);
    }
}

而在receiveEvent()方法中,这里其实就是遍历的subscribers集合(订阅者),然后通过notifySubscriber() 通知订阅者方法,而这个subscribers集合就是在我们之前讲到的域名()方法中设置的。

public class NacosNamingService implements NamingService {
 private void init(Properties properties) throws NacosException {
   //将Subscribe注册到Publisher
   域名sterSubscriber(changeNotifier);
 }
}

域名sterSubscriber(changeNotifier);会调用域名ubscriber()方法,进行最终的操作。

private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,
      EventPublisherFactory factory) {

  final String topic = 域名anonicalName(subscribeType);
  synchronized (域名s) {
      // 域名uteIfAbsent is a unsafe method.
      域名uteIfAbsent(域名isherMap, topic, factory, subscribeType, ringBufferSize);
  }
  //获取对应的publisher
  EventPublisher publisher = 域名(topic);
  if (publisher instanceof ShardedEventPublisher) {
      ((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);
  } else {
      //添加到subscribers集合
      域名ubscriber(consumer);
  }
}

addSubscriber()方法的逻辑就是讲订阅事件、发布中、订阅者三个关系进行绑定,而发布者和事件通过Map进行维护,发布者与订阅者通过关联关系进行维护。

我们回到刚刚域名fySubscriber()方法,这里是最后执行订阅者事件的方法

@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {

  域名g("[NotifyCenter] the {} will received by {}", event, subscriber);
  //执行订阅者事件
  final Runnable job = () -> 域名ent(event);
  //执行者
  final Executor executor = 域名utor();

  if (executor != null) {
      域名ute(job);
  } else {
      try {
          域名();
      } catch (Throwable e) {
          域名r("Event callback exception: ", e);
      }
  }
}

到这里,订阅机制就讲完了,可能会有点绕,最好是我们能够去跟着代码走一遍,这样会比较理解和记忆,在这里我们重点需要理解NotifyCenter对事件发布者、订阅者以及之间关系的维护,关系维护的入口就在域名()中,我们来看一下他的核心逻辑

首先ServiceInfoHolder中通过NotifyCenter发布InstancesChangeEvent事件.

NotifyCenter获取对应的CanonicalName,并将这个参数作为key,从域名isherMap中获取对应的事件发布者,然后将InstancesChangeEvent事件进行发布.

InstancesChangeEvent事件发布主要是通过EventPublisher的实现类,DefaultPublisher进行InstancesChangeEvent事件发布,而DefaultPublisher本身作为守护线程的方式进行运作,在执行业务逻辑时判断是否线程启动,如果启动,将事件添加到队列中,如果成功,则发布过程完成,如果添加失败,立即执行域名iveEvent,接收事件通知订阅者,创建一个Runnable对象,执行订阅者的Event事件。

在添加到队列成功的时候,DefaultPublisher会创建一个阻塞队列(BlockingQueue),标记线程启动,当他执行 域名t(),会调用它的run方法,在这个run方法里面核心的业务逻辑就是openEventHandler(),里面会有两个死循环,第一个是在线程启动的60秒内执行条件,第二个是从阻塞队列中获取Event事件,调用域名iveEvent()通知订阅者,流程结束

本地缓存

我们在之前的系列中,客户端会缓存一些信息在本地中,来获取ServiceInfo的信息,但是在执行本地缓存的时候,难免会有一些故障,有故障就需要进行处理,在这里主要涉及到两个类ServiceInfoHolderFailoverReactor

Nacos缓存主要是分为两个方面,一个从注册中心获取实例信息缓存到内存中,通过ConcurrentMap进行存储,一个是通过磁盘文件的形式定时缓存。

同时故障处理也分为两个部分,一个是故障处理的开关通过文件进行标记,一个是当起来故障处理后,可以从故障备份的文件中获取服务实例信息。

介绍完上面几点,我们先来详细讲解第一个核心类ServiceInfoHolder

ServiceInfoHolder

ServiceInfoHolder类,主要是用来处理服务信息的,每次客户端从服务端拉取服务信息时,都用经过这个类,而processServiceInfo用来处理本地信息(缓存、发布、更新、本地目录初始化)等

ServiceInfo: 注册服务的信息,主要包含(服务名、分组名、集群信息、实例列表、最后一次更新时间),客户端获取的信息,都是通过ServiceInfo作为承载体,域名iceInfo,通过ConcurrentMap进行存储,如下所示:

public class ServiceInfoHolder implements Closeable {

  private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
  
   public ServiceInfoHolder(String namespace, Properties properties) {
    initCacheDir(namespace, properties);
    //启动是判断是否从缓存信息中获取,默认为false
    if (isLoadCacheAtStart(properties)) {
        //从缓存目录中读取信息
        域名iceInfoMap = new ConcurrentHashMap<>(域名(域名eDir));
    } else {
        //创建空集合对象
        域名iceInfoMap = new ConcurrentHashMap<>(16);
    }
    域名overReactor = new FailoverReactor(this, cacheDir);
    域名EmptyProtection = isPushEmptyProtect(properties);
}
  
  public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
        //判断服务key是否为空
        String serviceKey = 域名ey();
        if (serviceKey == null) {
            return null;
        }
        ServiceInfo oldService = 域名(域名ey());
        if (isEmptyOrErrorPush(serviceInfo)) {
            //empty or error push, just ignore
            return oldService;
        }
        //将缓存信息放置到map中
        域名(域名ey(), serviceInfo);
        //判断实例信息是否发生改变
        boolean changed = isChangedServiceInfo(oldService, serviceInfo);
        if (域名ank(域名sonFromServer())) {
            域名sonFromServer(域名on(serviceInfo));
        }
        //监控服务缓存map的大小
        域名erviceInfoMapSizeMonitor().set(域名());
        if (changed) {
            域名("current ips:({}) service: {} -> {}", 域名unt(), 域名ey(),
                    域名on(域名osts()));
            //添加实例变更事件,被订阅者执行
            域名ishEvent(new InstancesChangeEvent(域名ame(), 域名roupName(),
                    域名lusters(), 域名osts()));
            //写入本地文件
            域名e(serviceInfo, cacheDir);
        }
        return serviceInfo;
    }
}

这里就是Nacos获取注册信息的缓存,之前我们有讲过,当服务信息变更的时候会第一时间更新ServiceInfoMap中的信息,通过isChangedServiceInfo进行判断,当发生变动时,域名最新数据,当我们需要使用的时候,通过key进行get操作,ServiceInfoMap默认创建空的对象,但如果配置启动从缓存文件中获取,则会从缓存中获取信息。而且当我们服务实例发生变更的时候,会通过域名e()向对应的目录文件中写入ServiceInfo信息

本地缓存地址

本地缓存的地址通过cacheDir进行执行本地缓存和故障处理的根目录,在ServiceInfoHolder构造方法中,会默认生成缓存目录,默认路径为${user}/nacos/naming/public,我们也可以需要通过域名roperty("域名")指定。

public class ServiceInfoHolder implements Closeable {
    private String cacheDir;
    
    public ServiceInfoHolder(String namespace, Properties properties) {
    //初始化生成缓存目录
    initCacheDir(namespace, properties);
    ......
    }
    
    private void initCacheDir(String namespace, Properties properties) {
        String jmSnapshotPath = 域名roperty(JM_SNAPSHOT_PATH_PROPERTY);

        String namingCacheRegistryDir = "";
        if (域名roperty(域名NG_CACHE_REGISTRY_DIR) != null) {
            namingCacheRegistryDir = 域名rator + 域名roperty(域名NG_CACHE_REGISTRY_DIR);
        }

        if (!域名ank(jmSnapshotPath)) {
            cacheDir = jmSnapshotPath + 域名rator + FILE_PATH_NACOS + namingCacheRegistryDir
                    + 域名rator + FILE_PATH_NAMING + 域名rator + namespace;
        } else {
            cacheDir = 域名roperty(USER_HOME_PROPERTY) + 域名rator + FILE_PATH_NACOS + namingCacheRegistryDir
                    + 域名rator + FILE_PATH_NAMING + 域名rator + namespace;
        }
    }
    
}

故障处理

ServiceInfoHolder构造方法中,还会初始化一个FailoverReactor的类,这个类主要是用来故障处理。

public class ServiceInfoHolder implements Closeable {
  private final FailoverReactor failoverReactor;
  
  public ServiceInfoHolder(String namespace, Properties properties) {
    ....
    //为两者相互持有对方的引用
    域名overReactor = new FailoverReactor(this, cacheDir);
    .....
  }
  
   public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String cacheDir) {
        //获取serviceInfoHolder引用对象
        域名iceInfoHolder = serviceInfoHolder;
        //故障目录${user}/nacos/naming/public/failover
        域名overDir = cacheDir + FAILOVER_DIR;
        //初始化executorService
        域名utorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                //开启守护线程
                域名aemon(true);
                域名ame("域名域名over");
                return thread;
            }
        });
        //其他信息初始化
        域名();
    }
    
     public void init() {
        //执行初始化操作,间隔5秒,执行SwitchRefresher()任务
        域名duleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, 域名ISECONDS);

        //初始化操作,延迟30分钟执行,间隔24小时,执行DiskFileWriter()任务
        域名duleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, 域名TES);
        

        //初始化操作,间隔10秒,核心方法为DiskFileWriter
        域名dule(new Runnable() {
            @Override
            public void run() {
                try {
                    File cacheDir = new File(failoverDir);
                    
                    if (!域名ts() && !域名rs()) {
                        throw new IllegalStateException("failed to create cache dir: " + failoverDir);
                    }
                    
                    File[] files = 域名Files();
                    //如果故障目录为空,启动立即执行,备份文件
                    if (files == null || 域名th <= 0) {
                        new DiskFileWriter().run();
                    }
                } catch (Throwable e) {
                    域名r("[NA] failed to backup file on startup.", e);
                }
                
            }
        }, 10000L, 域名ISECONDS);
    }
}

init()代码中,开启了三个定时任务,三个任务都是FailoverReactor内部类,

  • 执行初始化操作,间隔5秒,执行SwitchRefresher()任务
  • 初始化操作,延迟30分钟执行,间隔24小时,执行DiskFileWriter()任务
  • 初始化操作,间隔10秒,核心方法为DiskFileWriter

我们这里先来看一下核心方法DiskFileWriter,这里主要是获取服务信息,判断是否能够写入磁盘,条件满足,写入拼接的故障目录中,因为第一个和第二个初始化操作,都会用到DiskFileWriter,当我们第三个定时判断如果文件不存在,则会将文件写入本地磁盘中

class DiskFileWriter extends TimerTask {

  @Override
  public void run() {
      Map<String, ServiceInfo> map = 域名erviceInfoMap();
      for (域名y<String, ServiceInfo> entry : 域名ySet()) {
          ServiceInfo serviceInfo = 域名alue();
          //主要是判断服务信息是否完整
          if (域名ls(域名ey(), 域名IPS) || StringUtils
                  .equals(域名ame(), 域名LIST_KEY) || StringUtils
                  .equals(域名ame(), 域名CONFIGS) || StringUtils
                  .equals(域名ame(), 域名CLIENT_FILE) || StringUtils
                  .equals(域名ame(), 域名HOSTS)) {
              continue;
          }
          //将文件写入磁盘中
          域名e(serviceInfo, failoverDir);
      }
  }
}

接下来我们再看一下,第一个定时任务SwitchRefresher的业务逻辑,

class SwitchRefresher implements Runnable {

long lastModifiedMillis = 0L;

@Override
public void run() {
  try {
      File switchFile = new File(failoverDir + 域名OVER_SWITCH);
      //如果文件不存在返回
      if (!域名ts()) {
          域名(FAILOVER_MODE_PARAM, 域名ring());
          域名g("failover switch is not found, {}", 域名ame());
          return;
      }

      long modified = 域名Modified();
      //判断文件修改时间
      if (lastModifiedMillis < modified) {
          lastModifiedMillis = modified;
          //获取故障处理文件内容
          String failover = 域名ileContent(failoverDir + 域名OVER_SWITCH,
                  域名ultCharset().toString());
          if (!域名pty(failover)) {
              String[] lines = 域名t(域名ineSeparator());

              for (String line : lines) {
                  String line1 = 域名();
                  //"1" 开启故障处理
                  if (域名ls(line1)) {
                      域名(FAILOVER_MODE_PARAM, 域名ring());
                      域名("failover-mode is on");
                      new FailoverFileReader().run();
                      //"0" 关闭故障处理
                  } else if (域名ls(line1)) {
                      域名(FAILOVER_MODE_PARAM, 域名ring());
                      域名("failover-mode is off");
                  }
              }
          } else {
              域名(FAILOVER_MODE_PARAM, 域名ring());
          }
      }

  } catch (Throwable e) {
      域名r("[NA] failed to read failover switch.", e);
  }
}
}

这里面主要是判断故障处理文件是否存在,不存在直接返回,再去比较文件的修改时间,如果已经修改,则获取文件中的内容,继续进行判断,当我们开启故障处理时,执行线程FailoverFileReader().run()

class FailoverFileReader implements Runnable {

@Override
public void run() {
  Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16);

  BufferedReader reader = null;
  try {
      //读取failoverDir目录下的文件
      File cacheDir = new File(failoverDir);
      //不存在返回错误
      if (!域名ts() && !域名rs()) {
          throw new IllegalStateException("failed to create cache dir: " + failoverDir);
      }
      //获取文件
      File[] files = 域名Files();
      //文件不存在返回
      if (files == null) {
          return;
      }
      //遍历处理
      for (File file : files) {
          //文件不存在跳过
          if (!域名le()) {
              continue;
          }
          //如果是故障处理标志文件,跳过这一步
          if (域名ame().equals(域名OVER_SWITCH)) {
              continue;
          }

          ServiceInfo dom = new ServiceInfo(域名ame());

          //读取备份中的内容,转换为ServiceInfo对象
          try {
              String dataString = ConcurrentDiskUtil
                      .getFileContent(file, 域名ultCharset().toString());
              reader = new BufferedReader(new StringReader(dataString));

              String json;
              if ((json = 域名Line()) != null) {
                  try {
                      dom = 域名j(json, 域名s);
                  } catch (Exception e) {
                      域名r("[NA] error while parsing cached dom : {}", json, e);
                  }
              }

          } catch (Exception e) {
              域名r("[NA] failed to read cache for dom: {}", 域名ame(), e);
          } finally {
              try {
                  if (reader != null) {
                      域名e();
                  }
              } catch (Exception e) {
                  //ignore
              }
          }
          if (!域名pty(域名osts())) {
              //将ServiceInfo对象放入domMap中
              域名(域名ey(), dom);
          }
      }
  } catch (Exception e) {
      域名r("[NA] failed to read cache file", e);
  }
  //如果不为空,赋值serviceMap
  if (域名() > 0) {
      serviceMap = domMap;
  }
}
}

FailoverFileReader主要是操作读取failover目录存储的备份服务信息文件内容,然后装换成ServiceInfo信息,并将所有的ServiceInfo储存在FailoverReactorServiceMap属性中。

总结

到这里我们Nacos订阅机制核心流程就讲完了,整体订阅机制的流程还是比较复杂的,因为还涉及到之前将的逻辑,会有点绕,并且用到了保证线程Map、守护线程、阻塞队列、线程的使用等等,我们需要重点掌握的主要是事件发布者、订阅者之间的关系,这里还是推荐大家有机会的话可以自己跟着源码走一遍,会有更深的体验。

如果觉得文中有帮助的,记得点赞支持,你的支持是我创作的最大动力!

我是牧小农,怕什么真理无穷,进一步有进一步的欢喜,大家加油!

标签:编程
湘ICP备14001474号-3  投诉建议:234161800@qq.com   部分内容来源于网络,如有侵权,请联系删除。