t-io作为目前国内最流行的开源网络编程框架软件,以简单易懂,上手容易而著称,相同的功能比起netty实现起来,要简单的多,代码量也大大减少,目前作者基于t-io开发了性能强大的即时通讯软件-谭聊,完全继承了t-io的功能,单机版并发数达到100万,集群版并发数过亿,大家如果想好好使用t-io,还是要先学习t-io的一些基本知识,这篇文章主要从8个方面介绍了t-io的基础知识。

t-io收发消息过程

t-io收发消息及处理过程,可以用一张图清晰地表达出来:

应用层包:Packet

Packet是用于表述业务数据结构的,我们通过继承Packet来实现自己的业务数据结构,对于各位而言,把Packet看作是一个普通的VO对象即可。

注意:不建议直接使用Packet对象,而是要继承Packet

package org.tio.study.helloworld.common;import org.tio.core.intf.Packet;/** * @author tanyaowu */public class HelloPacket extends Packet { private static final long serialVersionUID = -172060606924066412L; public static final int HEADER_LENGTH = 4;//消息头的长度 public static final String CHARSET = "utf-8"; private byte[] body; /** * @return the body */ public byte[] getBody() { return body; } /** * @param body the body to set */ public void setBody(byte[] body) { this.body = body; }}


可以结合AioHandler.java理解Packet

import java.nio.ByteBuffer;import org.tio.core.ChannelContext;import org.tio.core.TioConfig;import org.tio.core.exception.AioDecodeException;/** * * @author tanyaowu * 2017年10月19日 上午9:40:15 */public interface AioHandler { /** * 根据ByteBuffer解码成业务需要的Packet对象. * 如果收到的数据不全,导致解码失败,请返回null,在下次消息来时框架层会自动续上前面的收到的数据 * @param buffer 参与本次希望解码的ByteBuffer * @param limit ByteBuffer的limit * @param position ByteBuffer的position,不一定是0哦 * @param readableLength ByteBuffer参与本次解码的有效数据(= limit - position) * @param channelContext * @return * @throws AioDecodeException */ Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException; /** * 编码 * @param packet * @param tioConfig * @param channelContext * @return * @author: tanyaowu */ ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext); /** * 处理消息包 * @param packet * @param channelContext * @throws Exception * @author: tanyaowu */ void handler(Packet packet, ChannelContext channelContext) throws Exception;}单条TCP连接上下文:ChannelContext

每一个tcp连接的建立都会产生一个ChannelContext对象,这是个抽象类,如果你是用t-io作tcp客户端,那么就是ClientChannelContext,如果你是用tio作tcp服务器,那么就是ServerChannelContext

用户可以把业务数据通过ChannelContext对象和TCP连接关联起来,像下面这样设置属性

ChannelContext.set(String key, Object value)

然后用下面的方式获取属性

ChannelContext.get(String key)

当然最最常用的还是用t-io提供的强到没对手的bind功能,譬如用下面的代码绑定userid

Tio.bindUser(ChannelContext channelContext, String userid)

然后可以通过userid进行操作,示范代码如下

//获取某用户的ChannelContext集合SetWithLock<ChannelContext> set = Tio.getChannelContextsByUserid(tioConfig, userid);//给某用户发消息Tio.sendToUser(TioConfig, userid, Packet)

除了可以绑定userid,t-io还内置了如下绑定API

无序列表绑定业务id

Tio.bindBsId(ChannelContext channelContext, String bsId)绑定token

Tio.bindToken(ChannelContext channelContext, String token)绑定群组

Tio.bindGroup(ChannelContext channelContext, String group)

ChannelContext对象包含的信息非常多,主要对象见下图

说明
ChannelContext是t-io中非常重要的类,他是业务和连接的沟通桥梁!

服务配置与维护:TioConfig

场景:我们在写TCP Server时,都会先选好一个端口以监听客户端连接,再创建N组线程池来执行相关的任务,譬如发送消息、解码数据包、处理数据包等任务,还要维护客户端连接的各种数据,为了和业务互动,还要把这些客户端连接和各种业务数据绑定起来,譬如把某个客户端绑定到一个群组,绑定到一个userid,绑定到一个token等。
TioConfig就是解决以上场景的:配置线程池、监听端口,维护客户端各种数据等的。

TioConfig是个抽象类

如果你是用tio作tcp客户端,那么你需要创建ClientTioConfig对象 服务器端对应一个ClientTioConfig对象如果你是用tio作tcp服务器,那么你需要创建ServerTioConfig 一个监听端口对应一个ServerTioConfig ,一个jvm可以监听多个端口,所以一个jvm可以有多个ServerTioConfig对象TioConfig对象包含的信息非常多,主要对象见下图

如何获取TioConfig对象
见:「链接」

编码、解码、处理:AioHandler

AioHandler是处理消息的核心接口,它有两个子接口,ClientAioHandler和ServerAioHandler,当用tio作tcp客户端时需要实现ClientAioHandler,当用tio作tcp服务器时需要实现ServerAioHandler,它主要定义了3个方法,见下

import java.nio.ByteBuffer;import org.tio.core.ChannelContext;import org.tio.core.TioConfig;import org.tio.core.exception.AioDecodeException;/** * * @author tanyaowu * 2017年10月19日 上午9:40:15 */public interface AioHandler { /** * 根据ByteBuffer解码成业务需要的Packet对象. * 如果收到的数据不全,导致解码失败,请返回null,在下次消息来时框架层会自动续上前面的收到的数据 * @param buffer 参与本次希望解码的ByteBuffer * @param limit ByteBuffer的limit * @param position ByteBuffer的position,不一定是0哦 * @param readableLength ByteBuffer参与本次解码的有效数据(= limit - position) * @param channelContext * @return * @throws AioDecodeException */ Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException; /** * 编码 * @param packet * @param tioConfig * @param channelContext * @return * @author: tanyaowu */ ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext); /** * 处理消息包 * @param packet * @param channelContext * @throws Exception * @author: tanyaowu */ void handler(Packet packet, ChannelContext channelContext) throws Exception;}消息来往监听:AioListener

AioListener是处理消息的核心接口,它有两个子接口:ClientAioListener和ServerAioListener

当用tio作tcp客户端时需要实现ClientAioListener
当用tio作tcp服务器时需要实现ServerAioListener
它主要定义了如下方法

package org.tio.core.intf;import org.tio.core.ChannelContext;/** * * @author tanyaowu * 2017年4月1日 上午9:34:08 */public interface AioListener { /** * 建链后触发本方法,注:建链不一定成功,需要关注参数isConnected * @param channelContext * @param isConnected 是否连接成功,true:表示连接成功,false:表示连接失败 * @param isReconnect 是否是重连, true: 表示这是重新连接,false: 表示这是第一次连接 * @throws Exception * @author: tanyaowu */ public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) throws Exception; /** * 原方法名:onAfterDecoded * 解码成功后触发本方法 * @param channelContext * @param packet * @param packetSize * @throws Exception * @author: tanyaowu */ public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) throws Exception; /** * 接收到TCP层传过来的数据后 * @param channelContext * @param receivedBytes 本次接收了多少字节 * @throws Exception */ public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) throws Exception; /** * 消息包发送之后触发本方法 * @param channelContext * @param packet * @param isSentSuccess true:发送成功,false:发送失败 * @throws Exception * @author tanyaowu */ public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) throws Exception; /** * 处理一个消息包后 * @param channelContext * @param packet * @param cost 本次处理消息耗时,单位:毫秒 * @throws Exception */ public void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) throws Exception; /** * 连接关闭前触发本方法 * @param channelContext the channelcontext * @param throwable the throwable 有可能为空 * @param remark the remark 有可能为空 * @param isRemove * @author tanyaowu * @throws Exception */ public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception; /** * 连接关闭前后触发本方法 * 警告:走到这个里面时,很多绑定的业务都已经解绑了,所以这个方法一般是空着不实现的 * @param channelContext the channelcontext * @param throwable the throwable 有可能为空 * @param remark the remark 有可能为空 * @param isRemove 是否是删除 * @throws Exception * @author: tanyaowu */// public void onAfterClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception;}服务器端入口:TioServer

这个对象大家稍微了解一下即可,服务器启动时会用到这个对象,简单贴一下它的源代码吧,大家只需要关注它有一个start()方法是用来启动网络服务的即可

import java.io.IOException;import java.lang.management.ManagementFactory;import java.lang.management.RuntimeMXBean;import java.net.InetSocketAddress;import java.net.StandardSocketOptions;import java.nio.channels.AsynchronousChannelGroup;import java.nio.channels.AsynchronousServerSocketChannel;import java.util.ArrayList;import java.util.Date;import java.util.List;import java.util.concurrent.TimeUnit;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.tio.core.Node;import org.tio.utils.SysConst;import org.tio.utils.date.DateUtils;import org.tio.utils.hutool.StrUtil;/** * @author tanyaowu * */public class TioServer { private static Logger log = LoggerFactory.getLogger(TioServer.class); private ServerTioConfig serverTioConfig; private AsynchronousServerSocketChannel serverSocketChannel; private AsynchronousChannelGroup channelGroup = null; private Node serverNode; private boolean isWaitingStop = false; /** * * @param serverTioConfig * * @author tanyaowu * 2017年1月2日 下午5:53:06 * */ public TioServer(ServerTioConfig serverTioConfig) { super(); this.serverTioConfig = serverTioConfig; } /** * @return the serverTioConfig */ public ServerTioConfig getServerTioConfig() { return serverTioConfig; } /** * @return the serverNode */ public Node getServerNode() { return serverNode; } /** * @return the serverSocketChannel */ public AsynchronousServerSocketChannel getServerSocketChannel() { return serverSocketChannel; } /** * @return the isWaitingStop */ public boolean isWaitingStop() { return isWaitingStop; } /** * @param serverTioConfig the serverTioConfig to set */ public void setServerTioConfig(ServerTioConfig serverTioConfig) { this.serverTioConfig = serverTioConfig; } /** * @param isWaitingStop the isWaitingStop to set */ public void setWaitingStop(boolean isWaitingStop) { this.isWaitingStop = isWaitingStop; } public void start(String serverIp, int serverPort) throws IOException { long start = System.currentTimeMillis(); this.serverNode = new Node(serverIp, serverPort); channelGroup = AsynchronousChannelGroup.withThreadPool(serverTioConfig.groupExecutor); serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup); serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 64 * 1024); InetSocketAddress listenAddress = null; if (StrUtil.isBlank(serverIp)) { listenAddress = new InetSocketAddress(serverPort); } else { listenAddress = new InetSocketAddress(serverIp, serverPort); } serverSocketChannel.bind(listenAddress, 0); AcceptCompletionHandler acceptCompletionHandler = serverTioConfig.getAcceptCompletionHandler(); serverSocketChannel.accept(this, acceptCompletionHandler); serverTioConfig.startTime = System.currentTimeMillis(); //下面这段代码有点无聊,写得随意,纯粹是为了打印好看些 String baseStr = "|----------------------------------------------------------------------------------------|"; int baseLen = baseStr.length(); StackTraceElement[] ses = Thread.currentThread().getStackTrace(); StackTraceElement se = ses[ses.length - 1]; int xxLen = 18; int aaLen = baseLen - 3; List<String> infoList = new ArrayList<>(); infoList.add(StrUtil.fillAfter("Tio gitee address", ' ', xxLen) + "| " + SysConst.TIO_URL_GITEE); infoList.add(StrUtil.fillAfter("Tio site address", ' ', xxLen) + "| " + SysConst.TIO_URL_SITE); infoList.add(StrUtil.fillAfter("Tio version", ' ', xxLen) + "| " + SysConst.TIO_CORE_VERSION); infoList.add(StrUtil.fillAfter("-", '-', aaLen)); infoList.add(StrUtil.fillAfter("TioConfig name", ' ', xxLen) + "| " + serverTioConfig.getName()); infoList.add(StrUtil.fillAfter("Started at", ' ', xxLen) + "| " + DateUtils.formatDateTime(new Date())); infoList.add(StrUtil.fillAfter("Listen on", ' ', xxLen) + "| " + this.serverNode); infoList.add(StrUtil.fillAfter("Main Class", ' ', xxLen) + "| " + se.getClassName()); try { RuntimeMXBean runtimeMxBean = ManagementFactory.getRuntimeMXBean(); String runtimeName = runtimeMxBean.getName(); String pid = runtimeName.split("@")[0]; long startTime = runtimeMxBean.getStartTime(); long startCost = System.currentTimeMillis() - startTime; infoList.add(StrUtil.fillAfter("Jvm start time", ' ', xxLen) + "| " + startCost + " ms"); infoList.add(StrUtil.fillAfter("Tio start time", ' ', xxLen) + "| " + (System.currentTimeMillis() - start) + " ms"); infoList.add(StrUtil.fillAfter("Pid", ' ', xxLen) + "| " + pid); } catch (Exception e) { } //100 String printStr = "\r\n"+baseStr+"\r\n"; // printStr += "|--" + leftStr + " " + info + " " + rightStr + "--|\r\n"; for (String string : infoList) { printStr += "| " + StrUtil.fillAfter(string, ' ', aaLen) + "|\r\n"; } printStr += baseStr + "\r\n"; if (log.isInfoEnabled()) { log.info(printStr); } else { System.out.println(printStr); } } /** * * @return * @author tanyaowu */ public boolean stop() { isWaitingStop = true; boolean ret = true; try { channelGroup.shutdownNow(); } catch (Exception e) { log.error("channelGroup.shutdownNow()时报错", e); } try { serverSocketChannel.close(); } catch (Exception e1) { log.error("serverSocketChannel.close()时报错", e1); } try { serverTioConfig.groupExecutor.shutdown(); } catch (Exception e1) { log.error(e1.toString(), e1); } try { serverTioConfig.tioExecutor.shutdown(); } catch (Exception e1) { log.error(e1.toString(), e1); } serverTioConfig.setStopped(true); try { ret = ret && serverTioConfig.groupExecutor.awaitTermination(6000, TimeUnit.SECONDS); ret = ret && serverTioConfig.tioExecutor.awaitTermination(6000, TimeUnit.SECONDS); } catch (InterruptedException e) { log.error(e.getLocalizedMessage(), e); } log.info(this.serverNode + " stopped"); return ret; }}客户端入口:TioClient

只有当你在用t-io作为TCP客户端时,才用得到TioClient,此处简单贴一下它的源代码,它的用法,见后面的showcase示范工程

package org.tio.client;import java.io.IOException;import java.net.InetSocketAddress;import java.net.StandardSocketOptions;import java.nio.channels.AsynchronousChannelGroup;import java.nio.channels.AsynchronousSocketChannel;import java.util.Set;import java.util.concurrent.CountDownLatch;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.ReentrantReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.tio.client.intf.ClientAioHandler;import org.tio.core.ChannelContext;import org.tio.core.Node;import org.tio.core.Tio;import org.tio.core.intf.Packet;import org.tio.core.ssl.SslFacadeContext;import org.tio.core.stat.ChannelStat;import org.tio.utils.SystemTimer;import org.tio.utils.hutool.StrUtil;import org.tio.utils.lock.SetWithLock;/** * * @author tanyaowu * 2017年4月1日 上午9:29:58 */public class TioClient { /** * 自动重连任务 * @author tanyaowu * */ private static class ReconnRunnable implements Runnable { ClientChannelContext channelContext = null; TioClient tioClient = null; // private static Map<Node, Long> cacheMap = new HashMap<>(); public ReconnRunnable(ClientChannelContext channelContext, TioClient tioClient) { this.channelContext = channelContext; this.tioClient = tioClient; } /** * @see java.lang.Runnable#run() * * @author tanyaowu * 2017年2月2日 下午8:24:40 * */ @Override public void run() { ReentrantReadWriteLock closeLock = channelContext.closeLock; WriteLock writeLock = closeLock.writeLock(); writeLock.lock(); try { if (!channelContext.isClosed) //已经连上了,不需要再重连了 { return; } long start = SystemTimer.currTime; tioClient.reconnect(channelContext, 2); long end = SystemTimer.currTime; long iv = end - start; if (iv >= 100) { log.error("{},重连耗时:{} ms", channelContext, iv); } else { log.info("{},重连耗时:{} ms", channelContext, iv); } if (channelContext.isClosed) { channelContext.setReconnCount(channelContext.getReconnCount() + 1); // cacheMap.put(channelContext.getServerNode(), SystemTimer.currTime); return; } } catch (java.lang.Throwable e) { log.error(e.toString(), e); } finally { writeLock.unlock(); } } } private static Logger log = LoggerFactory.getLogger(TioClient.class); private AsynchronousChannelGroup channelGroup; private ClientTioConfig clientTioConfig; /** * @param serverIp 可以为空 * @param serverPort * @param aioDecoder * @param aioEncoder * @param aioHandler * * @author tanyaowu * @throws IOException * */ public TioClient(final ClientTioConfig clientTioConfig) throws IOException { super(); this.clientTioConfig = clientTioConfig; this.channelGroup = AsynchronousChannelGroup.withThreadPool(clientTioConfig.groupExecutor); startHeartbeatTask(); startReconnTask(); } /** * * @param serverNode * @throws Exception * * @author tanyaowu * */ public void asynConnect(Node serverNode) throws Exception { asynConnect(serverNode, null); } /** * * @param serverNode * @param timeout * @throws Exception * * @author tanyaowu * */ public void asynConnect(Node serverNode, Integer timeout) throws Exception { asynConnect(serverNode, null, null, timeout); } /** * * @param serverNode * @param bindIp * @param bindPort * @param timeout * @throws Exception * * @author tanyaowu * */ public void asynConnect(Node serverNode, String bindIp, Integer bindPort, Integer timeout) throws Exception { connect(serverNode, bindIp, bindPort, null, timeout, false); } /** * * @param serverNode * @return * @throws Exception * * @author tanyaowu * */ public ClientChannelContext connect(Node serverNode) throws Exception { return connect(serverNode, null); } /** * * @param serverNode * @param timeout * @return * @throws Exception * @author tanyaowu */ public ClientChannelContext connect(Node serverNode, Integer timeout) throws Exception { return connect(serverNode, null, 0, timeout); } /** * * @param serverNode * @param bindIp * @param bindPort * @param initClientChannelContext * @param timeout 超时时间,单位秒 * @return * @throws Exception * @author tanyaowu */ public ClientChannelContext connect(Node serverNode, String bindIp, Integer bindPort, ClientChannelContext initClientChannelContext, Integer timeout) throws Exception { return connect(serverNode, bindIp, bindPort, initClientChannelContext, timeout, true); } /** * * @param serverNode * @param bindIp * @param bindPort * @param initClientChannelContext * @param timeout 超时时间,单位秒 * @param isSyn true: 同步, false: 异步 * @return * @throws Exception * @author tanyaowu */ private ClientChannelContext connect(Node serverNode, String bindIp, Integer bindPort, ClientChannelContext initClientChannelContext, Integer timeout, boolean isSyn) throws Exception { AsynchronousSocketChannel asynchronousSocketChannel = null; ClientChannelContext channelContext = null; boolean isReconnect = initClientChannelContext != null; // ClientAioListener clientAioListener = clientTioConfig.getClientAioListener(); long start = SystemTimer.currTime; asynchronousSocketChannel = AsynchronousSocketChannel.open(channelGroup); long end = SystemTimer.currTime; long iv = end - start; if (iv >= 100) { log.error("{}, open 耗时:{} ms", channelContext, iv); } asynchronousSocketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); asynchronousSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); asynchronousSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); InetSocketAddress bind = null; if (bindPort != null && bindPort > 0) { if (false == StrUtil.isBlank(bindIp)) { bind = new InetSocketAddress(bindIp, bindPort); } else { bind = new InetSocketAddress(bindPort); } } if (bind != null) { asynchronousSocketChannel.bind(bind); } channelContext = initClientChannelContext; start = SystemTimer.currTime; InetSocketAddress inetSocketAddress = new InetSocketAddress(serverNode.getIp(), serverNode.getPort()); ConnectionCompletionVo attachment = new ConnectionCompletionVo(channelContext, this, isReconnect, asynchronousSocketChannel, serverNode, bindIp, bindPort); if (isSyn) { Integer realTimeout = timeout; if (realTimeout == null) { realTimeout = 5; } CountDownLatch countDownLatch = new CountDownLatch(1); attachment.setCountDownLatch(countDownLatch); asynchronousSocketChannel.connect(inetSocketAddress, attachment, clientTioConfig.getConnectionCompletionHandler()); boolean f = countDownLatch.await(realTimeout, TimeUnit.SECONDS); if (f) { return attachment.getChannelContext(); } else { log.error("countDownLatch.await(realTimeout, TimeUnit.SECONDS) 返回false "); return attachment.getChannelContext(); } } else { asynchronousSocketChannel.connect(inetSocketAddress, attachment, clientTioConfig.getConnectionCompletionHandler()); return null; } } /** * * @param serverNode * @param bindIp * @param bindPort * @param timeout 超时时间,单位秒 * @return * @throws Exception * * @author tanyaowu * */ public ClientChannelContext connect(Node serverNode, String bindIp, Integer bindPort, Integer timeout) throws Exception { return connect(serverNode, bindIp, bindPort, null, timeout); } /** * @return the channelGroup */ public AsynchronousChannelGroup getChannelGroup() { return channelGroup; } /** * @return the clientTioConfig */ public ClientTioConfig getClientTioConfig() { return clientTioConfig; } /** * * @param channelContext * @param timeout * @return * @throws Exception * * @author tanyaowu * */ public void reconnect(ClientChannelContext channelContext, Integer timeout) throws Exception { connect(channelContext.getServerNode(), channelContext.getBindIp(), channelContext.getBindPort(), channelContext, timeout); } /** * @param clientTioConfig the clientTioConfig to set */ public void setClientTioConfig(ClientTioConfig clientTioConfig) { this.clientTioConfig = clientTioConfig; } /** * 定时任务:发心跳 * @author tanyaowu * */ private void startHeartbeatTask() { final ClientGroupStat clientGroupStat = (ClientGroupStat)clientTioConfig.groupStat; final ClientAioHandler aioHandler = clientTioConfig.getClientAioHandler(); final String id = clientTioConfig.getId(); new Thread(new Runnable() { @Override public void run() { while (!clientTioConfig.isStopped()) {// final long heartbeatTimeout = clientTioConfig.heartbeatTimeout; if (clientTioConfig.heartbeatTimeout <= 0) { log.warn("用户取消了框架层面的心跳定时发送功能,请用户自己去完成心跳机制"); break; } SetWithLock<ChannelContext> setWithLock = clientTioConfig.connecteds; ReadLock readLock = setWithLock.readLock(); readLock.lock(); try { Set<ChannelContext> set = setWithLock.getObj(); long currtime = SystemTimer.currTime; for (ChannelContext entry : set) { ClientChannelContext channelContext = (ClientChannelContext) entry; if (channelContext.isClosed || channelContext.isRemoved) { continue; } ChannelStat stat = channelContext.stat; long compareTime = Math.max(stat.latestTimeOfReceivedByte, stat.latestTimeOfSentPacket); long interval = currtime - compareTime; if (interval >= clientTioConfig.heartbeatTimeout / 2) { Packet packet = aioHandler.heartbeatPacket(channelContext); if (packet != null) { if (log.isInfoEnabled()) { log.info("{}发送心跳包", channelContext.toString()); } Tio.send(channelContext, packet); } } } if (log.isInfoEnabled()) { log.info("[{}]: curr:{}, closed:{}, received:({}p)({}b), handled:{}, sent:({}p)({}b)", id, set.size(), clientGroupStat.closed.get(), clientGroupStat.receivedPackets.get(), clientGroupStat.receivedBytes.get(), clientGroupStat.handledPackets.get(), clientGroupStat.sentPackets.get(), clientGroupStat.sentBytes.get()); } } catch (Throwable e) { log.error("", e); } finally { try { readLock.unlock(); Thread.sleep(clientTioConfig.heartbeatTimeout / 4); } catch (Throwable e) { log.error(e.toString(), e); } finally { } } } } }, "tio-timer-heartbeat" + id).start(); } /** * 启动重连任务 * * * @author tanyaowu * */ private void startReconnTask() { final ReconnConf reconnConf = clientTioConfig.getReconnConf(); if (reconnConf == null || reconnConf.getInterval() <= 0) { return; } final String id = clientTioConfig.getId(); Thread thread = new Thread(new Runnable() { @Override public void run() { while (!clientTioConfig.isStopped()) { //log.info("准备重连"); LinkedBlockingQueue<ChannelContext> queue = reconnConf.getQueue(); ClientChannelContext channelContext = null; try { channelContext = (ClientChannelContext) queue.take(); } catch (InterruptedException e1) { log.error(e1.toString(), e1); } if (channelContext == null) { continue; // return; } if (channelContext.isRemoved) //已经删除的,不需要重新再连 { continue; } SslFacadeContext sslFacadeContext = channelContext.sslFacadeContext; if (sslFacadeContext != null) { sslFacadeContext.setHandshakeCompleted(false); } long sleeptime = reconnConf.getInterval() - (SystemTimer.currTime - channelContext.stat.timeInReconnQueue); //log.info("sleeptime:{}, closetime:{}", sleeptime, timeInReconnQueue); if (sleeptime > 0) { try { Thread.sleep(sleeptime); } catch (InterruptedException e) { log.error(e.toString(), e); } } if (channelContext.isRemoved || !channelContext.isClosed) //已经删除的和已经连上的,不需要重新再连 { continue; } ReconnRunnable runnable = new ReconnRunnable(channelContext, TioClient.this); reconnConf.getThreadPoolExecutor().execute(runnable); } } }); thread.setName("tio-timer-reconnect-" + id); thread.setDaemon(true); thread.start(); } /** * * @return * @author tanyaowu */ public boolean stop() { boolean ret = true; try { clientTioConfig.groupExecutor.shutdown(); } catch (Exception e1) { log.error(e1.toString(), e1); } try { clientTioConfig.tioExecutor.shutdown(); } catch (Exception e1) { log.error(e1.toString(), e1); } clientTioConfig.setStopped(true); try { ret = ret && clientTioConfig.groupExecutor.awaitTermination(6000, TimeUnit.SECONDS); ret = ret && clientTioConfig.tioExecutor.awaitTermination(6000, TimeUnit.SECONDS); } catch (InterruptedException e) { log.error(e.getLocalizedMessage(), e); } log.info("client resource has released"); return ret; }}