netty的基本使用,基本原理分析,相关问题解决(更新中…)

[TOC]

netty的基本介绍

什么是netty

引用netty官网的一句话:Netty是一个NIO客户端服务器框架,可以快速轻松地开发网络应用程序,如协议服务器和客户端。它极大地简化了TCP和UDP套接字服务器等网络编程。 “快速和简单”并不意味着最终的应用程序将遭受可维护性或性能问题。Netty是根据实现许多协议(如FTP、SMTP、HTTP和各种基于二进制和文本的遗留协议)所获得的经验精心设计的。因此,Netty成功地找到了一种方法,在不妥协的情况下实现易于开发、性能、稳定性和灵活性。

简单来说,netty就是Java领域的一个网络通信标杆级框架,大量框架底层网络通信依赖netty,像zookeeper,dubbo,elasticsearch,redisson等。

netty底层是对Java原生NIO的高度封装,提供更简单的API,使NIO程序的开发不用那么复杂,且自定义ByteBuf数据类型,对标Java原生的ByteBuffer,实现了数据的零拷贝,数据传输效率也更高。

netty与tomcat的区别是什么

netty是一个网络通信框架,可以自定义多种协议,不仅仅是http协议,也可以是ftp协议,websocke协议等,也可以自定义协议,自定义程度非常高,性能也很高。

tomcat是一个web容器,符合servlet容器标准,是对http协议的高度封装,针对http协议有着特定的优化,因此tomcat支持特定协议领域的网络通信框架。

**举个简单的例子:**有两个厨师,A厨师什么菜都会做,但是需要你告诉他做什么,帮他买好菜;B厨师只会做红烧肉,但他什么都不需要你干,不用指挥,他就能把菜做好。因此A厨师就像netty,B厨师tomcat。

相关概念/问题

  • 什么是BIO,NIO,AIO

BIO:BIO是阻塞IO,就是一个网络请求进来,就要给这个连接分配一个线程,这个线程只为这个连接服务,如果这个连接没有数据传输,这个线程就一只阻塞在这里。**举个例子:**你去订蛋糕,在蛋糕没好之前,你一直在蛋糕店里面守着,期间你无法去干啥,这段事件就浪费了,你也就被阻塞了。

NIO:NIO是同步非阻塞IO,就是一个网络请求进来,我不需要给他分配一个线程,而是将这个连接加入到一个容器中,这个线程不断去轮询,检测某个请求是否有消息发送过来了,如果有,我就将它放入另一个线程就行处理,同时往下轮询检查其它的连接。**举个例子:**你去订蛋糕,下订单后,你去附近的商场逛街,隔一段时间你就打一个电话过去,蛋糕好了没有,等有个电话打过去刚好蛋糕做好了,你就可以回去拿了。

AIO:AIO就是异步非阻塞IO,就是一个网络请求进来,我不需要给他分配一个线程,而是将这个连接加入一个容器中,也不用线程去轮询,而是等如果其中一个连接有消息进来,它主动通知我,我再将这个连接的消息交给一个线程来处理。**举个例子:**你去订蛋糕,下订单后,你去附近的商城逛街,也不用打电话过去,等蛋糕做好了,店家主动打电话告诉你,你再去拿蛋糕。

所以说,店家为什么不能主动送过来呢😂

以上例子只是一些简单的抽象,比如线程可以池化设计,一个线程可以管理一部分连接等。

  • 什么是网络通信

什么是网络通信呢,就是不同计算机间通过网络数据流进行数据交换,目前都是基于传输层TCP/UPD两大协议。

  • 什么是零拷贝

零拷贝不是说不需要进行数据拷贝了,只是减少数据拷贝次数,同时减少CPU上下文切换次数,这样无疑能提高数据传输的效率。

传统的数据从硬盘拷贝到网卡

  • 数据从硬盘读取到系统内核read buffer缓冲区(一次拷贝)
  • 数据从内核的read buffer缓冲区拷贝到用户缓冲区(两次拷贝)
  • 数据从用户缓存区拷贝到内核的socket buffer缓冲区(三次拷贝)
  • 数据从socket buffer缓冲区拷贝到网卡接口(四次拷贝)
  • 期间CPU发生了两次上下文切换,一次切换是第二次拷贝时:内核态到用户态,二次切换是第三次拷贝时:用户态到内核态

零拷贝将数据从硬盘拷贝到网卡

  • 硬盘文件由DMA从硬盘拷贝到内核read buffer缓冲区(一次拷贝)
  • DMA将内核read buffer缓冲区数据拷贝到网卡接口(两次拷贝)

零拷贝相比于传统的拷贝,少了两次数据拷贝和两次CPU上下文切换,效率提升非常多。

DMA:DMA(Direct Memory Access,直接内存访问)是一种计算机系统中的技术,用于实现数据在主存和外设之间的直接传输,而无需通过中央处理器(CPU)的干预。DMA 可以提高数据传输的效率,减少 CPU 的负载,同时也有助于实现零拷贝技术。同时DMA技术是需要有硬件支持的,一般现代主板都集成有DMA处理芯片。

netty的基本使用(Socket连接)

以下netty的使用都是基于netty4

netty服务端

  • 创建maven项目
  • 引入依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.94.Final</version>
</dependency>
  • NettyServer(服务端)
public class NettyServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new InServerChannelHandler());
}
});
// 绑定9909端口
Channel channel = serverBootstrap.bind(9909).sync().channel();
System.out.println("---------------服务端启动,端口:" + 9909 + "---------------");
channel.closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 关闭线程组
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
  • InServerChannelHandler(自定义服务端入站处理器)
public class InServerChannelHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("---------服务端连接成功----------");
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 时间
LocalDateTime now = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
String formattedDateTime = now.format(formatter);
ByteBuf buff = (ByteBuf) msg;
String strMsg = buff.toString(CharsetUtil.UTF_8);
System.out.println("---------服务端收到数据[" + formattedDateTime + "]:" + strMsg + "-----------");
}
}
  • 运行main方法启动netty服务端,监听9909端口

netty客户端

  • 创建maven项目
  • 引入依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.94.Final</version>
</dependency>
  • ClientServer(客户端),通过ip与port监听服务端
public class NettyClient {

public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new ClientChannelHandel());
}
});

Channel channel = bootstrap.connect("localhost", 9909).sync().channel();
// 可以在这里进行其他操作,如发送数据

channel.closeFuture().sync();

} catch (InterruptedException e) {
e.printStackTrace();
} finally {
worker.shutdownGracefully();
}
}
}
  • ClientChannelHandel(自定义客户端入站处理器)
class ClientChannelHandel extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("客户端收到数据:" + msg.toString(CharsetUtil.UTF_8)));
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("---------客户端进行连接----------");
// 向服务端发送消息
ctx.channel().writeAndFlush(Unpooled.copiedBuffer("你好,我是netty客户端", CharsetUtil.UTF_8));
}
}
  • 启动客户端,连接到客户端,便可以发送第一条消息:你好,我是netty客户端

基本代码分析

服务端

  • NioEventLoopGroup类

NioEventLoopGroup底层是一个线程池,线程池中线程数量为机器逻辑处理器数量*2,其中使用了两个线程组,一个boss线程组和一个woker工作线程组。

**逻辑处理器:**对于没有超线程的处理器,逻辑处理器就是物理核心的数量,对于有超线程的处理器,逻辑处理器就是超线程的数量

**为什么要使用两个线程组:**boss线程组用于请求的连接和服务端端口的监听,worker线程组用于入站消息的接收,解码,出站消息的编码,发送。因此可以得出,boss线程的压力要小很多,所以boss线程池的线程数量可以设置小一些。NioEventLoopGroup构造器的一个int入参就是设置线程池中线程的数量的。

  • ServerBootstrap与Bootstrap类

**ServerBootstrap:**ServerBootstrap用于netty服务端,请求的连接,数据的入站出站处理。

**Bootstrap:**Bootstrap用于netty客户端,因为不需要处理连接请求,因此只需要一个worker线程组设置就行。

  • channel()方法

用于设置Netty底层数据IO类型,NioServerSocketChannel指的是服务端使用NIO类型,也可以指定OioServerSocketChannel,指的是BIO,不过有NIO了,基本上也不用BIO了;也可以设置EpollServerSocketChannel,指的是Linux系统下的epoll IO模型,不过epoll存在空轮询问题,会导致CPU资源空消耗,所以此处推荐NioServerSocketChannel用于服务端,NioSocketChannel用于客户端。

  • handler()方法

一般是设置整个服务的一些处理器,上面代码handler(new LoggingHandler(LogLevel.INFO)),用于设置netty日志处理的级别,这个handler()方法也基本上很少用

  • option()

一般用于设置整个服务的一些参数,向option(ChannelOption.SO_BACKLOG, 1024),用于设置连接等待队列的大小,也可以设置一些其它参数,例如是否启动:TCP_NODELAY,启用或禁用 Nagle’s algorithm。这是一种改善网络效率的算法,通过减少小包的数量来优化网络。

  • childOption()方法

childOption的用法与option基本上类似,只不过是针对连接来设定的,不过基本上功能也和option类似。

  • childHandler()方法,重要
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new InServerChannelHandler());
}
});

入参为:实现ChannelInitializer匿名内部类,泛型参数为SocketChannel,接收每一个连接发送的消息,向每一个连接发送消息,都要经过ch.pipline下的处理器,使用了责任链模式,以addLast为例,就是在最后添加InServerChannelHandler自定义处理器。

责任链模式: 处理逻辑像链条一样连接在一起,上游处理完毕将结果传递给下游处理器,下游处理器开始执行。

pipeline()中处理器链添加的位置:(如果未指定处理器,处理器链上默认是没有处理器的)

  • addFirst(): 将指定的处理器添加到处理器链的开头位置。
  • addLast():将指定的处理器添加到处理器链的结束位置。
  • addBefore(): 在指定的处理器之前添加一个新的处理器。
  • addAfter(): 在指定的处理器之后添加一个新的处理器。
  • replace(): 替换指定名称的处理器为一个新的处理器。

不管按什么方式添加处理器,入站信息都是从处理器链头部往尾部执行的,而出站信息则是从处理器链尾部往头部执行的。

  • 入站信息/入站处理器

以服务端为例,入站信息就是客户端发给服务端的数据,也叫服务端接收的消息;而入站处理器就是用来处理这些消息的,比如对这些消息进行解密,编码,转格式等。

常见入站处理器:StringDecoder(字符串解码,将ByteBuf等格式转化成String格式)。

入站处理器默认都是都是继承ChannelInboundHandlerAdapter,在此基础上自定义自己的需求

image-20230907175206141

  • 出站信息/出站处理器

以服务端为例,出站信息就是服务端发给客户端的数据,也叫服务端发送数据;而出站处理器就是用来对要发出去的数据进行一系列处理,像转格式,加密等。

常见出站处理器:StringEncoder(字符串编码,将String格式转化成ByteBuf格式)

出站处理器默认都是继承ChannelOutboundHandlerAdapter,在此基础上自定义自己的需求

image-20230907175224432

  • 双工处理器

基本上这种是既可以处理入站数据,也可以处理出站数据,双工处理器一般是继承一个入站处理器类和实现一个出站处理器接口,或者继承一个出站处理器类和实现入站处理器接口。

总的来说,netty的数据处理器链上只有入站处理器和出站处理器两种类型,其它都是在此基础上封装而来的。

入站处理器

以最开始的服务端代码为例,创建InServerChannelHandler类,继承ChannelInboundHandlerAdapter,重写父类的方法,其中重写的一系列方法就是消息入站时一系列生命周期的回调,包括连接时回调,异常时回调,接收消息时回调等,以下列举各个回调的作用。

在描述以下方法前,先说下每个方法都会有的一个入参,ChannelHandlerContext,表示当前连接管道的上下文环境,一般是使用ctx.channel获取当前连接管道,一个连接就时一个channel,可以通过channel向连接的这个客户端发送信息,获取当前客户端的连接id,地址等信息。

  • channelRegistered(ChannelHandlerContext ctx)

连接注册到EventLoop,就是连接已经到达了连接处理线程

  • channelUnregistered(ChannelHandlerContext ctx)

连接从EventLoop上取消注册,连接离开了连接处理线程

  • channelActive(ChannelHandlerContext ctx)

连接成功时调用该方法,表示一个请求已经连接成功,在这里一般做一些初始化的工作,比如将连接加入到容器中,下次要用的时候直接根据id从容器中取出该channel,向该channel发送消息,可用于客户端间的互相通信

  • channelInactive(ChannelHandlerContext ctx)

连接已断开,这里是指连接主动断开了,如果因关闭程序断开,则不会运行这个回调,而是抛异常回调

  • channelRead(ChannelHandlerContext ctx, Object msg)

**最主要的方法:**接收客户端发来的数据,进行相关操作。

不加入其它内置的入站处理器的话,netty默认接收到的数据类型就是ByteBuf,这是netty自定义对标Java ByteBuffer的自定义字节缓冲区,ByteBuf只是一个抽象类,PooledByteBuf便是其上层实现,还有一种就是接收的不是文本,字节类数据,而是文件类数据,此时数据类型便是FileRegion类型,在使用Object msg,要对数据进行转换。

其实自带的入站处理器StringDecorder其实是将ByteBuf转成String类型。

使用ctx.channel().writeAndFlush()方法向channel所对应的客户端发送数据,在没有自定义出站处理器或者没有出站处理器时,writeAndFlush能就收的数据类型便只有ByteBuf或则FileRegion了,可以使用Unpooled类的方法将不同数据类型转成ByteBuf,以下是Unpooled类中方法的一些使用方式:

**Unpooled.buffer(int initialCapacity, int maxCapacity):创建一个堆内存(Heap Buffer)ByteBuf,它将数据存储在JVM的堆空间中。这种类型的缓冲区,数据的读写都会通过一个中间数组进行,这可能会引入一些额外的内存复制操作。不过,由于可以利用JVM 的自动垃圾回收,堆内存的分配和释放通常比堆外内存更高效。Unpooled.directBuffer(int initialCapacity, int maxCapacity)**:创建一个直接内存(Direct Buffer)ByteBuf,它将数据存储在JVM堆空间之外的直接内存中。这种类型的缓冲区可以减少在与本地 I/O 操作交互时的内存复制操作,从而提高性能。但是,直接内存的分配和释放通常比堆内存更昂贵,而且不受JVM垃圾回收的管理。
**Unpooled.wrappedBuffer(byte[] array)**:创建一个包装缓冲区(Wrapped Buffer)ByteBuf,它会将已有的字节数组或ByteBuffer 包装为 ByteBuf,而不会复制数据。如果修改ByteBuf中的数据,也会影响到原来的字节数组或ByteBuffer。
**Unpooled.copiedBuffer(byte[] array)**:创建一个复制缓冲区(Copied Buffer)ByteBuf,它会将已有的字节数组、字符串或ByteBuffer 等数据复制一份到ByteBuf中。如果修改ByteBuf 中的数据,不会影响到原来的数据。

  • channelReadComplete(ChannelHandlerContext ctx)

一般接收一次数据,此方法只调用一次,因为接收的数据可能过大,分几次读取完毕,在最后一次读取后调用。什么意思呢?就是客户端一次性向服务端发送大量数据,导致服入站缓冲区满了,无法一次性将数据读完,可能得分几次读完,那么这个方法就是在最后一次读完得时候调用,也就是一次接收只读一次。

  • userEventTriggered(ChannelHandlerContext ctx, Object evt)

触发用户自定义事件:什么叫用户自定义事件,就是该处理器的上游处理器在调用了fireUserEventTriggered方法,将调用链传递到这个处理器,(因为调用了fireUserEventTriggered方法,上游处理器变结束运行了),到了该处理器,就会触发userEventTriggered方法,通过判断传入的参数Object evt,判断是哪个处理器调用的,进行相关判断,执行相应的方法。

一般这个用户自定义事件搭配IdleStateHandler处理器可以实现心跳检测,下面将会介绍到如何实现心跳检测。

  • channelWritabilityChanged(ChannelHandlerContext ctx)

判断channel通道是不是可写状态:为什么会变的不可写呢,因为一次性向发送缓冲区写入太多的数据,或者写入速度超出了网络传输能力,导致缓存区过满就会导致变成不可写状态,不可写状态不代表就发送失败了,只是发送被阻塞了,等缓冲区不满了,又可以继续写入发送了这里的发送缓存区,是指服务端向客户端发送数据。

  • exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

网络出错导致短连,数据解析错误等一系列错误,抛出的异常在此捕获

出站处理器

出站处理器与入站处理器相对应,出站处理器主要是对发往客户端的数据进行相关处理,创建OutServerChannelHandler类,继承ChannelOutboundHandlerAdapter,重写父类的方法,将出站处理器加入到pipline处理器链上,重写的父类方法便是数据发往客户端时的一些回调,以下对这些回调方法进行一些简要的介绍。由于自定义出站处理器用的远没有入站处理器那么频繁,所以以下只是对回调方法的一些简要的介绍。

  • bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)

出站缓冲区绑定到本地网卡,因为数据发送的过程是,数据先发往出站缓冲区,出站缓冲区再发往本地网卡,绑定本地网卡,说明出站缓存区的数据可以发往本地网卡了。

  • connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise)

绑定到远程网卡,也就是客户端网卡,两者间联通了,说明数据可以从本地网卡发往远程客户端网卡了

  • disconnect(ChannelHandlerContext ctx, ChannelPromise promise)

与客户端断开连接

  • close(ChannelHandlerContext ctx, ChannelPromise promise)

关闭连接通道

  • deregister(ChannelHandlerContext ctx, ChannelPromise promise)

将连接通道从EventLoop中移除

  • read(ChannelHandlerContext ctx)

这个read其实是从入站缓冲区读取数据出来,注意是入站缓冲区,个人也不晓得为什么要放在这里,与出站处理器有啥关系

  • write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)

出站处理器的核心回调方法:obj就是要发往客户端的数据,默认不处理是ByteBuf或FileRegion类型,可以手动在此处转类型,或者对数据进行加密等。

  • flush(ChannelHandlerContext ctx)

将出站缓冲区的数据写入到本地网卡中

心跳检测

心跳检测其实就是客户端定时向服务端发送一个特定的数据,服务端对每次对其回应一个特定的数据,用来检测客户端与服务端之间是否还存活着;如果客户端发现向服务端发送数据不成功,则主动断开连接,同理,服务端也是类似。

netty心跳检测用到了IdleStateHandler这个处理器类,这个处理器就上面说的既是入站处理器又是出站处理器,以下是它的继承结构。首先继承ChannelDuplexHandler类;ChannelDuplexHandler类又继承ChannelInboundHandlerAdapter入站处理器类和ChannelOutboundHandler出站处理器接口。

  • IdleStateHandler继承结构

image-20230908090942034

  • ChannelDuplexHandler继承结构

image-20230908091043232

  • 以下对IdleStateHandler处理器的基本入参做一个介绍

**readerIdleTime**:代表几秒内没有收到数据就触发下游处理器用户事件回调userEventTriggered,比如你需要5秒内没有数据读取就触发下游处理器userEventTriggered事件,0代表禁用按读取数据判断。一般用在服务端,因为服务端主要接收客户端心跳包数据。

**writerIdleTime**:代表写入数据流的判断,几秒没有写入数据流就向触发下游用户自定义事件。0代表禁用。一般用在客户端,客户端主要是向服务端发送心跳数据。

**allIdleTime**:代表的是有数据,不管是写入还是读取,只要有数据就行。0代表禁用。

默认时间单位为秒。

服务端处理

  • 在处理器链的自定义处理器链前面加上IdleStateHandler处理器
// 0表示禁用,10表示10s内没有接收到该channel的数据,就触发下游用户自定义事件
// 事件类型为IdleStateEvent
addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS))
  • 编写自定义处理器链的channelRead回调部分
// 添加在channelRead回调的代码最前方,反正是普通数据处理的前方
// 将数据转成ByteBuf
ByteBuf buff = (ByteBuf) msg;
// 转成string
String strMsg = buff.toString(CharsetUtil.UTF_8);
// 判断传过来的数据是否是ping,是的话就是客户端的心跳包
if (strMsg.equals("ping")) {
// 我们就给客户端回应一个pong,表示收到了心跳
ctx.channel().writeAndFlush(Unpooled.copiedBuffer("pong", CharsetUtil.UTF_8))
// 这个是事件监听,表示发送失败直接关闭连接,发送失败基本上是连接不到客户端了
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
  • 编写自定义处理器链的userEventTriggered用户自定义事件部分
// 心跳检测,判断是否是IdleStateEvent事件
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
// 判断是否是未读取事件
if (e.state() == IdleState.READER_IDLE) {
System.out.println("--------超时未读取--------");
// 未发送数据,代表对面可能特殊原因关闭连接了,这里我们也关闭连接
ctx.channel().close();
}
}
// 不是的话跳过
super.userEventTriggered(ctx, evt);

客户端处理

  • 在处理器链的自定义处理器链前面加上IdleStateHandler处理器
// 0表示禁用,6表示6s内没有向该channel写入数据,就触发下游用户自定义事件
// 事件类型为IdleStateEvent
addLast(new IdleStateHandler(0, 6, 0, TimeUnit.SECONDS))
  • 编写自定义处理器链的userEventTriggered用户自定义事件部分
// 判断是否是IdleStateEvent事件
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
// 是否是未写入事件
if (event.state() == IdleState.WRITER_IDLE) {
// 向服务端发送ping数据
ctx.writeAndFlush(Unpooled.copiedBuffer("ping", CharsetUtil.UTF_8))
// 如果发送失败,直接关闭连接
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
}
// 不是的话跳过
super.userEventTriggered(ctx, evt);
  • 以上便是简易的心跳检测程序

进阶

现在有这么一个场景,如果客户端与服务端之间发送的数据不止是String,可能还有文件,JSON等,或则哪天突然ping这个字符被当作普通字符使用了,那么服务端在接收到时就会将其按照心跳处理了。

所以这里涉及到了自定义数据解析类型,自定义数据传输格式的内容了,咱们以后有机会再说。

粘包/拆包处理

什么是粘包/拆包

以下以TCP连接为例,因为UDP没有这个粘包/拆包问题,还有就是UDP协议的使用比较少,常见的就是DNS协议了。

为什么会发生粘包与拆包问题呢?因为数据传输的传输层协议是TCP,这种协议是面向连接,可靠性高,底层使用字节流传输,这样就会引发一些问题?

如果客户端发送的数据太小,无法填满TCP连接缓冲区,TCP缓冲区就会等下一部分数据将其填满后,再一起发送给服务端,由于是一次性发送过来的,服务端就无法识别数据是客户端一次发过来的还是多次发过来的,这样的或就会照成粘包问题,就是多个数据包粘在一起了。

如果客户端一次发送的数据过大,超出了TCP缓冲区承载的能力,TCP就会先将满了的一部分数据发送给服务端,再发送接下来的,服务端没有一次性收到全部的数据,因此数据也是不完整的,就发生了拆包

  • 正常图示

image-20230908100825414

  • 粘包图示

image-20230908100901998

  • 拆包图示

image-20230908100931760

  • 粘包/拆包问题展示
// 客户端循环1000次向服务端发送数据
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 1000; i++) {
ctx.channel().writeAndFlush(Unpooled.copiedBuffer("我是netty客户端", CharsetUtil.UTF_8));
}
}
  • 运行结果

image-20230908102545637

我们发现,并没有我们想象中的服务端一个一个接收数据,而是多个数据内容粘连在一起了,这就是粘包,也与我发送的数据大小有关

如何解决粘包拆包问题

  • 指定数据包的长度,使用定长模式

意思是:如果你的数据包的长度是固定的,那么可以使用FixedLengthFrameDecoder定长解码器,设置数据包字节的长度,那么netty每次只会从入站缓冲区中取固定长度的数据,其它则等下一次获取。

// 在处理器链的最前端添加此处理器链
// 设置每次接收20字节长度的数据
addLast(new FixedLengthFrameDecoder(20))
  • 指定数据末尾分隔符

就是在数据包的结尾加上特定的分割符号示意,当入站处理器读取到这个分隔符的时候,就停止读取,下一次再进行读取。

分隔符解决拆包和粘包问题的,netty有两个解码器,分别是:LineBasedFrameDecoder和DelimiterBasedFrameDecoder。

**LineBasedFrameDecoder:**主要是按行符\n和回车符\t\n进行数据包拆分。

**DelimiterBasedFrameDecoder:**主要是按自定义符号进行数据拆分,比如按$拆分,则在每条数据末尾加上$

// 按换行进行拆分,8192指每次读取最大数据长度为8192字节
addLast(new LineBasedFrameDecoder(8192))

// 按自定义符号拆分,比如按$符号
addLast(new DelimiterBasedFrameDecoder(8192, "$"))
  • 自定义编解码器

TODO 可以通过自定义编解码器,就是自定义数据结构来实现数据的拆分,解决粘包/拆包问题。

自定义编解码器

TODO:自定义编解码器,未完待续...