Netty高併發網路程式設計

5。1 Netty介紹

Netty 是一個廣泛使用的 Java 網路程式設計框架,它提供了一個易於使用的 API 客戶端和伺服器,它活躍和成長於使用者社群,像大型公司 Facebook 以及流行 開源專案如 Infinispan, HornetQ, Vert。x, Apache Cassandra 和 Elasticsearch 等,都利用其強大的對於網路抽象的核心程式碼。

​ Netty受到大公司青睞的原因:

1。併發高2。傳輸快3。封裝好

併發高:

​ Netty是一款基於NIO(Nonblocking I/O,非阻塞IO)開發的網路通訊框架,對比於BIO(Blocking I/O,阻塞IO),他的併發效能得到了很大提高,兩張圖讓你瞭解BIO和NIO的區別:

Netty高併發網路程式設計

Netty高併發網路程式設計

​ 從這兩圖可以看出,NIO的單執行緒能處理連線的數量比BIO要高出很多,而為什麼單執行緒能處理更多的連線呢?原因就是圖二中出現的Selector。

​ 當一個連線建立之後,他有兩個步驟要做,第一步是接收完客戶端發過來的全部資料,第二步是服務端處理完請求業務之後返回response給客戶端。NIO和BIO的區別主要是在第一步。

​ 在BIO中,等待客戶端發資料這個過程是阻塞的,這樣就造成了一個執行緒只能處理一個請求的情況,而機器能支援的最大執行緒數是有限的,這就是為什麼BIO不能支援高併發的原因。

​ 而NIO中,當一個Socket建立好之後,Thread並不會阻塞去接受這個Socket,而是將這個請求交給Selector,Selector會不斷的去遍歷所有的Socket,一旦有一個Socket建立完成,他會通知Thread,然後Thread處理完資料再返回給客戶端——這個過程是不阻塞的,這樣就能讓一個Thread處理更多的請求了。

傳輸快:

​ Netty的傳輸快其實也是依賴了NIO的一個特性——*零複製*。我們知道,Java的記憶體有堆記憶體、棧記憶體和字串常量池等等,其中堆記憶體是佔用記憶體空間最大的一塊,也是Java物件存放的地方,一般我們的資料如果需要從IO讀取到堆記憶體,中間需要經過Socket緩衝區,也就是說一個數據會被複製兩次才能到達他的的終點,如果資料量大,就會造成不必要的資源浪費。

​ Netty針對這種情況,使用了NIO中的另一大特性——零複製,當他需要接收資料的時候,他會在堆記憶體之外開闢一塊記憶體,資料就直接從IO讀到了那塊記憶體中去,在netty裡面透過ByteBuf可以直接對這些資料進行直接操作,從而加快了傳輸速度。

關於零複製理解可以參考:https://www。ibm。com/developerworks/cn/linux/l-cn-zerocopy1/index。html

封裝好:

​ Netty對NIO進行了封裝,程式碼簡潔,遠遠優於傳統Socket程式設計,我們來理解一下Netty的一些重要概念:

Netty高併發網路程式設計

Channel:

資料傳輸流,與channel相關的概念有以下四個,上一張圖讓你瞭解netty裡面的Channel。

1。Channel,表示一個連線,可以理解為每一個請求,就是一個Channel。2。ChannelHandler,核心處理業務就在這裡,用於處理業務請求。3。ChannelHandlerContext,用於傳輸業務資料。4。ChannelPipeline,用於儲存處理過程需要用到的ChannelHandler和ChannelHandlerContext。

Netty高併發網路程式設計

ByteBuf:

ByteBuf是一個儲存位元組的容器,最大特點就是使用方便,它既有自己的讀索引和寫索引,方便你對整段位元組快取進行讀寫,也支援get/set,方便你對其中每一個位元組進行讀寫,他的資料結構如上圖所示。

5。2 Netty+WebSocket

​ 我們來實現一個Netty+WebSocket整合案例,由於Netty+WebSocket整合程式碼比較麻煩,我們可以利用目前開源的專案netty-websocket-spring-boot-starter輕鬆實現Netty和WebSocket的整合。

我們搭建一個專案,專案叫seckill-message,用於處理通知使用者搶單狀態。

1)pom。xml

<!——db依賴——> com。seckill seckill-db 0。0。1-SNAPSHOT <!——Netty Websocket——> org。yeauty netty-websocket-spring-boot-starter 0。9。5

2)bootstrap。yml

server: port: 18088spring: application: name: seckill-message cloud: nacos: config: file-extension: yaml server-addr: nacos-server:8848 discovery: #Nacos的註冊地址 server-addr: nacos-server:8848 main: allow-bean-definition-overriding: true redis: cluster: nodes: - redis-server:7001 - redis-server:7002 - redis-server:7003 - redis-server:7004 - redis-server:7005 - redis-server:7006#websocket配置ws: port: 28082 host: 0。0。0。0

3)Redis配置

在專案中新增redis配置,這裡主要用於儲存websocket會話部分資訊,程式碼如下:

@Configurationpublic class RedisConfig { /*** * 模板操作物件序列化設定 * @param redissonConnectionFactory * @return */ @Bean(“redisTemplate”) public RedisTemplate getRedisTemplate(RedisConnectionFactory redissonConnectionFactory) { RedisTemplate redisTemplate = new RedisTemplate(); redisTemplate。setConnectionFactory(redissonConnectionFactory); redisTemplate。setValueSerializer(valueSerializer()); redisTemplate。setKeySerializer(keySerializer()); redisTemplate。setHashKeySerializer(keySerializer()); redisTemplate。setHashValueSerializer(valueSerializer()); redisTemplate。setEnableTransactionSupport(true); return redisTemplate; } /**** * 序列化設定 * @return */ @Bean public StringRedisSerializer keySerializer() { return new StringRedisSerializer(); } /**** * 序列化設定 * @return */ @Bean public RedisSerializer valueSerializer() { Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object。class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper。setVisibility(PropertyAccessor。ALL, JsonAutoDetect。Visibility。ANY); objectMapper。enableDefaultTyping(ObjectMapper。DefaultTyping。NON_FINAL); jackson2JsonRedisSerializer。setObjectMapper(objectMapper); return jackson2JsonRedisSerializer; }}

4)WebSocket會話處理

WebSocket會話處理我們使用了netty-websocket-spring-boot-starter相關的註解,netty-websocket-spring-boot-starter相關的註解可以參考,會話處理程式碼如下:

@Slf4j@Component@ServerEndpoint(path = “/ws/{userid}”,port = “${ws。port}”,host = “${ws。host}”)public class NettyWebSocketServer { /*** * 儲存使用者WebSocket會話 */ private static Map sessionMaps = new HashMap(); @Autowired private RedisTemplate redisTemplate; /*** * 當有新的WebSocket連線完成時,對該方法進行回撥 注入引數的型別:Session、HttpHeaders。。。 */ @OnOpen public void onOpen(Session session, HttpHeaders headers, @RequestParam String req, @RequestParam MultiValueMap reqMap, @PathVariable String arg, @PathVariable Map pathMap){ //獲取使用者ID String userId = pathMap。get(“userid”)。toString(); //將userId存入到Redis中,方便查詢Session String id = session。channel()。id()。toString(); redisTemplate。boundHashOps(“WebSocketLogin”)。put(id,userId); //將會話儲存起來 sessionMaps。put(userId,session); } /*** * 當有WebSocket連線關閉時,對該方法進行回撥 注入引數的型別:Session */ @OnClose public void onClose(Session session){ log。info(“會話關閉”); String key = session。channel()。id()。toString(); //獲取使用者ID String userId = redisTemplate。boundHashOps(“WebSocketLogin”)。get(key)。toString(); //刪除會話ID redisTemplate。boundHashOps(“WebSocketLogin”)。delete(key); //移除Session sessionMaps。remove(userId); } /*** * 當有WebSocket丟擲異常時,對該方法進行回撥 注入引數的型別:Session、Throwable */ @OnError public void onError(Session session, Throwable throwable){ throwable。printStackTrace(); } /*** * 當接收到字串訊息時,對該方法進行回撥 注入引數的型別:Session、String */ @OnMessage public void onMessage(Session session, String message){ session。sendText(“您的會話ID:”+session。channel()。id()+“,收到的訊息:”+message); } /*** * 當接收到二進位制訊息時,對該方法進行回撥 注入引數的型別:Session、byte[] */ @OnBinary public void onBinary(Session session, byte[] bytes){ session。sendBinary(bytes); } /*** * 當接收到Netty的事件時,對該方法進行回撥 注入引數的型別:Session、Object */ @OnEvent public void onEvent(Session session, Object evt){ if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; switch (idleStateEvent。state()) { case READER_IDLE: System。out。println(“read idle”); break; case WRITER_IDLE: System。out。println(“write idle”); break; case ALL_IDLE: System。out。println(“all idle”); break; default: break; } } } /*** * 給指定使用者傳送訊息 * @param userId * @param msg */ public void sendMessage(String userId, String msg) { Session session = sessionMaps。get(userId); if(session!=null){ session。sendText(msg); } }}

並且我們需要注入ServerEndpointExporter物件,程式碼如下:

@Configurationpublic class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter(){ return new ServerEndpointExporter(); }}

5)測試

建立啟動類:

@SpringBootApplication(exclude = DataSourceAutoConfiguration。class)public class NettyWebSocketServerApplication { public static void main(String[] args) { SpringApplication。run(NettyWebSocketServerApplication。class,args); }}

主動向客戶端發訊息:

@RestController@CrossOrigin@RequestMapping(value = “/msg”)public class SendMessageController { @Autowired private NettyWebSocketServer nettyWebSocketServer; /*** * 訊息傳送 * @param userId * @param msg * @return */ @GetMapping(value = “/send/{userId}”) public String send(@PathVariable(value = “userId”)String userId,String msg){ nettyWebSocketServer。sendMessage(userId,msg); return “傳送成功”; }}

我們這裡編寫了2個WebSocket頁面:

<!DOCTYPE html> websocket

測試效果如下:

Netty高併發網路程式設計

5。3 訂單狀態更新通知

我們為剛才編寫的WebSocket編寫一個Feign,並在熱點搶單成功的地方呼叫通知使用者搶單成功即可。

1)Feign編寫

我們先把接收訊息的方法改一下,接收一個Map訊息,程式碼如下:

@RestController@CrossOrigin@RequestMapping(value = “/msg”)public class SendMessageController { @Autowired private NettyWebSocketServer nettyWebSocketServer; /*** * 訊息傳送 * @return */ @PostMapping(value = “/send/{userId}”) public String send(@PathVariable(value = “userId”)String userId,@RequestParam Map dataMap){ nettyWebSocketServer。sendMessage(userId, JSON。toJSONString(dataMap)); return “傳送成功”; }}

建立feign,程式碼如下:

@FeignClient(value = “seckill-message”)public interface MessageFeign { /**** * 訊息傳送 * @return */ @PostMapping(value = “/msg/send/{userId}”) String send(@PathVariable(value = “userId”)String userId, @RequestParam Map dataMap);}

2)搶單訊息通知

修改熱點商品下單,在這裡根據使用者名稱進行通知,程式碼如下:

/*** * 秒殺下單 * @param orderMap */@Overridepublic void addHotOrder(Map orderMap) { String id = orderMap。get(“id”); String username = orderMap。get(“username”); //key String key = “SKU_” + id; //分散式鎖的key String lockkey = “LOCKSKU_” + id; //使用者購買的key String userKey = “USER” + username + “ID” + id; //嘗試獲取鎖,等待10秒,自己獲得鎖後一直不解鎖則10秒後自動解鎖 boolean bo = distributedLocker。tryLock(lockkey, TimeUnit。SECONDS, 10L, 10L); if(bo){ if (redisTemplate。hasKey(key)) { //。。。略 } //解鎖 distributedLocker。unlock(lockkey); //通知使用者搶單成功 Map dataMap = new HashMap(); dataMap。put(“code”,200); dataMap。put(“message”,“搶單成功!”); messageFeign。send(username,dataMap); }else{ Map dataMap = new HashMap(); dataMap。put(“code”,20001); dataMap。put(“message”,“搶單失敗!”); messageFeign。send(username,dataMap); }}

測試熱點商品搶單的時候,返回資料如下:

{“code”:“200”,“message”:“搶單成功!”}