rocket源码阅读(二)
文章目录
NamesrvController的核心组件
NameServer主要作用是为消息生产者和消息消费者提供关于主题的Topic的路由消息,那么NameServer需要存储路由的基础信息,还要能够管理Broker节点,包括路由的注册、路由的删除。NamesrvController有几个核心组件,分别是,kvConfigMangager,routeInfoManager和remotingServer
1. routeInfoManger
1.1 主要属性
1 | la'susterName */, Set<String/* brokerName */>> clusterAddrTable; |
topicQueueTable
Topic路由信息,消息发送时根据路由负载均衡。
1
2
3
4
5
6
7
8//一个Topic拥有多个消息队列,一个broker为每一主题创建readQueueNums个读队列
//writeQueueNums个写队列。
public class QueueData implements Comparable<QueueData> {
private String brokerName;
private int readQueueNums;
private int writeQueueNums;
private int perm; //读写权限,详见org.apache.rocketmq.common.constant.PermName
private int topicSynFlag; //topic同步标记,详见org.apache.rocketmq.common.sysflag.TopicSysFlagbrokerAddrTable
broker及时信息,包括brokerName,所属集群名称,主备Broker地址
1
2
3
4
5
6public class BrokerData implements Comparable<BrokerData> {
private String cluster;
private String brokerName;
//BrokerName名字相同的多台机器组成Master-slave架构,通过brokerId做区分
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
}clusterAddrTable
broker集群信息,存储集群及集群锁包含的broker。多个Broker组成一个集群。
brokerLivetable
broker的状态信息,NameServer每次收到心跳包时会替换该信息
1
2
3
4
5
6
7class BrokerLiveInfo {
private long lastUpdateTimestamp; //NameServer上次收到心跳包时间
private DataVersion dataVersion;
private Channel channel;
private String haServerAddr; //master地址,初次请求是值为空,slave向NameServer注册后返回
}filterServerTable
broker上的FilterServer列表,用于类模式消息过滤
1.2 路由注册
RocketMQ的路由注册时通过Broker与NameServer的心跳实现的。Broker启动时向集群中所有的NameServer发送心跳语句,每隔30s向集群中所有NameServer发送心跳包,NameServer收到心跳包后更新lastUpdateTimestamp。
1.2.1 broker发送心跳包
1 | //BrokerController |
核心心跳代码维护在BrokerOuterAPI
- 使用了CountDownLatch来并发向所有NameServer发送心跳消息,同时等到此时心跳发送结果
- 心跳包包含topic的信息以及broker属性相关
1 | public List<RegisterBrokerResult> registerBrokerAll( |
1.2.2 NameServer处理心跳包
RouteInfoManager注册broker
1 | public RegisterBrokerResult registerBroker( |
1.3 删除路由
routInfoManager每隔10秒扫面一次brokerLiveTable,如果lastUpdateTimestamp滞后当前系统时间超过BROKER_CHANNEL_EXPIRED_TIME
,认为broker节点失效,关闭与broker的连接,移除broker。调用onChannelDestory
方法同时更新topicQueueTabel、brokerAddrTable、brokerLiveTable和filterServerTable
1 | public void scanNotActiveBroker() { |
NameServer与broker之间通过RemotingUtils保持长连接。
1.4 路由发现
RocketMQ路由发现是非实时的, 当Topic路由出现变化后,NameServer不主动推送给客户端,而是由客户端定时拉取Topic最新的路由。通过发送RequestCode.GET_ROUTEINTO_BY_TOPIC
到DefaultRequestProcesser
。
1 | public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, |
1.5 ReentrantReadWriteLock
在onChannelDestory
中用到了读写锁。
- routeInfoManager中维护的信息都是多线程竞争使用的,会被频繁的读取,而销毁则需要加锁,通过读写锁实现读写分离
- 先加读锁的目的,写锁是一个排他锁,直接加写锁会导致其他线程拿不到读锁,而此时channel不存在,固造成线程阻塞,资源浪费,先用读锁判断channel存在,此时就必须加写锁进行broker信息删除。
1 | this.lock.readLock().lockInterruptibly(); //获取锁,如果线程被中断,自动释放锁 |
1 | this.lock.writeLock().lockInterruptibly(); |
这里有一些优化,虽然scan方法是有NameServer定时执行,不会存并发调用这个方法,但是代码还是可以优化一下的,将if判定放到加锁之后更合适
1 | if (brokerAddrFound != null && brokerAddrFound.length() > 0) { |
2. 框图总结
3. 尚存疑问
- nameserver需要120s才能移除宕机Broker,此时producer根据路由信息路由到了宕机的broker如何处理
- nameserver如何借助topicQueueTable实现负载均衡的
- kvConfigManager和brokerHouseKeepingService的作用