SpringBoot 集成 Netty 实战

  |   0 评论   |   368 浏览

前言

本文结合了网上一些资料 结合自身经验和实战整理出来的

SpringBoot 集成

添加依赖

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.36.Final</version>
        </dependency>

创建一个NettyService
以下代码直接复制就行 这里不是重点
重点在 NettyServerHandlerInitializer 这个类里面

@Service
@Slf4j
public class NettyService  {
    @Value("${netty.port}")
    private Integer port;

    /**
     * boss 线程组用于处理连接工作
     */
    private EventLoopGroup boss = new NioEventLoopGroup();
    /**
     * work 线程组用于数据处理
     */
    private EventLoopGroup work = new NioEventLoopGroup();

    @Autowired
    private NettyServerHandlerInitializer nettyServerHandlerInitializer;

    /**
     * SpringBoot 启动的时候 调用
     * @throws InterruptedException
     */
    @PostConstruct
    public void init() throws InterruptedException {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(boss, work)
                // 指定Channel
                .channel(NioServerSocketChannel.class)
                //使用指定的端口设置套接字地址
                .localAddress(new InetSocketAddress(port))

                //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
                .option(ChannelOption.SO_BACKLOG, 1024)

                //设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
                .childOption(ChannelOption.SO_KEEPALIVE, true)

                //将小的数据包包装成更大的帧进行传送,提高网络的负载,即TCP延迟传输
                .childOption(ChannelOption.TCP_NODELAY, true)

                .childHandler(nettyServerHandlerInitializer);
        ChannelFuture future = bootstrap.bind().sync();
        if (future.isSuccess()) {
            log.info("启动 Netty Server");
        }
    }

    /**
     * SpringBoot 销毁的时候 调用
     * @throws InterruptedException
     */
    @PreDestroy
    public void destory() throws InterruptedException {
        boss.shutdownGracefully().sync();
        work.shutdownGracefully().sync();
        log.info("关闭Netty");
    }
}

配置 Netty

Netty的服务 初始化类

@Component
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {
    /**
     * 换行解码器  最大解码长度
     */
    private final int messageMaxLength = 1024;

    /**
     * 初始化通道
     * @param channel
     * @throws Exception
     */
    @Override
    protected void initChannel(Channel channel) throws Exception {
        channel.pipeline()
                //添加编码器
                .addLast(new NettyMessageEncode())
                //添加Netty 自带的 换行解码器(用来解决 沾包,拆包) 详细见 https://juejin.im/post/5b67902f6fb9a04fc67c1a24
                .addLast(new LineBasedFrameDecoder(messageMaxLength))
                //添加自定义的 解码器
                .addLast( new NettyMessageDecode())
                //添加 接收消息的 处理器
                .addLast(new ServiceMessageReceiveHandler());

    }
}

Netty 编码器

这个环节很重要
第一点,TCP 通讯常见问题存在沾包,拆包 这个Netty 自带解决方案(例:LineBasedFrameDecoder)
第二点, 利用编码器,我们可以直接用channel write(object),然后由编码转换为字节
第三点,利用解码器,我们可以由解码器,解码字节,并转换为JavaBean,然后直接在Handler处理
第四点,如果添加了LineBasedFrameDecoder,那么在给服务端发送消息的时候,结尾要发送换行符

自定义编码器

拦截相应ImMessageEntity 类型的数据,然后使用ByteBufUtil 转换为ByteBuf

@Slf4j
public class NettyMessageEncode extends MessageToMessageEncoder<ImMessageEntity> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, ImMessageEntity imMessageEntity, List<Object> list) throws Exception {
	// 转换为JSON
        String json = GsonPlugin.toJson(imMessageEntity) + "\n";
        log.info(json);
        list.add(ByteBufUtil.encodeString(channelHandlerContext.alloc(), CharBuffer.wrap(json), Charset.defaultCharset()));
    }
}

自定义解码器

拦截ByteBuf 数据,编码为String,并且解析JSON 为JavaBean

@Slf4j
public class NettyMessageDecode extends MessageToMessageDecoder<ByteBuf> {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> list) throws Exception {
        String message = msg.toString(Charset.defaultCharset());
        log.info(message);
        ImMessageEntity imMessageEntity = GsonPlugin.fromJson(message, ImMessageEntity.class);
        list.add(imMessageEntity);
    }
}

消息处理器

剩下的看这个类的注释就行。

public class ServiceMessageReceiveHandler extends SimpleChannelInboundHandler<ImMessageEntity> {
    private static final AttributeKey<Long> userId = AttributeKey.newInstance("userId");

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ImMessageEntity msg) throws Exception {

        Channel channel = ctx.channel();
        Attribute<Long> attr = channel.attr(userId);
        System.out.println("消息来自:" + attr.get());

    }

    /**
     * 通道进入
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
        //假设 这个是用户的ID
        Long uMid = 1238192381284123L;
        System.out.println("用户进入:" + uMid);

        //我们在该通道 放入用户id数据  当然也可以放入其他数据  可以用来绑定用户之类的操作
        Channel channel = ctx.channel();
        channel.attr(userId).set(uMid);

    }

    /**
     * 通道退出
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);
        Channel channel = ctx.channel();
        Long uMid = channel.attr(userId).get();

        System.out.println("用户退出:" + uMid);
    }

    /**
     * 接收到的事件
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            System.out.println("event:" + event.state());
        }
    }

    /**
     * 通道异常 应该在此关闭通道
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);

    }
}

调试工具

Mac OS 的话 可以直接在App Store 搜索 网络调试工具 那个就可以调试。

评论

发表评论