簡介
在前面的文章中,總結了SparkStreaming入門級的文章,瞭解到SparkStreaming是一種微批處理的“實時”流技術,在實際場景中,當我們使用SparkStreaming開發好功能並透過測試之後部署到生產環境,那麼之後就會7*24不間斷執行的,除非出現異常退出。當然SparkStreaming提供了checkpoint和WAL機制能夠保證我們的程式再次啟動時候不會出現資料丟失的情況。但是需求並不是一成不變的,相信讀者們都經歷過需求不斷迭代的情況,當我們需要迭代邏輯的時候,那麼我們如何停止線上正在執行的程式呢?本文將為讀者們詳細介紹一些關於SparkStreaming優雅關閉的手段。接下來我們將針對以下幾個問題進行展開講解:
為什麼需要優雅關閉?
什麼時候觸發關閉?
採用什麼策略關閉?
1.為什麼需要優雅關閉
基於前面提到的,當我們的場景需要
保證資料準確,不允許資料丟失
,那麼這個時候我們就得考慮優雅關閉了。說到關閉,那麼非優雅關閉就是透過kill -9 processId的方式或者yarn -kill applicationId的方式進行暴力關閉,為什麼說這種方式是屬於暴力關閉呢?由於Spark Streaming是基於micro-batch機制工作的,按照間隔時間生成RDD,如果在間隔期間執行了暴力關閉,那麼就會導致這段時間的資料丟失,雖然提供了checkpoin機制,可以使程式啟動的時候進行恢復,但是當出現程式發生變更的場景,必須要刪除掉checkpoint,因此這裡就會有丟失的風險。
因此我們需要優雅關閉,將剩餘未處理的資料或者正在處理的資料能夠全部執行完成後,這樣才不會出現數據丟失的情況。
2.什麼時候觸發關閉
既然我們知道了需要優雅關閉,那麼就需要知道什麼會觸發關閉,這樣才能有針對性的策略實現優雅關閉。
首先我們先來了解一下整體流程:
首先StreamContext在做初始化的時候,會增加Shutdown hook方法 ,放入到一個鉤子佇列中,並設定優先順序為51
當程式jvm退出時,會啟動一個執行緒從鉤子佇列中按照優先順序取出執行,然後就會執行Shutdown鉤子方法
當執行Shutdown鉤子方法時,首先會將receiver進行關閉,即不再接收資料
然後停止生成BatchRDD
等待task全部完成,停止Executor
最後釋放所有資源,即整個關閉流程結束
接下來看原始碼的具體實現
StreamingContext.scala:底層呼叫ShutdownHookManager註冊stopOnShutdown函式
1def start(): Unit = synchronized { 2 state match { 3 case INITIALIZED => 4 startSite。set(DStream。getCreationSite()) 5 …… 6 /** 7 * StreamContext啟動時會增加Shutdown鉤子函式,優先順序為51 8 */ 9 shutdownHookRef = ShutdownHookManager。addShutdownHook(10 StreamingContext。SHUTDOWN_HOOK_PRIORITY)(() => stopOnShutdown())11 。。。。12 case ACTIVE =>13 logWarning(“StreamingContext has already been started”)14 case STOPPED =>15 throw new IllegalStateException(“StreamingContext has already been stopped”)16 }17 }
ShutdownHookManager.scala:在增加鉤子函式的時候底層呼叫了SparkShutdownHookManager內部類
1def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = { 2 shutdownHooks。add(priority, hook) 3} 4private lazy val shutdownHooks = { 5 val manager = new SparkShutdownHookManager() 6 manager。install() 7 manager 8 } 910private [util] class SparkShutdownHookManager {11 def install(): Unit = {12 val hookTask = new Runnable() {13 override def run(): Unit = runAll()14 }15 org。apache。hadoop。util。ShutdownHookManager。get()。addShutdownHook(16 hookTask, FileSystem。SHUTDOWN_HOOK_PRIORITY + 30)17 }1819 /**20 * jvm退出的時候會開啟一個執行緒按照優先順序逐個呼叫鉤子函式21 */22 def runAll(): Unit = {23 shuttingDown = true24 var nextHook: SparkShutdownHook = null25 while ({ nextHook = hooks。synchronized { hooks。poll() }; nextHook != null }) {26 Try(Utils。logUncaughtExceptions(nextHook。run()))27 }28 }2930 def add(priority: Int, hook: () => Unit): AnyRef = {31 hooks。synchronized {32 if (shuttingDown) {33 throw new IllegalStateException(“Shutdown hooks cannot be modified during shutdown。”)34 }35 val hookRef = new SparkShutdownHook(priority, hook)36 hooks。add(hookRef)37 hookRef38 }39 }40}4142private class SparkShutdownHook(private val priority: Int, hook: () => Unit)43 extends Comparable[SparkShutdownHook] {44 //這裡真正呼叫註冊的函式45 def run(): Unit = hook()46}
真正執行關閉的邏輯,即StreamingContext#stopOnShutdown方法
1private def stopOnShutdown(): Unit = { 2 val stopGracefully = conf。getBoolean(“spark。streaming。stopGracefullyOnShutdown”, false) 3 stop(stopSparkContext = false, stopGracefully = stopGracefully) 4 } 5 def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = { 6 synchronized { 7 state match { 8 case ACTIVE => 9 //排程相關的關閉10 Utils。tryLogNonFatalError {11 scheduler。stop(stopGracefully)12 }1314 //監控15 Utils。tryLogNonFatalError {16 env。metricsSystem。removeSource(streamingSource)17 }1819 //ui20 Utils。tryLogNonFatalError {21 uiTab。foreach(_。detach())22 }23 Utils。tryLogNonFatalError {24 unregisterProgressListener()25 }26 StreamingContext。setActiveContext(null)27 //設定狀態為停止28 state = STOPPED29 }30 }31 if (shutdownHookRefToRemove != null) {32 ShutdownHookManager。removeShutdownHook(shutdownHookRefToRemove)33 }34 // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true)。35 if (stopSparkContext) sc。stop()36 }
可以看到這裡有一個
spark.streaming.stopGracefullyOnShutdown
引數來傳給底層的stop方法,即呼叫Jobscheduler#stop方法
JobScheduler#stop
1def stop(processAllReceivedData: Boolean): Unit = synchronized { 2 //1。首先停止接收資料 3 if (receiverTracker != null) { 4 receiverTracker。stop(processAllReceivedData) 5 } 6 7 if (executorAllocationManager != null) { 8 executorAllocationManager。foreach(_。stop()) 9 }1011 //2。停止生成BatchRdd,處理剩餘的資料12 jobGenerator。stop(processAllReceivedData)1314 //3。停止Exectuor15 jobExecutor。shutdown()1617 val terminated = if (processAllReceivedData) {18 jobExecutor。awaitTermination(1, TimeUnit。HOURS) // just a very large period of time19 } else {20 jobExecutor。awaitTermination(2, TimeUnit。SECONDS)21 }22 if (!terminated) {23 jobExecutor。shutdownNow()24 }2526 // Stop everything else27 listenerBus。stop()28 eventLoop。stop()29 eventLoop = null30 logInfo(“Stopped JobScheduler”)31 }32
3.採用什麼策略關閉?
3.1 配置策略
根據剛才梳理的觸發關閉流程中,其實可以透過配置
spark.streaming.stopGracefullyOnShutdown=true
來實現優雅關閉,但是需要傳送 SIGTERM 訊號給driver端,這裡有兩種方案
方案一,具體步驟如下:
透過Spark UI找到driver所在節點。
登入driver節點,執行 ps -ef |g
rep java |grep ApplicationMaster
命令找到對應的pid
執行kill -SIGTERM傳送SIGTERM訊號
當spark driver收到該訊號時,在日誌中會有以下資訊
1ERROR yarn。ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM2INFO streaming。StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook3INFO streaming。StreamingContext: StreamingContext stopped successfully4INFO spark。SparkContext: Invoking stop() from shutdown hook5INFO spark。SparkContext: Successfully stopped SparkContext6INFO util。ShutdownHookManager: Shutdown hook called7
注意:
這裡有一個坑,預設情況下在yarn模式下,
spark.yarn.maxAppAttempts
和
yarn.resourcemanager.am.max-attempts
是同一個值,即為2。當透過Kill命令殺掉AM時,Yarn會自動重新啟動一個AM,因此需要再發送一次Kill命令。當然也可以透過spark-submit命令提交的時候指定spark。yarn。maxAppAttempts=1這個配置引數;但這裡也會有容災風險,比如出現網路問題的時候,這裡就無法自動重啟了,程式就會以失敗而告終。
方案二:透過
yarn application -kill <
applicationid
>
命令來kill掉job(
不建議使用
)
該命令會發送SIGTERM訊號給container,同時也會立即傳送 SIGKILL 命令。雖然可以透過
yarn.nodemanager.sleep-delay-before-sigkill.ms
引數來調整SIGTERM和SIGKILL之間的間隔,但是好像沒什麼作用。具體日誌資訊如下:
1ERROR yarn。ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM2INFO streaming。StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook3
3.2 標記策略
該種策略透過藉助於三方系統來標記狀態, 一種方法是將標記HDFS檔案,如果標記檔案存在,則呼叫scc。stop(true,true);或者是藉助於redis的key是否存在等方式
1val checkIntervalMillis = 60000 2var isStopped = false 3 4while (! isStopped) { 5 isStopped = ssc。awaitTerminationOrTimeout(checkIntervalMillis) 6 checkShutdownMarker 7 if (!isStopped && stopFlag) { 8 ssc。stop(true, true) 9 }10}1112def checkShutdownMarker = {13 if (!stopFlag) {14 val fs = FileSystem。get(new Configuration())15 stopFlag = fs。exists(new Path(shutdownMarker))16 }
3.3 服務策略
即提供一個restful服務,暴露出一個介面提供關閉功能。
1def httpServer(port:Int,ssc:StreamingContext)={ 2 val server = new Server(port) 3 val context = new ContextHandler() 4 context。setContextPath(“/shutdown”) 5 context。setHandler( new CloseStreamHandler(ssc) ) 6 server。setHandler(context) 7 server。start() 8} 9class CloseStreamHandler(ssc:StreamingContext) extends AbstractHandler {10 override def handle(s: String, baseRequest: Request, req: HttpServletRequest, response: HttpServletResponse): Unit ={11 ssc。stop(true,true)12 response。setContentType(“text/html; charset=utf-8”);13 response。setStatus(HttpServletResponse。SC_OK);14 val out = response。getWriter();15 baseRequest。setHandled(true);16 }17 }