【學】阿里終面:如何設計一個高效能閘道器?

一、前言

最近在github上看了soul閘道器的設計,突然就來了興趣準備自己從零開始寫一個高效能的閘道器。經過兩週時間的開發,我的閘道器ship-gate核心功能基本都已完成,最大的缺陷就是前端功底太差沒有管理後臺。

二、設計

2.1技術選型

閘道器是所有請求的入口,所以要求有很高的吞吐量,為了實現這點可以使用請求非同步化來解決。目前一般有以下兩種方案:

Tomcat/Jetty+NIO+Servlet3

Servlet3已經支援非同步,這種方案使用比較多,京東,有贊和Zuul,都用的是這種方案。

Netty+NIO

Netty為高併發而生,目前唯品會的閘道器使用這個策略,在唯品會的技術文章中在相同的情況下Netty是每秒30w+的吞吐量,Tomcat是13w+,可以看出是有一定的差距的,但是Netty需要自己處理HTTP協議,這一塊比較麻煩。

後面發現Soul閘道器是基於Spring WebFlux(底層Netty)的,不用太關心HTTP協議的處理,於是決定也用Spring WebFlux。

閘道器的第二個特點是具備可擴充套件性,比如Netflix Zuul有preFilters,postFilters等在不同的階段方便處理不同的業務,基於責任鏈模式將請求進行鏈式處理即可實現。

在微服務架構下,服務都會進行多例項部署來保證高可用,請求到達閘道器時,閘道器需要根據URL找到所有可用的例項,這時就需要服務註冊和發現功能,即註冊中心。

現在流行的註冊中心有Apache的Zookeeper和阿里的Nacos兩種(consul有點小眾),因為之前寫RPC框架時已經用過了Zookeeper,所以這次就選擇了Nacos。

2.2需求清單

首先要明確目標,即開發一個具備哪些特性的閘道器,總結下後如下:

自定義路由規則可基於version的路由規則設定,路由物件包括DEFAUL,HEADER和QUERY三種,匹配方式包括=、regex、like三種。

跨語言HTTP協議天生跨語言

高效能Netty本身就是一款高效能的通訊框架,同時server將一些路由規則等資料快取到JVM記憶體避免請求admin服務。

高可用支援叢集模式防止單節點故障,無狀態。

灰度釋出灰度釋出(又名金絲雀釋出)是指在黑與白之間,能夠平滑過渡的一種釋出方式。在其上可以進行A/B testing,即讓一部分使用者繼續用產品特性A,一部分使用者開始用產品特性B,如果使用者對B沒有什麼反對意見,那麼逐步擴大範圍,把所有使用者都遷移到B上面來。透過特性一可以實現。

介面鑑權基於責任鏈模式,使用者開發自己的鑑權外掛即可。

負載均衡支援多種負載均衡演算法,如隨機,輪詢,加權輪詢等。利用SPI機制可以根據配置進行動態載入。

2.3架構設計

在參考了一些優秀的閘道器Zuul,Spring Cloud Gateway,Soul後,將專案劃分為以下幾個模組。

名稱描述

ship-admin後臺管理介面,配置路由規則等

ship-server閘道器服務端,核心功能模組

ship-client-spring-boot-starter閘道器客戶端,自動註冊服務資訊到註冊中心

ship-common一些公共的程式碼,如pojo,常量等。

它們之間的關係如圖:

【學】阿里終面:如何設計一個高效能閘道器?

閘道器設計

注意:

這張圖與實際實現有點出入,Nacos push到本地快取的那個環節沒有實現,目前只有ship-sever定時輪詢pull的過程。ship-admin從Nacos獲取註冊服務資訊的過程,也改成了ServiceA啟動時主動發生HTTP請求通知ship-admin。

2.4表結構設計

【學】阿里終面:如何設計一個高效能閘道器?

圖片

三、編碼

3.1 ship-client-spring-boot-starter

首先建立一個spring-boot-starter命名為ship-client-spring-boot-starter,不知道如何自定義starter的可以看我以前寫的《開發自己的starter》。

其核心類

AutoRegisterListener

就是在專案啟動時做了兩件事:

1。將服務資訊註冊到Nacos註冊中心

2。通知ship-admin服務上線了並註冊下線hook。

程式碼如下:

/** * Created by 2YSP on 2020/12/21 */public class AutoRegisterListener implements ApplicationListener {    private final static Logger LOGGER = LoggerFactory。getLogger(AutoRegisterListener。class);    private volatile AtomicBoolean registered = new AtomicBoolean(false);    private final ClientConfigProperties properties;    @NacosInjected    private NamingService namingService;    @Autowired    private RequestMappingHandlerMapping handlerMapping;    private final ExecutorService pool;    /**     * url list to ignore     */    private static List ignoreUrlList = new LinkedList<>();    static {        ignoreUrlList。add(“/error”);    }    public AutoRegisterListener(ClientConfigProperties properties) {        if (!check(properties)) {            LOGGER。error(“client config port,contextPath,appName adminUrl and version can‘t be empty!”);            throw new ShipException(“client config port,contextPath,appName adminUrl and version can’t be empty!”);        }        this。properties = properties;        pool = new ThreadPoolExecutor(1, 4, 0, TimeUnit。SECONDS, new LinkedBlockingQueue<>());    }    /**     * check the ClientConfigProperties     *     * @param properties     * @return     */    private boolean check(ClientConfigProperties properties) {        if (properties。getPort() == null || properties。getContextPath() == null                || properties。getVersion() == null || properties。getAppName() == null                || properties。getAdminUrl() == null) {            return false;        }        return true;    }    @Override    public void onApplicationEvent(ContextRefreshedEvent event) {        if (!registered。compareAndSet(false, true)) {            return;        }        doRegister();        registerShutDownHook();    }    /**     * send unregister request to admin when jvm shutdown     */    private void registerShutDownHook() {        final String url = “http://” + properties。getAdminUrl() + AdminConstants。UNREGISTER_PATH;        final UnregisterAppDTO unregisterAppDTO = new UnregisterAppDTO();        unregisterAppDTO。setAppName(properties。getAppName());        unregisterAppDTO。setVersion(properties。getVersion());        unregisterAppDTO。setIp(IpUtil。getLocalIpAddress());        unregisterAppDTO。setPort(properties。getPort());        Runtime。getRuntime()。addShutdownHook(new Thread(() -> {            OkhttpTool。doPost(url, unregisterAppDTO);            LOGGER。info(“[{}:{}] unregister from ship-admin success!”, unregisterAppDTO。getAppName(), unregisterAppDTO。getVersion());        }));    }    /**     * register all interface info to register center     */    private void doRegister() {        Instance instance = new Instance();        instance。setIp(IpUtil。getLocalIpAddress());        instance。setPort(properties。getPort());        instance。setEphemeral(true);        Map metadataMap = new HashMap<>();        metadataMap。put(“version”, properties。getVersion());        metadataMap。put(“appName”, properties。getAppName());        instance。setMetadata(metadataMap);        try {            namingService。registerInstance(properties。getAppName(), NacosConstants。APP_GROUP_NAME, instance);        } catch (NacosException e) {            LOGGER。error(“register to nacos fail”, e);            throw new ShipException(e。getErrCode(), e。getErrMsg());        }        LOGGER。info(“register interface info to nacos success!”);        // send register request to ship-admin        String url = “http://” + properties。getAdminUrl() + AdminConstants。REGISTER_PATH;        RegisterAppDTO registerAppDTO = buildRegisterAppDTO(instance);        OkhttpTool。doPost(url, registerAppDTO);        LOGGER。info(“register to ship-admin success!”);    }    private RegisterAppDTO buildRegisterAppDTO(Instance instance) {        RegisterAppDTO registerAppDTO = new RegisterAppDTO();        registerAppDTO。setAppName(properties。getAppName());        registerAppDTO。setContextPath(properties。getContextPath());        registerAppDTO。setIp(instance。getIp());        registerAppDTO。setPort(instance。getPort());        registerAppDTO。setVersion(properties。getVersion());        return registerAppDTO;    }}

3.2 ship-server

ship-sever專案主要包括了兩個部分內容, 1。請求動態路由的主流程 2。本地快取資料和ship-admin及nacos同步,這部分在後面3。3再講。

ship-server實現動態路由的原理是利用WebFilter攔截請求,然後將請求教給plugin chain去鏈式處理。

PluginFilter根據URL解析出appName,然後將啟用的plugin組裝成plugin chain。

public class PluginFilter implements WebFilter {    private ServerConfigProperties properties;    public PluginFilter(ServerConfigProperties properties) {        this。properties = properties;    }    @Override    public Mono filter(ServerWebExchange exchange, WebFilterChain chain) {        String appName = parseAppName(exchange);        if (CollectionUtils。isEmpty(ServiceCache。getAllInstances(appName))) {            throw new ShipException(ShipExceptionEnum。SERVICE_NOT_FIND);        }        PluginChain pluginChain = new PluginChain(properties, appName);        pluginChain。addPlugin(new DynamicRoutePlugin(properties));        pluginChain。addPlugin(new AuthPlugin(properties));        return pluginChain。execute(exchange, pluginChain);    }    private String parseAppName(ServerWebExchange exchange) {        RequestPath path = exchange。getRequest()。getPath();        String appName = path。value()。split(“/”)[1];        return appName;    }}

PluginChain繼承了AbstractShipPlugin並持有所有要執行的外掛。

/** * @Author: Ship * @Description: * @Date: Created in 2020/12/25 */public class PluginChain extends AbstractShipPlugin {    /**     * the pos point to current plugin     */    private int pos;    /**     * the plugins of chain     */    private List plugins;    private final String appName;    public PluginChain(ServerConfigProperties properties, String appName) {        super(properties);        this。appName = appName;    }    /**     * add enabled plugin to chain     *     * @param shipPlugin     */    public void addPlugin(ShipPlugin shipPlugin) {        if (plugins == null) {            plugins = new ArrayList<>();        }        if (!PluginCache。isEnabled(appName, shipPlugin。name())) {            return;        }        plugins。add(shipPlugin);        // order by the plugin‘s order        plugins。sort(Comparator。comparing(ShipPlugin::order));    }    @Override    public Integer order() {        return null;    }    @Override    public String name() {        return null;    }    @Override    public Mono execute(ServerWebExchange exchange, PluginChain pluginChain) {        if (pos == plugins。size()) {            return exchange。getResponse()。setComplete();        }        return pluginChain。plugins。get(pos++)。execute(exchange, pluginChain);    }    public String getAppName() {        return appName;    }}

AbstractShipPlugin實現了ShipPlugin介面,並持有ServerConfigProperties配置物件。

public abstract class AbstractShipPlugin implements ShipPlugin {    protected ServerConfigProperties properties;    public AbstractShipPlugin(ServerConfigProperties properties) {        this。properties = properties;    }}

ShipPlugin介面定義了所有外掛必須實現的三個方法order(),name()和execute()。

public interface ShipPlugin {    /**     * lower values have higher priority     *     * @return     */    Integer order();    /**     * return current plugin name     *     * @return     */    String name();    Mono execute(ServerWebExchange exchange,PluginChain pluginChain);}

DynamicRoutePlugin繼承了抽象類AbstractShipPlugin,包含了動態路由的主要業務邏輯。

/** * @Author: Ship * @Description: * @Date: Created in 2020/12/25 */public class DynamicRoutePlugin extends AbstractShipPlugin {    private final static Logger LOGGER = LoggerFactory。getLogger(DynamicRoutePlugin。class);    private static WebClient webClient;    private static final Gson gson = new GsonBuilder()。create();    static {        HttpClient httpClient = HttpClient。create()                。tcpConfiguration(client ->                        client。doOnConnected(conn ->                                conn。addHandlerLast(new ReadTimeoutHandler(3))                                        。addHandlerLast(new WriteTimeoutHandler(3)))                                。option(ChannelOption。TCP_NODELAY, true)                );        webClient = WebClient。builder()。clientConnector(new ReactorClientHttpConnector(httpClient))                。build();    }    public DynamicRoutePlugin(ServerConfigProperties properties) {        super(properties);    }    @Override    public Integer order() {        return ShipPluginEnum。DYNAMIC_ROUTE。getOrder();    }    @Override    public String name() {        return ShipPluginEnum。DYNAMIC_ROUTE。getName();    }    @Override    public Mono execute(ServerWebExchange exchange, PluginChain pluginChain) {        String appName = pluginChain。getAppName();        ServiceInstance serviceInstance = chooseInstance(appName, exchange。getRequest());//        LOGGER。info(“selected instance is [{}]”, gson。toJson(serviceInstance));        // request service        String url = buildUrl(exchange, serviceInstance);        return forward(exchange, url);    }    /**     * forward request to backend service     *     * @param exchange     * @param url     * @return     */    private Mono forward(ServerWebExchange exchange, String url) {        ServerHttpRequest request = exchange。getRequest();        ServerHttpResponse response = exchange。getResponse();        HttpMethod method = request。getMethod();        WebClient。RequestBodySpec requestBodySpec = webClient。method(method)。uri(url)。headers((headers) -> {            headers。addAll(request。getHeaders());        });        WebClient。RequestHeadersSpec<?> reqHeadersSpec;        if (requireHttpBody(method)) {            reqHeadersSpec = requestBodySpec。body(BodyInserters。fromDataBuffers(request。getBody()));        } else {            reqHeadersSpec = requestBodySpec;        }        // nio->callback->nio        return reqHeadersSpec。exchange()。timeout(Duration。ofMillis(properties。getTimeOutMillis()))                。onErrorResume(ex -> {                    return Mono。defer(() -> {                        String errorResultJson = “”;                        if (ex instanceof TimeoutException) {                            errorResultJson = “{\”code\“:5001,\”message\“:\”network timeout\“}”;                        } else {                            errorResultJson = “{\”code\“:5000,\”message\“:\”system error\“}”;                        }                        return ShipResponseUtil。doResponse(exchange, errorResultJson);                    })。then(Mono。empty());                })。flatMap(backendResponse -> {                    response。setStatusCode(backendResponse。statusCode());                    response。getHeaders()。putAll(backendResponse。headers()。asHttpHeaders());                    return response。writeWith(backendResponse。bodyToFlux(DataBuffer。class));                });    }    /**     * weather the http method need http body     *     * @param method     * @return     */    private boolean requireHttpBody(HttpMethod method) {        if (method。equals(HttpMethod。POST) || method。equals(HttpMethod。PUT) || method。equals(HttpMethod。PATCH)) {            return true;        }        return false;    }    private String buildUrl(ServerWebExchange exchange, ServiceInstance serviceInstance) {        ServerHttpRequest request = exchange。getRequest();        String query = request。getURI()。getQuery();        String path = request。getPath()。value()。replaceFirst(“/” + serviceInstance。getAppName(), “”);        String url = “http://” + serviceInstance。getIp() + “:” + serviceInstance。getPort() + path;        if (!StringUtils。isEmpty(query)) {            url = url + “?” + query;        }        return url;    }    /**     * choose an ServiceInstance according to route rule config and load balancing algorithm     *     * @param appName     * @param request     * @return     */    private ServiceInstance chooseInstance(String appName, ServerHttpRequest request) {        List serviceInstances = ServiceCache。getAllInstances(appName);        if (CollectionUtils。isEmpty(serviceInstances)) {            LOGGER。error(“service instance of {} not find”, appName);            throw new ShipException(ShipExceptionEnum。SERVICE_NOT_FIND);        }        String version = matchAppVersion(appName, request);        if (StringUtils。isEmpty(version)) {            throw new ShipException(“match app version error”);        }        // filter serviceInstances by version        List instances = serviceInstances。stream()。filter(i -> i。getVersion()。equals(version))。collect(Collectors。toList());        //Select an instance based on the load balancing algorithm        LoadBalance loadBalance = LoadBalanceFactory。getInstance(properties。getLoadBalance(), appName, version);        ServiceInstance serviceInstance = loadBalance。chooseOne(instances);        return serviceInstance;    }    private String matchAppVersion(String appName, ServerHttpRequest request) {        List rules = RouteRuleCache。getRules(appName);        rules。sort(Comparator。comparing(AppRuleDTO::getPriority)。reversed());        for (AppRuleDTO rule : rules) {            if (match(rule, request)) {                return rule。getVersion();            }        }        return null;    }    private boolean match(AppRuleDTO rule, ServerHttpRequest request) {        String matchObject = rule。getMatchObject();        String matchKey = rule。getMatchKey();        String matchRule = rule。getMatchRule();        Byte matchMethod = rule。getMatchMethod();        if (MatchObjectEnum。DEFAULT。getCode()。equals(matchObject)) {            return true;        } else if (MatchObjectEnum。QUERY。getCode()。equals(matchObject)) {            String param = request。getQueryParams()。getFirst(matchKey);            if (!StringUtils。isEmpty(param)) {                return StringTools。match(param, matchMethod, matchRule);            }        } else if (MatchObjectEnum。HEADER。getCode()。equals(matchObject)) {            HttpHeaders headers = request。getHeaders();            String headerValue = headers。getFirst(matchKey);            if (!StringUtils。isEmpty(headerValue)) {                return StringTools。match(headerValue, matchMethod, matchRule);            }        }        return false;    }}

3.3 資料同步

app資料同步

後臺服務(如訂單服務)啟動時,只將服務名,版本,ip地址和埠號註冊到了Nacos,並沒有例項的權重和啟用的外掛資訊怎麼辦?

一般線上的例項權重和外掛列表都是在管理介面配置,然後動態生效的,所以需要ship-admin定時更新例項的權重和外掛資訊到註冊中心。

對應程式碼ship-admin的NacosSyncListener

/** * @Author: Ship * @Description: * @Date: Created in 2020/12/30 */@Configurationpublic class NacosSyncListener implements ApplicationListener {    private static final Logger LOGGER = LoggerFactory。getLogger(NacosSyncListener。class);    private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,            new ShipThreadFactory(“nacos-sync”, true)。create());    @NacosInjected    private NamingService namingService;    @Value(“${nacos。discovery。server-addr}”)    private String baseUrl;    @Resource    private AppService appService;    @Override    public void onApplicationEvent(ContextRefreshedEvent event) {        if (event。getApplicationContext()。getParent() != null) {            return;        }        String url = “http://” + baseUrl + NacosConstants。INSTANCE_UPDATE_PATH;        scheduledPool。scheduleWithFixedDelay(new NacosSyncTask(namingService, url, appService), 0, 30L, TimeUnit。SECONDS);    }    class NacosSyncTask implements Runnable {        private NamingService namingService;        private String url;        private AppService appService;        private Gson gson = new GsonBuilder()。create();        public NacosSyncTask(NamingService namingService, String url, AppService appService) {            this。namingService = namingService;            this。url = url;            this。appService = appService;        }        /**         * Regular update weight,enabled plugins to nacos instance         */        @Override        public void run() {            try {                // get all app names                ListView services = namingService。getServicesOfServer(1, Integer。MAX_VALUE, NacosConstants。APP_GROUP_NAME);                if (CollectionUtils。isEmpty(services。getData())) {                    return;                }                List appNames = services。getData();                List appInfos = appService。getAppInfos(appNames);                for (AppInfoDTO appInfo : appInfos) {                    if (CollectionUtils。isEmpty(appInfo。getInstances())) {                        continue;                    }                    for (ServiceInstance instance : appInfo。getInstances()) {                        Map queryMap = buildQueryMap(appInfo, instance);                        String resp = OkhttpTool。doPut(url, queryMap, “”);                        LOGGER。debug(“response :{}”, resp);                    }                }            } catch (Exception e) {                LOGGER。error(“nacos sync task error”, e);            }        }        private Map buildQueryMap(AppInfoDTO appInfo, ServiceInstance instance) {            Map map = new HashMap<>();            map。put(“serviceName”, appInfo。getAppName());            map。put(“groupName”, NacosConstants。APP_GROUP_NAME);            map。put(“ip”, instance。getIp());            map。put(“port”, instance。getPort());            map。put(“weight”, instance。getWeight()。doubleValue());            NacosMetadata metadata = new NacosMetadata();            metadata。setAppName(appInfo。getAppName());            metadata。setVersion(instance。getVersion());            metadata。setPlugins(String。join(“,”, appInfo。getEnabledPlugins()));            map。put(“metadata”, StringTools。urlEncode(gson。toJson(metadata)));            map。put(“ephemeral”, true);            return map;        }    }}

ship-server再定時從Nacos拉取app資料更新到本地Map快取。

/** * @Author: Ship * @Description: sync data to local cache * @Date: Created in 2020/12/25 */@Configurationpublic class DataSyncTaskListener implements ApplicationListener {    private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,            new ShipThreadFactory(“service-sync”, true)。create());    @NacosInjected    private NamingService namingService;    @Autowired    private ServerConfigProperties properties;    @Override    public void onApplicationEvent(ContextRefreshedEvent event) {        if (event。getApplicationContext()。getParent() != null) {            return;        }        scheduledPool。scheduleWithFixedDelay(new DataSyncTask(namingService)                , 0L, properties。getCacheRefreshInterval(), TimeUnit。SECONDS);        WebsocketSyncCacheServer websocketSyncCacheServer = new WebsocketSyncCacheServer(properties。getWebSocketPort());        websocketSyncCacheServer。start();    }    class DataSyncTask implements Runnable {        private NamingService namingService;        public DataSyncTask(NamingService namingService) {            this。namingService = namingService;        }        @Override        public void run() {            try {                // get all app names                ListView services = namingService。getServicesOfServer(1, Integer。MAX_VALUE, NacosConstants。APP_GROUP_NAME);                if (CollectionUtils。isEmpty(services。getData())) {                    return;                }                List appNames = services。getData();                // get all instances                for (String appName : appNames) {                    List instanceList = namingService。getAllInstances(appName, NacosConstants。APP_GROUP_NAME);                    if (CollectionUtils。isEmpty(instanceList)) {                        continue;                    }                    ServiceCache。add(appName, buildServiceInstances(instanceList));                    List pluginNames = getEnabledPlugins(instanceList);                    PluginCache。add(appName, pluginNames);                }                ServiceCache。removeExpired(appNames);                PluginCache。removeExpired(appNames);            } catch (NacosException e) {                e。printStackTrace();            }        }        private List getEnabledPlugins(List instanceList) {            Instance instance = instanceList。get(0);            Map metadata = instance。getMetadata();            // plugins: DynamicRoute,Auth            String plugins = metadata。getOrDefault(“plugins”, ShipPluginEnum。DYNAMIC_ROUTE。getName());            return Arrays。stream(plugins。split(“,”))。collect(Collectors。toList());        }        private List buildServiceInstances(List instanceList) {            List list = new LinkedList<>();            instanceList。forEach(instance -> {                Map metadata = instance。getMetadata();                ServiceInstance serviceInstance = new ServiceInstance();                serviceInstance。setAppName(metadata。get(“appName”));                serviceInstance。setIp(instance。getIp());                serviceInstance。setPort(instance。getPort());                serviceInstance。setVersion(metadata。get(“version”));                serviceInstance。setWeight((int) instance。getWeight());                list。add(serviceInstance);            });            return list;        }    }}

路由規則資料同步

同時,如果使用者在管理後臺更新了路由規則,ship-admin需要推送規則資料到ship-server,這裡參考了soul閘道器的做法利用websocket在第一次建立連線後進行全量同步,此後路由規則發生變更就只作增量同步。

服務端WebsocketSyncCacheServer:

/** * @Author: Ship * @Description: * @Date: Created in 2020/12/28 */public class WebsocketSyncCacheServer extends WebSocketServer {    private final static Logger LOGGER = LoggerFactory。getLogger(WebsocketSyncCacheServer。class);    private Gson gson = new GsonBuilder()。create();    private MessageHandler messageHandler;    public WebsocketSyncCacheServer(Integer port) {        super(new InetSocketAddress(port));        this。messageHandler = new MessageHandler();    }    @Override    public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) {        LOGGER。info(“server is open”);    }    @Override    public void onClose(WebSocket webSocket, int i, String s, boolean b) {        LOGGER。info(“websocket server close。。。”);    }    @Override    public void onMessage(WebSocket webSocket, String message) {        LOGGER。info(“websocket server receive message:\n[{}]”, message);        this。messageHandler。handler(message);    }    @Override    public void onError(WebSocket webSocket, Exception e) {    }    @Override    public void onStart() {        LOGGER。info(“websocket server start。。。”);    }    class MessageHandler {        public void handler(String message) {            RouteRuleOperationDTO operationDTO = gson。fromJson(message, RouteRuleOperationDTO。class);            if (CollectionUtils。isEmpty(operationDTO。getRuleList())) {                return;            }            Map> map = operationDTO。getRuleList()                    。stream()。collect(Collectors。groupingBy(AppRuleDTO::getAppName));            if (OperationTypeEnum。INSERT。getCode()。equals(operationDTO。getOperationType())                    || OperationTypeEnum。UPDATE。getCode()。equals(operationDTO。getOperationType())) {                RouteRuleCache。add(map);            } else if (OperationTypeEnum。DELETE。getCode()。equals(operationDTO。getOperationType())) {                RouteRuleCache。remove(map);            }        }    }}

客戶端WebsocketSyncCacheClient:

/** * @Author: Ship * @Description: * @Date: Created in 2020/12/28 */@Componentpublic class WebsocketSyncCacheClient {    private final static Logger LOGGER = LoggerFactory。getLogger(WebsocketSyncCacheClient。class);    private WebSocketClient client;    private RuleService ruleService;    private Gson gson = new GsonBuilder()。create();    public WebsocketSyncCacheClient(@Value(“${ship。server-web-socket-url}”) String serverWebSocketUrl,                                    RuleService ruleService) {        if (StringUtils。isEmpty(serverWebSocketUrl)) {            throw new ShipException(ShipExceptionEnum。CONFIG_ERROR);        }        this。ruleService = ruleService;        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1,                new ShipThreadFactory(“websocket-connect”, true)。create());        try {            client = new WebSocketClient(new URI(serverWebSocketUrl)) {                @Override                public void onOpen(ServerHandshake serverHandshake) {                    LOGGER。info(“client is open”);                    List list = ruleService。getEnabledRule();                    String msg = gson。toJson(new RouteRuleOperationDTO(OperationTypeEnum。INSERT, list));                    send(msg);                }                @Override                public void onMessage(String s) {                }                @Override                public void onClose(int i, String s, boolean b) {                }                @Override                public void onError(Exception e) {                    LOGGER。error(“websocket client error”, e);                }            };            client。connectBlocking();            //使用排程執行緒池進行斷線重連,30秒進行一次            executor。scheduleAtFixedRate(() -> {                if (client != null && client。isClosed()) {                    try {                        client。reconnectBlocking();                    } catch (InterruptedException e) {                        LOGGER。error(“reconnect server fail”, e);                    }                }            }, 10, 30, TimeUnit。SECONDS);        } catch (Exception e) {            LOGGER。error(“websocket sync cache exception”, e);            throw new ShipException(e。getMessage());        }    }    public  void send(T t) {        while (!client。getReadyState()。equals(ReadyState。OPEN)) {            LOGGER。debug(“connecting 。。。please wait”);        }        client。send(gson。toJson(t));    }}

四、測試

4.1動態路由測試

本地啟動nacos ,

sh startup.sh -m standalone

啟動ship-admin

本地啟動兩個ship-example例項。例項1配置:ship:

http:

app-name: order

version: gray_1。0

context-path: /order

port: 8081

admin-url: 127。0。0。1:9001

server:

port: 8081

nacos:

discovery:

server-addr: 127。0。0。1:8848例項2配置:ship:

http:

app-name: order

version: prod_1。0

context-path: /order

port: 8082

admin-url: 127。0。0。1:9001

server:

port: 8082

nacos:

discovery:

server-addr: 127。0。0。1:8848

在資料庫新增路由規則配置,該規則表示當http header 中的name=ship時請求路由到gray_1。0版本的節點。

【學】阿里終面:如何設計一個高效能閘道器?

圖片

啟動ship-server,看到以下日誌時則可以進行測試了。2021-01-02 19:57:09。159 INFO 30413 ——- [SocketWorker-29] cn。sp。sync。WebsocketSyncCacheServer : websocket server receive message:

[{

“operationType”

“INSERT”

“ruleList”

:[{

“id”

:1,

“appId”

:5,

“appName”

“order”

“version”

“gray_1。0”

“matchObject”

“HEADER”

“matchKey”

“name”

“matchMethod”

:1,

“matchRule”

“ship”

“priority”

:50}]}]

用Postman請求http://localhost:9000/order/user/add,POST方式,header設定name=ship,可以看到只有例項1有日誌顯示。==========add user,version:gray_1。0

4.2效能壓測

壓測環境:

MacBook Pro 13英寸

處理器 2。3 GHz 四核Intel Core i7

記憶體 16 GB 3733 MHz LPDDR4X

後端節點個數一個

壓測工具:wrk

壓測結果:20個執行緒,500個連線數,吞吐量大概每秒9400個請求。

【學】阿里終面:如何設計一個高效能閘道器?

壓測結果

五、總結

千里之行始於足下,開始以為寫一個閘道器會很難,但當你實際開始行動時就會發現其實沒那麼難,所以邁出第一步很重要。過程中也遇到了很多問題,還在github上給soul和nacos這兩個開源專案提了兩個issue,後來發現是自己的問題,尷尬。