最近项目需要使用到websocket,考虑到连接的稳定性,所以加上了重连;
Java-WebSocket在1.3.8版本及之后加上了重连,故本次使用的是其自带的api;
依赖版本为
<!-- websocket作为客户端--> <dependency> <groupId>org.java-websocket</groupId> <artifactId>Java-WebSocket</artifactId> <version>1.5.1</version> </dependency>
下面为代码展示 :
websocketClient客户端代码:
import com.demo.common.enums.DemoReconnectThreadEnum; import com.demo.utils.DateUtils; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import org.java_websocket.drafts.Draft_6455; import org.java_websocket.handshake.ServerHandshake; import java.net.URI; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * websocket的客户端 * */ @Slf4j public class DemoWebSocketClient extends WebSocketClient { public static final String HEARTBEAT_CMD = "此处为商定的保活命令"; public DemoWebSocketClient(URI serverUri) { super(serverUri, new Draft_6455()); } @Override public void onOpen(ServerHandshake serverHandshake) { //开启心跳保活 heartbeat(this); log.info("===建立连接,心跳保活开启==="); } @Override public void onMessage(String s) { log.info("{}时来自服务端的消息:{}", DateUtils.getTime(),s); } @Override public void onClose(int a, String s, boolean b) { //重连 log.info("由于:{},连接被关闭,开始尝试重新连接",s); DemoReconnectThreadEnum.getInstance().reconnectWs(this); } @Override public void onError(Exception e) { log.error("====websocket出现错误====" + e.getMessage()); } /** * 心跳保活 * @param var1 */ private void heartbeat(DemoWebSocketClient var1){ ScheduledExecutorService service = Executors.newScheduledThreadPool(1); Runnable runnable = () -> { if(var1 != null) { var1.send(HEARTBEAT_CMD); } }; service.scheduleAtFixedRate(runnable, 0, 3, TimeUnit.SECONDS); } }
执行重连代码:
import com.demo.pojo.DemoWebSocketClient; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 执行重连 */ public enum DemoReconnectThreadEnum { WebSocketInstance(){ @Override public void reconnectWs(DemoWebSocketClient demoWebSocketClient) { cachedThreadPool.execute(new Runnable() { @Override public void run() { try { //重连间隔一秒 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } demoWebSocketClient.reconnect(); } }); } }; private static final ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); public abstract void reconnectWs(DemoWebSocketClient demoWebSocketClient); public static DemoReconnectThreadEnum getInstance(){ return WebSocketInstance; } }
至此,完毕!