Netty系列文章 - 粘包与拆包

Itachi 2019年12月26日 202次浏览

粘包和拆包

产生粘包和拆包问题的主要原因是,操作系统在发送TCP数据的时候,底层会有一个缓冲区,例如1024个字节大小,如果一次请求发送的数据量比较小,没达到缓冲区大小,TCP则会将多个请求合并为同一个请求进行发送,这就形成了粘包问题;如果一次请求发送的数据量比较大,超过了缓冲区大小,TCP就会将其拆分为多次发送,这就是拆包,也就是将一个大的包拆分为多个小包进行发送。

问题重现

服务端

NIOServer.java

/**
 * @author Itachi is.xianglei@gmail.com
 * @Date 2019-12-26 20:42
 */
public class NIOServer {

    @SneakyThrows
    public static void main(String[] args) {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup work = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(boss,work);
        bootstrap.channel(NioServerSocketChannel.class);
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>(){
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline pipeline = socketChannel.pipeline();
                pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
                pipeline.addLast(new MyServerHandler());
            }
        });
        ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", 9090).sync();
        channelFuture.channel().closeFuture().sync();
    }

}

主要看自己写的处理器类MyServerHandler.java

/**
 * @author Itachi is.xianglei@gmail.com
 * @Date 2019-12-26 20:50
 */
public class MyServerHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        SocketAddress socketAddress = channelHandlerContext.channel().remoteAddress();
        System.out.println(socketAddress + "----" + s.trim());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        SocketAddress socketAddress = ctx.channel().remoteAddress();
        System.out.println(socketAddress+"---已连接!");
    }

}

客户端

NIOClient.java

/**
 * @author Itachi is.xianglei@gmail.com
 * @Date 2019-12-26 20:52
 */
public class NIOClient {

    @SneakyThrows
    public static void main(String[] args) {
        EventLoopGroup client = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(client);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(new ChannelInitializer<SocketChannel>(){
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline pipeline = socketChannel.pipeline();
                pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
                pipeline.addLast(new MyClientHandler());
            }
        });
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();
        Channel channel = channelFuture.channel();

        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in, Charset.defaultCharset()));
        while (true){
            String message = reader.readLine().trim();
            for (int i = 0; i < 10; i++) {
                channel.writeAndFlush(message);
            }
        }
    }

}

MyClientHandler.java

/**
 * @author Itachi is.xianglei@gmail.com
 * @Date 2019-12-26 21:02
 */
public class MyClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        SocketAddress socketAddress = channelHandlerContext.channel().remoteAddress();
        System.out.println(socketAddress + "发来消息:  " + s);
    }
}

理想的情况.我们输入一段文字服务端就应该打印这一段文字出来..

结果图

image.png

image.png 可以发现数据并不是一段一段的打印出来. netty自己封装了一次java.nio的ByteBuffer使用的是ByteBuf.可以自动扩容. 缓冲区,例如1024个字节大小,如果一次请求发送的数据量比较小,没达到缓冲区大小,TCP则会将多个请求合并为同一个请求进行发送,这就形成了粘包问题.

解决方案

对于粘包和拆包问题,常见的解决方案有四种

(1)通过FixedLengthFrameDecoder 定长解码器来解决定长消息的黏包问题;

(2)通过LineBasedFrameDecoder和StringDecoder来解决以回车换行符作为消息结束符的TCP黏包的问题;

(3)通过DelimiterBasedFrameDecoder 特殊分隔符解码器来解决以特殊符号作为消息结束符的TCP黏包问题;

(4)最后一种,也是本文的重点,通过LengthFieldBasedFrameDecoder 自定义长度解码器解决TCP黏包问题。

NIOClient.java

public class NIOClient {

    @SneakyThrows
    public static void main(String[] args) {
        EventLoopGroup client = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(client);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(new ChannelInitializer<SocketChannel>(){
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline pipeline = socketChannel.pipeline();
                /**
                 * 1) lengthFieldOffset  //长度字段的偏差
                 * 2) lengthFieldLength  //长度字段占的字节数
                 * 3) lengthAdjustment  //添加到长度字段的补偿值
                 * 4) initialBytesToStrip  //从解码帧中第一次去除的字节数
                 */
                pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));

                /**
                 * 计算当前待发送消息的二进制字节长度,将该长度添加到ByteBuf的缓冲区头中
                 */
                pipeline.addLast(new LengthFieldPrepender(4));
                pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
                pipeline.addLast(new MyClientHandler());
            }
        });
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();
        Channel channel = channelFuture.channel();

        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in, Charset.defaultCharset()));
        while (true){
            String message = reader.readLine().trim();
            for (int i = 0; i < 10; i++) {
                channel.writeAndFlush(message);
            }
        }
    }

}

NIOServer.java

public class NIOServer {

    @SneakyThrows
    public static void main(String[] args) {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup work = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(boss,work);
        bootstrap.channel(NioServerSocketChannel.class);
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>(){
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline pipeline = socketChannel.pipeline();
                /**
                 * 1) lengthFieldOffset  //长度字段的偏差
                 * 2) lengthFieldLength  //长度字段占的字节数
                 * 3) lengthAdjustment  //添加到长度字段的补偿值
                 * 4) initialBytesToStrip  //从解码帧中第一次去除的字节数
                 */
                pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));

                /**
                 * 计算当前待发送消息的二进制字节长度,将该长度添加到ByteBuf的缓冲区头中
                 */
                pipeline.addLast(new LengthFieldPrepender(4));
                pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
                pipeline.addLast(new MyServerHandler());
            }
        });
        ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", 9090).sync();
        channelFuture.channel().closeFuture().sync();
    }

}

image.png image.png 问题解决![QQ20200101000518.gif]