netty快速使用

Netty是什么?

JBoss做的一个Jar包, 为了快速开发高性能、高可靠的网络服务器和客户端, 提供异步的事件驱动的网络应用框架和工具。

Netty可以让你定制编解码协议,实现自己特定协议的服务器。

Netty是基于Java NIO技术封装的一套框架(更高层次的抽象)。

  • NIO全称NoneBlocking IO, 非阻塞IO。使用事件机制。
  • Accept连接和读写可使用一个线程处理也可以分开线程池处理(根据实际情况进行组装)
  • 内置了常用的编解码器;
    • 行编解码器[一行一个请求]
    • 前缀长度编解码器[前N个字节定义请求的字节长度]
    • 可重放解码器[记录半包消息的状态]
    • HTTP编解码器
    • WebSocket消息解码器
  • 一系列的生命周期回调接口
  • 同时管理多个端口
  • 可以处理TCP和UDP的socket

Netty组成概念

Netty 处理流程简析(附流程图)

Bootstrap

Bootstrapping(引导) 是出现在Netty 配置程序的过程中,Bootstrapping在给服务器绑定指定窗口或者要连接客户端的时候会使用到。

Bootstrapping 有以下两种类型:

  • 一种是用于客户端的Bootstrap

    1个EventLoopGroup,

  • 一种是用于服务端的ServerBootstrap

    2个EventLoopGroup,

    ​ bossGroup线程(用于创建、连接、绑定socket,然后把这些socket传 给worker线程池)

    ​ workerGroup线程(执行所有的异步I/O)

EventLoop

处理 Channel 的 I/O 操作

一个单一的 EventLoop通常会处理多个 Channel 事件

EventLoopGroup

继承于ScheduledExecutorService线程池接口的接口, 用于管理线程池。

如果在构造函数中不指定创建的线程数量,会默认创建当前cpu个数的2倍个。

一个 EventLoopGroup 具有一个或多个 EventLoop。

当创建一个 Channel,Netty 通过 一个单独的 EventLoop 实例来注册该 Channel(并同样是一个单独的 Thread)的通道的使用寿命。这就是为什么你的应用程序不需要同步 Netty 的 I/O操作;所有 Channel 的 I/O 始终用相同的线程来执行。

EventLoop

Channel

用于连接到实体如硬件设备、文件、网络套接字或程序组件

线程安全可以被多个不同的线程安全的操作

当从Java NIO获得一个Selector事件,将激活通道Channel。

包括了网络的读,写,客户端发起连接,主动关闭连接,获取对方网络地址等,封装了Socket的操作,当有Socket操作发生时,会触发事件相应操作如:channelRead、channelReadComplete、exceptionCaught等方法

Netty提供了大量的Channel实现来专门使用

  • AbstractChannel
  • AbstractNioByteChannel
  • AbstractNioChannel
  • EmbeddedChannel
  • LocalServerChannel
  • NioSocketChannel

每个Channel都会分配一个ChannelPipeline(容纳ChannelHandler实例|处理出站和入站)和ChannelConfig(存储配置|允许运行期间更新)

channel的生命周期
状态 描述
channelUnregistered channel已创建但未注册到一个 EventLoop.
channelRegistered channel 注册到一个 EventLoop.
channelActive channel 变为活跃状态(连接到了远程主机),现在可以接收和发送数据了
channelInactive channel 处于非活跃状态,没有连接到远程主机

ChannelHandler

提供用于数据处理的容器, 由特定事件触发

常用于:

  • 传输数据时,将数据从一种格式转换到另一种格式
  • 异常通知
  • Channel 变为 active(活动) 或 inactive(非活动) 时获得通知* Channel 被注册或注销时从 EventLoop 中获得通知
  • 通知用户特定事件

Netty提供接口

  • ChannelInboundHandler 进站数据用于状态的更改事件

    实现覆盖了 channelRead() 方法处理进来的数据用来响应释放资源

    当执行释放资源时可以减少内存的消耗ReferenceCountUtil.release(msg);.

    如果使用SimpleChannel….内部释放内存(重名所以使用channelRead0)

  • ChannelOutboundHandler 出站数据用于拦截

ChannelHandler的生命周期
类型 描述
handlerAdded 当 ChannelHandler 添加到 ChannelPipeline 调用
handlerRemoved 当 ChannelHandler 从 ChannelPipeline 移除时调用
exceptionCaught 当 ChannelPipeline 执行抛出异常时调用
资源管理 (防止泄露)

channelRead 或者 write 处理数据,处理资源时要确保资源不要泄漏

Netty 包含了一个 ResourceLeakDetector

级别描述 DISABLED
Disables 禁用检测
SIMPLE 使用泄漏,使用1 %的采样率
ADVANCED 使用1 %的采样率,告知是否发现泄漏以及消息的访问位置
PARANOID 每次访问都采样,告知是否发现泄漏以及消息的访问位置

修改检测等级,只需修改 io.netty.leakDetectionLevel 系统属性

1
java -Dio.netty.leakDetectionLevel=paranoid
编解码Codec

decoder(解码器)

encoder(编码器)

ChannelPipeline

提供了一个容器给 ChannelHandler 链并提供了一个API 用于管理沿着链入站和出站事件的流动.(是一系列的ChannelHandler 实例)

每个 Channel 都有自己的ChannelPipeline,当 Channel 创建时自动创建的。(Netty内部负责完成)

执行顺序

In为队列(先进先出)。Out为栈(先进后出)

如果add的时候需要堵塞这个线程,可以使用含有EventExecuorGroup方法的add

ChannelFuture 未来执行操作结果的占位符

Netty 所有的 I/O 操作都是异步。一个操作可能无法立即返回

添加事件监听,当操作完成时,可以被通知。

ChannelPromise 是 特殊的 ChannelFuture

Buffer 缓冲

引用计数器

ByteBuf 和 ByteBufHolder(两者都实现了 ReferenceCounted 接口)引入了引用计数器(池化的精髓)

引用计数器大于0,就会被保证不被释放。当数量引用减少到0,将释放该实例。

1
2
buffer.refCnt(); // 引用计数器数量
buffer.release(); // 释放计数器为0
ByteBuf

ByteBuf的原理类似ByteBuffer

管理模式:

  • ByteBufAllocator(池化)

    减少分配和释放内存的开销

    理论上,每 Channel 可具有不同的 ByteBufAllocator

    1
    ByteBufAllocator allocator = channel.alloc();

    提供了两种 ByteBufAllocator 的实现:

    • PooledByteBufAllocator(使用jemalloc内存分配)
    • 每次返回新实例
  • Unpooled(非池化)

    提供了静态辅助方法来创建非池化的 ByteBuf 实例

使用模式:

  • 堆缓冲区 heapBuffer()

    数据存储在 JVM 的堆空间;存储在数组的实现.

    可以快速分配和释放, 直接访问数组

  • 直接缓冲区 directBuffer()

    免去中间交换的内存拷贝, 提升IO处理速度,直接缓冲区的内容可以驻留在垃圾回收扫描的堆区以外

    分配和释放上比堆缓冲区更复杂

  • 复合缓冲区 CompositeByteBuf

    创建多个不同的 ByteBuf,可能既包含堆缓冲区,也包含直接缓冲区;

    CompositeByteBuf.hasArray() 总是返回 false

ByteBufHolder

优势:

  • 可以自定义缓冲类型
  • 通过一个内置的复合缓冲类型实现零拷贝
  • 扩展性好,比如 StringBuilder
  • 不需要调用 flip() 来切换读/写模式
  • 读取和写入索引分开
  • 方法链
  • 引用计数
  • Pooling(池)

Transport 传输协议

方法名称 描述
NIO io.netty.channel.socket.nio 基于java.nio.channels的工具包,使用选择器作为基础的方法。
OIO io.netty.channel.socket.oio 基于java.net的工具包,使用阻塞流。
Local io.netty.channel.local 用来在虚拟机之间本地通信。
Embedded io.netty.channel.embedded 嵌入传输,它允许在没有真正网络的传输中使用 ChannelHandler,可以非常有用的来测试ChannelHandler的实现。

用例:

  • OIO-在低连接数、需要低延迟时、阻塞时使用
  • NIO-在高连接数时使用
  • Local-在同一个JVM内通信时使用
  • Embedded-测试ChannelHandler时使用

流程

channel-pipeline-context-handler

  • 服务端

    ServerBootstrap监听的一个端口对应一个boss线程,它们一 一对应。比如你需要netty监听80和443端口,那么就会有两个boss线程分别负责处理来自两个端口的socket请求。

    在boss线程接收了socket连接请求后,会产生一个channel(一个打开的socket对应一个打开的channel),并把这个channel交给ServerBootstrap初始化时指定的ChannelInitializer来处理,boss线程则继续处理socket的请求。从worker线程池中找出一个worker线程来继续处理这个请求。

    服务器端handle主要处理bossGroup的线程,childHandle主要处理workerGroup的线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    // 定义事件循环组
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    try{
    // 便捷的启动类
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    // 管理handler
    .childHandler(new ChannelInitializer<SocketChannel>(){
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();

    pipeline.addLast("httpServerCodec", new HttpServerCodec());
    pipeline.addLast("XXXHttpServerHandler", new XXXHttpServerHandler());
    }
    });

    ChannelFuture channelFuture = serverBootstrap.bind(9999).();

    channelFuture.channel().closeFuture().sync();
    } finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }

Netty 使用

倒入Netty包

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
 上一篇

linux