java nio need to known

NIO 开发需知

Selector 在不同线程中对象锁竞争问题

多路复用器 selector 的register(...)select(...)操作会有锁冲突,在 reactor pattern skeleton example 文章中已经简单说明了多路复用器上有很多同步操作,锁竞争很严重,并可能阻塞线程,为了避免在 selector 及其属性对象上有同步操作,可以引入一层用以解耦,如示例 reactor pattern time server example 里将 MainReactor 新接入的 socketChannel 加入到对应 SubReactor 线程的不同队列里,可以看源码注释说明。

Netty 中因为在selector.select(1000)操作之后,后面还有处理定时任务相关的逻辑,线程不会一直持有selector.publicKeys对象锁,所以其他线程可以注册新的通道到EventLoop#selector上。

Selector 维护的各种集合

Selector 可以在多线程环境中使用,但是其各种键集合并非是线程安全的

Selector 维护三种选择键:

  1. keys: 保存了所有已注册且没有 cancel 的选择键,Set 类型,可以通过selector.keys()获取。
  2. selectedKeys: 已选择键集,即前一次操作期间,已经准备就绪的通道所对应的选择键,此集合为 keys 的子集,通过selector.selectedKeys()获取。
  3. canceledKeys: 已取消键,已经被取消但尚未取消注册的选择键,此集合不可被访问,为 keys 的子集。

Selector 本身对各种 key 集合的操作都是同步的,当然为了避免死锁问题,其同步的顺序也是一致的,比如在执行select(...)操作时,其他线程的register(...)操作将会阻塞,可以在任意时刻关闭通道或者取消键,因为select(...)操作并未对 cancelledKeys 同步,因此有可能再 selectedKey 中出现的 key 是已经被取消的,这一点需要注意,需要校验key.isValid() && key.isReadable()

Selector#wakeup() 方法

某个线程调用select(...)方法后阻塞了,即使没有通道已经就绪,也有办法让其从select(...)方法返回。只要在其他线程上调用Selector#wakeup()方法即可,阻塞在select(...)方法上的线程会立即返回。

如果有其它线程调用了wakeup()方法,但当前没有线程阻塞在select(...)方法上,下个调用select(...)方法的线程会立即wakeup

如果多个线程阻塞,事实上 wakeup 只能让正在阻塞的一个线程返回。

有二种情况会导致线程阻塞在 select 操作上:

  1. 因为 selector keys 集合同步阻塞
  2. 因为 selector IO 阻塞

wakeup()方法是让基于 pipe IO 阻塞的操作返回。但是因为 keys 是同步锁的阻塞,wakeup 也无能为力,wakeup 是一种对底层操作消耗较为严重的操作,需要对此操作的调用频度有所留意。

为什么要唤醒 Selector#wakeup()

  1. 注册了新的 channel 或者事件
  2. channel 关闭,取消注册
  3. 优先级更高的事件触发(如定时器事件),希望及时处理

wakeup 的原理

  1. Linux 上利用 pipe 调用创建一个管道
  2. Windows 上则是一个 loopback 的 TCP 连接,这是因为 win32 的管道无法加入 select fd set
  3. 将管道或者 TCP 连接加入 select fd set
  4. wakeup 往管道或者连接写入一个字节
  5. 阻塞的 select 因为有 IO 事件就绪,立即返回
  6. wakeup 的调用开销不可忽视
JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_interrupt(JNI Env *env, jobject this, jint fd){    int fakebuf[1];    fakebuf[0] = 1;    if (write(fd, fakebuf, 1) < 0) {        JNU_ThrowIOExceptionWithLastError(env," write to interrupt fd failed");    }}

减少 wakeup 调用

  1. 仅在有需要的时候才调用
  2. 如往连接发送数据,通常是缓存在一个消息队列,当且仅当队列为空的注册 OP_WRITE 并 wakeup 。
boolean needsWakeup=false;synchronized(queue)  {    if(queue.isEmpty())        needsWakeup=true;    queue.add(session);}if(needsWakeup) {    registerOPWrite();    selector.wakeup();}

Netty 的优化

AtomicBoolean wakenUp = new AtomicBoolean();wakenUp.set(false); //select 之前设置为 falseselector.select(500);if (wakenUp.compareAndSet(false, true)) {    selector.wakeup();}

Selector#close() 方法

用完 Selector 后调用其close()方法会关闭该 Selector,即使注册到该 Selector 上的所有 SelectionKey 实例无效,但是通道本身并不会关闭。

为什么延迟取消 SelectionKey

使用内部的已取消的键的集合来延迟注销,是一种防止线程在取消键时阻塞,并防止与正在进行的选择操作冲突的优化。

注销通道是一个潜在的代价很高的操作,这可能需要重新分配资源,键是与通道相关的,并且可能与它们相关的通道对象之间有复杂的交互。

关于 OP_WRITE 事件

OP_WRITE 事件的就绪条件并不是发生在调用SocketChannel#write(...)方法之后,而是在当底层缓冲区有空闲空间的情况下。

因为写缓冲区在绝大部分时候都是有空闲空间的,所以如果你注册了写事件,这会使得写事件一直处于就就绪,选择处理操作就会一直占用着 CPU 资源。所以只有当你确实有数据要写时再注册写操作,并在写完以后马上取消注册。

其实,在大部分情况下,我们直接调用SocketChannel#write(...)方法写数据就好了,没必要都用 OP_WRITE 事件。

那么 OP_WRITE 事件主要是在什么情况下使用的了?

其实 OP_WRITE 事件主要是在发送缓冲区空间的情况下使用的,如下代码所示:

while (buffer.hasRemaining()) {     int len = socketChannel.write(buffer);     if (len == 0) {          selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);          selector.wakeup();          break;     }}

当 buffer 还有数据,但缓冲区已经满的情况下,socketChannel.write(buffer)会返回已经写出去的字节数,此时为0。那么这个时候我们就需要注册 OP_WRITE 事件,这样当缓冲区又有空闲空间的时候就会触发 OP_WRITE 事件,这是我们就可以继续将没写完的数据继续写出了。

而且在写完后,一定要记得将 OP_WRITE 事件注销:

selectionKey.interestOps(sk.interestOps() & ~SelectionKey.OP_WRITE);

注意,这里在修改了 interest 之后调用了Selector#wakeup();方法是为了唤醒被堵塞的Selector#select(...)方法,这样当 while 中判断Selector#select(...)返回的是 0 时,会再次调用Selector#select(...)。而 selectionKey 的 interest 是在每次selector#select(...)操作的时候注册到系统进行监听的,所以在Selector#select(...)调用之后修改的 interest 需要在下一次Selector#select(...)调用才会生效。

读到或者写入 0 个字节

  1. 不代表连接关闭
  2. 高负载或者慢速网络下很常见的情况
  3. 通常的处理办法是返回并继续注册 OP_READ/OP_WRITE 等待下次处理。
    1. 缺点: 系统调用开销,线程切换开销
  4. 其他解决办法
    1. 循环忙等待或者 yield 一定次数
      1. Netty: writeSpinCount = 16
      2. Mina: WRITE_SPIN_COUNT = 256
    2. 启用 Temporary Selector 在当前线程注册并 poll

关于远端关闭事件

SelectionKey 并没有提供关闭事件,其实通过 OP_READ 是可以监听到远端的关闭操作的。

当 OP_READ 事件触发使,int readByteNum = channel.read(buffer)会返回从 channel 读取到的字节数:

  1. readByteNum > 0时,表示从 channel 读取到了 readByteNum 个字节到 buffer 中。
  2. readByteNum == 0时,表示 channel 中已经没有数据可以读取了,这个时候 buffer 的position == limit
  3. readByteNum == -1时,表示远端 channel 正常关闭了,这个时候我们就需要进行该通道的关闭和注销操作了。

如何正确的关闭一个已经注册的 SelectableChannel

需要调用Channel#close(),最终调用的方法是AbstractInterruptibleChannel#close()方法,处理过程如下:

  1. 调用SelectionKey#cancel()方法使该 SelectionKey 加到 Selector 的 cancelledKeys 集合中,这样在下一次Selector#select(...)的时候,就会将其从 selector 中相关的 SelectionKey 集合中移除,并且不会监听该 SelectionKey 所感兴趣的事件了。
  2. 会关闭底层的套接字连接。

这里注意:如果只是通过调用SelectionKey.cancel()来注销一个远端已经关闭了的 channel,是一个不对的用法。

因为Selector#select(...)在处理 cancelledKeys 集合的时候,会判断若该 SelectionKey 对应的 channel 已经没有注册到其他的 selector,并且该 channel open 表示为false的情况下,才会去调用底层套接字的关闭操作,所以如果直接调用SelectionKey.cancel()来注销一个远端已经关闭了的 channel,会导致本段的 TCP 连接处于 CLOSE_WAIT 状态,一直在等待程序调用套接字的关闭。

Channel 的 open 标志

只有在下面两种情况下才会将AbstractInterruptibleChannel#open属性设置为false

  1. 调用了Channel#close()方法。
  2. 操作 channel 读/写的当前线程发生中断时。

SO_RCVBUF 和 SO_SNDBUF

使用Socket.setReceiveBufferSize/setSendBufferSize设置 Socket 缓冲区大小。

此参数设置仅仅是对底层平台的一个提示,是否有效取决于底层平台,因此 get 返回的可能不是你设置的值,也可能不是底层平台实际使用的值。

设置原则

  1. 在以太网上,4k通常是不够的,增加到16K,吞估量增加了40%
  2. Socket 缓冲区大小至少应该是连接的 MSS 的三倍,MSS = MTU + 40,一般以太网卡的MTU = 1500字节。
    1. MSS: 最大分段大小
    2. MTU: 最大传输单元
  3. send buffer 最好与对端的 receive buffer 一致。
  4. 对于一次性发送大量数据的应用,增加发送缓冲区到48K64K可能是唯一的最有效地提高性能的方式。
  5. 同样,对于大量接收数据的应用,提高接收缓冲区,能减少发送端的阻塞。
  6. 如果应用既发送大量数据,也接收大量数据,recv buffer 和 send buffer 应该同时增加。
  7. 如果要设置 ServerSocket 的 recv 缓冲区超过 RFC1323 中定义的64k,那么必须在绑定端口前设置,以后 accept 产生的 socket 将继承这一设置。
  8. 无论缓冲区大小多少,你都应该尽可能地帮助 TCP 至少以那样的大小的块写入。

SO_TCPNODELAY

Nagle 算法

  1. NAGLE 算法通过将缓冲区内的小封包自动相连,组成较大的封包,阻止大量小封包的发送阻塞网络,从而提高网络应用效率
  2. 默认打开
  3. 对于实时性要求较高的应用(telnet、网游),可能需要关闭此算法
    1. 注意,true为关闭此算法,false为开启
    2. Socket.setTcpNoDelay(true)

Nagle 算法描述

if there is new data to send    if the window size >= MSS and available data is >= MSS        send complete MSS segment now    else        if there is unconfirmed data still in the pipe            enqueue data in the buffer until an acknowledge is received        else            send data immediately        end if    end ifend if

SO_LINGER

Socket.setSoLinger(boolean linger, int timeout)

SO_LINGER 这个参数是控制 socket 关闭后的行为,二个参数分别有以下几种组合:

linger=false, timeout=-1

这是默认行为,当 socket 主动 close,调用的线程会马上返回,不会阻塞,然后进入 CLOSING 状态,残留在缓冲区中的数据将继续发送给对端,并且与对端进行 FIN-ACK 协议交换,最后进入 TIME_WAIT 状态

linger=true, timeout>0

调用 close 的线程将阻塞,发生两种可能的情况:一是剩余的数据继续发送,进行关闭协议交换,二就是超时过期,剩余的数据将被删除,进行 FIN-ACK 交换。

linger=true, timeout=0

进行所谓 hard-close,任何剩余的数据都将被丢弃,并且 FIN-ACK 交换也不会发生,替代产生 RST ,让对端抛出 connection reset 的 SocketException 。

慎重使用该选项

  1. TIME_WAIT 状态的价值
  2. 可靠实现 TCP 连接终止
  3. 允许老的分节在网络中流失,防止发给新的连接
  4. 持续时间= 2 * MSL
    1. MSL (Maximum Segment Lifetime)是最大分节生命期,一般为60秒(linux),120秒(Windows)
    2. Linux 上可以查看cat /proc/sys/net/ipv4/tcp_fin_timeout
    3. Windows 注册表项HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\Tcpip\Parameters,默认值120秒,推荐值为30秒

时间缓存

  1. 网络服务器通常需要频繁地获取系统时间: 定时器、协议时间戳、缓存过期等等。
  2. System.currentTimeMillis
    1. Linux 调用 gettimeofday,需要切换到内核态
    2. 大部分应用并不需要特别高的精度
  3. SystemTimer.currentTimeMillis
    1. 独立线程定期更新时间缓存
    2. currentTimeMillis 直接返回缓存值
    3. 精度取决于定期间隔

SystemTimer

public class SystemTimer {    private final static ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();    private static final long tickUnit = Long.parseLong(System.getProperty("notify.systimer.tick", "50"));    private static volatile long time = System.currentTimeMillis();    private static class TimerTicker implements Runnable {        public void run() {            time = System.currentTimeMillis();        }    }    public static long currentTimeMillis() {        return time;    }    static {        executor.scheduleAtFixedRate(new TimerTicker(), tickUnit, tickUnit, TimeUnit.MILLISECONDS);        Runtime.getRuntime().addShutdownHook(new Thread() {            @Override            public void run() {                executor.shutdown();            }        });    }}

内存管理

  1. Java 能做的事情有限
    1. GC 带来的自动内存管理
  2. 缓冲区的管理
    1. 池化
      1. ThreadLocal cache
      2. 环形缓冲区
    2. Netty 扩展
      1. putString, getString 等高级 API
      2. 缓冲区自动扩展和收缩,处理不定长度字节
    3. 字节序
      1. 跨语言通讯需要注意
      2. 网络字节序: BigEndian
      3. 默认缓冲区: BigEndian
      4. Java 的 IO 库和 class 文件: BigEndian

定时器的实现

  1. 定时器在网络程序中频繁使用
    1. 周期性事件的触发
    2. 异步超时通知和移除
    3. 延迟事件的触发
  2. 三个时间复杂度
    1. 插入定时器
    2. 删除定时器
    3. PerTickBookkeeping,一次 tick 内,系统需要执行的操作
  3. Tick 的方式
    1. Selector.select(timeout);
    2. Thread.sleep(timeout);

Netty 定时器实现: HashedWheelTimer

  1. TimingWheels
  2. 将定时器组织成时间轮
  3. 指针按照一定周期旋转,一个 tick 跳动一个槽位
  4. 定时器根据延时时间和当前指针位置插入到特定槽位
  5. 槽位和 tick 决定了精度和延时

连接 IDLE 的判断

  1. 连接处于 Idle 状态:一段时间内没有 IO 读写事件发生
    1. 读 Idle,一段时间内没有 IO 读
    2. 写 Idle,一段时间内没有 IO 写
    3. Both,一段时间内没有 IO 读写
  2. 实现方式:
    1. 每次 IO 读写都记录 IO 读和写的时间戳
    2. 定时扫描所有连接,判断当前时间与上一次读或者写的时间差是否超过设定阀值,超过即认为连接处于 Idle 状态,通知业务处理器
    3. 定时的方式: 基于select(timeout)或者定时器

NIO 陷阱

陷阱 1: 处理事件忘记移除 key

  1. 在 select 返回值大于0的情况下,循环处理Selector.selectedKeys集合,每处理一个必须移除
    Iterator<SelectionKey> it = set.iterator();while (it.hasNext()) {    SelectionKey key = it.next();    it.remove(); // 切记移除    // ......处理事件}
  2. 不移除的后果是本次的就绪的 key 集合下次会再次返,会导致无限循环,CPU 消耗 100%

陷阱 2: Selector 返回的 key 集合非线程安全

  1. Selector.selectedKeys/keys返回的集合都是非线程安全的
  2. Selector.selectedKeys返回的可移除
  3. Selector.keys不可变
  4. 对 selectedkeys 的处理必须单线程处理或者适当同步

陷阱 3: 正确注册 Channel 和更新 interest

  1. 直接注册不可吗?
  2. channel.register(selector, ops, attachment)
  3. 不是不可以,效率问题
  4. 至少加两次锁,锁竞争激烈
  5. Channel 本身的 regLock,竞争几乎没有
  6. Selector 内部的 key 集合,竞争激烈
  7. 更好的方式: 加入缓冲队列,等待注册,reactor 单线程处理

    if (isReactorThread()){    channel.register(selector, ops, attachment);} else {    register.offer(new Event(channel,ops,attachment));    selector.wakeup();}
  8. 同样,SelectionKey.interest(ops)

  9. 在 Linux 上会阻塞,需要获取 selector 内部锁做同步
  10. 在 Win32 上不会阻塞
  11. 屏蔽平台差异,避免锁的激烈竞争,采用类似注册 channel 的方式:
    if (this.isReactorThread()) {    key.interestOps(key.interestOps() | SelectionKey.OP_READ);} else {    this.register.offer(new Event(key,SelectionKey.OP_READ));    selector.wakeup();}

陷阱 4: 正确处理 OP_WRITE

  1. OP_WRITE 处理不当很容易导致 CPU 100%
  2. OP_WRITE 触发条件
    1. 前提: interest 了 OP_WRITE
    2. 触发条件:
      1. socket 发送缓冲区可写
      2. 远端关闭
      3. 有错误发生
  3. 正确的处理方式
    1. 仅在已经连接的 channel 上注册
    2. 仅在有数据可写的时候才注册
    3. 触发之后立即取消注册,否则会继续触发导致循环
    4. 处理完成后视情况决定是否继续注册
      1. 没有完全写入,继续注册
      2. 全部写入,无需注册

陷阱 5: 正确取消注册 channel

  1. SelectableChannel 一旦注册将一直有效直到明确取消
  2. 怎么取消注册?
    1. channel.close(),内部会调用key.cancel()
    2. key.cancel();
    3. 中断 channel 的读写所在线程引起的 channel 关闭
  3. 但是这样还不够!
    1. key.cancel()仅仅是将 key 加入 cancelledKeys
    2. 直到下一次 select 才真正处理
    3. 并且 channel 的 socketfd 只有在真正取消注册后才会close(fd)

正确取消注册 channel

  1. 后果是什么?
    1. 服务端,问题不大,select 调用频繁
    2. 客户端,通常只有一个连接,关闭 channel 之后,没有调用 select 就关闭了 selector
      1. sockfd 没有关闭,停留在 CLOSE_WAIT 状态
  2. 正确的处理方式,取消注册也应当作为事件交给 reactor 处理,及时 wakeup 做 select
  3. 适当的时候调用selector.selectNow()
  4. Netty 在超过 256 个连接未关闭的时候主动调用一次 selectNow
    static final int CLEANUP_INTERVAL = 256;private boolean cleanUpCancelledKeys() throws IOException {    if (cancelledKeys >= CLEANUP_INTERVAL) {        cancelledKeys = 0;        selector.selectNow();        return true;    }    return false;}//channel 关闭的时候channel.socket.close();cancelledKeys++;

陷阱 6: 同时注册 OP_ACCPET 和 OP_READ,同时注册 OP_CONNECT 和 OP_WRITE

  1. 在底层来说,只有两种事件: read 和 write
  2. Java NIO 还引入了 OP_ACCEPT 和 OP_CONNECT
    1. OP_ACCEPT、OP_READ == Read
    2. OP_CONNECT、OP_WRITE == Write
  3. 同时注册 OP_ACCEPT 和 OP_READ,或者同时注册 OP_CONNECT 和 OP_WRITE 在不同平台上产生错误的行为,避免这样做!

陷阱 7: 正确处理 connect

  1. SocketChannel.connect 方法在非阻塞模式下可能返回 false,切记判断返回值
    1. 如果是 loopback 连接,可能直接返回 true,表示连接成功
    2. 返回 false,后续处理
      1. 注册 channel 到 selector,监听 OP_CONNECT 事件
      2. 在 OP_CONNECT 触发后,调用SocketChannel.finishConnect()成功后,连接才真正建立
  2. 陷阱
    1. 没有判断 connect 返回值
    2. 没有调用 finishConnect
    3. 在 OP_CONNECT 触发后,没有移除 OP_CONNECT,导致 SelectionKey 一直处于就绪状态,空耗 CPU
      1. OP_CONNECT 只能在还没有连接的 channel 上注册

References

  1. Nio Trick and Trap.pdf
  2. NIO Selector 类详解
  3. 关于 NIO 你不得不知道的一些地雷