NIO体系

Soul Lv2

理论上来说下一步应该是实现一些SpringWeb中的相关功能了,比如基本的HTTP通信实现,但是要实现Spring中的效果需要用Java的异步IO体系,这一部分比较复杂,且网上我也没找到太多比较好的资料,所以就有了这一篇,我们专门来讨论一下NIO体系的相关知识

缓冲区

相信大家都使用过BufferedReader等带缓冲的输入输出流,这些自带缓冲区的输入输出流主要被用于在数据量比较大的时候的数据输入输出,这些流都在java.io包下,但这些不是我们今天真正要介绍的内容下。在java.nio包下还有一套单独实现的缓冲机制,所有缓冲类从一个叫做Buffer的类开始,下面是这个类的一些基础内容

1
2
3
4
5
6
7
8
9
10
11
12
13
public abstract sealed class Buffer permits ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer {
//sealed是jdk17的新特性,称为密封类,及只允许permits后的类继承
static final Unsafe UNSAFE = Unsafe.getUnsafe();//unsafe类提供了直接的内存操作能力
static final ScopedMemoryAccess SCOPED_MEMORY_ACCESS = ScopedMemoryAccess.getScopedMemoryAccess();
//内存范围
static final int SPLITERATOR_CHARACTERISTICS = 16464;
private int mark = -1;
private int position = 0;
private int limit;
private final int capacity;
long address;
final MemorySegment segment;
}

我们最好还是实际操作一下创建一个缓冲区来看看到底是怎么操作的,这个类是个抽象类,我们选择这个类的一个子类 intbuffer 来创建一个子类吧。大概的流程如下

1
2
3
4
5
6
7
public class Main {
public static void main(String[] args) {
IntBuffer intBuffer = IntBuffer.allocate(10);//直接申请一个大小为10的缓存
int[] ints = new int[10];
IntBuffer intBuffer2 = IntBuffer.wrap(ints);//将这个数组放到缓存中
}
}

我们接下来可以看看发生了什么

1
2
3
4
5
6
7
public static IntBuffer allocate(int capacity) {
if (capacity < 0) {
throw createCapacityException(capacity);
} else {
return new HeapIntBuffer(capacity, capacity, (MemorySegment)null);//如果容量大于等0,创建一个堆int缓存
}
}

然后追踪几层找到实际的构造方法
这个构造方法长这样

1
2
3
4
HeapIntBuffer(int cap, int lim, MemorySegment segment) {  
super(-1, 0, lim, cap, new int[cap], 0, segment);
this.address = ARRAY_BASE_OFFSET;
}

然后继续往下找 super 方法就可以发现下面这个

1
2
3
4
5
IntBuffer(int mark, int pos, int lim, int cap, int[] hb, int offset, MemorySegment segment) {  
super(mark, pos, lim, cap, segment);
this.hb = hb;
this.offset = offset;
}//这里其实就体现出来了,实际上这个类也是在使用数组在存储int,就存在hb中

接着看 super 就是下面这个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Buffer(int mark, int pos, int lim, int cap, MemorySegment segment) {
if (cap < 0) {
throw createCapacityException(cap);
} else {
this.capacity = cap;
this.segment = segment;
this.limit(lim);
this.position(pos);
if (mark >= 0) {
if (mark > pos) {
throw new IllegalArgumentException("mark > position: (" + mark + " > " + pos + ")");
}

this.mark = mark;
}

}
}

这里存在四个量:mark, position, limit, capacity。这四个量分别代表标记位置,实际位置,最大位置,数组容量。标记位置用于标记特定的位置实现跳转读取,所以初始状态为-1 即表示没有标记,实际位置就是数组的 index,最大位置为允许实际位置的最大值,数组容量是数组的实际容量。此外还有一个 offset 是操作的偏移量

常见操作

接下来我们看几个比较常见的操作,首先是写操作,主要使用 put 方法,实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//ix用来确定偏移量
protected int ix(int i) {
return i + this.offset;
}
//获取下一个位置
final int nextPutIndex() {
int p = this.position;
if (p >= this.limit) {
throw new BufferOverflowException();
} else {
this.position = p + 1;
return p;
}
}
public IntBuffer put(int x) {
this.hb[this.ix(this.nextPutIndex())] = x;
return this;
}

public IntBuffer put(int i, int x) {
this.hb[this.ix(this.checkIndex(i))] = x;
return this;
}

然后还包括两个针对数组和缓冲区的 put 方法我就不展示了,接下来看一眼获取的 get 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//nextGetIndex和put中的nextPutIndex实现是相同的
public int get() {
return this.hb[this.ix(this.nextGetIndex())];
}

public int get(int i) {
return this.hb[this.ix(this.checkIndex(i))];
}

public IntBuffer get(int[] dst, int offset, int length) {
this.checkSession();//无需在意的合法性检查
Objects.checkFromIndexSize(offset, length, dst.length);
int pos = this.position();
if (length > this.limit() - pos) {
throw new BufferUnderflowException();
} else {
System.arraycopy(this.hb, this.ix(pos), dst, offset, length);
//从当前的position开始复制
this.position(pos + length);
return this;
}
}

然后是 mark 和 reset 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public Buffer mark() {  
this.mark = this.position;
return this;
}//将当前位置标记

public Buffer reset() {
int m = this.mark;
if (m < 0) {
throw new InvalidMarkException();
} else {
this.position = m;
return this;
}
}//位置指针跳转到mark处

缓冲区的基本操作就了解到这里,内部的其余方法有兴趣的可以自己看看,但个人觉得也没什么必要,用得着的时候查就对了,这不是我们今天的重点,我们来看看下个部分

直接缓冲区

我们之前的缓冲占用的是 jvm 的堆内存,本质上就是几个数组,那么我们是否可以申请一些对外内存来使用呢,当然可以,我们可以选择使用下面的这个方法来实现

1
2
3
4
5
6
public static void main(String[] args) { //这里我们申请一个直接缓冲区 
ByteBuffer buffer = ByteBuffer.allocateDirect(10); //使用方式基本和之前是一样的
buffer.put((byte) 66);
buffer.flip();//这个方法将缓冲区翻转,简单理解就是将指针移动的方向反向
System.out.println(buffer.get());
}

这个缓冲区在使用上与堆缓冲的使用是一致的,但差别在于这个缓冲区直接通过系统的 IO 实现,理论上会比堆缓冲快一点

通道类与选择器

那么缓冲类有什么用,有这功夫我不如直接使用一个 List,还更方便一些。事实上这些类是专门用于适配通道类的。通道类包括文件通道和网络通道,是 NIO 机制的实现。所有的通道类都继承自 Channel 类。这些通道类有什么作用呢?我们以网络通信为例来做一个比较,在没有 NIO 机制的情况下如果我们要实现一个网络通信,我们可以这么写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static void main(String[] args) {  
try {
Executor executor = Executors.newFixedThreadPool(12);
ServerSocket serverSocket = new ServerSocket(8080);
BlockingDeque<Socket> sockets = new LinkedBlockingDeque<>();
executor.execute(() -> {
while(true){
try {
Socket socket =serverSocket.accept();
sockets.add(socket);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
while(true){
Socket socket = sockets.poll();
executor.execute(() -> {
try {
while(true){
BufferedReader bufferedReader =new BufferedReader(new InputStreamReader(socket.getInputStream()));
//在这里如果读不到信息回阻塞到有信息为止
String s = bufferedReader.readLine();
BufferedWriter bufferedWriter =new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
bufferedWriter.write(s);
bufferedWriter.flush();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
})

}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

这么搞在人数比较少的前提下当然没什么问题,但如果同时连接的客户端足够多,那么每个连接都需要消耗一个线程,而如你所见,我们只给这个程序分配了 12 个线程,也就是说最大处理 12 个连接,这怎么够呢?有没有什么办法可以解决这个问题呢?当然有,我们可以通过异步 IO 来解决这个问题,像下面这样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class Main {  
public static void main(String[] args) {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//选用通道版本ServerSocket
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 8888));
//绑定端口
Selector selector = Selector.open();//创建一个选择器
serverSocketChannel.configureBlocking(false);
//默认状态下通道版本的Socket同样会在调用acept方法时阻塞直到有连接加入,我们这里手动配置为非阻塞
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//将这个通道注册到刚刚的选择器中,并要求选择器监听接受事件
ExecutorService executor = Executors.newFixedThreadPool(12);
while (true) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
//从选择器中获得所有的事件并创造一个迭代器
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) { //对于一个接受事件,放到线程池中要求线程池创建对应的Socket通道
executor.execute(() -> {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
//既然这也是一个通道那么自然也可以注册到选择器中,这次我们监听可读事件
socketChannel.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
throw new RuntimeException(e);
}
});

} else if (key.isReadable()) {
executor.execute(() -> {
//如果出现了可读事件,那么就执行对应的信心处理操作
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = (SocketChannel) key.channel();
try {
socketChannel.read(byteBuffer);
byteBuffer.flip();
String message = new String(byteBuffer.array());
System.out.println(message);
String message2 = "收到信息"+message;
socketChannel.write(ByteBuffer.wrap(message2.getBytes()));
} catch (IOException e) {
throw new RuntimeException(e);
}
});

}
//最后记得将处理过的事件移除
iterator.remove();
}

}

} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

不知道你能否理解这种写法的好处,在这种写法中,没有一个线程会陷入阻塞去等待客户端的消息,每个线程都是去处理一个已经存在的确定可行的事件,单个线程可以处理多个来自不同连接的消息。这就是我们所说的异步 IO 机制。
首先解释一下上面的一些内容,通道的事件可以分为四种

  • connect 连接事件,说明与对应的资源建立起了连接,这种连接可以是网络连接也可以是与文件资源的连接
  • accept 接受事件,监听到了对应的连接需求,可以与对应的资源建立连接
  • read 读就绪事件,有内容可以从通道中读取
  • write 写就绪事件,可以向通道中写入内容,这个事件在没有锁时一般是一直允许的
    当然,并不是所有的通道都实现了这四种事件,有些通道只支持部分事件。当我们将通道与对应的事件注册到选择器中时,选择器就会监听并记录对应通道的对应事件。通过这种方法,我们可以很好的避免单个线程陷入无意义的阻塞。大大提高单个线程的利用率。
    那么你可以猜猜选择器到底是怎样工作的?最简单的方法其实是选择器在每次被询问有哪些事件时通过对所有通道进行轮询,如果存在对应的事件则进行记录,但是显然这种机制在通道数足够多的状态下每次轮询都要消耗大量的时间,这完全不可接受,那该怎么办?聪明人们想到了一种办法:事件驱动机制

什么是事件驱动机制?简单来说事件驱动就是通过对对应的事件增添一个回调处理器,这个回调处理器中的操作会在事件发生时执行,而只要在回调处理器的操作中添加通知选择器的操作即可。回调处理器的操作由事件发生所驱动,这样选择器就无需主动询问不同的通道而是被动的接收事件发生的信息,那么无论存在多少个通道,时间复杂度永远是 O (1);

Java 中选择器的监听

结语

NIO 机制我们暂时先介绍到这里,你可以试着去改进一下上面的代码,重点有三个:

  1. 如果连接数特别大一个线程作为监听线程显然不够,尝试着支持多个监听线程
  2. 如果一个连接在客户端被断开,那么上面的代码是没法发现的,还会保留原有的通道,想办法改进(提示:通道的 read 方法会返回读取到的字节数,如果连接断开返回-1)
  3. 我们不可能真的只对客户端的信息做如此简单的处理,想办法创建将上面的流程拆分成几个类,降低耦合度,然后要支持在 excute 方法中放入任意的 Runnable 类

下一篇我们正式的去研究一下 SpringWeb 中的网络机制

  • 标题: NIO体系
  • 作者: Soul
  • 创建于 : 2025-05-07 15:06:43
  • 更新于 : 2025-05-08 21:10:02
  • 链接: https://soulmate.org.cn/posts/81499a4c/
  • 版权声明: 本文章采用 CC BY-NC 4.0 进行许可。