当前位置:首页 > 单片机 > 架构师社区
[导读]作者:vivo互联网服务器团队-YeWenhao一、RocketMQ架构简介1.1逻辑部署图(图片来自网络)1.2核心组件说明通过上图可以看到,RocketMQ的核心组件主要包括4个,分别是NameServer、Broker、Producer和Consumer,下面我们先依次简单...

ckquote class="js_blockquote_wrap" data-type="2" data-url="" data-author-name="" data-content-utf8-length="25" data-source-title="">作者:vivo互联网服务器团队-Ye Wenhao

一、RocketMQ架构简介


1.1 逻辑部署图


深入剖析RocketMQ源码-NameServer

图片来自网络


1.2 核心组件说明


通过上图可以看到,RocketMQ的核心组件主要包括4个,分别是NameServer、Broker、Producer和Consumer,下面我们先依次简单说明下这四个核心组件:


NameServerNameServer充当路由信息的提供者。生产者或消费者能够通过NameServer查找各Topic相应的Broker IP列表。多个Namesrver实例组成集群,但相互独立,没有信息交换。


Broker:消息中转角色,负责存储消息、转发消息。Broker服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。Broker服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。


Producer:负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到Broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。


Consumer:负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。


除了上面说的三个核心组件外,还有Topic这个概念下面也会多次提到:


Topic:表示一类消息的集合,每个Topic包含若干条消息,每条消息只能属于一个Topic,是RocketMQ进行消息订阅的基本单位。一个Topic可以分片在多个Broker集群上,每一个Topic分片包含多个queue,具体结构可以参考下图:


深入剖析RocketMQ源码-NameServer


1.3 设计理念


RocketMQ是基于主题的发布与订阅模式,核心功能包括消息发送、消息存储、消息消费,整体设计追求简单与性能第一,归纳来说主要是下面三种:


  • NameServer取代ZK充当注册中心,NameServer集群间互不通信,容忍路由信息在集群内分钟级不一致,更加轻量级;

  • 使用内存映射机制实现高效的IO存储,达到高吞吐量;

  • 容忍设计缺陷,通过ACK确保消息至少消费一次,但是如果ACK丢失,可能消息重复消费,这种情况设计上允许,交给使用者自己保证。


本文重点介绍的就是NameServer,我们下面一起来看下NameServer是如何启动以及如何进行路由管理的。


二、NameServer架构设计


在第一章已经简单介绍了NameServer取代zk作为一种更轻量级的注册中心充当路由信息的提供者。那么具体是如何来实现路由信息管理的呢?我们先看下图:


深入剖析RocketMQ源码-NameServer


上面的图描述了NameServer进行路由注册、路由剔除和路由发现的核心原理。


路由注册:Broker服务器在启动的时候会想NameServer集群中所有的NameServer发送心跳信号进行注册,并会每隔30秒向nameserver发送心跳,告诉NameServer自己活着。NameServer接收到Broker发送的心跳包之后,会记录该broker信息,并保存最近一次收到心跳包的时间。


路由剔除NameServer和每个Broker保持长连接,每隔30秒接收Broker发送的心跳包,同时自身每个10秒扫描BrokerLiveTable,比较上次收到心跳时间和当前时间比较是否大于120秒,如果超过,那么认为Broker不可用,剔除路由表中该Broker相关信息。


路由发现:路由发现不是实时的,路由变化后,NameServer不主动推给客户端,等待producer定期拉取最新路由信息。这样的设计方式降低了NameServer实现的复杂性,当路由发生变化时通过在消息发送端的容错机制来保证消息发送的高可用(这块内容会在后续介绍producer消息发送时介绍,本文不展开讲解)。


高可用NameServer通过部署多台NameServer服务器来保证自身的高可用,同时多个NameServer服务器之间不进行通信,这样路由信息发生变化时,各个NameServer服务器之间数据可能不是完全相同的,但是通过发送端的容错机制保证消息发送的高可用。这个也正是NameServer追求简单高效的目的所在。


三、 启动流程


在整理了解了NameServer的架构设计之后,我们先来看下NameServer到底是如何启动的呢?


既然是源码解读,那么我们先来看下代码入口:org.apache.rocketmq.namesrv.NamesrvStartup#main(String[] args),实际调用的是main0()方法,代码如下:

public static NamesrvController main0(String[] args) {
try { //创建namesrvController NamesrvController controller = createNamesrvController(args); //初始化并启动NamesrvController start(controller); String tip = "The Name Server boot success. serializeType=" RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n", tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); }
return null;}

通过main方法启动NameServer,主要分为两大步,先创建NamesrvController,然后再初始化并启动NamesrvController。我们分别展开来分析。


3.1 时序图


具体展开阅读代码之前,我们先通过一个序列图对整体流程有个了解,如下图:


深入剖析RocketMQ源码-NameServer


3.2 创建NamesrvController


先来看核心代码,如下:

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { // 设置版本号为当前版本号 System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); //PackageConflictDetect.detectFastjson(); //构造org.apache.commons.cli.Options,并添加-h -n参数,-h参数是打印帮助信息,-n参数是指定namesrvAddr Options options = ServerUtil.buildCommandlineOptions(new Options()); //初始化commandLine,并在options中添加-c -p参数,-c指定nameserver的配置文件路径,-p标识打印配置信息 commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); return null; } //nameserver配置类,业务参数 final NamesrvConfig namesrvConfig = new NamesrvConfig(); //netty服务器配置类,网络参数 final NettyServerConfig nettyServerConfig = new NettyServerConfig(); //设置nameserver的端口号 nettyServerConfig.setListenPort(9876); //命令带有-c参数,说明指定配置文件,需要根据配置文件路径读取配置文件内容,并将文件中配置信息赋值给NamesrvConfig和NettyServerConfig if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); //反射的方式 MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); //设置配置文件路径 namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file); in.close(); } } //命令行带有-p,说明是打印参数的命令,那么就打印出NamesrvConfig和NettyServerConfig的属性。在启动NameServer时可以先使用./mqnameserver -c configFile -p打印当前加载的配置属性 if (commandLine.hasOption('p')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME); MixAll.printObjectProperties(console, namesrvConfig); MixAll.printObjectProperties(console, nettyServerConfig); //打印参数命令不需要启动nameserver服务,只需要打印参数即可 System.exit(0); } //解析命令行参数,并加载到namesrvConfig中 MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); //检查ROCKETMQ_HOME,不能为空 if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } //初始化logback日志工厂,rocketmq默认使用logback作为日志输出 LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); //创建NamesrvController final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
//将全局Properties的内容复制到NamesrvController.Configuration.allConfigs中 // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties);
return controller;}

通过上面对每一行代码的注释,可以看出来,创建NamesrvController的过程主要分为两步:

Step1:通过命令行中获取配置。赋值给NamesrvConfig和NettyServerConfig类。

Step2:根据配置类NamesrvConfig和NettyServerConfig构造一个NamesrvController实例。


可见NamesrvConfig和NettyServerConfig是想当重要的,这两个类分别是NameServer的业务参数和网络参数,我们分别看下这两个类里面有哪些属性:


NamesrvConfig

深入剖析RocketMQ源码-NameServer


NettyServerConfig

深入剖析RocketMQ源码-NameServer

注:Apache Commons CLI是开源的命令行解析工具,它可以帮助开发者快速构建启动命令,并且帮助你组织命令的参数、以及输出列表等。

3.3 初始化并启动


创建了NamesrvController实例之后,开始初始化并启动NameServer。


首先进行初始化,代码入口是NamesrvController#initialize。

public boolean initialize() { //加载kvConfigPath下kvConfig.json配置文件里的KV配置,然后将这些配置放到KVConfigManager#configTable属性中 this.kvConfigManager.load(); //根据nettyServerConfig初始化一个netty服务器。 //brokerHousekeepingService是在NamesrvController实例化时构造函数里实例化的,该类负责Broker连接事件的处理,实现了ChannelEventListener,主要用来管理RouteInfoManager的brokerLiveTable this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); //初始化负责处理Netty网络交互数据的线程池,默认线程数是8个 this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); //注册Netty服务端业务处理逻辑,如果开启了clusterTest,那么注册的请求处理类是ClusterTestRequestProcessor,否则请求处理类是DefaultRequestProcessor this.registerProcessor(); //注册心跳机制线程池,延迟5秒启动,每隔10秒遍历RouteInfoManager#brokerLiveTable这个属性,用来扫描不存活的broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //注册打印KV配置线程池,延迟1分钟启动、每10分钟打印出kvConfig配置 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); //rocketmq可以通过开启TLS来提高数据传输的安全性,如果开启了,那么需要注册一个监听器来重新加载SslContext if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // Register a listener to reload SslContext try { fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false; @Override public void onChanged(String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (certChanged
本站声明: 本文章由作者或相关机构授权发布,目的在于传递更多信息,并不代表本站赞同其观点,本站亦不保证或承诺内容真实性等。需要转载请联系该专栏作者,如若文章内容侵犯您的权益,请及时联系本站删除。
换一批
延伸阅读

LED驱动电源的输入包括高压工频交流(即市电)、低压直流、高压直流、低压高频交流(如电子变压器的输出)等。

关键字: 驱动电源

在工业自动化蓬勃发展的当下,工业电机作为核心动力设备,其驱动电源的性能直接关系到整个系统的稳定性和可靠性。其中,反电动势抑制与过流保护是驱动电源设计中至关重要的两个环节,集成化方案的设计成为提升电机驱动性能的关键。

关键字: 工业电机 驱动电源

LED 驱动电源作为 LED 照明系统的 “心脏”,其稳定性直接决定了整个照明设备的使用寿命。然而,在实际应用中,LED 驱动电源易损坏的问题却十分常见,不仅增加了维护成本,还影响了用户体验。要解决这一问题,需从设计、生...

关键字: 驱动电源 照明系统 散热

根据LED驱动电源的公式,电感内电流波动大小和电感值成反比,输出纹波和输出电容值成反比。所以加大电感值和输出电容值可以减小纹波。

关键字: LED 设计 驱动电源

电动汽车(EV)作为新能源汽车的重要代表,正逐渐成为全球汽车产业的重要发展方向。电动汽车的核心技术之一是电机驱动控制系统,而绝缘栅双极型晶体管(IGBT)作为电机驱动系统中的关键元件,其性能直接影响到电动汽车的动力性能和...

关键字: 电动汽车 新能源 驱动电源

在现代城市建设中,街道及停车场照明作为基础设施的重要组成部分,其质量和效率直接关系到城市的公共安全、居民生活质量和能源利用效率。随着科技的进步,高亮度白光发光二极管(LED)因其独特的优势逐渐取代传统光源,成为大功率区域...

关键字: 发光二极管 驱动电源 LED

LED通用照明设计工程师会遇到许多挑战,如功率密度、功率因数校正(PFC)、空间受限和可靠性等。

关键字: LED 驱动电源 功率因数校正

在LED照明技术日益普及的今天,LED驱动电源的电磁干扰(EMI)问题成为了一个不可忽视的挑战。电磁干扰不仅会影响LED灯具的正常工作,还可能对周围电子设备造成不利影响,甚至引发系统故障。因此,采取有效的硬件措施来解决L...

关键字: LED照明技术 电磁干扰 驱动电源

开关电源具有效率高的特性,而且开关电源的变压器体积比串联稳压型电源的要小得多,电源电路比较整洁,整机重量也有所下降,所以,现在的LED驱动电源

关键字: LED 驱动电源 开关电源

LED驱动电源是把电源供应转换为特定的电压电流以驱动LED发光的电压转换器,通常情况下:LED驱动电源的输入包括高压工频交流(即市电)、低压直流、高压直流、低压高频交流(如电子变压器的输出)等。

关键字: LED 隧道灯 驱动电源
关闭