SLK的个人博客

  • 首页

  • 搜索
LRU Stack ThreadLocal Nacos RejectedExecutionHandler Executor RocketMQ ConcurrentHashMap CyclicBarrier Semaphore CountDownLatch canal Unsafe Atomic BlockingQueue AQS ReentrantLock Synchronized MESI Volatile JMM BufferPool Explain MySQL 常量池 Arthas JVM调优 三色标记 CMS ParNew 垃圾收集器 G1 Java Redis Android HTTPDNS DNS ioc 爬虫 seleniumhq 推荐引擎 Mahout IM Netty Vert.x HashMap 梯子 翻墙 V2ray Docker 搜索引擎 SpringBoot elasticsearch

SpringBoot 集成 Netty 实战

发表于 2019-06-28 | 分类于 Java | 2 | 阅读次数 521

前言

本文结合了网上一些资料 结合自身经验和实战整理出来的

SpringBoot 集成

添加依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.36.Final</version>
</dependency>

创建一个NettyService
以下代码直接复制就行 这里不是重点
重点在 NettyServerHandlerInitializer 这个类里面

@Service
@Slf4j
public class NettyService  {
    @Value("${netty.port}")
    private Integer port;

    /**
     * boss 线程组用于处理连接工作
     */
    private EventLoopGroup boss = new NioEventLoopGroup();
    /**
     * work 线程组用于数据处理
     */
    private EventLoopGroup work = new NioEventLoopGroup();

    @Autowired
    private NettyServerHandlerInitializer nettyServerHandlerInitializer;

    /**
     * SpringBoot 启动的时候 调用
     * @throws InterruptedException
     */
    @PostConstruct
    public void init() throws InterruptedException {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(boss, work)
                // 指定Channel
                .channel(NioServerSocketChannel.class)
                //使用指定的端口设置套接字地址
                .localAddress(new InetSocketAddress(port))

                //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
                .option(ChannelOption.SO_BACKLOG, 1024)

                //设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
                .childOption(ChannelOption.SO_KEEPALIVE, true)

                //将小的数据包包装成更大的帧进行传送,提高网络的负载,即TCP延迟传输
                .childOption(ChannelOption.TCP_NODELAY, true)

                .childHandler(nettyServerHandlerInitializer);
        ChannelFuture future = bootstrap.bind().sync();
        if (future.isSuccess()) {
            log.info("启动 Netty Server");
        }
    }

    /**
     * SpringBoot 销毁的时候 调用
     * @throws InterruptedException
     */
    @PreDestroy
    public void destory() throws InterruptedException {
        boss.shutdownGracefully().sync();
        work.shutdownGracefully().sync();
        log.info("关闭Netty");
    }
}

配置 Netty

Netty的服务 初始化类

@Component
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {
    /**
     * 换行解码器  最大解码长度
     */
    private final int messageMaxLength = 1024;

    /**
     * 初始化通道
     * @param channel
     * @throws Exception
     */
    @Override
    protected void initChannel(Channel channel) throws Exception {
        channel.pipeline()
                //添加编码器
                .addLast(new NettyMessageEncode())
                //添加Netty 自带的 换行解码器(用来解决 沾包,拆包) 详细见 https://juejin.im/post/5b67902f6fb9a04fc67c1a24
                .addLast(new LineBasedFrameDecoder(messageMaxLength))
                //添加自定义的 解码器
                .addLast( new NettyMessageDecode())
                //添加 接收消息的 处理器
                .addLast(new ServiceMessageReceiveHandler());

    }
}

Netty 编码器

这个环节很重要

  1. TCP 通讯常见问题存在沾包,拆包 这个Netty 自带解决方案(例:LineBasedFrameDecoder)
  2. 利用编码器,我们可以直接用channel write(object),然后由编码转换为字节
  3. 利用解码器,我们可以由解码器,解码字节,并转换为JavaBean,然后直接在Handler处理
  4. 如果添加了LineBasedFrameDecoder,那么在给服务端发送消息的时候,结尾要发送换行符

自定义编码器

拦截相应ImMessageEntity 类型的数据,然后使用ByteBufUtil 转换为ByteBuf

@Slf4j
public class NettyMessageEncode extends MessageToMessageEncoder<ImMessageEntity> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, ImMessageEntity imMessageEntity, List<Object> list) throws Exception {
	// 转换为JSON
        String json = GsonPlugin.toJson(imMessageEntity) + "\n";
        log.info(json);
        list.add(ByteBufUtil.encodeString(channelHandlerContext.alloc(), CharBuffer.wrap(json), Charset.defaultCharset()));
    }
}

自定义解码器

拦截ByteBuf 数据,编码为String,并且解析JSON 为JavaBean

@Slf4j
public class NettyMessageDecode extends MessageToMessageDecoder<ByteBuf> {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> list) throws Exception {
        String message = msg.toString(Charset.defaultCharset());
        log.info(message);
        ImMessageEntity imMessageEntity = GsonPlugin.fromJson(message, ImMessageEntity.class);
        list.add(imMessageEntity);
    }
}

消息处理器

剩下的看这个类的注释就行。

public class ServiceMessageReceiveHandler extends SimpleChannelInboundHandler<ImMessageEntity> {
    private static final AttributeKey<Long> userId = AttributeKey.newInstance("userId");

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ImMessageEntity msg) throws Exception {

        Channel channel = ctx.channel();
        Attribute<Long> attr = channel.attr(userId);
        System.out.println("消息来自:" + attr.get());

    }

    /**
     * 通道进入
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
        //假设 这个是用户的ID
        Long uMid = 1238192381284123L;
        System.out.println("用户进入:" + uMid);

        //我们在该通道 放入用户id数据  当然也可以放入其他数据  可以用来绑定用户之类的操作
        Channel channel = ctx.channel();
        channel.attr(userId).set(uMid);

    }

    /**
     * 通道退出
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);
        Channel channel = ctx.channel();
        Long uMid = channel.attr(userId).get();

        System.out.println("用户退出:" + uMid);
    }

    /**
     * 接收到的事件
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            System.out.println("event:" + event.state());
        }
    }

    /**
     * 通道异常 应该在此关闭通道
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);

    }
}

调试工具

Mac OS 的话 可以直接在App Store 搜索 网络调试工具 那个就可以调试。

# LRU # Stack # ThreadLocal # Nacos # RejectedExecutionHandler # Executor # RocketMQ # ConcurrentHashMap # CyclicBarrier # Semaphore # CountDownLatch # canal # Unsafe # Atomic # BlockingQueue # AQS # ReentrantLock # Synchronized # MESI # Volatile # JMM # BufferPool # Explain # MySQL # 常量池 # Arthas # JVM调优 # 三色标记 # CMS # ParNew # 垃圾收集器 # G1 # Java # Redis # Android # HTTPDNS # DNS # ioc # 爬虫 # seleniumhq # 推荐引擎 # Mahout # IM # Netty # Vert.x # HashMap # 梯子 # 翻墙 # V2ray # Docker # 搜索引擎 # SpringBoot # elasticsearch
部署Yapi接口文档管理平台
SpringBoot Vert.x 一(整合开发环境)
  • 文章目录
  • 站点概览
宋龙宽

宋龙宽

87 日志
13 分类
53 标签
RSS
Github E-mail
Creative Commons
Links
  • 黑客派
  • Relyn
  • 张小妞的博客
  • ElasticSearch教程
© 2021 宋龙宽
由 Halo 强力驱动
|
主题 - NexT.Gemini v5.1.4