前言
Kafka是我們日常的流處理任務中最為常用的資料來源之一。隨著資料型別和資料量的增大,難免要增加新的Kafka topic,或者為已有的topic增加更多partition。那麼,Kafka後面作為消費者的實時處理引擎是如何感知到topic和partition變化的呢?本文以Spark Streaming和Flink為例來簡單探究一下。
Spark Streaming的場合
根據官方文件(如上圖),spark-streaming-kafka-0-10才支援Kafka的動態感知(即Dynamic Topic Subscription),翻翻原始碼,來到o。a。s。streaming。kafka010。DirectKafkaInputDStream類,每個微批次都會呼叫的compute()方法的第一行。
val untilOffsets = clamp(latestOffsets())
顧名思義,clamp()方法用來限制資料的流量,這裡不提。而latestOffsets()方法返回各個partition當前最近的offset值,其具體實現如下(包含它呼叫的paranoidPoll()方法)。
/** * Returns the latest (highest) available offsets, taking new partitions into account。 */protected def latestOffsets(): Map[TopicPartition, Long] = { val c = consumer paranoidPoll(c) val parts = c。assignment()。asScala // make sure new partitions are reflected in currentOffsets val newPartitions = parts。diff(currentOffsets。keySet) // Check if there‘s any partition been revoked because of consumer rebalance。 val revokedPartitions = currentOffsets。keySet。diff(parts) if (revokedPartitions。nonEmpty) { throw new IllegalStateException(s“Previously tracked partitions ” + s“${revokedPartitions。mkString(”[“, ”,“, ”]“)} been revoked by Kafka because of consumer ” + s“rebalance。 This is mostly due to another stream with same group id joined, ” + s“please check if there’re different streaming application misconfigure to use same ” + s“group id。 Fundamentally different stream should use different group id”) } // position for new partitions determined by auto。offset。reset if no commit currentOffsets = currentOffsets ++ newPartitions。map(tp => tp -> c。position(tp))。toMap // find latest available offsets c。seekToEnd(currentOffsets。keySet。asJava) parts。map(tp => tp -> c。position(tp))。toMap}/** * The concern here is that poll might consume messages despite being paused, * which would throw off consumer position。 Fix position if this happens。 */private def paranoidPoll(c: Consumer[K, V]): Unit = { // don‘t actually want to consume any messages, so pause all partitions c。pause(c。assignment()) val msgs = c。poll(0) if (!msgs。isEmpty) { // position should be minimum offset per topicpartition msgs。asScala。foldLeft(Map[TopicPartition, Long]()) { (acc, m) => val tp = new TopicPartition(m。topic, m。partition) val off = acc。get(tp)。map(o => Math。min(o, m。offset))。getOrElse(m。offset) acc + (tp -> off) }。foreach { case (tp, off) => logInfo(s“poll(0) returned messages, seeking $tp to $off to compensate”) c。seek(tp, off) } }}
可見,在每次compute()方法執行時,都會透過paranoidPoll()方法來seek到每個TopicPartition對應的offset位置,並且透過latestOffsets()方法找出那些新加入的partition,並維護它們的offset,實現了動態感知。
由上也可以看出,Spark Streaming無法處理Kafka Consumer的Rebalance,所以一定要為不同的Streaming App設定不同的group。id。
Flink的場合
根據官方文件(如上圖),Flink是支援Topic/Partition Discovery的,但是預設並未開啟,需要手動配置flink。partition-discovery。interval-millis引數,即動態感知新topic/partition的間隔,單位毫秒。
Flink Kafka Source的基類時o。a。f。streaming。connectors。kafka。FlinkKafkaConsumerBase抽象類,在其run()方法中,會先建立獲取資料的KafkaFetcher,再判斷是否啟用了動態感知。
this。kafkaFetcher = createFetcher( sourceContext, subscribedPartitionsToStartOffsets, watermarkStrategy, (StreamingRuntimeContext) getRuntimeContext(), offsetCommitMode, getRuntimeContext()。getMetricGroup()。addGroup(KAFKA_CONSUMER_METRICS_GROUP), useMetrics);if (!running) { return;}// depending on whether we were restored with the current state version (1。3),// remaining logic branches off into 2 paths:// 1) New state - partition discovery loop executed as separate thread, with this// thread running the main fetcher loop// 2) Old state - partition discovery is disabled and only the main fetcher loop is executedif (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) { kafkaFetcher。runFetchLoop();} else { runWithPartitionDiscovery();}
如果啟用了,最終會呼叫createAndStartDiscoveryLoop()方法,啟動一個單獨的執行緒,負責以discoveryIntervalMillis為週期發現新的topic/partition,並傳遞給KafkaFetcher。
private void createAndStartDiscoveryLoop(AtomicReference
可見,Flink透過名為PartitionDiscoverer的元件來實現動態感知。上面的程式碼中呼叫了discoverPartitions()方法,其原始碼如下。
public List
首先,會根據傳入的是單個固定的topic還是由正則表示式指定的多個topics來分別處理,最終都呼叫getAllPartitionsForTopics()方法來獲取這些topic的所有partition(這個方法由抽象類AbstractPartitionDiscoverer的各個子類實現,很簡單)。然後會遍歷這些partition,並呼叫setAndCheckDiscoveredPartition()方法來檢查之前是否消費過它們,如果是,則移除之,保證方法返回的是新加入的partition。