SpringCloud2020替換Netflix套件實踐二

參閱前文《SpringCloud2020替換Netflix套件實踐一 》

本文應該早就出來的,可前幾天忙著面試,給耽擱了。老年程式設計師就業不易啊~~閒言少敘,書歸正傳。

OpenFeign+重試

A-Server作為服務提供者,提供兩個API:

@RestControllerpublic class HelloController { @GetMapping(“/getError”) @ResponseBody public UserEntity getError() throws InterruptedException { Thread。sleep(5000); return new UserEntity(666, “正常”); } @GetMapping(“/getById”) @ResponseBody public UserEntity getById(@RequestParam Long id) throws InterruptedException { if (id == 1L) { return new UserEntity(id, “A-SERVER-使用者” + id); } else { Thread。sleep(5000); return new UserEntity(id, “A-SERVER-使用者” + id); } }}

額外提一句:getById的“id==1L”寫法是不合適的,應該用equals,不過本文的重點不在這,所以當時隨手就寫上去了。

額外提一句:getById的“id==1L”寫法是不合適的,應該用equals,不過本文的重點不在這,所以當時隨手就寫上去了。

B-Server使用OpenFeign,搭配Eureka呼叫A-Server提供的API:

@FeignClient(value = “a-server”)public interface AServerClient { @GetMapping(“/getById”) public UserEntity getById(@RequestParam Long id); @GetMapping(“/getError”) public UserEntity getError();}

Service的實現:

@Servicepublic class HelloServiceImpl implements HelloService { @Autowired private AServerClient aServerClient; public UserEntity getUserError() { return aServerClient。getError(); } public UserEntity getUserById(@RequestParam Long id) { System。out。println(“B-SERVER:getUserById:” + id); return aServerClient。getById(id); }Controller}

Controller實現:

@RestControllerpublic class HelloController { @Autowired private HelloService helloService; @GetMapping(“/getUserError”) @ResponseBody public UserEntity getUserError() { UserEntity user = helloService。getUserError(); return user; } @GetMapping(“/getUserById”) @ResponseBody public UserEntity getUserById(@RequestParam Long id) { UserEntity user = helloService。getUserById(id); return user; }}

重試配置:

#關閉Ribbon,啟用Spring Cloud LoadBalancer作為預設的負載均衡器spring。cloud。loadbalancer。ribbon。enabled=false#default是指所有的服務,也可以改為服務名如a-server來給不同的服務指定不同的重試策略#讀超時時間feign。client。config。default。read-timeout=1200#連線超時時間feign。client。config。default。connect-timeout=1000#自定義的Retryerfeign。client。config。default。retryer=org。leo。serverb。controller。CustomerRetryer

為了測試,縮短了超時時間。

注意裡面的

default

,如果是針對全域性的服務提供者,用default即可,如果要針對不同的服務配置不同的策略,應改成服務名。

自定義重試​類:

public class CustomerRetryer implements Retryer { private final int maxAttempts; int attempt; long sleptForMillis; public CustomerRetryer() { this(3); } public CustomerRetryer(int maxAttempts) { super(); this。maxAttempts = maxAttempts; this。attempt = 0; } @Override public void continueOrPropagate(RetryableException e) { attempt = attempt + 1; if (attempt > maxAttempts) { System。out。println(“已經到” + maxAttempts + “次,不再重試”); throw e; } System。out。println(LocalTime。now() + “:重試” + attempt + “次”); long interval; if (e。retryAfter() != null) { interval = e。retryAfter()。getTime() - System。currentTimeMillis(); if (interval > 1) { interval = 1; } if (interval < 0) { return; } } else { interval = nextMaxInterval(); } try { Thread。sleep(interval); } catch (InterruptedException ignored) { Thread。currentThread()。interrupt(); throw e; } sleptForMillis += interval; } long nextMaxInterval() { long interval = (long) (100 * Math。pow(1。5, attempt - 1)); return interval > 1 ? 1 : interval; } @Override public Retryer clone() { return new CustomerRetryer(3); }}

其實就是照搬並簡寫了Retryer的Default類,加了​一些輸出。

如果不想在配置檔案裡指定自定義的Retryer,也可以在Application裡​增加:

@SpringBootApplication@EnableEurekaClient@EnableFeignClientspublic class BServerApplication { public static void main(String[] args) { SpringApplication。run(BServerApplication。class, args); } @Bean public Retryer feignRetryer() { return new CustomerRetryer(); }}

增加@EnableFeignClients註解,啟用​OpenFeign。

依次啟動Eureka、A-Server、B-Server,訪問B-server的/getUserError介面,後臺顯示:

13:57:44。838640:重試1次13:57:46。045124:重試2次13:57:47。251659:重試3次已經到3次,不再重試java。net。SocketTimeoutException: Read timed out

多說幾句,重試的方案有很多,比如下一篇我要寫的Resilience4j就有,與Feign的重試使用效果有些不同,​這個下文再說。

我選用Feign的重試,主要是路徑依賴,因為以前用的就是這個~~​

Gateway+過濾器

pom:

org。springframework。cloud spring-cloud-starter-gateway org。springframework。cloud spring-cloud-starter-netflix-eureka-client org。springframework。boot spring-boot-starter-actuator

Application:

@SpringBootApplication@EnableDiscoveryClientpublic class GatewayServerApplication { public static void main(String[] args) { SpringApplication。run(GatewayServerApplication。class, args); }}

配置檔案:

spring。application。name=gateway-servereureka。client。serviceUrl。defaultZone:http://192。168。31。247:8761/eureka/eureka。instance。prefer-ip-address=true#啟用DiscoveryClient閘道器整合spring。cloud。gateway。discovery。locator。enabled=true#啟用小寫spring。cloud。gateway。discovery。locator。lowerCaseServiceId=trueinfo。company。name=www。leo。orginfo。build。artifactId=@project。artifactId@info。build。version=@project。version@logging。level。org。springframework。cloud。gateway=debug

注意一下lowerCaseServiceId,如果不配置,走閘道器訪問服務的時候URL就必須是大寫,例如

http://閘道器IP:閘道器埠/

B-SERVER

/getUserError​。

配置為true則變成

http://閘道器IP:閘道器埠/

b-server

/getUserError。

過濾器,以校驗Authorization為例,假設採用JWT,在過濾器內進行解析校驗。驗證透過則將解析的使用者資訊放入Header往下傳:

@Componentpublic class AuthorizationFilter implements GlobalFilter, Ordered { @Override public int getOrder() { // 數字越低越優先 return 0; } @Override public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { HttpHeaders headers = exchange。getRequest()。getHeaders(); // 如果Authorization為空 if (CollectionUtils。isEmpty(headers。get(“Authorization”))) { exchange。getResponse()。setStatusCode(HttpStatus。UNAUTHORIZED); return exchange。getResponse()。setComplete(); } // 如果校驗成功 if (headers。get(“Authorization”)。get(0)。equals(“admin”)) { exchange。getAttributes()。put(“userName”, “adminYYY”); ServerHttpRequest newReq = exchange。getRequest()。mutate()。header(“userName”, “adminXXX”)。build(); ServerWebExchange newExchange = exchange。mutate()。request(newReq)。build(); return chain。filter(newExchange); } else { exchange。getResponse()。setStatusCode(HttpStatus。UNAUTHORIZED); return exchange。getResponse()。setComplete(); } }}

其中exchange。getAttributes()。put(“userName”, “adminYYY”);是傳值給下一個Filter使用,而後兩行則是給Headers增加內容,因為Headers是隻讀的,無法直接新增。

第二個Filter:

@Componentpublic class AuthorFilter implements GlobalFilter, Ordered { @Override public int getOrder() { return 1; } @Override public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { String userName = exchange。getAttribute(“userName”); System。out。println(“從AuthorizationFilter獲取的userName:” + userName); return chain。filter(exchange); }}

這個只是單純展示一下上一個Filter設定的引數​。​

在服務提供者端,要想獲取Header的內容可以做如下修改:

@GetMapping(“/getUserById”)@ResponseBodypublic UserEntity getUserById(@RequestParam Long id,@RequestHeader(“userName”) String userName) { System。out。println(“userName:”+userName); UserEntity user = helloService。getUserById(id); return user;}

不過一般來說,這一段的處理,應該是放在服務自己的Filter裡,把使用者資訊放在ThreadLocal內,供業務程式碼使用。

依次啟動Eureka、A-Server、B-Server、GateWay-Server,訪問http://閘道器IP:閘道器埠/b-server/getUserId?id=2。

注意:要在Header出增加Authorization:admin,如果不加,或者不是admin,返回狀態碼就是401。​

從GateWay監控日誌可得:

從AuthorizationFilter獲取的userName:adminYYY

從B-server監控日誌可得:

userName:adminXXX

多說幾句,閘道器的功能很強大,除了鑑權之外,限流、轉發、熔斷、跨域等都可以在這裡處理​。

不過以我個人觀點來看,從運維的角度講,跨域這事應該由Nginx來解決,我想,不管用什麼語言開發,很少有後端服務不用Nginx吧。

從開發的角度講,限流、熔斷最好交由各服務自己去處理,畢竟閘道器服務與業務服務都是獨立的,讓業務開發人員一邊寫業務程式碼,一邊去改閘道器程式碼,總是比較麻煩吧。

下一篇是基於Resilience4j的限流RateLimiter、熔斷CircuitBreaker、艙壁Bulkhead。

也不知道能不能在鼠年完成這最後一篇[捂臉]