经过前面的铺垫,在这一节我们进入NIO编程,NIO弥补了原来同步阻塞IO的不足,他提供了高速的、面向块的I/O,NIO中加入的Buffer缓冲区,体现了与原I/O的一个重要区别。在面向流的I/O中,可以将数据直接写入或者将数据直接读到Stream对象中。下面看一些概念
Buffer缓冲区
而在NIO中,所有数据都是用缓冲区处理的,在读取数据时,直接读到缓冲区,在写数据时,也是写到缓冲区,任何时候访问NIO中的数据,都是通过缓冲区进行操作。最常用的是个ByteBuffer缓冲区,他提供了一组功能用于操作字节数组。
Channel通道
Channel 是一个通道,网络数据通过它进行读写,通道和流的区别在于通道是双向的,流是单向的,一个流必须是读或者写,而通道则可以读写同时进行。Channel可以分为用于网路读写的SelectableChannel和用于文件操作的FileChannel。
Selector多路复用器
首先强调一点,多路复用器对于NIO编程非常重要,非常重要,多路复用器提供选择已经就绪的任务的能力。简单的讲,Selector会不断的轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续I/O操作。一个多路复用器可以轮询多个Channel,意味着只需要一个线程负责selector轮询,就可以接入成千上万个客户端。
现在改造上一节的代码使其成为NIO server端
1 package com.example.biodemo; 2 3 4 import java.io.*; 5 import java.net.ServerSocket; 6 import java.net.Socket; 7 8 public class TimeServer { 9 public static void main(String[] args) throws IOException {10 int port = 8090;11 if (args != null && args.length > 0) {12 try {13 port = Integer.valueOf(args[0]);14 } catch (NumberFormatException e) {15 port = 8090;16 }17 }18 // 创建多路复用线程类并初始化多路复用器,绑定端口等以及轮询注册功能19 MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);20 // 启动多路复用类线程负责轮询多路复用器Selector,IO数据处理等操作21 new Thread(timeServer, "NIO-MultiplexerTimeSever-001").start();22 23 24 25 26 27 28 // ===================以下内容注释掉=================================29 30 /* ServerSocket server = null;31 try {32 server = new ServerSocket(port);33 System.out.println("the timeServer is start in port :" + port);34 Socket socket = null;35 // 引入线程池start36 TimeServerHandlerExecutePool singleExecutor = new TimeServerHandlerExecutePool(50,10000);37 while (true) {38 socket = server.accept();39 // 替换BIO中new Thread(new TimeServerHandler(socket)).start();为下一行代码40 singleExecutor.execute(new TimeServerHandler(socket));41 // 引入线程池end42 43 }44 } finally {45 if (server != null) {46 System.out.println("the time server close");47 server.close();48 server = null;49 }50 }*/51 52 }53 }
多路复用类代码,其注册轮询功能及请求消息处理返回在这里进行
1 package com.example.biodemo; 2 3 import java.io.IOException; 4 import java.net.InetSocketAddress; 5 import java.nio.ByteBuffer; 6 import java.nio.channels.SelectionKey; 7 import java.nio.channels.Selector; 8 import java.nio.channels.ServerSocketChannel; 9 import java.nio.channels.SocketChannel; 10 import java.util.Date; 11 import java.util.Iterator; 12 import java.util.Set; 13 14 public class MultiplexerTimeServer implements Runnable { 15 // 定义多路复用器 16 private Selector selector; 17 // 定义ServerSocketChannel 18 private ServerSocketChannel servChannel; 19 // 定义停止标识 20 private boolean stop; 21 22 // 构造函数初始化多路复用器、并绑定监听端口 23 public MultiplexerTimeServer(int port) { 24 try { 25 // 打开一个多路复用器 26 selector = Selector.open(); 27 // 打开ServerSocketChannel通道 28 servChannel = ServerSocketChannel.open(); 29 // 绑定监听端口号,并设置backlog为1024 30 servChannel.socket().bind(new InetSocketAddress(port), 1024); 31 // 设置severSocketChannel为异步非阻塞模式 32 servChannel.configureBlocking(false); 33 // 将ServerSocketChannel 注册到Reactor 线程的多路复用器Selector上,监听ACCEPT事件,并返回一个SelectionKey类型的值 34 SelectionKey selectionKey = servChannel.register(selector, SelectionKey.OP_ACCEPT); 35 System.out.println("The time server is start in port:" + port); 36 } catch (IOException ioe) { 37 ioe.printStackTrace(); 38 // 资源初始化失败则退出 39 System.exit(1); 40 } 41 } 42 43 public void stop() { 44 this.stop = true; 45 } 46 47 @Override 48 public void run() { 49 // 在线程中遍历轮询多路复用器selector 50 while (!stop) { 51 try { 52 /*selector.select();选择一些I/O操作已经准备好的channel。每个channel对应着一个key。这个方法是一个阻塞的选择操作。 53 当至少有一个通道被选择时才返回。当这个方法被执行时,当前线程是允许被中断的。*/ 54 // 该方法是阻塞的,选择一组键,其相应的通道已为 I/O 操作准备就绪。最多等1s,如果还没有就绪的就返回0 55 /*如果 timeout为正,则select(long timeout)在等待有通道被选择时至多会阻塞timeout毫秒 56 如果timeout为零,则永远阻塞直到有至少一个通道准备就绪。 57 timeout不能为负数*/ 58 selector.select(1000); 59 // 当有处于就绪状态的channel时,返回该channel的selectionKey集合,此通道是已准备就绪的键集,已选择键集(I/O操作已就绪返回key)始终是键集的一个子集。 60 SetselectionKeys = selector.selectedKeys(); 61 Iterator it = selectionKeys.iterator(); 62 // Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。 63 SelectionKey key = null; 64 while (it.hasNext()) { 65 // 来一个事件 第一次触发一个accepter线程,SocketReadHandler 66 key = it.next(); 67 // 从iterator中移除该元素 68 it.remove(); 69 try { 70 // 去对该已经准备就绪的I/O操作进行处理 71 dispatch(key); 72 } catch (Exception e) { 73 // 删除处理完不为空的键,关闭相关通道 74 if (key != null) { 75 key.cancel(); 76 if (key.channel() != null) { 77 key.channel().close(); 78 } 79 } 80 } 81 } 82 } catch (IOException ioe1) { 83 ioe1.printStackTrace(); 84 } 85 } 86 // 最后关闭多路复用器 87 if (selector != null) { 88 try { 89 selector.close(); 90 } catch (IOException e) { 91 e.printStackTrace(); 92 } 93 } 94 } 95 //该方法用于处理轮询到的I/O已经就绪的SelectionKey键 96 private void dispatch(SelectionKey key) throws IOException { 97 if (key.isValid()) { 98 // 处理请求接入的信息 99 if (key.isAcceptable()) {100 // 接受新连接,通过SelectionKey获取其通道101 ServerSocketChannel ssc = (ServerSocketChannel) key.channel();102 // 接收客户端连接请求并创建SocketChannel实例103 SocketChannel sc = ssc.accept();104 sc.configureBlocking(false);105 // 添加新连接到多路复用器上106 sc.register(selector, SelectionKey.OP_READ);107 }108 if (key.isReadable()) {109 // 本方法体读取客户端发来的请求数据110 SocketChannel sc = (SocketChannel) key.channel();111 // 开辟一个缓冲区,这里开辟了1M112 ByteBuffer readBuffer = ByteBuffer.allocate(1024);113 // 读取请求码流,返回值>0,读到字节数,返回值=0,没有读到字节数,返回值<0,说明链路已经关闭,114 // 需要关闭SocketChannel115 int readBytes = sc.read(readBuffer);116 if (readBytes > 0) {117 // 读到字节数后的解码操作,对 readBuffer 进行 flip 操作, 作用是将缓冲区当前的 limit 设置为 position118 // position 设置为 0,用于后续对缓冲区的读取操作。119 readBuffer.flip();120 // 根据缓冲区的可读的字节个数创建字节数组121 byte[] bytes = new byte[readBuffer.remaining()];122 // 调用 ByteBuffer的 get 操作将缓冲区可读的字节数复制到新创建的字节数组中123 readBuffer.get(bytes);124 // 调用字符串中的构造函数创建请求消息体并打印。125 String body = new String(bytes, "utf-8");126 System.out.println("The time server receive order :" + body);127 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";128 // 将应答消息异步发送给客户端129 doWrite(sc, currentTime);130 } else if (readBytes < 0) {131 // 对端链路关闭132 key.cancel();133 sc.close();134 } else {135 ;//读到0字节,忽略。136 }137 }138 }139 }140 141 private void doWrite(SocketChannel channel, String response) throws IOException {142 if (response != null && response.trim().length() > 0) {143 // 将字符创编码为字节数组144 byte[] bytes = response.getBytes();145 // 根据字节数组大小创建缓冲区146 ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);147 // 复制字节数组内容进入缓冲区148 writeBuffer.put(bytes);149 // 进行flip操作,作用同上面150 writeBuffer.flip();151 // 将缓冲区中的字节数组发送出去152 channel.write(writeBuffer);153 }154 }155 }
下面是客户端的详细代码及注释,需要注意的是在客户端代码判断是否连接成功时需要进行两步判断(需要判断连接状态和连接结果是否成功),第二步判断成功需要注册状态,否则客户端发送不了消息。
1 package com.example.biodemo; 2 3 import java.io.*; 4 import java.net.Socket; 5 6 public class TimeClient { 7 public static void main(String[] args) { 8 int port = 8090; 9 if (args != null && args.length > 0) {10 try {11 port = Integer.valueOf(args[0]);12 } catch (NumberFormatException ne) {13 port = 8090;14 }15 }16 new Thread(new TimeClientHandles("127.0.0.1",port),"TimeClient-001").start();17 /* 代码改造注释掉以下代码18 Socket socket = null;19 BufferedReader in = null;20 PrintWriter out = null;21 try {22 socket = new Socket("127.0.0.1", port);23 System.out.println(socket.getInputStream());24 in = new BufferedReader(new InputStreamReader(socket.getInputStream()));25 out = new PrintWriter(socket.getOutputStream(), true);26 out.println("QUERY TIME ORDER");27 System.out.println("send order 2 server succeed.");28 String resp = in.readLine();29 System.out.println("now is :" + resp);30 } catch (IOException e1) {31 32 } finally {33 if (out != null) {34 out.close();35 out = null;36 }37 38 if (in != null) {39 try {40 in.close();41 } catch (IOException e2) {42 e2.printStackTrace();43 }44 in = null;45 if (socket != null) {46 try {47 socket.close();48 } catch (IOException e3) {49 e3.printStackTrace();50 }51 52 }53 socket = null;54 }55 }*/56 }57 }
客户端HandleInput代码
1 package com.example.biodemo; 2 3 import java.io.IOException; 4 import java.net.InetSocketAddress; 5 import java.nio.ByteBuffer; 6 import java.nio.channels.ClosedChannelException; 7 import java.nio.channels.SelectionKey; 8 import java.nio.channels.Selector; 9 import java.nio.channels.SocketChannel; 10 import java.util.Iterator; 11 import java.util.Set; 12 13 //该类用来处理异步连接和读写操作 14 public class TimeClientHandles implements Runnable { 15 private String host; 16 private int port; 17 private Selector selector; 18 private SocketChannel socketChannel; 19 private volatile boolean stop; 20 21 // 构造函数初始化并连接服务器 22 public TimeClientHandles(String host, int port) { 23 this.host = host == null ? "127.0.0.1" : host; 24 this.port = port; 25 try { 26 selector = Selector.open(); 27 socketChannel = SocketChannel.open(); 28 socketChannel.configureBlocking(false); 29 } catch (IOException e) { 30 e.printStackTrace(); 31 System.exit(1); 32 } 33 } 34 35 @Override 36 public void run() { 37 // 发送连接请求 38 try { 39 doConnection(); 40 }catch (Exception e){ 41 e.printStackTrace(); 42 System.exit(1); 43 } 44 // 在循环体内轮询多路复用器Selector,当有就绪的Channel时,执行handleInput(key); 45 while (!stop) { 46 try { 47 selector.select(1000); 48 Setkeys = selector.selectedKeys(); 49 Iterator it = keys.iterator(); 50 SelectionKey key = null; 51 while (it.hasNext()) { 52 key = it.next(); 53 it.remove(); 54 try { 55 handleInput(key); 56 } catch (Exception e) { 57 if (key != null) { 58 key.cancel(); 59 if (key.channel() != null) { 60 key.channel().close(); 61 } 62 } 63 } 64 65 } 66 } catch (IOException e) { 67 e.printStackTrace(); 68 } 69 70 } 71 } 72 73 private void handleInput(SelectionKey key) throws IOException { 74 if (key.isValid()) { 75 // 判断是否连接成功(需要判断连接状态和连接结果是否成功) 76 SocketChannel sc = (SocketChannel) key.channel(); 77 // 连接状态判断,是连接状态返回true,判断的是服务端是否已经返回ACK应答消息 78 if (key.isConnectable()) { 79 // 是连接状态则需要对连接结果进行判断,如果true,说明客户端连接成功,如果flase则抛出IO异常,连接失败 80 if(sc.finishConnect()){ 81 sc.register(selector, SelectionKey.OP_READ); 82 doWrite(sc); 83 }else { 84 // 连接失败则进程退出 85 System.exit(1); 86 } 87 } 88 if (key.isReadable()) { 89 ByteBuffer readBuffer = ByteBuffer.allocate(1024); 90 int readBytes = sc.read(readBuffer); 91 if (readBytes > 0) { 92 readBuffer.flip(); 93 byte[] bytes = new byte[readBuffer.remaining()]; 94 readBuffer.get(bytes); 95 String body = new String(bytes, "utf-8"); 96 System.out.println("Now is :" + body); 97 this.stop = true; 98 } else if (readBytes < 0) { 99 key.cancel();100 sc.close();101 } else {102 ;//没有读到字节什么都不做103 }104 }105 }106 }107 108 private void doConnection() {109 try {110 // 如果直接连接成功,则注册到多路复用器上,并注册SelectionKey.OP_READ,发送请求消息,读应答111 if (socketChannel.connect(new InetSocketAddress(host, port))) {112 socketChannel.register(selector, SelectionKey.OP_READ);113 doWrite(socketChannel);114 } else {115 // 连接不成功说明服务端没有返回TCP握手应答消息,但是不代表连接失败,注册socketChannel到多路复用器上,116 // 并注册SelectionKey.OP_CONNECT,当服务器返回TCP syn-ack 消息后,Selector 就能轮询到这个SocketChannel117 // 处于连接就绪状态118 socketChannel.register(selector, SelectionKey.OP_CONNECT);119 }120 } catch (IOException e) {121 e.printStackTrace();122 }123 }124 125 private void doWrite(SocketChannel socketChannel) throws IOException {126 byte[] req = "QUERY TIME ORDER".getBytes();127 ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);128 writeBuffer.put(req);129 writeBuffer.flip();130 socketChannel.write(writeBuffer);131 if (!writeBuffer.hasRemaining()) {132 System.out.println("send order 2 server succeed.");133 }134 135 }136 }
通过上面代码的学习,我们对NIO的思路有一定的了解,尽管现在还不熟悉,但是认真看过,就会发现其套路脉络还是很清楚的。