java中的IO操作(BIO、NIO、AIO)

传统的BIO编程

java的核心库java.io提供了全面的IO接口。包括:文件读写、标准设备输出等。Java中IO是以流为基础进行输入输出的,所有数据被串行化写入输出流,或者从输入流读入。
BIO的类图思维导图:


原图连接

几个IO流常见的用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public class Demo {
private static String BASE_PATH = Demo.class.getClass().getResource("/").getPath() + "/";

public static void main(String[] args) throws Exception {
//new Demo().io1();
//new Demo().io2();
new Demo().io3();
}

//IO流简单例子 字节流操作
public void io1() throws IOException {
FileInputStream fis = null;
FileOutputStream fos = null;
try {
fis = new FileInputStream(new File(BASE_PATH + "io_a.txt"));
fos = new FileOutputStream(new File(BASE_PATH + "io_b.txt"));
int ch;
while ((ch = fis.read()) != -1) {
System.out.println((char) ch);
fos.write(ch);
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != fos) {
fos.close();
}
if (null != fis) {
fis.close();
}
}
}

//字节流转换成字符流
public void io2() throws IOException {
BufferedReader br = null;
BufferedWriter bw = null;
try {
br = new BufferedReader(new InputStreamReader(new FileInputStream(BASE_PATH + "io_a.txt")));
bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(BASE_PATH + "io_b.txt")));
String s;
StringBuilder sb = new StringBuilder();
while ((s = br.readLine()) != null) {
System.out.println(s);
bw.write(s);
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != bw) {
bw.close();
}
if (null != br) {
br.close();
}
}
}

//用转换流从控制台上读入数据
public void io3() throws IOException {
BufferedReader br = null;
try {
br = new BufferedReader(new InputStreamReader(System.in));
String s = br.readLine();
System.out.println(s);
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != br) {
br.close();
}
}
}
}

上面的代码都是对文件的操作,IO编程主要还用于网络编程

网络编程的基本模型是C/S模型,即两个进程间的通信。服务端提供IP和监听端口,客户端通过连接操作想服务端监听的地址发起连接请求,通过三次握手连接,如果连接成功建立,双方就可以通过套接字进行通信。

传统的同步阻塞模型开发中,ServerSocket负责绑定IP地址,启动监听端口;Socket负责发起连接操作。连接成功后,双方通过输入和输出流进行同步阻塞式通信。

简单的描述一下BIO的服务端通信模型:采用BIO通信模型的服务端,通常由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理处理完成后,通过输出流返回应答给客户端,线程销毁。即典型的请求一应答通信模型。
BIO的服务端通信模型

BIO网络编程源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
public class NetworkIOServer {

private static final int DEFAULT_PORT = 12345;
private static ServerSocket serverSocket;

public static void main(String[] args) throws IOException {
start(DEFAULT_PORT);
}

public synchronized static void start(int port) throws IOException {
if (serverSocket != null) return;
try {
//通过构造函数创建ServerSocket,如果端口合法且空闲,服务端就h会监听成功
serverSocket = new ServerSocket(port);
System.out.println("服务器已启动,端口号:" + port);
//通过无限循环监听客户端连接,如果没有客户端接入,将阻塞在accept操作上
while (true) {
Socket socket = serverSocket.accept();
//当有新的客户端接入时,会执行下面的代码,然后创建一个新的线程处理这条Socket链路
new Thread(new ServerHandler(socket)).start();
}
} finally {
if (serverSocket != null) {
System.out.println("服务器已关闭。");
serverSocket.close();
serverSocket = null;
}
}
}
}

class ServerHandler implements Runnable {
private Socket socket;

public ServerHandler(Socket socket) {
this.socket = socket;
}

@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
String expression;
String result;
while (true) {
//通过BufferedReader读取一行
//如果已经读到输入流尾部,返回null,退出循环
//如果得到非空值,就尝试计算结果并返回
if ((expression = in.readLine()) == null) break;
System.out.println("服务器收到消息:" + expression);
try {
result = Calculator.cal(expression).toString();
} catch (Exception e) {
result = "计算错误:" + e.getMessage();
}
out.println(result);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
in = null;
}
if (out != null) {
out.close();
out = null;
}
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
socket = null;
}
}
}
}

public class Calculator {
private final static ScriptEngine JSE = new ScriptEngineManager().getEngineByName("JavaScript");

public static Object cal(String expression) throws ScriptException {
return JSE.eval(expression);
}
}

public class NetworkIOClient {

private static final int DEFAULT_SERVER_PORT = 12345;
private static final String DEFAULT_SERVER_IP = "127.0.0.1";

public static void main(String[] args) {
send("1+2*5");
}

public static void send(String expression) {
send(DEFAULT_SERVER_PORT, expression);
}

public static void send(int port, String expression) {
System.out.println("算术表达式为:" + expression);
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket(DEFAULT_SERVER_IP, port);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
out.println(expression);
System.out.println("结果为:" + in.readLine());
} catch (Exception e) {
e.printStackTrace();
} finally {
//必要的清理工作
if (in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
in = null;
}
if (out != null) {
out.close();
out = null;
}
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
socket = null;
}
}
}
}

BIO主要的问题在于每当有一个新的客户端请求接入时,服务端必须创建一个新的线程来处理这条链路,在需要满足高性能、高并发的场景是没法应用的(大量创建新的线程会严重影响服务器性能)。这时程序猿们的惯用做法–搞一个线程池。

1
2
private static ExecutorService executorService = Executors.newFixedThreadPool(100); //成员变量
executorService.execute(new ServerHandler(socket)); //替换 new Thread

限制了线程数量,如果发生大量并发请求,超过最大数量的线程就只能等待,直到线程池中的有空闲的线程可以被复用。而对Socket的输入流进行读取时,会一直阻塞,直到发生:

  • 有数据可读
  • 可用数据以及读取完毕
  • 发生空指针或I/O异常
    所以在读取数据较慢时(比如数据量大、网络传输慢等),大量并发的情况下,其他接入的消息,只能一直等待,这就是最大的弊端。

NIO编程

JDK 1.4中的java.nio.*包中引入新的Java I/O库,其目的是提高速度。实际上,”旧”的I/O包已经使用NIO重新实现过,即使我们不显式的使用NIO编程,也能从中受益。速度的提高在文件I/O和网络I/O中都可能会发生。
NIO我们一般认为是New I/O(也是官方的叫法),因为它是相对于老的I/O类库新增的。但更多人称之为Non-blocking I/O,即非阻塞I/O,因为这样叫,更能体现它的特点。
NIO(非阻塞IO)提供了与传统BIO模型中的Socket和ServerSocket相对应的SocketChannel和ServerSocketChannel两种不同的套接字通道实现。
新增的着两种通道都支持阻塞和非阻塞两种模式。
阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反。
对于低负载、低并发的应用程序,可以使用同步阻塞I/O来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用NIO的非阻塞模式来开发。

NIO包含下面几个核心的组件:

  • Channels
  • Buffers
  • Selectors

整个NIO体系包含的类远远不止这几个,但是Channel,Buffer和Selector组成了这个核心的API。其他的一些组件,比如Pipe和FileLock仅仅只作为上述三个的负责类。因此重点关注这三个概念。

通道和缓冲区(Channels and Buffers)

通常来说NIO中的所有IO都是从Channel开始的。Channel和流有点类似。通过Channel,我们既可以从Channel把数据写到Buffer中,也可以把数据冲Buffer写入到Channel,下图是一个示意图:
通道和缓冲区
注意:通道必须结合Buffer使用,不能直接向通道中读/写数据

有很多的Channel,Buffer类型。下面列举了主要的几种:

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

FileChannel用于文件的数据读写。 DatagramChannel用于UDP的数据读写。 SocketChannel用于TCP的数据读写。 ServerSocketChannel允许我们监听TCP链接请求,每个请求会创建会一个SocketChannel.

Channel的基础示例(Basic Channel Example)

这有一个利用FileChannel读取数据到Buffer的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();

ByteBuffer buf = ByteBuffer.allocate(48);

int bytesRead = inChannel.read(buf);
while (bytesRead != -1) {

System.out.println("Read " + bytesRead);
buf.flip();

while(buf.hasRemaining()){
System.out.print((char) buf.get());
}

buf.clear();
bytesRead = inChannel.read(buf);
}
aFile.close();

注意buf.flip()的调用。首先把数据读取到Buffer中,然后调用flip()方法。接着再把数据读取出来。

Buffer基本用法(Basic Buffer Usage)

buffer本质上就是一块内存区,可以用来写入数据,并在稍后读取出来。这块内存被NIO Buffer包裹起来,对外提供一系列的读写方便开发的接口。
利用Buffer读写数据,通常遵循四个步骤:

  • 把数据写入buffer
  • 调用flip
  • 从Buffer中读取数据
  • 调用buffer.clear()或者buffer.compact()

当写入数据到buffer中时,buffer会记录已经写入的数据大小。当需要读数据时,通过flip()方法把buffer从写模式调整为读模式;在读模式下,可以读取所有已经写入的数据。

当读取完数据后,需要清空buffer,以满足后续写入操作。清空buffer有两种方式:调用clear()或compact()方法。clear会清空整个buffer,compact则只清空已读取的数据,未被读取的数据会被移动到buffer的开始位置,写入位置则近跟着未读数据之后。相关的代码实例可以查看前面channel的例子。

Buffer的容量,位置,上限(Buffer Capacity, Position and Limit)

buffer缓冲区实质上就是一块内存,用于写入数据,也供后续再次读取数据。这块内存被NIO Buffer管理,并提供一系列的方法用于更简单的操作这块内存。

一个Buffer有三个属性是必须掌握的,分别是:

  • capacity容量
  • position位置
  • limit限制

position和limit的具体含义取决于当前buffer的模式。capacity在两种模式下都表示容量。
下面有张示例图,描诉了不同模式下position和limit的含义:
position和limit

容量(Capacity)

作为一块内存,buffer有一个固定的大小,叫做capacity容量。也就是最多只能写入容量值得字节,整形等数据。一旦buffer写满了就需要清空已读数据以便下次继续写入新的数据。

位置(Position)

当写入数据到Buffer的时候需要中一个确定的位置开始,默认初始化时这个位置position为0,一旦写入了数据比如一个字节,整形数据,那么position的值就会指向数据之后的一个单元,position最大可以到capacity-1.
当从Buffer读取数据时,也需要从一个确定的位置开始。buffer从写入模式变为读取模式时,position会归零,每次读取后,position向后移动。

上限(Limit)

在写模式,limit的含义是我们所能写入的最大数据量。它等同于buffer的容量。
一旦切换到读模式,limit则代表我们所能读取的最大数据量,他的值等同于写模式下position的位置。
数据读取的上限时buffer中已有的数据,也就是limit的位置(原position所指的位置)。

Buffer Types

核心的Buffer实现类的列表:

  • ByteBuffer
  • MappedByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

这些Buffer涵盖了可以通过IO操作的基础类型:byte,short,int,long,float,double以及characters.
MappedByteBuffer稍有不同,一般用于和内存映射的文件。

分配一个Buffer(Allocating a Buffer)

为了获取一个Buffer对象,你必须先分配。每个Buffer实现类都有一个allocate()方法用于分配内存。下面看一个实例,开辟一个48字节大小的buffer:

1
ByteBuffer buf = ByteBuffer.allocate(48);

开辟一个1024个字符的CharBuffer:

1
CharBuffer buf = CharBuffer.allocate(1024);

写入数据到Buffer(Writing Data to a Buffer)

写数据到Buffer有两种方法:

  • 从Channel中写数据到Buffer
  • 手动写数据到Buffer,调用put方法

下面是一个实例,演示从Channel写数据到Buffer:

1
int bytesRead = inChannel.read(buf); //read into buffer.

通过put写数据:

1
buf.put(127);

put方法有很多不同版本,对应不同的写数据方法。例如把数据写到特定的位置,或者把一个字节数据写入buffer。看考JavaDoc文档可以查阅的更多数据。

翻转flip()

flip()方法可以吧Buffer从写模式切换到读模式。调用flip方法会把position归零,并设置limit为之前的position的值。 也就是说,现在position代表的是读取位置,limit标示的是已写入的数据位置。

从Buffer读取数据

冲Buffer读数据也有两种方式:

  • 从buffer读数据到channel
  • 从buffer直接读取数据,调用get方法

读取数据到channel的例子:

1
2
//read from buffer into channel.
int bytesWritten = inChannel.write(buf);

调用get读取数据的例子:

1
byte aByte = buf.get();

get也有诸多版本,对应了不同的读取方式。

rewind()

Buffer.rewind()方法将position置为0,这样我们可以重复读取buffer中的数据。limit保持不变。

clear() and compact()

一旦我们从buffer中读取完数据,需要复用buffer为下次写数据做准备。只需要调用clear或compact方法。
clear方法会重置position为0,limit为capacity,也就是整个Buffer清空。实际上Buffer中数据并没有清空,我们只是把标记为修改了。
如果Buffer还有一些数据没有读取完,调用clear就会导致这部分数据被“遗忘”,因为我们没有标记这部分数据未读。
针对这种情况,如果需要保留未读数据,那么可以使用compact。 因此compact和clear的区别就在于对未读数据的处理,是保留这部分数据还是一起清空。

mark() and reset()

通过mark方法可以标记当前的position,通过reset来恢复mark的位置,这个非常像canva的save和restore:

1
2
3
buffer.mark();
//call buffer.get() a couple of times, e.g. during parsing.
buffer.reset(); //set position back to mark.

equals() and compareTo()

可以用eqauls和compareTo比较两个buffer

equals()

判断两个buffer相等,需满足:

  • 类型相同
  • buffer中剩余字节数相同
  • 所有剩余字节相等

从上面的三个条件可以看出,equals只比较buffer中的部分内容,并不会去比较每一个元素。

compareTo()

compareTo也是比较buffer中的剩余元素,只不过这个方法适用于比较排序的。一个buffer比另外一个小的前提:

  • 第一个元素等于另一个缓冲区中的相应元素,它比另一个缓冲区中的元素要小。
  • 所有的元素都是相等的,但是第一个缓冲区在第二个缓冲区之前就耗尽了(它的元素更少)。

Scatter/Gather & Channel Transfers

JavaNIO发布时内置了对scatter/gather的支持。scatter/gather是通过通道读写数据的两个概念。
在JavaNIO中如果一个channel是FileChannel类型的,那么他可以直接把数据传输到另一个channel。逐个特性得益于FileChannel包含的transferTo和transferFrom两个方法。
这两部分详细内容可以参考原文:Scatter/Gather,Channel Transfers

选择器(Selectors)

Selector是Java NIO中的一个组件,用于检查一个或多个NIO Channel的状态是否处于可读、可写。如此可以实现单线程管理多个channels,也就是可以管理多个网络链接。

为什么使用Selector

用单线程处理多个channels的好处是我需要更少的线程来处理channel。实际上,你甚至可以用一个线程来处理所有的channels。从操作系统的角度来看,切换线程开销是比较昂贵的,并且每个线程都需要占用系统资源,因此暂用线程越少越好。

需要留意的是,现代操作系统和CPU在多任务处理上已经变得越来越好,所以多线程带来的影响也越来越小。如果一个CPU是多核的,如果不执行多任务反而是浪费了机器的性能。不过这些设计讨论是另外的话题了。简而言之,通过Selector我们可以实现单线程操作多个channel。
下面是一个单线程中Slector维护3个Channel的示意图:
选择器

创建Selector

创建一个Selector可以通过Selector.open()方法:

1
Selector selector = Selector.open();

注册Channel到Selector上
为了使用带有选择器的通道,您必须使用选择器注册通道。这是使用SelectableChannel.register()方法完成的,如下所述:

1
2
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

通道必须处于非阻塞模式,以便与选择器一起使用。这意味着您不能使用FileChannel的选择器,因为FileChannel不能切换到非阻塞模式。不过,Socket Channels将会很好地工作。

注意register的第二个参数。这是一个“兴趣集”,意思是您希望通过选择器在通道中监听的事件。你可以监听四种不同的事件:

  • Connect
  • Accept
  • Read
  • Write

一个channel触发了一个事件也可视作该事件处于就绪状态。因此当channel与server连接成功后,那么就是“连接就绪”状态。server channel接收请求连接时处于“可连接就绪”状态。channel有数据可读时处于“读就绪”状态。channel可以进行数据写入时处于“写就绪”状态。

这四个事件由四个SelectionKey常量表示:

  • SelectionKey.OP_CONNECT
  • SelectionKey.OP_ACCEPT
  • SelectionKey.OP_READ
  • SelectionKey.OP_WRITE

如果对多个事件感兴趣可利用位或运算结合多个常量,比如:

1
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

SelectionKey’s

在上一小节中,我们利用register方法把Channel注册到了Selectors上,这个方法的返回值是SelectionKeys,这个返回的对象包含了一些比较有价值的属性:

  • The interest set
  • The ready set
  • The Channel
  • The Selector
  • An attached object (optional)

这5个属性都代表什么含义呢?下面会一一介绍。

Interest Set

这个“兴趣集”实际上就是我们希望处理的事件的集合,它的值就是注册时传入的参数,我们可以用按为与运算把每个事件取出来:

1
2
3
4
5
6
int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;

Ready Set

准备就绪的集合是通道已经准备好了的一组操作。在选择之后,您将主要访问就绪集。选择将在后面的小节中解释。你可以像这样访问准备好的集合:

1
int readySet = selectionKey.readyOps();

从“就绪集合”中取值的操作类似”兴趣集”的操作,当然还有更简单的方法,SelectionKey提供了一系列返回值为boolean的的方法:

1
2
3
4
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

Channel + Selector

从SelectionKey操作Channel和Selector非常简单:

1
2
Channel  channel = selectionKey.channel();
Selector selector = selectionKey.selector();

Attaching Objects

我们可以给一个SelectionKey附加一个Object,这样做一方面可以方便我们识别某个特定的channel,同时也增加了channel相关的附加信息。例如,可以把用于channel的buffer附加到SelectionKey上:

1
2
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();

附加对象的操作也可以在register的时候就执行:

1
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

从Selector中选择channel

一旦我们向Selector注册了一个或多个channel后,就可以调用select来获取channel。select方法会返回所有处于就绪状态的channel。 select方法具体如下:

  • int select()
  • int select(long timeout)
  • int selectNow()

select()阻塞,直到至少有一个通道为您注册的事件做好了准备。
select(long timeout)select()做的事一样,不过它的阻塞有一个超时限制。
selectNow()不会阻塞,根据当前状态立刻返回合适的channel。

select()方法返回的int告诉我们有多少通道已经准备好了。也就是说,自从上次调用select()以来,已经准备好了多少个频道。如果你调用select(),它返回1,因为一个通道已经准备好了,然后你再次调用select(),并且一个通道已经准备好了,它将再次返回1。如果您在第一个已经准备好的通道上没有做任何事情,那么您现在有两个现成的通道,但是在每个select()调用之间只有一个通道已经准备好了。

selectedKeys()

一旦你调用了select()方法,它的返回值表明一个或多个通道已经准备好了,你可以通过调用选择器的selectedKeys()方法来访问已准备好的通道。如下:

1
Set<SelectionKey> selectedKeys = selector.selectedKeys();

当你用一个选择器注册一个通道时,Channel.register()方法返回一个SelectionKey对象。key表示通道注册于那个选择器。您可以通过selectedKeySet()方法从SelectionKey中访问所有key。

遍历这些SelectionKey可以通过如下方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

while(keyIterator.hasNext()) {

SelectionKey key = keyIterator.next();

if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.

} else if (key.isConnectable()) {
// a connection was established with a remote server.

} else if (key.isReadable()) {
// a channel is ready for reading

} else if (key.isWritable()) {
// a channel is ready for writing
}

keyIterator.remove();
}

上述循环会迭代key集合,针对每个key我们单独判断他是处于何种就绪状态。

注意keyIterater.remove()方法的调用,Selector本身并不会移除SelectionKey对象。当你处理channel时,你必须这样做。下一次通道“就绪”时,选择器将再次把它添加到选定的keySet中。

SelectionKey.channel返回的channel实例需要强转为我们实际使用的具体的channel类型,例如ServerSocketChannel或SocketChannel。

wakeUp()

由于调用select而被阻塞的线程,可以通过调用Selector.wakeup()来唤醒即便此时已然没有channel处于就绪状态。具体操作是,在另外一个线程调用wakeup,被阻塞与select方法的线程就会立刻返回。

close()

当操作Selector完毕后,需要调用close方法。close的调用会关闭Selector并使所有在该选择器中注册的SelectionKey实例失效。channel本身不会被关闭。

完整的Selector案例

这有一个完整的案例,首先打开一个Selector,然后注册channel,最后监听Selector的状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Selector selector = Selector.open();

channel.configureBlocking(false);

SelectionKey key = channel.register(selector, SelectionKey.OP_READ);


while(true) {

int readyChannels = selector.select();

if(readyChannels == 0) continue;


Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

while(keyIterator.hasNext()) {

SelectionKey key = keyIterator.next();

if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.

} else if (key.isConnectable()) {
// a connection was established with a remote server.

} else if (key.isReadable()) {
// a channel is ready for reading

} else if (key.isWritable()) {
// a channel is ready for writing
}

keyIterator.remove();
}
}

FileChannel文件通道

Java NIO中的FileChannel是用于连接文件的通道。通过文件通道可以读、写文件的数据。Java NIO的FileChannel是相对标准Java IO API的可选接口。
FileChannel不可以设置为非阻塞模式,他只能在阻塞模式下运行。

打开文件通道

在使用FileChannel前必须打开通道,打开一个文件通道需要通过输入/输出流或者RandomAccessFile,下面是通过RandomAccessFile打开文件通道的案例:

1
2
RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();

从文件通道内读取数据

读取文件通道的数据可以通过read方法:

1
2
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf);

首先开辟一个Buffer,从通道中读取的数据会写入Buffer内。接着就可以调用read方法,read的返回值代表有多少字节被写入了Buffer,返回-1则表示已经读取到文件结尾了。

向文件通道写入数据

写数据用write方法,入参是Buffer:

1
2
3
4
5
6
7
8
9
10
11
String newData = "New String to write to file..." + System.currentTimeMillis();

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());

buf.flip();

while(buf.hasRemaining()) {
channel.write(buf);
}

注意这里的write调用写在了while循环里面,这是因为write不能保证有多少数据真实被写入,因此需要循环写入直到没有更多数据。

关闭通道

操作完毕后,需要把通道关闭:

1
channel.close();

FileChannel Position

当操作FileChannel的时候读和写都是基于特定起始位置的(position),获取当前的位置可以用FileChannel的position()方法,设置当前位置可以用带参数的position(long pos)方法。

1
2
3
long pos channel.position();

channel.position(pos +123);

假设我们把当前位置设置为文件结尾之后,那么当我们从通道中读取数据时就会发现返回值是-1,表示已经到达文件结尾了。
如果把当前位置设置为文件结尾之后,在想通道中写入数据,文件会自动扩展以便写入数据,但是这样会导致文件中出现类似空洞,即文件的一些位置是没有数据的。

FileChannel Size

size()方法可以返回FileChannel对应的文件的文件大小:

1
long fileSize = channel.size();

FileChannel Truncate

利用truncate方法可以截取指定长度的文件:

1
channel.truncate(1024);

FileChannel Force

force方法会把所有未写磁盘的数据都强制写入磁盘。这是因为在操作系统中出于性能考虑会把数据放入缓冲区,所以不能保证数据在调用write写入文件通道后就及时写到磁盘上了,除非手动调用force方法。
force方法需要一个布尔参数,代表是否把meta data也一并强制写入。

1
channel.force(true);

ServerSocketChannel & SocketChannel

ServerSocketChannel

在Java NIO中,ServerSocketChannel是用于监听TCP链接请求的通道,正如Java网络编程中的ServerSocket一样。

ServerSocketChannel实现类位于java.nio.channels包下面。下面是一个示例程序:

1
2
3
4
5
6
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bin(new InetSocketAddress(9999));
while(true) {
SocketChannel socketChannel = serverSocketChannel.accept();
//do something with socketChannel...
}

打开ServerSocketChannel

打开一个ServerSocketChannel我们需要调用他的open()方法,例如:

1
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

关闭ServerSocketChannel

关闭一个ServerSocketChannel我们需要调用他的close()方法,例如:

1
serverSocketChannel.close();

监听链接

通过调用accept()方法,我们就开始监听端口上的请求连接。当accept()返回时,他会返回一个SocketChannel连接实例,实际上accept()是阻塞操作,他会阻塞带去线程知道返回一个连接; 很多时候我们是不满足于监听一个连接的,因此我们会把accept()的调用放到循环中,就像这样:

1
2
3
4
while(true){
SocketChannel socketChannel = serverSocketChannel.accept();
//do something with socketChannel...
}

当然我们可以在循环体内加上合适的中断逻辑,而不是单纯的在while循环中写true,以此来结束循环监听;

非阻塞模式

实际上ServerSocketChannel是可以设置为非阻塞模式的。在非阻塞模式下,调用accept()函数会立刻返回,如果当前没有请求的链接,那么返回值为空null。因此我们需要手动检查返回的SocketChannel是否为空,例如:

1
2
3
4
5
6
7
8
9
10
11
12
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);

while(true){
SocketChannel socketChannel = serverSocketChannel.accept();

if(socketChannel != null){
//do something with socketChannel...
}
}

SocketChannel

在Java NIO体系中,SocketChannel是用于TCP网络连接的套接字接口,相当于Java网络编程中的Socket套接字接口。创建SocketChannel主要有两种方式,如下:

  1. 打开一个SocketChannel并连接网络上的一台服务器。
  2. 当ServerSocketChannel接收到一个连接请求时,会创建一个SocketChannel。
建立一个SocketChannel连接

打开一个SocketChannel可以这样操作:

1
2
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));

关闭一个SocketChannel连接

关闭一个SocketChannel只需要调用他的close方法,如下:

1
socketChannel.close();

从SocketChannel中读数据

从一个SocketChannel连接中读取数据,可以通过read()方法,如下:

1
2
3
ByteBuffer buf = ByteBuffer.allocate(48);

int bytesRead = socketChannel.read(buf);

首先需要开辟一个Buffer。从SocketChannel中读取的数据将放到Buffer中。

接下来就是调用SocketChannel的read()方法。这个read()会把通道中的数据读到Buffer中。read()方法的返回值是一个int数据,代表此次有多少字节的数据被写入了Buffer中。如果返回的是-1,那么意味着通道内的数据已经读取完毕,到底了(链接关闭)。

向SocketChannel写数据

向SocketChannel中写入数据是通过write()方法,write也需要一个Buffer作为参数。下面看一下具体的示例:

1
2
3
4
5
6
7
8
9
10
11
String newData = "New String to write to file..." + System.currentTimeMillis();

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());

buf.flip();

while(buf.hasRemaining()) {
channel.write(buf);
}

仔细观察代码,这里我们把write()的调用放在了while循环中。这是因为我们无法保证在write的时候实际写入了多少字节的数据,因此我们通过一个循环操作,不断把Buffer中数据写入到SocketChannel中知道Buffer中的数据全部写入为止。

非阻塞模式

我们可以把SocketChannel设置为non-blocking(非阻塞)模式。这样的话在调用connect(), read(), write()时都是异步的。

connect()

如果我们设置了一个SocketChannel是非阻塞的,那么调用connect()后,方法会在链接建立前就直接返回。为了检查当前链接是否建立成功,我们可以调用finishConnect(),如下:

1
2
3
4
5
6
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));

while(! socketChannel.finishConnect() ){
//wait, or do something else...
}

write()

在非阻塞模式下,调用write()方法不能确保方法返回后写入操作一定得到了执行。因此我们需要把write()调用放到循环内。这和前面在讲write()时是一样的,此处就不在代码演示。

read()

在非阻塞模式下,调用read()方法也不能确保方法返回后,确实读到了数据。因此我们需要自己检查的整型返回值,这个返回值会告诉我们实际读取了多少字节的数据。

Selector结合非阻塞模式

SocketChannel的非阻塞模式可以和Selector很好的协同工作。把一个活多个SocketChannel注册到一个Selector后,我们可以通过Selector知道哪些channels通道是处于可读,可写等等状态的。后续我们会再详细阐述如果联合使用Selector与SocketChannel。

NIO服务器

代码比传统的Socket编程复杂不少

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
public class NIOServer {
private static int DEFAULT_PORT = 12345;
private static ServerHandler serverHandler;

public static void start() {
start(DEFAULT_PORT);
}

public static synchronized void start(int port) {
if (serverHandler != null) {
serverHandler.stop();
}
serverHandler = new ServerHandler(port);
new Thread(serverHandler, "Server").start();
}

public static void main(String[] args) {
start();
}
}
public class ServerHandler implements Runnable {
private Selector selector;
private ServerSocketChannel serverChannel;
private volatile boolean started;

public ServerHandler(int port) {
try {
//创建选择器
selector = Selector.open();
//打开监听通道
serverChannel = ServerSocketChannel.open();
//如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
serverChannel.configureBlocking(false); //开启非阻塞模式
//绑定端口 backlog设为1024
serverChannel.socket().bind(new InetSocketAddress(port), 1024);
//监听客户端连接请求
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
//标记服务器已开启
started = true;
System.out.println("服务器已启动,端口号:" + port);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}

public void stop() {
started = false;
}

@Override
public void run() {
//循环遍历selector
while (started) {
try {
//无论是否有读写事件发生,selector每隔1s被唤醒一次
selector.select(1000);
//阻塞,只有当至少一个注册的事件发生的时候才会继续.
//selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
//selector关闭后会自动释放里面管理的资源
if (selector != null) {
try {
selector.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}

private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
//处理新接入的请求消息
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//通过ServerSocketChannel的accept创建SocketChannel实例
//完成该操作意味着完成TCP三次握手,TCP物理链路正式建立
SocketChannel sc = ssc.accept();
//设置为非阻塞的
sc.configureBlocking(false);
//注册为读
sc.register(selector, SelectionKey.OP_READ);
}
//读消息
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
//创建ByteBuffer,并开辟一个1M的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取请求码流,返回读取到的字节数
int readBytes = sc.read(buffer);
//读取到字节,对字节进行编解码
if (readBytes > 0) {
//将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
buffer.flip();
//根据缓冲区可读字节数创建字节数组
byte[] bytes = new byte[buffer.remaining()];
//将缓冲区可读字节数组复制到新建的数组中
buffer.get(bytes);
String expression = new String(bytes, "UTF-8");
System.out.println("服务器收到消息:" + expression);
//处理数据
String result = null;
try {
result = Calculator.cal(expression).toString();
} catch (Exception e) {
result = "计算错误:" + e.getMessage();
}
//发送应答消息
doWrite(sc, result);
}
//没有读取到字节 忽略
//else if(readBytes==0);
//链路已经关闭,释放资源
else if (readBytes < 0) {
key.cancel();
sc.close();
}
}
}
}

//异步发送应答消息
private void doWrite(SocketChannel channel, String response) throws IOException {
//将消息编码为字节数组
byte[] bytes = response.getBytes();
//根据数组容量创建ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
//将字节数组复制到缓冲区
writeBuffer.put(bytes);
//flip操作
writeBuffer.flip();
//发送缓冲区的字节数组
channel.write(writeBuffer);
//****此处不含处理“写半包”的代码 //https://blog.csdn.net/nongfuyumin/article/details/78343999
}
}
public class NIOClient {
private static final String DEFAULT_HOST = "127.0.0.1";
private static final int DEFAULT_PORT = 12345;
private static ClientHandler clientHandler;

public static void start() {
start(DEFAULT_HOST, DEFAULT_PORT);
}

public static synchronized void start(String ip, int port) {
if (clientHandler != null) {
clientHandler.stop();
}
clientHandler = new ClientHandler(ip, port);
new Thread(clientHandler, "Server").start();
}

//向服务器发送消息
public static boolean sendMsg(String msg) throws Exception {
clientHandler.sendMsg(msg);
return true;
}

public static void main(String[] args) {
start();
}
}
public class ClientHandler implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean started;

public ClientHandler(String ip, int port) {
this.host = ip;
this.port = port;
try {
//创建选择器
selector = Selector.open();
//打开监听通道
socketChannel = SocketChannel.open();
//如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
socketChannel.configureBlocking(false);//开启非阻塞模式
started = true;
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}

public void stop() {
started = false;
}

@Override
public void run() {
try {
doConnect();
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
while (started) { //循环遍历selector
try {
//无论是否有读写事件发生,selector每隔1s被唤醒一次
selector.select(1000);
//阻塞,只有当至少一个注册的事件发生的时候才会继续.
//selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
//selector关闭后会自动释放里面管理的资源
if (selector != null) {
try {
selector.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}

private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
SocketChannel sc = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (!sc.finishConnect()) {
System.exit(1);
}
}
//读消息
if (key.isReadable()) {
//创建ByteBuffer,并开辟一个1M的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取请求码流,返回读取到的字节数
int readBytes = sc.read(buffer);
//读取到字节,对字节进行编解码
if (readBytes > 0) {
//将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
buffer.flip();
//根据缓冲区可读字节数创建字节数组
byte[] bytes = new byte[buffer.remaining()];
//将缓冲区可读字节数组复制到新建的数组中
buffer.get(bytes);
String result = new String(bytes, "UTF-8");
System.out.println("客户端收到消息:" + result);
}
//没有读取到字节 忽略
//else if(readBytes==0);
//链路已经关闭,释放资源
else if (readBytes < 0) {
key.cancel();
sc.close();
}
}
}
}

//异步发送消息
private void doWrite(SocketChannel channel, String request) throws IOException {
//将消息编码为字节数组
byte[] bytes = request.getBytes();
//根据数组容量创建ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
//将字节数组复制到缓冲区
writeBuffer.put(bytes);
//flip操作
writeBuffer.flip();
//发送缓冲区的字节数组
channel.write(writeBuffer);
//****此处不含处理“写半包”的代码
}

private void doConnect() throws IOException {
if (!socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}

public void sendMsg(String msg) throws Exception {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel, msg);
}
}
public class NIOTest { //测试启动类
public static void main(String[] args) throws Exception {
NIOServer.start();
Thread.sleep(100);
NIOClient.start();
while(NIOClient.sendMsg(new Scanner(System.in).nextLine())); // 3+2-5
}
}

非阻塞服务器(Non-blocking Server)

现在我们已经知道了Java NIO里面那些非阻塞特性是怎么工作的,但是要设计一个非阻塞的服务仍旧比较困难。非阻塞IO相对传统的阻塞IO给开发者带来了更多的挑战。在本节非阻塞服务的讲解中,我们一起来讨论这些会面临的主要挑战,同时也会给出一些潜在的解决方案。

非阻塞服务器-GitHub源码

非阻塞IO通道(Non-blocking IO Pipelines)

非阻塞的IO管道(Non-blocking IO Pipelines)可以看做是整个非阻塞IO处理过程的链条。包括在以非阻塞形式进行的读与写操作。下面有一张插图,简单的描述了一个基础的非阻塞的IO管道(Non-blocking IO Pipelines):

我们的组件(Component)通过Selector检查当前Channel是否有数据需要写入。此时component读入数据,并且根据输入的数据input对外提供数据输出output。这个对外的数据输出output被写到了另一个Channel中。

一个非阻塞的IO管道不必同时需要读和写数据,通常来说有些管道只需要读数据,而另一些管道则只需写数据。

上面的这幅流程图仅仅展示了一个组件。实际上一个管道可能存在多个component在处理输入数据。管道的长度取决于管道具体要做的事情。

当然一个非阻塞的IO管道他也可以同时从多个Channel中读取数据,例如同时冲多个SocketChannel中读取数据;

上面的流程图实际上被简化了,图中的Component实际上负责初始化Selector,从Channel中读取数据,而不是由Channel往Selector压如数据(push),这是简化的上图容易给人带来的误解。

非阻塞和阻塞通道比较(Non-blocking vs. Blocking IO Pipelines)

非阻塞IO管道和阻塞IO管道之间最大的区别是他们各自如何从Channel(套接字socket或文件file)读写数据。

IO管道通常直接从流中(来自于socket活file的流)读取数据,然后把数据分割为连续的消息。这个处理与我们读取流信息,用tokenizer进行解析非常相似。不同的是我们在这里会把数据流分割为更大一些的消息块。我把这个过程叫做Message Reader.下面是一张说明的插图:

一个阻塞IO管道的使用可以和输入流一样调用,每次从Channel中读取一个字节的数据,阻塞自身直到有数据可读。这个流程就是一个阻塞的Messsage Reader实现。

使用阻塞IO大大简化了Message Reader的实现成本。阻塞的Message Reader无需关注没有数据返回的情形,无需关注返回部分数据或者数据解析需要被复用的问题。

相似的,一个阻塞的Message Writer也不需要关注写入部分数据,和数据复用的问题。

阻塞IO通道的缺点(Blocking IO Pipeline Drawbacks)

上面提到了阻塞的Message Reader易于实现,但是阻塞也给他带了不可避免的缺点,必须为每个数据数量都分配一个单独线程。原因就在于IO接口在读取数据时在有数据返回前会一直被阻塞住。这直接导致我们无法用单线程来处理一个流没有数据返回时去读取其他的流。每当一个线程尝试去读取一个流的数据,这个线程就会被阻塞直到有数据真正返回。

如果这样的IO管道运用到服务器去处理高并发的链接请求,服务器将不得不为每一个到来的链接分配一个单独的线程。如果并发数不高比如每一时刻只有几百并发,也行不会有太大问题。一旦服务器的并发数上升到百万级别,这种设计就缺乏伸缩性。每个线程需要为堆栈分配320KB(32位JVM)到1024KB(64位JVM)的内存空间。这就是说如果有1,000,000个线程,需要1TB的内存。而这些在还没开始真正处理接收到的消息前就需要(消息处理中还需要为对象开拍内存)。

为了减少线程数,很多服务器都设计了线程池,把所有接收到的请求放到队列内,每次读取一条连接进行处理。这种设计可以用下图表示:

但是这种设计要求缓冲的连接进程发送有意义的数据。如果这些连接长时间处于非活跃的状态,那么大量非活跃的连接会阻塞线程池中的所有线程。这会导致服务器的响应速度特别慢甚至无响应。

有些服务器为了减轻这个问题,采取的操作是适当增加线程池的弹性。例如,当线程池所有线程都处于饱和时,线程池可能会自动扩容,启动更多的线程来处理事务。这个解决方案会使得服务器维护大量不活跃的链接。但是需要谨记服务器所能开辟的线程数是有限制的。所有当有1,000,000个低速的链接时,服务器还是不具备伸缩性。

基础的非阻塞通道设计(Basic Non-blocking IO Pipeline Design)

一个非阻塞的IO通道可以用单线程读取多个数据流。这个前提是相关的流可以切换为非阻塞模式(并不是所有流都可以以非阻塞形式操作)。在非阻塞模式下,读取一个流可能返回0个或多个字节。如果流还没有可供读取的数据那么就会返回0,其他大于1的返回都表明这是实际读取到的数据;

为了避开没有数据可读的流,我们结合Java NIO中的Selector。一个Selector可以注册多个SelectableChannel实例。当我们调用select()或selectorNow()方法时Selector会返回一个有数据可读的SelectableChannel实例。这个设计可以如下插图:

读取部分信息(Reading Partial Messages)

当我们冲SelectableChannel中读取一段数据后,我们并不知道这段数据是否是完整的一个message。因为一个数据段可能包含部分message,也就是说即可能少于一个message,也可能多一个message,正如下面这张插图所示意的那样:

要处理这种截断的message,我们会遇到两个问题:

  • 检测数据段中是否包含一个完整的message
  • 在message剩余部分获取到之前,我们如何处理不完整的message

检测完整message要求Message Reader查看数据段中的数据是否至少包含一个完整的message。如果包含一个或多个完整message,这些message可以被下发到通道中处理。查找完整message的过程是个大量重复的操作,所以这个操作必须是越快越好的。

当数据段中有一个不完整的message时,无论不完整消息是整个数据段还是说在完整message前后,这个不完整的message数据都需要在剩余部分获得前存储起来。

检查message完整性和存储不完整message都是Message Reader的职责。为了避免混淆来自不同Channel的数据,我们为每一个Channel分配一个Message Reader。整个设计大概是这样的:

当我们通过Selector获取到一个有数据可以读取的Channel之后,改Channel关联的Message Reader会读取数据,并且把数据打断为Message块。得到完整的message后就可以通过通道下发到其他组件进行处理。

一个Message Reader自然是协议相关的。他需要知道message的格式以便读取。如果我们的服务器是跨协议复用的,那他必须实现Message Reader的协议-大致类似于接收一个Message Reader工厂作为配置参数。

存储不完整的Message(Storing Partial Messages)

现在我们已经明确了由Message Reader负责不完整消息的存储直到接收到完整的消息。闲杂我们还需要知道这个存储过程需要如何来实现。

在设计的时候我们需要考虑两个关键因素:

  • 我们希望在拷贝消息数据的时候数据量能尽可能的小,拷贝量越大则性能相对越低;
  • 我们希望完整的消息是以顺序的字节存储,这样方便进行数据的解析;

为每个Message Reade分配Buffer(A Buffer Per Message Reader)

显然不完整的消息数据需要存储在某种buffer中。比较直接的办法是我们为每个Message Reader都分配一个内部的buffer成员。但是,多大的buffer才合适呢?这个buffer必须能存储下一个message最大的大小。如果一个message最大是1MB,那每个Message Reader内部的buffer就至少有1MB大小。

在百万级别的并发链接数下,1MB的buffer基本没法正常工作。举例来说,1,000,000 x 1MB就是1TB的内存大小!如果消息的最大数据量是16MB又需要多少内存呢?128MB呢?

可伸缩Buffer(Resizable Buffers)

另一个方案是在每个Message Reader内部维护一个容量可变的buffer。一个可变的buffer在初始化时占用较少控件,在消息变得很大超出容量时自动扩容。这样每个链接就不需要都占用比如1MB的空间。每个链接只使用承载下一个消息所必须的内存大小。

要实现一个可伸缩的buffer有几种不同的办法。每一种都有它的优缺点,下面几个小结我会逐一讨论它们。

拷贝扩容(Resize by Copy)

第一种实现可伸缩buffer的办法是初始化buffer的时候只申请较少的空间,比如4KB。如果消息超出了4KB的大小那么开赔一个更大的空间,比如8KB,然后把4KB中的数据拷贝纸8KB的内存块中。

以拷贝方式扩容的优点是一个消息的全部数据都被保存在了一个连续的字节数组中。这使得数据解析变得更加容易。

同时它的缺点是会增加大量的数据拷贝操作。

为了减少数据的拷贝操作,你可以分析整个消息流中的消息大小,一次来找到最适合当前机器的可以减少拷贝操作的buffer大小。例如,你可能会注意到觉大多数的消息都是小于4KB的,因为他们仅仅包含了一个非常请求和响应。这意味着消息的处所荣校应该设置为4KB。

同时,你可能会发现如果一个消息大于4KB,很可能是因为他包含了一个文件。你会可能注意到 大多数通过系统的数据都是小于128KB的。所以我们可以在第一次扩容设置为128KB。

最后你可能会发现当一个消息大于128KB后,没有什么规律可循来确定下次分配的空间大小,这意味着最后的buffer容量应该设置为消息最大的可能数据量。

结合这三次扩容时的大小设置,可以一定程度上减少数据拷贝。4KB以下的数据无需拷贝。在1百万的连接下需要的空间例如1,000,000x4KB=4GB,目前(2015)大多数服务器都扛得住。4KB到128KB会仅需拷贝一次,即拷贝4KB数据到128KB的里面。消息大小介于128KB和最大容量的时需要拷贝两次。首先4KB数据被拷贝第二次是拷贝128KB的数据,所以总共需要拷贝132KB数据。假设没有很多的消息会超过128KB,那么这个方案还是可以接受的。

当一个消息被完整的处理完毕后,它占用的内容应当即刻被释放。这样下一个来自东一个链接通道的消息可以从最小的buffer大小重新开始。这个操作是必须的如果我们需要尽可能高效地复用不同链接之间的内存。大多数情况下并不是所有的链接都会在同一时刻需要大容量的buffer。

笔者写了一个完整的教程阐述了如何实现一个内存buffer使其支持扩容:Resizable Arrays 。这个教程也附带了一个指向GitHub上的源码仓地址,里面有实现方案的具体代码。

追加扩容(Resize by Append)

另一种实现buffer扩容的方案是让buffer包含几个数组。当需要扩容的时候只需要在开辟一个新的字节数组,然后把内容写到里面去。

这种扩容也有两个具体的办法。一中是开辟单独的字节数组,然后用一个列表把这些独立数组关联起来。另一种是开辟一些更大的,相互共享的字节数组切片,然后用列表把这些切片和buffer关联起来。个人而言,笔者认为第二种切片方案更好一点点,但是它们之前的差异比较小。(译者话:关于这两个办法,我个人觉得概念介绍有点难懂,建议读者也参考一下原文😏)

这种追加扩容的方案不管是用独立数组还是切片都有一个优点,那就是写数据的时候不需要二外的拷贝操作。所有的数据可以直接从socket(Channel)中拷贝至数组活切片当中。

这种方案的缺点也很明显,就是数据不是存储在一个连续的数组中。这会使得数据的解析变得更加复杂,因为解析器不得不同时查找每一个独立数组的结尾和所有数组的结尾。正因为我们需要在写数据时查找消息的结尾,这个模型在设计实现时会相对不那么容易。

TLV编码消息(TLV Encoded Messages)

有些协议的消息消失采用的是一种TLV格式(Type, Length, Value)。这意味着当消息到达时,消息的完整大小存储在了消息的开始部分。我们可以立刻判断为消息开辟多少内存空间。

TLV编码是的内存管理变得更加简单。我们可以立刻知道为消息分配多少内存。即便是不完整的消息,buffer结尾后面也不会有浪费的内存。

TLV编码的一个缺点是我们需要在消息的全部数据接收到之前就开辟好需要用的所有内存。因此少量链接慢,但发送了大块数据的链接会占用较多内存,导致服务器无响应。

解决上诉问题的一个变通办法是使用一种内部包含多个TLV的消息格式。这样我们为每个TLV段分配内存而不是为整个的消息分配,并且只在消息的片段到达时才分配内存。但是消息片段很大时,任然会出现一样的问题。

另一个办法是为消息设置超时,如果长时间未接收到的消息(比如10-15秒)。这可以让服务器从偶发的并发处理大块消息恢复过来,不过还是会让服务器有一段时间无响应。另外恶意的DoS攻击会导致服务器开辟大量内存。

TLV编码有不同的变种。有多少字节使用这样确切的类型和字段长度取决于每个独立的TLV编码。有的TLV编码吧字段长度放在前面,接着放类型,最后放值。尽管字段的顺序不同,但他任然是一个TLV的类型。

TLV编码使得内存管理更加简单,这也是HTTP1.1协议让人觉得是一个不太优良的的协议的原因。正因如此,HTTP2.0协议在设计中也利用TLV编码来传输数据帧。也是因为这个原因我们设计了自己的利用TLV编码的网络协议VStack.co。

写不完整的消息(Writing Partial Messages)

在非阻塞IO管道中,写数据也是一个不小的挑战。当你调用一个非阻塞模式Channel的write()方法时,无法保证有多少机字节被写入了ByteBuffer中。write方法返回了实际写入的字节数,所以跟踪记录已被写入的字节数也是可行的。这就是我们遇到的问题:持续记录被写入的不完整的小树知道一个消息中所有的数据都发送完毕。

为了管理不完整消息的写操作,我们需要创建一个Message Writer。正如前面的Message Reader,我们也需要每个Channel配备一个Message Writer来写数据。在每个Message Writer中我们记录准确的已经写入的字节数。

为了避免多个消息传递到Message Writer超出他所能处理到Channel的量,我们需要让到达的消息进入队列。Message Writer则尽可能快的将数据写到Channel里。

下面是一个流程图,展示的是不完整消息被写入的过程:

为了使Message Writer能够持续发送刚才已经发送了一部分的消息,Message Writer需要被移植调用,这样他就可以发送更多数据。

如果你有大量的链接,你会持有大量的Message Writer实例。检查比如1百万的Message Writer实例是来确定他们是否处于可写状态是很慢的操作。首先,许多Message Writer可能根本就没有数据需要发送。我们不想检查这些实例。其次,不是所有的Channel都处于可写状态。我们不想浪费时间在这些非写入状态的Channel。

为了检查一个Channel是否可写,可以把它注册到Selector上。但是我们不希望把所有的Channel实例都注册到Selector。试想一下,如果你有1百万的链接,这里面大部分是空闲的,把1百万链接都祖册到Selector上。然后调用select方法的时候就会有很多的Channel处于可写状态。你需要检查所有这些链接中的Message Writer以确认是否有数据可写。

为了避免检查所有的这些Message Writer,以及那些根本没有消息需要发送给他们的Channel实例,我么可以采用如下两步策略:

  • 当有消息写入到Message Writer忠厚,把它关联的Channel注册到Selector上(如果还未注册的话)。

  • 当服务器有空的时候,可以检查Selector看看注册在上面的Channel实例是否处于可写状态。每个可写的channel,使其Message Writer向Channel中写入数据。如果Message Writer已经把所有的消息都写入Channel,把Channel从Selector上解绑。

这两个小步骤确保只有有数据要写的Channel才会被注册到Selector。

集成(Putting it All Together)

正如你所知到的,一个被阻塞的服务器需要时刻检查当前是否有显得完整消息抵达。在一个消息被完整的收到前,服务器可能需要检查多次。检查一次是不够的。

类似的,服务器也需要时刻检查当前是否有任何可写的数据。如果有的话,服务器需要检查相应的链接看他们是否处于可写状态。仅仅在消息第一次进入队列时检查是不够的,因为一个消息可能被部分写入。

总而言之,一个非阻塞的服务器要三个管道,并且经常执行:

  • 读数据管道,用来检查打开的链接是否有新的数据到达;
  • 处理数据管道,负责处理接收到的完整消息;
  • 写数据管道,用于检查是否有数据可以写入打开的连接中;

这三个管道在循环中重复执行。你可以尝试优化它的执行。比如,如果没有消息在队列中等候,那么可以跳过写数据管道。或者,如果没有收到新的完整消息,你甚至可以跳过处理数据管道。

下面这张流程图阐述了这整个服务器循环过程:

假如你还是柑橘这比较复杂难懂,可以去clone我们的源码仓: https://github.com/jjenkov/java-nio-server 也许亲眼看到了代码会帮助你理解这一块是如何实现的。

服务器线程模型(Server Thread Model)

我们在GitHub上的源码中实现的非阻塞IO服务使用了一个包含两条线程的线程模型。第一个线程负责从ServerSocketChannel接收到达的链接。另一个线程负责处理这些链接,包括读消息,处理消息,把响应写回到链接。这个双线程模型如下:

服务器的循环处理在处理线程中执行

NIO Pipe & Path & Files

Pipe

一个Java NIO的管道是两个线程间单向传输数据的连接。一个管道(Pipe)有一个source channel和一个sink channel(没想到合适的中文名)。我们把数据写到sink channel中,这些数据可以同过source channel再读取出来。

下面是一个管道的示意图:

创建管道(Creating a Pipe)

打开一个管道通过调用Pipe.open()工厂方法,如下:

1
Pipe pipe = Pipe.open();

向管道写入数据(Writing to a Pipe)

向管道写入数据需要访问他的sink channel:

1
Pipe.SinkChannel sinkChannel = pipe.sink();

接下来就是调用write()方法写入数据了:

1
2
3
4
5
6
7
8
9
10
11
String newData = "New String to write to file..." + System.currentTimeMillis();

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());

buf.flip();

while(buf.hasRemaining()) {
sinkChannel.write(buf);
}

从管道读取数据(Reading from a Pipe)

类似的从管道中读取数据需要访问他的source channel:

1
Pipe.SourceChannel sourceChannel = pipe.source();

接下来调用read()方法读取数据:

1
2
3
ByteBuffer buf = ByteBuffer.allocate(48);

int bytesRead = inChannel.read(buf);

注意这里read()的整形返回值代表实际读取到的字节数。

Path & Files

参考: https://www.cnblogs.com/fysola/p/6143939.html

AIO编程

JDK7对NIO进行了重大改进,推出了基于异步Channel的IO,并提供了异步文件通道和异步套接字通道的实现。

AsynchronousFileChannel

创建AsynchronousFileChannel(Creating an AsynchronousFileChannel)

AsynchronousFileChannel的创建可以通过open()静态方法:

1
2
Path path = Paths.get("data/test.xml");
AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);

open()的第一个参数是一个Path实体,指向我们需要操作的文件。 第二个参数是操作类型。上述示例中我们用的是StandardOpenOption.READ,表示以读的形式操作文件。

读取数据(Reading Data)

读取AsynchronousFileChannel的数据有两种方式。每种方法都会调用AsynchronousFileChannel的一个read()接口。下面分别看一下这两种写法。

通过Future读取数据(Reading Data Via a Future)

第一种方式是调用返回值为Future的read()方法:

1
Future<Integer> operation = fileChannel.read(buffer, 0);

这种方式中,read()接受一个ByteBuffer座位第一个参数,数据会被读取到ByteBuffer中。第二个参数是开始读取数据的位置。

read()方法会立刻返回,即使读操作没有完成。我们可以通过isDone()方法检查操作是否完成。

下面是一个略长的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);

ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;

Future<Integer> operation = fileChannel.read(buffer, position);

while(!operation.isDone());

buffer.flip();
byte[] data = new byte[buffer.limit()];
buffer.get(data);
System.out.println(new String(data));
buffer.clear();

在这个例子中我们创建了一个AsynchronousFileChannel,然后创建一个ByteBuffer作为参数传给read。接着我们创建了一个循环来检查是否读取完毕isDone()。这里的循环操作比较低效,它的意思是我们需要等待读取动作完成。

一旦读取完成后,我们就可以把数据写入ByteBuffer,然后输出。

通过CompletionHandler读取数据(Reading Data Via a CompletionHandler)

另一种方式是调用接收CompletionHandler作为参数的read()方法。下面是具体的使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fileChannel.read(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("result = " + result);

attachment.flip();
byte[] data = new byte[attachment.limit()];
attachment.get(data);
System.out.println(new String(data));
attachment.clear();
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {

}
});

这里,一旦读取完成,将会触发CompletionHandler的completed()方法,并传入一个Integer和ByteBuffer。前面的整形表示的是读取到的字节数大小。第二个ByteBuffer也可以换成其他合适的对象方便数据写入。 如果读取操作失败了,那么会触发failed()方法。

写数据(Writing Data)

和读数据类似某些数据也有两种方式,调动不同的的write()方法,下面分别看介绍这两种方法。

通过Future写数据(Writing Data Via a Future)

通过AsynchronousFileChannel我们可以异步写数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Path path = Paths.get("data/test-write.txt");
AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);

ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;

buffer.put("test data".getBytes());
buffer.flip();

Future<Integer> operation = fileChannel.write(buffer, position);
buffer.clear();

while(!operation.isDone());

System.out.println("Write done");

首先把文件已写方式打开,接着创建一个ByteBuffer座位写入数据的目的地。再把数据进入ByteBuffer。最后检查一下是否写入完成。 需要注意的是,这里的文件必须是已经存在的,否者在尝试write数据是会抛出一个java.nio.file.NoSuchFileException.

检查一个文件是否存在可以通过下面的方法:

1
2
3
if(!Files.exists(path)){
Files.createFile(path);
}

通过CompletionHandler写数据(Writing Data Via a CompletionHandler)

我们也可以通过CompletionHandler来写数据:

Path path = Paths.get(“data/test-write.txt”);
if(!Files.exists(path)){
Files.createFile(path);
}
AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);

ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;

buffer.put(“test data”.getBytes());
buffer.flip();

fileChannel.write(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() {

@Override
public void completed(Integer result, ByteBuffer attachment) {
    System.out.println("bytes written: " + result);
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
    System.out.println("Write failed");
    exc.printStackTrace();
}

});
同样当数据吸入完成后completed()会被调用,如果失败了那么failed()会被调用。

AsynchronousServerSocketChannel & AsynchronousSocketChannel

虽然NIO在网络操作中,提供了非阻塞的方法,但是NIO的IO行为还是同步的。异步的套接字通道时真正的异步非阻塞I/O,对应于UNIX网络编程中的事件驱动I/O(AIO)。他不需要过多的Selector对注册的通道进行轮询即可实现异步读写,从而简化了NIO的编程模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
public class AIOServer {
private final static String IP = "127.0.0.1";
private final static int PORT = 12345;
public static void main(String[] args) {
new Thread(new AsyncServerHandler(IP, PORT)).start();
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class AsyncServerHandler implements Runnable {

public AsynchronousServerSocketChannel channel;

public AsyncServerHandler(String ip, int port) {
try {
//创建服务端通道
channel = AsynchronousServerSocketChannel.open();
//绑定端口
channel.bind(new InetSocketAddress(ip, port));
System.out.println("服务器已启动,端口号:" + port);
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void run() {
//用于接收客户端的连接
channel.accept(this, new AcceptHandler());
}
}

class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> {
@Override
public void completed(AsynchronousSocketChannel channel, AsyncServerHandler serverHandler) {
//继续接受其他客户端的请求
serverHandler.channel.accept(serverHandler, this);
//创建新的Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
//异步读 第三个参数为接收消息回调的业务Handler
channel.read(buffer, buffer, new ServerReadHandler(channel));
}

@Override
public void failed(Throwable exc, AsyncServerHandler serverHandler) {
exc.printStackTrace();
}
}

class ServerReadHandler implements CompletionHandler<Integer, ByteBuffer> {

private AsynchronousSocketChannel channel;

ServerReadHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}

@Override //读取到消息后的处理
public void completed(Integer result, ByteBuffer attachment) {
//flip操作
attachment.flip();
//根据
byte[] message = new byte[attachment.remaining()];
attachment.get(message);
try {
String expression = new String(message, "UTF-8");
System.out.println("服务器收到消息: " + expression);
String calrResult = null;
try {
calrResult = Calculator.cal(expression).toString();
} catch (Exception e) {
calrResult = "计算错误:" + e.getMessage();
}
//向客户端发送消息
doWrite(calrResult);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}

//发送消息
private void doWrite(String result) {
byte[] bytes = result.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
//异步写数据 参数与前面的read一样
channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
//如果没有发送完,就继续发送直到完成
if (buffer.hasRemaining()) {
channel.write(buffer, buffer, this);
} else {
//创建新的Buffer
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
//异步读 第三个参数为接收消息回调的业务Handler
channel.read(readBuffer, readBuffer, new ServerReadHandler(channel));
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

public class AIOClient {
private final static String IP = "127.0.0.1";
private final static int PORT = 12345;
private static AsyncClientHandler asyncClientHandler;
public static void start() {
start(IP, PORT);
}
public static synchronized void start(String ip, int port) {
asyncClientHandler = new AsyncClientHandler(ip, port);
new Thread(asyncClientHandler).start();
}
//向服务器发送消息
public static boolean sendMsg(String msg) {
asyncClientHandler.sendMsg(msg);
return true;
}
public static void main(String[] args) {
AIOClient.start();
System.out.println("请输入请求消息:");
Scanner scanner = new Scanner(System.in);
while (AIOClient.sendMsg(scanner.nextLine())) ;
}
}
public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable {
private AsynchronousSocketChannel clientChannel;
private String host;
private int port;
private CountDownLatch latch;

public AsyncClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
//创建异步的客户端通道
clientChannel = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void run() {
//创建CountDownLatch等待
latch = new CountDownLatch(1);
//发起异步连接操作,回调参数就是这个类本身,如果连接成功会回调completed方法
clientChannel.connect(new InetSocketAddress(host, port), this, this);
try {
latch.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
clientChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}

@Override //连接服务器成功,意味着TCP三次握手完成
public void completed(Void result, AsyncClientHandler attachment) {
System.out.println("客户端成功连接到服务器...");
}

//连接服务器失败
@Override
public void failed(Throwable exc, AsyncClientHandler attachment) {
System.err.println("连接服务器失败...");
exc.printStackTrace();
try {
clientChannel.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}

//向服务器发送消息
public void sendMsg(String msg) {
byte[] req = msg.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
//异步写
clientChannel.write(writeBuffer, writeBuffer, new ClientWriteHandler(clientChannel, latch));
}
}

class ClientWriteHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel clientChannel;
private CountDownLatch latch;

ClientWriteHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) {
this.clientChannel = clientChannel;
this.latch = latch;
}

@Override
public void completed(Integer result, ByteBuffer buffer) {
//完成全部数据的写入
if (buffer.hasRemaining()) {
clientChannel.write(buffer, buffer, this);
} else {
//读取数据
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
clientChannel.read(readBuffer, readBuffer, new ClientReadHandler(clientChannel, latch));
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.err.println("数据发送失败...");
try {
clientChannel.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
}

class ClientReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel clientChannel;
private CountDownLatch latch;

ClientReadHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) {
this.clientChannel = clientChannel;
this.latch = latch;
}

@Override
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String body;
try {
body = new String(bytes, "UTF-8");
System.out.println("客户端收到结果:" + body);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.err.println("数据读取失败...");
try {
clientChannel.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
}

0%