RocketMQ原始碼分析之BrokerController核心元件TopicConfigManager

一、前言

看完了前面的幾篇,我們都知道Broker中核心元件是由BrokerController控制的,上一篇我們分析完ConsumerOffsetManager消費偏移量管理元件,這一篇我們就來分析一下TopicConfigManager;

這個類繼承ConfigManager,用於管理

broker負責的topic元資料管理;

RocketMQ原始碼分析之BrokerController核心元件TopicConfigManager

二、原始碼導讀

topicConfigTable:用於儲存每個topic對應的配置

dataVersion:用於記錄當前配置的版本

TopicConfigManager的方法都是圍繞

topicConfigTable資料結構來進行操作。如果不出意外,

TopicConfigManager繼承至

ConfigManager,肯定需要實現

configFilePath抽象方法,這個方法上一篇也分析過,是獲取持久化檔案路徑的,我們可以看一下;

public String configFilePath() { return BrokerPathConfigHelper。getTopicConfigPath(this。brokerController。getMessageStoreConfig() 。getStorePathRootDir());}

public static String getTopicConfigPath(final String rootDir) { return rootDir + File。separator + “config” + File。separator + “topics。json”;}

@ImportantFieldprivate String storePathRootDir = System。getProperty(“user。home”) + File。separator + “store”;

我們還是在data目錄下看一下這個檔案的資料結構

RocketMQ原始碼分析之BrokerController核心元件TopicConfigManager

資料結構如下,有刪簡:

{ “dataVersion”:{ “counter”:3, “timestamp”:1650693834293 }, “topicConfigTable”:{ “SCHEDULE_TOPIC_XXXX”:{ “order”:false, “perm”:6, “readQueueNums”:18, “topicFilterType”:“SINGLE_TAG”, “topicName”:“SCHEDULE_TOPIC_XXXX”, “topicSysFlag”:0, “writeQueueNums”:18 }, “TopicTest”:{ “order”:false, “perm”:6, “readQueueNums”:4, “topicFilterType”:“SINGLE_TAG”, “topicName”:“TopicTest”, “topicSysFlag”:0, “writeQueueNums”:4 }, “SELF_TEST_TOPIC”:{ “order”:false, “perm”:6, “readQueueNums”:1, “topicFilterType”:“SINGLE_TAG”, “topicName”:“SELF_TEST_TOPIC”, “topicSysFlag”:0, “writeQueueNums”:1 }, “DefaultCluster”:{ “order”:false, “perm”:7, “readQueueNums”:16, “topicFilterType”:“SINGLE_TAG”, “topicName”:“DefaultCluster”, “topicSysFlag”:0, “writeQueueNums”:16 } }}

三、原始碼分析

構造方法;

根據topic名稱查詢他的元資料;

在傳送訊息的方法中建立topic;

在傳送訊息的備用方法中,建立topic;

更新topic元資料;

更新topicConfigTable,以orderKVTableFromNs為準;

看一個topic是否是order的;

刪除一個topic;

編碼;

1、構造方法

主要完成系統topic配置的初始化工作

// 繼承了ConfigManager,他會把topic元資料是可以支援進行持久化public class TopicConfigManager extends ConfigManager { private static final InternalLogger log = InternalLoggerFactory。getLogger( LoggerName。BROKER_LOGGER_NAME); // 鎖超時時間 private static final long LOCK_TIMEOUT_MILLIS = 3000; // 排程topic佇列數量,是18個 private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18; // topicConfigTable鎖 private transient final Lock topicConfigTableLock = new ReentrantLock(); // topic元資料核心資料結構,key是topicName,value是主題對應的配置 private final ConcurrentMap topicConfigTable = new ConcurrentHashMap(1024); // 資料版本號 private final DataVersion dataVersion = new DataVersion(); // broker核心控制組件 private transient BrokerController brokerController; public TopicConfigManager() { } public TopicConfigManager(BrokerController brokerController) { this。brokerController = brokerController; // 搞了一個一個的程式碼塊,每個程式碼塊裡初始化好一些資料,是內建的一些topic // 第一個內部topic是SELF_TEST_TOPIC,這個topic一看是用於進行自我測試的一個topic,快速測試mq能否跑通 { String topic = TopicValidator。RMQ_SYS_SELF_TEST_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); TopicValidator。addSystemTopic(topic); topicConfig。setReadQueueNums(1); topicConfig。setWriteQueueNums(1); this。topicConfigTable。put(topicConfig。getTopicName(), topicConfig); } // 是否啟用自動建立topic機制,如果啟用了以後,則搞一個system topic,TBW102 // 這個topic一看就是跟自動建立topic機制是有關係的 { if (this。brokerController。getBrokerConfig()。isAutoCreateTopicEnable()) { String topic = TopicValidator。AUTO_CREATE_TOPIC_KEY_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); TopicValidator。addSystemTopic(topic); topicConfig。setReadQueueNums(this。brokerController。getBrokerConfig() 。getDefaultTopicQueueNums()); topicConfig。setWriteQueueNums(this。brokerController。getBrokerConfig() 。getDefaultTopicQueueNums()); int perm = PermName。PERM_INHERIT | PermName。PERM_READ | PermName。PERM_WRITE; topicConfig。setPerm(perm); this。topicConfigTable。put(topicConfig。getTopicName(), topicConfig); } } // BenchmarkTest,benchmark測試的topic,設定1024個讀寫佇列 // 是用於進行壓力測試的topic { String topic = TopicValidator。RMQ_SYS_BENCHMARK_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); TopicValidator。addSystemTopic(topic); topicConfig。setReadQueueNums(1024); topicConfig。setWriteQueueNums(1024); this。topicConfigTable。put(topicConfig。getTopicName(), topicConfig); } // 建立一個當前的這個broker機器所屬的broker cluster叢集名字,針對這個cluster叢集也是一個topic // 是用於寫入和消費當前的broker叢集自身的一些元資料的topic { String topic = this。brokerController。getBrokerConfig()。getBrokerClusterName(); TopicConfig topicConfig = new TopicConfig(topic); TopicValidator。addSystemTopic(topic); int perm = PermName。PERM_INHERIT; if (this。brokerController。getBrokerConfig()。isClusterTopicEnable()) { perm |= PermName。PERM_READ | PermName。PERM_WRITE; } topicConfig。setPerm(perm); this。topicConfigTable。put(topicConfig。getTopicName(), topicConfig); } // 對於當前的broker分組,他也是一個topic,等於可能讀寫一些broker分組的元資料 { String topic = this。brokerController。getBrokerConfig()。getBrokerName(); TopicConfig topicConfig = new TopicConfig(topic); TopicValidator。addSystemTopic(topic); int perm = PermName。PERM_INHERIT; if (this。brokerController。getBrokerConfig()。isBrokerTopicEnable()) { perm |= PermName。PERM_READ | PermName。PERM_WRITE; } topicConfig。setReadQueueNums(1); topicConfig。setWriteQueueNums(1); topicConfig。setPerm(perm); this。topicConfigTable。put(topicConfig。getTopicName(), topicConfig); } // offset遷移事件topic { String topic = TopicValidator。RMQ_SYS_OFFSET_MOVED_EVENT; TopicConfig topicConfig = new TopicConfig(topic); TopicValidator。addSystemTopic(topic); topicConfig。setReadQueueNums(1); topicConfig。setWriteQueueNums(1); this。topicConfigTable。put(topicConfig。getTopicName(), topicConfig); } // 類似於定時排程訊息topic { String topic = TopicValidator。RMQ_SYS_SCHEDULE_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); TopicValidator。addSystemTopic(topic); topicConfig。setReadQueueNums(SCHEDULE_TOPIC_QUEUE_NUM); topicConfig。setWriteQueueNums(SCHEDULE_TOPIC_QUEUE_NUM); this。topicConfigTable。put(topicConfig。getTopicName(), topicConfig); } // 如果說要是啟用了追蹤topic之後,拿到msg trace topic名字,一般來說預設應該是不啟用的 { if (this。brokerController。getBrokerConfig()。isTraceTopicEnable()) { String topic = this。brokerController。getBrokerConfig()。getMsgTraceTopicName(); TopicConfig topicConfig = new TopicConfig(topic); TopicValidator。addSystemTopic(topic); topicConfig。setReadQueueNums(1); topicConfig。setWriteQueueNums(1); this。topicConfigTable。put(topicConfig。getTopicName(), topicConfig); } } // broker叢集reply訊息topic { String topic = this。brokerController。getBrokerConfig()。getBrokerClusterName() + “_” + MixAll。REPLY_TOPIC_POSTFIX; TopicConfig topicConfig = new TopicConfig(topic); TopicValidator。addSystemTopic(topic); topicConfig。setReadQueueNums(1); topicConfig。setWriteQueueNums(1); this。topicConfigTable。put(topicConfig。getTopicName(), topicConfig); } }}

2、根據topic名稱查詢他的元資料

// 根據topic名稱查詢他的元資料public TopicConfig selectTopicConfig(final String topic) { return this。topicConfigTable。get(topic);}

3、傳送訊息的方法中建立topic

在傳送訊息的方法中,建立topic,返回對應的TopicConfig,步驟如下:

topicConfigTable有記錄直接返回;

defaultTopicConfig有記錄,且有繼承許可權的話,就建立;

// 在傳送訊息的方法中建立topic// 如果說你開啟了自動建立topic功能,傳送訊息的時候直接針對一個不存在的topic去傳送訊息// 這個訊息到了broker了以後,此時會嘗試呼叫這個方法去建立一個新的topic// 但是此時這個topic是不存在的,會嘗試把預設的topic,TBW102這個topic給拿出來// 如果說沒有開啟自動建立topic,此時預設topic的perm只有read write,沒有inherit,如果開啟了呢// 開啟了以後預設是有inherit,此時你要建立的這個新的topic可以繼承預設的TBW102的元資料配置public TopicConfig createTopicInSendMessageMethod( final String topic, // topic名稱,傳送到哪個topic裡去 final String defaultTopic, // 預設topic final String remoteAddress, // 遠端機器的地址傳送訊息過來 final int clientDefaultTopicQueueNums, // 客戶端預設的topic queue數量 final int topicSysFlag) { // 是否是system topic TopicConfig topicConfig = null; boolean createNew = false; try { // 嘗試獲取一把鎖,超時時間是3s if (this。topicConfigTableLock。tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit。MILLISECONDS)) { try { // 當前如果說這個topic的元資料不為null,就說明這個topic已經建立過了,此時不能重複建立,直接返回就可以了 topicConfig = this。topicConfigTable。get(topic); if (topicConfig != null) return topicConfig; // 如果說topic元資料為null,此時就嘗試獲取預設topic的元資料 TopicConfig defaultTopicConfig = this。topicConfigTable。get(defaultTopic); if (defaultTopicConfig != null) { // 如果說預設topic是TBW102 if (defaultTopic。equals(TopicValidator。AUTO_CREATE_TOPIC_KEY_TOPIC)) { // 當前broker是否啟用了topic自動建立的機制,如果說沒有啟用 // 此時就設定一下對應的許可權 if (!this。brokerController。getBrokerConfig()。isAutoCreateTopicEnable()) { defaultTopicConfig。setPerm(PermName。PERM_READ | PermName。PERM_WRITE); } } // 預設topic元資料的perm如果是有繼承性,擁有inherit這樣的一個屬性,此時就可以往下走 // 如果說要是開啟了自動建立topic,此時TBW102是有inherit繼承特性的 // 如果說可以繼承,此時就可以往下走 if (PermName。isInherited(defaultTopicConfig。getPerm())) { // 要建立的topic元資料去構建出來 topicConfig = new TopicConfig(topic); // 客戶端預設topic裡面的queue數量 // 如果說你要是想要用自動建立topic,但是自動創建出來的topic在broker裡有幾個queue呢? // 所以說客戶端那邊是可以有一個預設設定的topic queue數量,跟預設topic寫queue數量 int queueNums = Math。min( clientDefaultTopicQueueNums, defaultTopicConfig。getWriteQueueNums() ); if (queueNums < 0) { queueNums = 0; } // 把自動建立的topic的讀寫佇列都設定為這個數量 topicConfig。setReadQueueNums(queueNums); topicConfig。setWriteQueueNums(queueNums); // 獲取預設topic獲取到一個許可權,透過這個許可權取消一下里麵包含的inherit這個許可權 int perm = defaultTopicConfig。getPerm(); perm &= ~PermName。PERM_INHERIT; // 把這個許可權設定到新的topic裡去 topicConfig。setPerm(perm); topicConfig。setTopicSysFlag(topicSysFlag); // 對於他的sys flag標識也繼承自預設topic // 設定topic過濾型別 topicConfig。setTopicFilterType(defaultTopicConfig。getTopicFilterType()); } else { // default topic 沒有繼承許可權 log。warn(“Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]”, defaultTopic, defaultTopicConfig。getPerm(), remoteAddress); } } else { // defaultTopic就不存在 log。warn(“Create new topic failed, because the default topic[{}] not exist。 producer:[{}]”, defaultTopic, remoteAddress); } // 如果說topic元資料不為空,就說明自動創建出來了一個topic if (topicConfig != null) { log。info(“Create new topic by default topic:[{}] config:[{}] producer:[{}]”, defaultTopic, topicConfig, remoteAddress); // 把新創建出來的topic元資料給放入到table裡去 this。topicConfigTable。put(topic, topicConfig); // 把資料版本號做一個迭代累加,每次topic元資料有變動,都得累加一下資料版本號 this。dataVersion。nextVersion(); createNew = true; // 更新了topic元資料之後,此時做一個persist this。persist(); } } finally { this。topicConfigTableLock。unlock(); } } } catch (InterruptedException e) { log。error(“createTopicInSendMessageMethod exception”, e); } if (createNew) { // 因為此時在broker裡管理的topic有變動,我新增加要管理一個topic一部分的queues // 此時可以什麼去對nameserver去做一個註冊,topic->queues->broker->機器 this。brokerController。registerBrokerAll( false, // 是否檢查order config,是false true, // 是否oneway單向來通訊,不要他的響應,這個是true true // 強制註冊,是true ); } return topicConfig;}

4、在傳送訊息的備用方法中,建立topic

topicConfigTable有記錄就用已有記錄;

若沒有記錄;

2。1、生成TopicConfig,記錄在table中,版本更新,持久化到檔案;

2。2、註冊broker;

public TopicConfig createTopicInSendMessageBackMethod( final String topic, final int clientDefaultTopicQueueNums, final int perm, final int topicSysFlag) { TopicConfig topicConfig = this。topicConfigTable。get(topic); // 有記錄就直接返回 if (topicConfig != null) return topicConfig; boolean createNew = false; try { if (this。topicConfigTableLock。tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit。MILLISECONDS)) { try { topicConfig = this。topicConfigTable。get(topic); if (topicConfig != null) return topicConfig; topicConfig = new TopicConfig(topic); topicConfig。setReadQueueNums(clientDefaultTopicQueueNums); topicConfig。setWriteQueueNums(clientDefaultTopicQueueNums); topicConfig。setPerm(perm); topicConfig。setTopicSysFlag(topicSysFlag); log。info(“create new topic {}”, topicConfig); // 新增記錄,版本更新,持久化 this。topicConfigTable。put(topic, topicConfig); createNew = true; this。dataVersion。nextVersion(); this。persist(); } finally { this。topicConfigTableLock。unlock(); } } } catch (InterruptedException e) { log。error(“createTopicInSendMessageBackMethod exception”, e); } if (createNew) { // 重新註冊broker this。brokerController。registerBrokerAll(false, true, true); } return topicConfig;}

5、更新topic元資料

從topicConfigTable取出對應name記錄,put方法有資料就說明之前有資料,則是update沒有資料則為create;

dataVersion更新;

持久化配置到檔案;

// 更新topic元資料public void updateTopicConfig(final TopicConfig topicConfig) { // 從topicConfigTable取出對應name記錄 TopicConfig old = this。topicConfigTable。put(topicConfig。getTopicName(), topicConfig); if (old != null) { log。info(“update topic config, old:[{}] new:[{}]”, old, topicConfig); } else { log。info(“create new topic [{}]”, topicConfig); } // dataVersion更新 this。dataVersion。nextVersion(); // 持久化配置到檔案 this。persist();}

6、更新topicConfigTable,以orderKVTableFromNs為準

// 更新topicConfigTable,以orderKVTableFromNs為準public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) { if (orderKVTableFromNs != null && orderKVTableFromNs。getTable() != null) { boolean isChange = false; Set orderTopics = orderKVTableFromNs。getTable()。keySet(); // orderKVTableFromNs有記錄的,topicConfigTable全部設定為有順序 for (String topic : orderTopics) { TopicConfig topicConfig = this。topicConfigTable。get(topic); if (topicConfig != null && !topicConfig。isOrder()) { topicConfig。setOrder(true); isChange = true; log。info(“update order topic config, topic={}, order={}”, topic, true); } } // topicConfigTable中記錄有順序的,但是在orderKVTableFromNs沒有記錄的,全部更新為無順序 for (Map。Entry entry : this。topicConfigTable。entrySet()) { String topic = entry。getKey(); if (!orderTopics。contains(topic)) { TopicConfig topicConfig = entry。getValue(); if (topicConfig。isOrder()) { topicConfig。setOrder(false); isChange = true; log。info(“update order topic config, topic={}, order={}”, topic, false); } } } if (isChange) { this。dataVersion。nextVersion(); this。persist(); } }}

7、看一個topic是否是order的

// 看一個topic是否是order的public boolean isOrderTopic(final String topic) { TopicConfig topicConfig = this。topicConfigTable。get(topic); if (topicConfig == null) { return false; } else { return topicConfig。isOrder(); }}

8、刪除一個topic

// 刪除一個topic// 如果刪除成功,版本更新,持久化配置到檔案public void deleteTopicConfig(final String topic) { TopicConfig old = this。topicConfigTable。remove(topic); if (old != null) { log。info(“delete topic config OK, topic: {}”, old); this。dataVersion。nextVersion(); this。persist(); } else { log。warn(“delete topic config failed, topic: {} not exists”, topic); }}

9、編碼

根據topicConfigTable和dataVersion 構建 TopicConfigSerializeWrapper;

呼叫wrapper的toJson方法;

public String encode(final boolean prettyFormat) { TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); topicConfigSerializeWrapper。setTopicConfigTable(this。topicConfigTable); topicConfigSerializeWrapper。setDataVersion(this。dataVersion); return topicConfigSerializeWrapper。toJson(prettyFormat);}

一些沒有被呼叫的方法

歷史原因吧,這裡不展開原始碼了

updateTopicUnitFlag:更新topic的unit標識updateTopicUnitSubFlag:更新topic裡面unit sub flag

四、總結

RocketMQ原始碼分析之BrokerController核心元件TopicConfigManager