SparkStreaming優雅關閉

簡介

在前面的文章中,總結了SparkStreaming入門級的文章,瞭解到SparkStreaming是一種微批處理的“實時”流技術,在實際場景中,當我們使用SparkStreaming開發好功能並透過測試之後部署到生產環境,那麼之後就會7*24不間斷執行的,除非出現異常退出。當然SparkStreaming提供了checkpoint和WAL機制能夠保證我們的程式再次啟動時候不會出現資料丟失的情況。但是需求並不是一成不變的,相信讀者們都經歷過需求不斷迭代的情況,當我們需要迭代邏輯的時候,那麼我們如何停止線上正在執行的程式呢?本文將為讀者們詳細介紹一些關於SparkStreaming優雅關閉的手段。接下來我們將針對以下幾個問題進行展開講解:

為什麼需要優雅關閉?

什麼時候觸發關閉?

採用什麼策略關閉?

1.為什麼需要優雅關閉

基於前面提到的,當我們的場景需要

保證資料準確,不允許資料丟失

,那麼這個時候我們就得考慮優雅關閉了。說到關閉,那麼非優雅關閉就是透過kill -9 processId的方式或者yarn -kill applicationId的方式進行暴力關閉,為什麼說這種方式是屬於暴力關閉呢?由於Spark Streaming是基於micro-batch機制工作的,按照間隔時間生成RDD,如果在間隔期間執行了暴力關閉,那麼就會導致這段時間的資料丟失,雖然提供了checkpoin機制,可以使程式啟動的時候進行恢復,但是當出現程式發生變更的場景,必須要刪除掉checkpoint,因此這裡就會有丟失的風險。

因此我們需要優雅關閉,將剩餘未處理的資料或者正在處理的資料能夠全部執行完成後,這樣才不會出現數據丟失的情況。

2.什麼時候觸發關閉

既然我們知道了需要優雅關閉,那麼就需要知道什麼會觸發關閉,這樣才能有針對性的策略實現優雅關閉。

首先我們先來了解一下整體流程:

首先StreamContext在做初始化的時候,會增加Shutdown hook方法 ,放入到一個鉤子佇列中,並設定優先順序為51

當程式jvm退出時,會啟動一個執行緒從鉤子佇列中按照優先順序取出執行,然後就會執行Shutdown鉤子方法

當執行Shutdown鉤子方法時,首先會將receiver進行關閉,即不再接收資料

然後停止生成BatchRDD

等待task全部完成,停止Executor

最後釋放所有資源,即整個關閉流程結束

接下來看原始碼的具體實現

StreamingContext.scala:底層呼叫ShutdownHookManager註冊stopOnShutdown函式

1def start(): Unit = synchronized { 2    state match { 3      case INITIALIZED => 4        startSite。set(DStream。getCreationSite()) 5        …… 6        /** 7         * StreamContext啟動時會增加Shutdown鉤子函式,優先順序為51 8         */ 9        shutdownHookRef = ShutdownHookManager。addShutdownHook(10          StreamingContext。SHUTDOWN_HOOK_PRIORITY)(() => stopOnShutdown())11       。。。。12      case ACTIVE =>13        logWarning(“StreamingContext has already been started”)14      case STOPPED =>15        throw new IllegalStateException(“StreamingContext has already been stopped”)16    }17  }

ShutdownHookManager.scala:在增加鉤子函式的時候底層呼叫了SparkShutdownHookManager內部類

1def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = { 2    shutdownHooks。add(priority, hook) 3}  4private lazy val shutdownHooks = { 5    val manager = new SparkShutdownHookManager() 6    manager。install() 7    manager 8  } 910private [util] class SparkShutdownHookManager {11  def install(): Unit = {12    val hookTask = new Runnable() {13      override def run(): Unit = runAll()14    }15    org。apache。hadoop。util。ShutdownHookManager。get()。addShutdownHook(16      hookTask, FileSystem。SHUTDOWN_HOOK_PRIORITY + 30)17  }1819  /**20   * jvm退出的時候會開啟一個執行緒按照優先順序逐個呼叫鉤子函式21   */22  def runAll(): Unit = {23    shuttingDown = true24    var nextHook: SparkShutdownHook = null25    while ({ nextHook = hooks。synchronized { hooks。poll() }; nextHook != null }) {26      Try(Utils。logUncaughtExceptions(nextHook。run()))27    }28  }2930  def add(priority: Int, hook: () => Unit): AnyRef = {31    hooks。synchronized {32      if (shuttingDown) {33        throw new IllegalStateException(“Shutdown hooks cannot be modified during shutdown。”)34      }35      val hookRef = new SparkShutdownHook(priority, hook)36      hooks。add(hookRef)37      hookRef38    }39  }40}4142private class SparkShutdownHook(private val priority: Int, hook: () => Unit)43  extends Comparable[SparkShutdownHook] {44  //這裡真正呼叫註冊的函式45  def run(): Unit = hook()46}

真正執行關閉的邏輯,即StreamingContext#stopOnShutdown方法

1private def stopOnShutdown(): Unit = { 2    val stopGracefully = conf。getBoolean(“spark。streaming。stopGracefullyOnShutdown”, false) 3    stop(stopSparkContext = false, stopGracefully = stopGracefully) 4  } 5 def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = { 6    synchronized { 7      state match { 8        case ACTIVE => 9          //排程相關的關閉10          Utils。tryLogNonFatalError {11            scheduler。stop(stopGracefully)12          }1314          //監控15          Utils。tryLogNonFatalError {16            env。metricsSystem。removeSource(streamingSource)17          }1819          //ui20          Utils。tryLogNonFatalError {21            uiTab。foreach(_。detach())22          }23          Utils。tryLogNonFatalError {24            unregisterProgressListener()25          }26          StreamingContext。setActiveContext(null)27          //設定狀態為停止28          state = STOPPED29      }30    }31    if (shutdownHookRefToRemove != null) {32      ShutdownHookManager。removeShutdownHook(shutdownHookRefToRemove)33    }34     // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true)。35    if (stopSparkContext) sc。stop()36  }

可以看到這裡有一個

spark.streaming.stopGracefullyOnShutdown

引數來傳給底層的stop方法,即呼叫Jobscheduler#stop方法

JobScheduler#stop

1def stop(processAllReceivedData: Boolean): Unit = synchronized { 2    //1。首先停止接收資料 3    if (receiverTracker != null) { 4      receiverTracker。stop(processAllReceivedData) 5    } 6 7    if (executorAllocationManager != null) { 8      executorAllocationManager。foreach(_。stop()) 9    }1011    //2。停止生成BatchRdd,處理剩餘的資料12    jobGenerator。stop(processAllReceivedData)1314    //3。停止Exectuor15    jobExecutor。shutdown()1617    val terminated = if (processAllReceivedData) {18      jobExecutor。awaitTermination(1, TimeUnit。HOURS)  // just a very large period of time19    } else {20      jobExecutor。awaitTermination(2, TimeUnit。SECONDS)21    }22    if (!terminated) {23      jobExecutor。shutdownNow()24    }2526    // Stop everything else27    listenerBus。stop()28    eventLoop。stop()29    eventLoop = null30    logInfo(“Stopped JobScheduler”)31  }32

3.採用什麼策略關閉?

3.1 配置策略

根據剛才梳理的觸發關閉流程中,其實可以透過配置

spark.streaming.stopGracefullyOnShutdown=true

來實現優雅關閉,但是需要傳送 SIGTERM 訊號給driver端,這裡有兩種方案

方案一,具體步驟如下:

透過Spark UI找到driver所在節點。

登入driver節點,執行 ps -ef |g

rep java |grep ApplicationMaster

命令找到對應的pid

執行kill -SIGTERM傳送SIGTERM訊號

當spark driver收到該訊號時,在日誌中會有以下資訊

1ERROR yarn。ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM2INFO streaming。StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook3INFO streaming。StreamingContext: StreamingContext stopped successfully4INFO spark。SparkContext: Invoking stop() from shutdown hook5INFO spark。SparkContext: Successfully stopped SparkContext6INFO util。ShutdownHookManager: Shutdown hook called7

注意:

這裡有一個坑,預設情況下在yarn模式下,

spark.yarn.maxAppAttempts

yarn.resourcemanager.am.max-attempts

是同一個值,即為2。當透過Kill命令殺掉AM時,Yarn會自動重新啟動一個AM,因此需要再發送一次Kill命令。當然也可以透過spark-submit命令提交的時候指定spark。yarn。maxAppAttempts=1這個配置引數;但這裡也會有容災風險,比如出現網路問題的時候,這裡就無法自動重啟了,程式就會以失敗而告終。

方案二:透過

yarn application -kill <

applicationid

>

命令來kill掉job(

不建議使用

該命令會發送SIGTERM訊號給container,同時也會立即傳送 SIGKILL 命令。雖然可以透過

yarn.nodemanager.sleep-delay-before-sigkill.ms

引數來調整SIGTERM和SIGKILL之間的間隔,但是好像沒什麼作用。具體日誌資訊如下:

1ERROR yarn。ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM2INFO streaming。StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook3

3.2 標記策略

該種策略透過藉助於三方系統來標記狀態, 一種方法是將標記HDFS檔案,如果標記檔案存在,則呼叫scc。stop(true,true);或者是藉助於redis的key是否存在等方式

1val checkIntervalMillis = 60000 2var isStopped = false 3 4while (! isStopped) { 5    isStopped = ssc。awaitTerminationOrTimeout(checkIntervalMillis) 6    checkShutdownMarker 7    if (!isStopped && stopFlag) { 8        ssc。stop(true, true) 9    }10}1112def checkShutdownMarker = {13    if (!stopFlag) {14        val fs = FileSystem。get(new Configuration())15        stopFlag = fs。exists(new Path(shutdownMarker))16    }

3.3 服務策略

即提供一個restful服務,暴露出一個介面提供關閉功能。

1def httpServer(port:Int,ssc:StreamingContext)={ 2    val server = new Server(port) 3    val context = new ContextHandler() 4    context。setContextPath(“/shutdown”) 5    context。setHandler( new CloseStreamHandler(ssc) ) 6    server。setHandler(context) 7    server。start() 8} 9class CloseStreamHandler(ssc:StreamingContext) extends AbstractHandler {10    override def handle(s: String, baseRequest: Request, req: HttpServletRequest, response: HttpServletResponse): Unit ={11      ssc。stop(true,true)12      response。setContentType(“text/html; charset=utf-8”);13      response。setStatus(HttpServletResponse。SC_OK);14      val out = response。getWriter();15      baseRequest。setHandled(true);16    }17  }