NameServer架构设计 NameServer提供服务发现的功能,维护了broker的地址列表和Topic及Topic对应队列的地址列表,与每一个broker保持心跳连接,检查broker是否存活,broker消息服务器在启动时向所有NameServer注册。在producer和consumer需要发布或者消费消息时,向nameserver发出请求来获取连接。NameServer本身的高可用可以通过部署多台NameServer服务器来实现,但彼此之间互不通信,也就是NameServer服务器之间在某个时刻的数据并不会完全相同,但这对消息发送不会造成任何影响,RoketMQ NameServer设计追求简单高效。
NameServer启动 NameServer通过org.apache.rocketmq.namesrv.NamesrvStartup启动。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static void main (String[] args) { main0(args); } public static NamesrvController main0 (String[] args) { try { NamesrvController controller = createNamesrvController(args); start(controller); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n" , tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1 ); } return null ; }
1. NameServer创建 主要是创建NameServerConfig和NettyServerConfig,生成NameServerController
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 public static NamesrvController createNamesrvController (String[] args) throws IOException, JoranException { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine("mqnamesrv" , args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1 ); return null ; } final NamesrvConfig namesrvConfig = new NamesrvConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); nettyServerConfig.setListenPort(9876 ); if (commandLine.hasOption('c' )) { String file = commandLine.getOptionValue('c' ); if (file != null ) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, %s%n" , file); in.close(); } } if (commandLine.hasOption('p' )) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME); MixAll.printObjectProperties(console, namesrvConfig); MixAll.printObjectProperties(console, nettyServerConfig); System.exit(0 ); } MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n" , MixAll.ROCKETMQ_HOME_ENV); System.exit(-2 ); } LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml" ); log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); controller.getConfiguration().registerConfig(properties); return controller; }
1.1 NameServerConfig属性 1 2 3 4 5 6 7 8 9 10 private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));private String kvConfigPath = System.getProperty("user.home" ) + File.separator + "namesrv" + File.separator + "kvConfig.json" ;private String configStorePath = System.getProperty("user.home" ) + File.separator + "namesrv" + File.separator + "namesrv.properties" ;private String productEnvName = "center" ;private boolean clusterTest = false ;private boolean orderMessageEnable = false ;
1.2 NettyServerConfig属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private int listenPort = 8888 ;private int serverWorkerThreads = 8 ;private int serverCallbackExecutorThreads = 0 ;private int serverSelectorThreads = 3 ;private int serverOnewaySemaphoreValue = 256 ;private int serverAsyncSemaphoreValue = 64 ;private int serverChannelMaxIdleTimeSeconds = 120 ;private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;private boolean serverPooledByteBufAllocatorEnable = true ; private boolean useEpollNativeSelector = false ;
1.3 创建NamesrvController实例 通过启动属性创建NamesrvController实例,NamesrvController是NameServer的核心控制器
1 2 3 4 5 6 7 8 9 10 11 12 public NamesrvController (NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) { this .namesrvConfig = namesrvConfig; this .nettyServerConfig = nettyServerConfig; this .kvConfigManager = new KVConfigManager(this ); this .routeInfoManager = new RouteInfoManager(); this .brokerHousekeepingService = new BrokerHousekeepingService(this ); this .configuration = new Configuration( log, this .namesrvConfig, this .nettyServerConfig ); this .configuration.setStorePathFromConfig(this .namesrvConfig, "configStorePath" ); }
2. 启动NamesrvController 初始化NamesrvController并创建JVM Hook
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public static NamesrvController start (final NamesrvController controller) throws Exception { if (null == controller) { throw new IllegalArgumentException("NamesrvController is null" ); } boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3 ); } Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call () throws Exception { controller.shutdown(); return null ; } })); controller.start(); return controller; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 public boolean initialize () { this .kvConfigManager.load(); this .remotingServer = new NettyRemotingServer(this .nettyServerConfig, this .brokerHousekeepingService); this .remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_" )); this .registerProcessor(); this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { NamesrvController.this .routeInfoManager.scanNotActiveBroker(); } }, 5 , 10 , TimeUnit.SECONDS); this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { NamesrvController.this .kvConfigManager.printAllPeriodically(); } }, 1 , 10 , TimeUnit.MINUTES); if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { try { fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false ; @Override public void onChanged (String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context" ); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true ; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true ; } if (certChanged && keyChanged) { log.info("The certificate and private key changed, reload the ssl context" ); certChanged = keyChanged = false ; reloadServerSslContext(); } } private void reloadServerSslContext () { ((NettyRemotingServer) remotingServer).loadSslContext(); } }); } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically" ); } } return true ; }
2.1 监听broker NameServer中的routeInfoManager的会定时检测broker的心跳,如果broker与服务器的心跳连接延迟超过两分钟,broker就会被移除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void scanNotActiveBroker () { Iterator<Entry<String, BrokerLiveInfo>> it = this .brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms" , next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); this .onChannelDestroy(next.getKey(), next.getValue().getChannel()); } } }