簡析Spark StreamingFlink的Kafka動態感知

前言

Kafka是我們日常的流處理任務中最為常用的資料來源之一。隨著資料型別和資料量的增大,難免要增加新的Kafka topic,或者為已有的topic增加更多partition。那麼,Kafka後面作為消費者的實時處理引擎是如何感知到topic和partition變化的呢?本文以Spark Streaming和Flink為例來簡單探究一下。

Spark Streaming的場合

簡析Spark Streaming/Flink的Kafka動態感知

根據官方文件(如上圖),spark-streaming-kafka-0-10才支援Kafka的動態感知(即Dynamic Topic Subscription),翻翻原始碼,來到o。a。s。streaming。kafka010。DirectKafkaInputDStream類,每個微批次都會呼叫的compute()方法的第一行。

val untilOffsets = clamp(latestOffsets())

顧名思義,clamp()方法用來限制資料的流量,這裡不提。而latestOffsets()方法返回各個partition當前最近的offset值,其具體實現如下(包含它呼叫的paranoidPoll()方法)。

/** * Returns the latest (highest) available offsets, taking new partitions into account。 */protected def latestOffsets(): Map[TopicPartition, Long] = { val c = consumer paranoidPoll(c) val parts = c。assignment()。asScala // make sure new partitions are reflected in currentOffsets val newPartitions = parts。diff(currentOffsets。keySet) // Check if there‘s any partition been revoked because of consumer rebalance。 val revokedPartitions = currentOffsets。keySet。diff(parts) if (revokedPartitions。nonEmpty) { throw new IllegalStateException(s“Previously tracked partitions ” + s“${revokedPartitions。mkString(”[“, ”,“, ”]“)} been revoked by Kafka because of consumer ” + s“rebalance。 This is mostly due to another stream with same group id joined, ” + s“please check if there’re different streaming application misconfigure to use same ” + s“group id。 Fundamentally different stream should use different group id”) } // position for new partitions determined by auto。offset。reset if no commit currentOffsets = currentOffsets ++ newPartitions。map(tp => tp -> c。position(tp))。toMap // find latest available offsets c。seekToEnd(currentOffsets。keySet。asJava) parts。map(tp => tp -> c。position(tp))。toMap}/** * The concern here is that poll might consume messages despite being paused, * which would throw off consumer position。 Fix position if this happens。 */private def paranoidPoll(c: Consumer[K, V]): Unit = { // don‘t actually want to consume any messages, so pause all partitions c。pause(c。assignment()) val msgs = c。poll(0) if (!msgs。isEmpty) { // position should be minimum offset per topicpartition msgs。asScala。foldLeft(Map[TopicPartition, Long]()) { (acc, m) => val tp = new TopicPartition(m。topic, m。partition) val off = acc。get(tp)。map(o => Math。min(o, m。offset))。getOrElse(m。offset) acc + (tp -> off) }。foreach { case (tp, off) => logInfo(s“poll(0) returned messages, seeking $tp to $off to compensate”) c。seek(tp, off) } }}

可見,在每次compute()方法執行時,都會透過paranoidPoll()方法來seek到每個TopicPartition對應的offset位置,並且透過latestOffsets()方法找出那些新加入的partition,並維護它們的offset,實現了動態感知。

由上也可以看出,Spark Streaming無法處理Kafka Consumer的Rebalance,所以一定要為不同的Streaming App設定不同的group。id。

Flink的場合

簡析Spark Streaming/Flink的Kafka動態感知

根據官方文件(如上圖),Flink是支援Topic/Partition Discovery的,但是預設並未開啟,需要手動配置flink。partition-discovery。interval-millis引數,即動態感知新topic/partition的間隔,單位毫秒。

Flink Kafka Source的基類時o。a。f。streaming。connectors。kafka。FlinkKafkaConsumerBase抽象類,在其run()方法中,會先建立獲取資料的KafkaFetcher,再判斷是否啟用了動態感知。

this。kafkaFetcher = createFetcher( sourceContext, subscribedPartitionsToStartOffsets, watermarkStrategy, (StreamingRuntimeContext) getRuntimeContext(), offsetCommitMode, getRuntimeContext()。getMetricGroup()。addGroup(KAFKA_CONSUMER_METRICS_GROUP), useMetrics);if (!running) { return;}// depending on whether we were restored with the current state version (1。3),// remaining logic branches off into 2 paths:// 1) New state - partition discovery loop executed as separate thread, with this// thread running the main fetcher loop// 2) Old state - partition discovery is disabled and only the main fetcher loop is executedif (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) { kafkaFetcher。runFetchLoop();} else { runWithPartitionDiscovery();}

如果啟用了,最終會呼叫createAndStartDiscoveryLoop()方法,啟動一個單獨的執行緒,負責以discoveryIntervalMillis為週期發現新的topic/partition,並傳遞給KafkaFetcher。

private void createAndStartDiscoveryLoop(AtomicReference discoveryLoopErrorRef) { discoveryLoopThread = new Thread(() -> { try { // ——————————- partition discovery loop ——————————- // throughout the loop, we always eagerly check if we are still running before // performing the next operation, so that we can escape the loop as soon as possible while (running) { if (LOG。isDebugEnabled()) { LOG。debug(“Consumer subtask {} is trying to discover new partitions 。。。”, getRuntimeContext()。getIndexOfThisSubtask()); } final List discoveredPartitions; try { discoveredPartitions = partitionDiscoverer。discoverPartitions(); } catch (AbstractPartitionDiscoverer。WakeupException | AbstractPartitionDiscoverer。ClosedException e) { // the partition discoverer may have been closed or woken up before or during the discovery; // this would only happen if the consumer was canceled; simply escape the loop break; } // no need to add the discovered partitions if we were closed during the meantime if (running && !discoveredPartitions。isEmpty()) { kafkaFetcher。addDiscoveredPartitions(discoveredPartitions); } // do not waste any time sleeping if we’re not running anymore if (running && discoveryIntervalMillis != 0) { try { Thread。sleep(discoveryIntervalMillis); } catch (InterruptedException iex) { // may be interrupted if the consumer was canceled midway; simply escape the loop break; } } } } catch (Exception e) { discoveryLoopErrorRef。set(e); } finally { // calling cancel will also let the fetcher loop escape // (if not running, cancel() was already called) if (running) { cancel(); } } }, “Kafka Partition Discovery for ” + getRuntimeContext()。getTaskNameWithSubtasks()); discoveryLoopThread。start();}

可見,Flink透過名為PartitionDiscoverer的元件來實現動態感知。上面的程式碼中呼叫了discoverPartitions()方法,其原始碼如下。

public List discoverPartitions() throws WakeupException, ClosedException { if (!closed && !wakeup) { try { List newDiscoveredPartitions; // (1) get all possible partitions, based on whether we are subscribed to fixed topics or a topic pattern if (topicsDescriptor。isFixedTopics()) { newDiscoveredPartitions = getAllPartitionsForTopics(topicsDescriptor。getFixedTopics()); } else { List matchedTopics = getAllTopics(); // retain topics that match the pattern Iterator iter = matchedTopics。iterator(); while (iter。hasNext()) { if (!topicsDescriptor。isMatchingTopic(iter。next())) { iter。remove(); } } if (matchedTopics。size() != 0) { // get partitions only for matched topics newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics); } else { newDiscoveredPartitions = null; } } // (2) eliminate partition that are old partitions or should not be subscribed by this subtask if (newDiscoveredPartitions == null || newDiscoveredPartitions。isEmpty()) { throw new RuntimeException(“Unable to retrieve any partitions with KafkaTopicsDescriptor: ” + topicsDescriptor); } else { Iterator iter = newDiscoveredPartitions。iterator(); KafkaTopicPartition nextPartition; while (iter。hasNext()) { nextPartition = iter。next(); if (!setAndCheckDiscoveredPartition(nextPartition)) { iter。remove(); } } } return newDiscoveredPartitions; } catch (WakeupException e) { // the actual topic / partition metadata fetching methods // may be woken up midway; reset the wakeup flag and rethrow wakeup = false; throw e; } } else if (!closed && wakeup) { // may have been woken up before the method call wakeup = false; throw new WakeupException(); } else { throw new ClosedException(); }

首先,會根據傳入的是單個固定的topic還是由正則表示式指定的多個topics來分別處理,最終都呼叫getAllPartitionsForTopics()方法來獲取這些topic的所有partition(這個方法由抽象類AbstractPartitionDiscoverer的各個子類實現,很簡單)。然後會遍歷這些partition,並呼叫setAndCheckDiscoveredPartition()方法來檢查之前是否消費過它們,如果是,則移除之,保證方法返回的是新加入的partition。