43.為何 SpringCloudGateway 中會有鏈路資訊丟失

43.為何 SpringCloudGateway 中會有鏈路資訊丟失

本系列程式碼地址:https://github。com/JoJoTec/spring-cloud-parent

在開始編寫我們自己的日誌 Filter 之前,還有一個問題我想在這裡和大家分享,即在 Spring Cloud Gateway 中可能發生鏈路資訊丟失的問題。

主要衝突 - Project Reactor 與 Java Logger MDC 之間的設計衝突

Poject Reactor 是基於非同步響應式設計的程式設計模式的實現,它的主要實現思路是先編寫執行鏈路,最後 sub 執行整個鏈路。但是鏈路的每一部分,

究竟是哪個執行緒執行的,是不確定的

Java 的日誌框架設計,其上下文 MDC(Mapped Diagnostic Context)資訊,是基於執行緒設計的,其實可以簡單理解為一個

ThreadLocal 的 Map

。日誌的鏈路資訊,是儲存在這個 MDC 中的。

這樣其實可以看出 Project Reactor 與日誌框架的 MDC 預設是不相容的,只要發生非同步執行緒切換,這個 MDC 就變了。Spring Cloud Sleuth 為此加了很多粘合程式碼,但是智者千慮必有一失,Project Reactor 應用場景和庫也在不斷髮展和壯大,Spring Cloud Sleuth 也可能會漏掉一些場景導致鏈路資訊丟失。

一種 Spring Cloud Gateway 常見的鏈路資訊丟失的場景

我們編寫一個簡單的測試專案(專案地址):

引入依賴:

org。springframework。boot spring-boot-starter-parent 2。4。6 org。springframework。cloud spring-cloud-starter-gateway org。springframework。cloud spring-cloud-starter-sleuth org。springframework。boot spring-boot-starter-log4j2 <!——log4j2非同步日誌需要的依賴,所有專案都必須用log4j2和非同步日誌配置——> com。lmax disruptor ${disruptor。version} org。springframework。cloud spring-cloud-dependencies 2020。0。3 pom import

對所有路徑開啟 AdaptCachedBodyGlobalFilter:

@Configuration(proxyBeanMethods = false)public class ApiGatewayConfiguration { @Autowired private AdaptCachedBodyGlobalFilter adaptCachedBodyGlobalFilter; @Autowired private GatewayProperties gatewayProperties; @PostConstruct public void init() { gatewayProperties。getRoutes()。forEach(routeDefinition -> { //對 spring cloud gateway 路由配置中的每個路由都啟用 AdaptCachedBodyGlobalFilter EnableBodyCachingEvent enableBodyCachingEvent = new EnableBodyCachingEvent(new Object(), routeDefinition。getId()); adaptCachedBodyGlobalFilter。onApplicationEvent(enableBodyCachingEvent); }); }}

配置(我們只有一個路由,將請求轉發到 httpbin。org 這個 http 請求測試網站):

server: port: 8181spring: application: name: apiGateway cloud: gateway: httpclient: connect-timeout: 500 response-timeout: 60000 routes: - id: first_route uri: http://httpbin。org predicates: - Path=/httpbin/** filters: - StripPrefix=1

新增兩個全域性 Filter,一個在 AdaptCachedBodyGlobalFilter 之前,一個在 AdaptCachedBodyGlobalFilter 之後。這兩個 Filter 非常簡單,只是打一行日誌。

@Log4j2@Componentpublic class PreLogFilter implements GlobalFilter, Ordered { public static final int ORDER = new AdaptCachedBodyGlobalFilter()。getOrder() - 1; @Override public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { log。info(“before AdaptCachedBodyGlobalFilter”); return chain。filter(exchange); } @Override public int getOrder() { return ORDER; }}@Log4j2@Componentpublic class PostLogFilter implements GlobalFilter, Ordered { public static final int ORDER = new AdaptCachedBodyGlobalFilter()。getOrder() + 1; @Override public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { log。info(“after AdaptCachedBodyGlobalFilter”); return chain。filter(exchange); } @Override public int getOrder() { return ORDER; }}

最後指定 Log4j2 的輸出格式中包含鏈路資訊,就像系列文章開頭中指定的那樣。

啟動這個應用,之後訪問 http://127。0。0。1:8181/httpbin/anything,檢視日誌,發現 PostLogFilter 中的日誌,沒有鏈路資訊了:

2021-09-08 06:32:35。457 INFO [service-apiGateway,51063d6f1fe264d0,51063d6f1fe264d0] [30600] [reactor-http-nio-2][?:]: before AdaptCachedBodyGlobalFilter2021-09-08 06:32:35。474 INFO [service-apiGateway,,] [30600] [reactor-http-nio-2][?:]: after AdaptCachedBodyGlobalFilter

Spring Cloud Sleuth 是如何增加鏈路資訊

透過系列之前的原始碼分析,我們知道,在最開始的 TraceWebFilter,我們將 Mono 封裝成了一個 MonoWebFilterTrace,它的核心原始碼是:

@Overridepublic void subscribe(CoreSubscriber<? super Void> subscriber) { Context context = contextWithoutInitialSpan(subscriber。currentContext()); Span span = findOrCreateSpan(context); //將 Span 放入執行上下文中,對於日誌其實就是將鏈路資訊放入 org。slf4j。MDC //日誌的 MDC 一般都是 ThreadLocal 的 Map,對於 Log4j2 的實現類就是 org。apache。logging。log4j。ThreadContext,其核心 contextMap 就是一個基於 ThreadLocal 實現的 Map //簡單理解就是將鏈路資訊放入一個 ThreadLocal 的 Map 中,每個執行緒訪問自己的 Map 獲取鏈路資訊 try (CurrentTraceContext。Scope scope = this。currentTraceContext。maybeScope(span。context())) { //將實際的 subscribe 用 Span 所在的 Context 包裹住,結束時關閉 Span this。source。subscribe(new WebFilterTraceSubscriber(subscriber, context, span, this)); } //在 scope。close() 之後,會將鏈路資訊從 ThreadLocal 的 Map 中剔除}@Overridepublic Object scanUnsafe(Attr key) { if (key == Attr。RUN_STYLE) { //執行的方式必須是不能切換執行緒,也就是同步的 //因為,日誌的鏈路資訊是放在 ThreadLocal 物件中,切換執行緒,鏈路資訊就沒了 return Attr。RunStyle。SYNC; } return super。scanUnsafe(key);}

WebFilterTraceSubscriber 幹了些什麼呢?出現異常,以及 http 請求結束的時候,我們可能想將響應資訊,異常資訊記錄進入 Span 中,就是透過這個類封裝實現的。

經過 MonoWebFilterTrace 的封裝,由於 Spring-WebFlux 處理請求,其實就是封裝成我們上面得出的 Mono 之後進行 subscribe 處理的請求,所以這樣,整個內部 Mono 的 publish 鏈路以及 subscribe 鏈路,就被 WebFilterTraceSubscriber 中的 scope 包裹起來了。只要我們自己不在 GatewayFilter 中轉換成某些強制非同步的 Mono 或者 Flux 導致切換執行緒,鏈路資訊是不會丟失的。

為何上面的測試專案中鏈路資訊會丟失

我們來看經過 AdaptCachedBodyGlobalFilter 之後,我們前面拼的 Mono 鏈路會變成什麼樣:

return Mono。defer(() -> new MonoWebFilterTrace(source, RoutePredicateHandlerMapping。this。lookupRoute(exchange) //根據請求尋找路由 。flatMap((Function>) r -> { exchange。getAttributes()。put(GATEWAY_ROUTE_ATTR, r); //將路由放入 Attributes 中,後面我們還會用到 return Mono。just(RoutePredicateHandlerMapping。this。webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler })。switchIfEmpty( //如果為 Mono。empty(),也就是沒找到路由 Mono。empty() 。then(Mono。fromRunnable(() -> { //返回 Mono。empty() 之後,記錄日誌 if (logger。isTraceEnabled()) { logger。trace(“No RouteDefinition found for [” + getExchangeDesc(exchange) + “]”); } }))) 。switchIfEmpty(DispatcherHandler。this。createNotFoundError()) //如果沒有返回不為 Mono。empty() 的 handlerMapping,則直接返回 404 。then( Mono。defer(() -> { //省略在 AdaptCachedBodyGlobalFilter 前面的鏈路巢狀 //讀取 Body,由於 TCP 拆包,所以需要他們拼接到一起 DataBufferUtils。join(exchange。getRequest()。getBody()) //如果沒有 Body,則直接返回空 DataBuffer 。defaultIfEmpty(factory。wrap(new EmptyByteBuf(factory。getByteBufAllocator()))) //decorate方法中將 dataBuffer 放入 exchange 的 Attributes 列表,只是為了防止重複進入這個 `AdaptCachedBodyGlobalFilter` 的情況導致重複快取請求 Body //之後,使用新的 body 以及原始請求封裝成新的請求,繼續 GatewayFilters 鏈路 。map(dataBuffer -> decorate(exchange, dataBuffer, cacheDecoratedRequest)) 。switchIfEmpty(Mono。just(exchange。getRequest()))。flatMap(function); }) 。then(Mono。empty())) ), //呼叫對應的 Handler TraceWebFilter。this。isTracePresent(), TraceWebFilter。this, TraceWebFilter。this。spanFromContextRetriever())。transformDeferred((call) -> { //MetricsWebFilter 相關的處理,在前面的程式碼中給出了,這裡省略 }););

其中 DataBufferUtils。join(exchange。getRequest()。getBody()) 其實是一個 FluxReceive,這裡我們可以理解為:

提交一個嘗試讀取請求 Body 的任務,將之後的 GatewayFilter 的鏈路處理加到在讀取完 Body 之後的回調當中,提交這個任務後,立刻返回

。這麼看可能比較複雜,我們用一個類似的例子類比下:

//首先我們建立一個新的 SpanSpan span = tracer。newTrace();//宣告一個類似於 TraceWebFilter 中封裝的 MonoWebFilterTrace 的 MonoOperatorclass MonoWebFilterTrace extends MonoOperator { protected MonoWebFilterTrace(Mono<? extends T> source) { super(source); } @Override public void subscribe(CoreSubscriber<? super T> actual) { //將 subscribe 用 span 包裹 try (Tracer。SpanInScope spanInScope = tracer。withSpanInScope(span)) { source。subscribe(actual); //在將要關閉 spanInScope 的時候(即從 ThreadLocal 的 Map 中移除鏈路資訊),列印日誌 log。info(“stopped”); } }}Mono。defer(() -> new MonoWebFilterTrace( Mono。fromRunnable(() -> { log。info(“first”); }) //模擬 FluxReceive 。then(Mono。delay(Duration。ofSeconds(1)) 。doOnSuccess(longSignal -> log。info(longSignal)))))。subscribe(aLong -> log。info(aLong));

Mono。delay 和 FluxReceive 表現類似,都是非同步切換執行緒池執行。執行上面的程式碼,我們可以從日誌上面就能看出來:

2021-09-08 07:12:45。236 INFO [service-apiGateway,7b2f5c190e1406cb,7b2f5c190e1406cb] [31868] [reactor-http-nio-2][?:]: first2021-09-08 07:12:45。240 INFO [service-apiGateway,7b2f5c190e1406cb,7b2f5c190e1406cb] [31868] [reactor-http-nio-2][?:]: stopped2021-09-08 07:12:46。241 INFO [service-apiGateway,,] [31868] [parallel-1][?:]: doOnEach_onNext(0)2021-09-08 07:12:46。242 INFO [service-apiGateway,,] [31868] [parallel-1][?:]: onComplete()2021-09-08 07:12:46。242 INFO [service-apiGateway,,] [31868] [parallel-1][?:]: 0

在 Spring Cloud Gateway 中,Request Body 的 FluxReceive 使用的執行緒池和呼叫 GatewayFilter 的是同一個執行緒池,所以可能執行緒還是同一個,但是由於 Span 已經結束,從 ThreadLocal 的 Map 中已經移除了鏈路資訊,所以日誌中還是沒有鏈路資訊。