简易RPC框架-心跳与重连机制">简易RPC框架-心跳与重连机制

心跳

就是告诉其它人自己还活着。在简易RPC框架中,采用的是TCP长连接,为了确保长连接有效,就需要客户端服务端之间有一种通知机制告知对方的存活状态。

如何实现

客户端发送心跳消息

在状态空闲的时候定时给服务端发送消息类型为PING消息。

服务端接收心跳消息

捕获通道空闲状态事件,如果接收客户端PING消息,则发送PONG消息给服务端。如果在一定时间内没有收到客户端的PING消息,则说明客户端已经不在线,此时关闭通道。

客户端管理可用连接

由于服务端会因为长时间接收不到服务端的PING消息而关闭通道,这就导致缓存在客户端的连接的可用性发生变化。需要将不可用的从可用列表中转移出去,并对不可用连接进行处理,比如直接丢弃或者是重新连接。

预备知识

ChannelPipeline与handle的关系。netty中的这些handle和spring mvc中的filter作用是类似的,ChannelPipeline可以理解成handle的容器,里面可以被注册众多处理不同业务功能的事件处理器,比如:

  • 编码
  • 解码
  • 心跳
  • 权限
  • 加密
  • 解密
  • 业务代码执行
  • ......

具体实现

空闲状态处理器

可以利用netty提供的IdleStateHandler来发送PING-PONG消息。这个处理器主要是捕获通道超时事件,主要有三类

  • 读超时,一定时间内没有从通道内读取到任何数据
  • 写超时,一定时间内没有从通道内写入任何数据
  • 读写超时,一定时间内没有从通道内读取或者是写入任何数据

客户端加入空闲状态处理器

客户端捕获读写超时,如果事件触发就给服务端发送PING消息。

服务端加入空闲状态处理器

服务端只需要捕获读超时即可,当读超时触发后就关闭通道。

为什么在空闲状态才发送心跳消息

在正常客户端与服务端有交互的情况下,说明双方都在正常工作不需要额外的心跳来告知对方的存活。只有双方在一定时间内没有接收到对方的消息时才开始采用心跳消息来探测对方的存活,这也是一种提升效率的做法。

抽象心跳处理器

创建AbstractHeartbeatHandler,并继承ChannelInboundHandlerAdapter,服务于客户端与服务端的心跳处理器。在读取方法中判断消息类型:

  • 如果是PING消息就发送PONG消息给客户端
  • 如果收到的是PONG消息,则直接打印消息说明客户端已经成功接收到服务端返回的PONG消息
  • 如果是其它类型的消息,则通知下一个处理器处理消息
public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {

        if(!(msg instanceof Rpcmessage)){
            channelHandlerContext.fireChannelRead(msg);
            return;
        }
        RpcMessage message=(RpcMessage)msg;

        if(null==message||null==message.getMessageHeader()){
            channelHandlerContext.fireChannelRead(msg);
            return;
        }
        if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PONG){
            logger.info("ClientHeartbeatHandler.channelRead0 ,pong data is:{}",message.getMessageBody());
        }
        else if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PING){
            this.sendPong(channelHandlerContext);
        }
        else {
            channelHandlerContext.fireChannelRead(msg);
        }

    }

空闲状态事件,可以根据不同的状态做不同的行为处理,定义三个可重写事件供客户端与服务端处理器具体确认处理事件。

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
                case READER_IDLE:
                    this.handleReaderIdle(ctx);
                    break;
                case WRITER_IDLE:
                    this.handleWriterIdle(ctx);
                    break;
                case ALL_IDLE:
                    this.handleAllIdle(ctx);
                    break;
                default:
                    break;
            }
        }
    }

客户端心跳处理器

继承抽象心跳处理器,并重写事件发送PING消息。

public class ClientHeartbeatHandler extends AbstractHeartbeatHandler {

    @Override
    protected void handleAllIdle(ChannelHandlerContext ctx) {
        this.sendPing(ctx);
    }
}

服务端心跳处理器

继承抽象心跳处理器,并重写事件关闭通道。

public class ServerHeartbeatHandler extends AbstractHeartbeatHandler {

    @Override
    protected void handleReaderIdle(ChannelHandlerContext ctx) {
        logger.info("ServerHeartbeatHandler.handleReaderIdle reader timeout ,close channel");
        ctx.close();
    }

}

客户端ChannelPipeline中加入心跳处理器

比如5秒内未写入或者读取通道数据就触发超时事件。

.addLast(new IdleStateHandler(0, 0, Constants.ALLIDLE_TIME_SECONDS));

服务端ChannelPipeline中加入心跳处理器

比如10秒未接收到通道消息就触发读超时事件。

.addLast(new IdleStateHandler(Constants.READER_TIME_SECONDS, 0, 0))

客户端消息示例

正常情况下心跳消息显示如下图所示,消息的内容可以根据自己的情况自行定义。

客户端下线消息示例

停止客户端程序,然后服务端读超时事件触发,并关闭通道。

客户端可用连接管理

由于上述的服务端心跳处理器,在触发读超时后会关闭通信管道,这导致客户端缓存的连接状态会出现不可用的情况,为了让客户端一直只能取到可用连接就需要对从缓存中获取到的连接做状态判断,如果可用直接返回,如果不可用则将连接从可用列表中删除然后取下一个可用连接。

修改获取连接方法

通过channel的isActive属性可以判断连接是否可用,如果不可以做删除并重新获取的操作。

public RpcClientInvoker getInvoker() {
        // ...
        int index = loadbalanceService.index(size);
        RpcClientInvoker invoker= RpcClientInvokerCache.get(index);
        if(invoker.getChannel().isActive()) {
            return invoker;
        }
        else {
            RpcClientInvokerCache.removeHandler(invoker);
            logger.info("invoker is not active,so remove it and get next one");
            return this.getInvoker();
        }
    }

后台启动任务处理不可用连接

启动一个每隔5秒执行一次任务的线程,定时取出不可用连接,然后重连,并将不可用连接删除。

这里我处理的重连是直接丢弃原有不可用连接,然后重新创建新连接。

private static final Logger logger = LoggerFactory.getLogger(RpcClientInvokerManager.class);

    static {
        executorService.schedule(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    List<RpcClientInvoker> notConnectedHandlers = RpcClientInvokerCache.getNotConnectedHandlers();
                    if (!CollectionUtils.isEmpty(notConnectedHandlers)) {
                        for (RpcClientInvoker invoker : notConnectedHandlers) {
                            RpcClientInvokerManager.getInstance(referenceConfig).connect();
                        }
                        RpcClientInvokerCache.clearNotConnectedHandler();
                    }
                }
            }
        }, Constants.RECONNECT_TIME_SECONDS,TimeUnit.SECONDS);

    }

本文源码

https://github.com/jiangmin168168/jim-framework

文中代码是依赖上述项目的,如果有不明白的可下载源码

引用

本文中的图取自于网格

赞 (0) 评论 分享 ()