springboot(二)使用@MessageMapping接收各種型別資料

各種方式:

1、傳送資料攜帶使用者ID

2、傳送JSON資料體

3、將引數攜帶到傳送請求的URL路徑中

4、傳送header

5、傳送Httpsession中的資料

1、傳送資料攜帶使用者ID

攜帶的使用者ID可以直接拿到給MessageMapping註解的函式注入,後端可以使用這個ID雙向通訊,需要定義一個實體實現

Principal

,實現

getName()

方法。

@Getter@Setterpublic class User implements Principal { private String username; private String password; private String role; private List urls; @Override public String getName() { return username; }}

定義使用者攔截器做認證,並生成User,注入StompHeaderAccessor

/** *使用者攔截器 **/public class UserInterceptor implements ChannelInterceptor { @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor。getAccessor(message, StompHeaderAccessor。class); if (StompCommand。CONNECT。equals(accessor。getCommand())) { Object raw = message。getHeaders()。get(SimpMessageHeaderAccessor。NATIVE_HEADERS); if (raw instanceof Map) { //這裡就是token Object name = ((Map) raw)。get(Constants。TOKEN_KEY); if (name instanceof LinkedList) { // 設定當前訪問器的認證使用者 String token = ((LinkedList) name)。get(0)。toString(); String username = null; try { Map claimMap = JWTUtils。verifyToken(token); username = claimMap。get(“username”)。asString(); if(username == null){ throw new RuntimeException(“websocket認證失敗”); } } catch (UnsupportedEncodingException e) { e。printStackTrace(); throw new RuntimeException(“websocket認證失敗”, e); } catch (ValidTokenException e) { e。printStackTrace(); throw new RuntimeException(“websocket認證失敗”, e); } User user = new User(); user。setUsername(username); accessor。setUser(user);// User user = new User();// user。setUsername(“lalala”);// accessor。setUser(user); } } } return message; } @Override public void postSend(Message<?> message, MessageChannel channel, boolean sent) { } @Override public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) { } @Override public boolean preReceive(MessageChannel channel) { return false; } @Override public Message<?> postReceive(Message<?> message, MessageChannel channel) { return null; } @Override public void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) { }}

/*將客戶端渠道攔截器加入spring ioc容器*/ @Bean public UserInterceptor createUserInterceptor() { return new UserInterceptor(); }

服務端:

/** * 接收使用者資訊 * */ @MessageMapping(value = “/principal”) public void test(Principal principal) { log。info(“當前線上人數:” + userRegistry。getUserCount()); int i = 1; for (SimpUser user : userRegistry。getUsers()) { log。info(“使用者” + i++ + “——-” + user); } //傳送訊息給指定使用者 messagingTemplate。convertAndSendToUser(principal。getName(), “/queue/message”,“伺服器主動推的資料”); }

客戶端:

/** * 傳送使用者資訊 * */ function send0() { stompClient。send(“/app/principal”, {}, {}); }

2、傳送JSON資料體

服務端可以直接在函式中注入JavaBean或者Map,List或者String接收

服務端:

/*點對點通訊*/ @MessageMapping(value = “/P2P”) public void templateTest(Principal principal, Map data) { log。info(“當前線上人數:” + userRegistry。getUserCount()); int i = 1; for (SimpUser user : userRegistry。getUsers()) { log。info(“使用者” + i++ + “——-” + user); } //傳送訊息給指定使用者 messagingTemplate。convertAndSendToUser(principal。getName(), “/queue/message”,“伺服器主動推的資料”); }

客戶端:

/** * 傳送JSON資料體 * */ function send() { stompClient。send(“/app/P2P”, {}, JSON。stringify({ ‘name’: ‘test’ })); }

3、將引數攜帶到傳送請求的URL路徑中

使用

@DestinationVariable

註解,類似SpringMVC的

@PathVirable

服務端:

/** * 接收路徑引數 * */ @MessageMapping(value = “/path/{name}/{company}”) public void pathTest(Principal principal, @DestinationVariable String name, @DestinationVariable String company) { log。info(“當前線上人數:” + userRegistry。getUserCount()); int i = 1; for (SimpUser user : userRegistry。getUsers()) { log。info(“使用者” + i++ + “——-” + user); } //傳送訊息給指定使用者 messagingTemplate。convertAndSendToUser(principal。getName(), “/queue/message”,“伺服器主動推的資料”); }

客戶端:

/** * 傳送路徑引數 * */ function send2() { stompClient。send(“/app/path/zhangsan/XXX公司”, {}, {}); }

4、傳送header

使用@Header註解

服務端:

/** * 接收header引數 * */ @MessageMapping(value = “/header”) public void headerTest(Principal principal, @Header String one, @Header String two) { log。info(“當前線上人數:” + userRegistry。getUserCount()); int i = 1; for (SimpUser user : userRegistry。getUsers()) { log。info(“使用者” + i++ + “——-” + user); } //傳送訊息給指定使用者 messagingTemplate。convertAndSendToUser(principal。getName(), “/queue/message”,“伺服器主動推的資料”); }

客戶端:

/** * 傳送header引數 * */ function send3() { stompClient。send(“/app/header”, {“one”:“lalala”, “two”:“中國”}, {}); }

5、傳送Httpsession中的資料

這裡有一點兒小問題,我理解的是隻能傳送握手連線時的HttpSession中的資料

註冊HttpSessionHandshakeIntercepror

/** * 註冊stomp的端點 */ @Override public void registerStompEndpoints(StompEndpointRegistry registry) { // 允許使用socketJs方式訪問,訪問點為webSocketServer,允許跨域 // 在網頁上我們就可以透過這個連結 // http://localhost:8080/webSocketServer // 來和伺服器的WebSocket連線 registry。addEndpoint(“/webSocketServer”) 。addInterceptors(new HttpSessionHandshakeInterceptor()) 。setAllowedOrigins(“*”) 。withSockJS(); }

服務端:

/** * 接收HttpSession資料 * */ @MessageMapping(value = “/httpsession”) public void httpsession( StompHeaderAccessor accessor) { String name = (String) accessor。getSessionAttributes()。get(“name”); System。out。println(1111); }

客戶端:

/** * 傳送httpsession * */ function send4() { stompClient。send(“/app/httpsession”, {}, {}); }

6、所有程式碼

前端JS:

<!DOCTYPE html> Title

後端MessaeMapping處:

package com。iscas。biz。test。controller;import com。iscas。templet。common。ResponseEntity;import lombok。extern。slf4j。Slf4j;import org。springframework。beans。factory。annotation。Autowired;import org。springframework。messaging。handler。annotation。*;import org。springframework。messaging。simp。SimpMessagingTemplate;import org。springframework。messaging。simp。stomp。StompHeaderAccessor;import org。springframework。messaging。simp。user。SimpUser;import org。springframework。messaging。simp。user。SimpUserRegistry;import org。springframework。web。bind。annotation。RestController;import java。security。Principal;import java。util。Map;/** * 如有要看例子,請開啟註釋 * **/@RestController@Slf4jpublic class WebSoketDemoController { //spring提供的傳送訊息模板 @Autowired private SimpMessagingTemplate messagingTemplate; @Autowired private SimpUserRegistry userRegistry; /** * 接收使用者資訊 * */ @MessageMapping(value = “/principal”) public void test(Principal principal) { log。info(“當前線上人數:” + userRegistry。getUserCount()); int i = 1; for (SimpUser user : userRegistry。getUsers()) { log。info(“使用者” + i++ + “——-” + user); } //傳送訊息給指定使用者 messagingTemplate。convertAndSendToUser(principal。getName(), “/queue/message”,“伺服器主動推的資料”); } /** * 接收資料體 * */ @MessageMapping(value = “/P2P”) public void templateTest(Principal principal, Map data) { log。info(“當前線上人數:” + userRegistry。getUserCount()); int i = 1; for (SimpUser user : userRegistry。getUsers()) { log。info(“使用者” + i++ + “——-” + user); } //傳送訊息給指定使用者 messagingTemplate。convertAndSendToUser(principal。getName(), “/queue/message”,“伺服器主動推的資料”); } /** * 接收路徑引數 * */ @MessageMapping(value = “/path/{name}/{company}”) public void pathTest(Principal principal, @DestinationVariable String name, @DestinationVariable String company) { log。info(“當前線上人數:” + userRegistry。getUserCount()); int i = 1; for (SimpUser user : userRegistry。getUsers()) { log。info(“使用者” + i++ + “——-” + user); } //傳送訊息給指定使用者 messagingTemplate。convertAndSendToUser(principal。getName(), “/queue/message”,“伺服器主動推的資料”); } /** * 接收header引數 * */ @MessageMapping(value = “/header”) public void headerTest(Principal principal, @Header String one, @Header String two) { log。info(“當前線上人數:” + userRegistry。getUserCount()); int i = 1; for (SimpUser user : userRegistry。getUsers()) { log。info(“使用者” + i++ + “——-” + user); } //傳送訊息給指定使用者 messagingTemplate。convertAndSendToUser(principal。getName(), “/queue/message”,“伺服器主動推的資料”); } /** * 接收HttpSession資料 * */ @MessageMapping(value = “/httpsession”) public void httpsession( StompHeaderAccessor accessor) { String name = (String) accessor。getSessionAttributes()。get(“name”); System。out。println(1111); }// /**// * 接收param資料// * */// @MessageMapping(value = “/param”)// public void param(String name) {// System。out。println(1111);// } /*廣播*/ @MessageMapping(“/broadcast”) @SendTo(“/topic/getResponse”) public ResponseEntity topic() throws Exception { return new ResponseEntity(200,“success”); }}

Websocket配置類:

package com。iscas。base。biz。config。stomp;import org。springframework。context。annotation。Bean;import org。springframework。messaging。simp。config。ChannelRegistration;import org。springframework。messaging。simp。config。MessageBrokerRegistry;import org。springframework。web。socket。config。annotation。*;import org。springframework。web。socket。server。support。HttpSessionHandshakeInterceptor;/** * webscoket配置 ** @auth zhuquanwen * **///@Configuration@EnableWebSocketMessageBrokerpublic class WebSocketStompConfig /*extends AbstractWebSocketMessageBrokerConfigurer*/ implements WebSocketMessageBrokerConfigurer { /** * 註冊stomp的端點 */ @Override public void registerStompEndpoints(StompEndpointRegistry registry) { // 允許使用socketJs方式訪問,訪問點為webSocketServer,允許跨域 // 在網頁上我們就可以透過這個連結 // http://localhost:8080/webSocketServer // 來和伺服器的WebSocket連線 registry。addEndpoint(“/webSocketServer”) 。addInterceptors(new HttpSessionHandshakeInterceptor()) 。setAllowedOrigins(“*”) 。withSockJS(); } /** * 配置資訊代理 */ @Override public void configureMessageBroker(MessageBrokerRegistry registry) { // 訂閱Broker名稱 registry。enableSimpleBroker(“/queue”, “/topic”); // 全域性使用的訊息字首(客戶端訂閱路徑上會體現出來) registry。setApplicationDestinationPrefixes(“/app”); // 點對點使用的訂閱字首(客戶端訂閱路徑上會體現出來),不設定的話,預設也是/user/ registry。setUserDestinationPrefix(“/user/”); } /** * 配置客戶端入站通道攔截器 */ @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration。interceptors(createUserInterceptor()); } /*將客戶端渠道攔截器加入spring ioc容器*/ @Bean public UserInterceptor createUserInterceptor() { return new UserInterceptor(); } @Override public void configureWebSocketTransport(WebSocketTransportRegistration registration) { registration。setMessageSizeLimit(500 * 1024 * 1024); registration。setSendBufferSizeLimit(1024 * 1024 * 1024); registration。setSendTimeLimit(200000); }}

使用者攔截器

package com。iscas。base。biz。config。stomp;import com。auth0。jwt。interfaces。Claim;import com。iscas。base。biz。config。Constants;import com。iscas。base。biz。util。SpringUtils;import com。iscas。templet。exception。ValidTokenException;import com。iscas。base。biz。model。auth。User;import com。iscas。base。biz。util。JWTUtils;import org。springframework。messaging。Message;import org。springframework。messaging。MessageChannel;import org。springframework。messaging。simp。SimpMessageHeaderAccessor;import org。springframework。messaging。simp。stomp。StompCommand;import org。springframework。messaging。simp。stomp。StompHeaderAccessor;import org。springframework。messaging。support。ChannelInterceptor;import org。springframework。messaging。support。MessageHeaderAccessor;import javax。servlet。http。HttpSession;import java。io。UnsupportedEncodingException;import java。util。LinkedList;import java。util。Map;/** *使用者攔截器 **/public class UserInterceptor implements ChannelInterceptor { @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor。getAccessor(message, StompHeaderAccessor。class); if (StompCommand。CONNECT。equals(accessor。getCommand())) { Object raw = message。getHeaders()。get(SimpMessageHeaderAccessor。NATIVE_HEADERS); if (raw instanceof Map) { //這裡就是token Object name = ((Map) raw)。get(Constants。TOKEN_KEY); if (name instanceof LinkedList) { // 設定當前訪問器的認證使用者// String token = ((LinkedList) name)。get(0)。toString();// String username = null;// try {// Map claimMap = JWTUtils。verifyToken(token);// username = claimMap。get(“username”)。asString();// if(username == null){// throw new RuntimeException(“websocket認證失敗”);// }// } catch (UnsupportedEncodingException e) {// e。printStackTrace();// throw new RuntimeException(“websocket認證失敗”, e);// } catch (ValidTokenException e) {// e。printStackTrace();// throw new RuntimeException(“websocket認證失敗”, e);// }// User user = new User();// user。setUsername(username);// accessor。setUser(user); User user = new User(); user。setUsername(“lalala”); accessor。setUser(user); } } } else if (StompCommand。SEND。equals(accessor。getCommand())) { //傳送資料 } return message; } @Override public void postSend(Message<?> message, MessageChannel channel, boolean sent) { } @Override public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) { } @Override public boolean preReceive(MessageChannel channel) { return false; } @Override public Message<?> postReceive(Message<?> message, MessageChannel channel) { return null; } @Override public void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) { }}