博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
netty权威指南学习笔记一——NIO入门(3)NIO
阅读量:4963 次
发布时间:2019-06-12

本文共 15889 字,大约阅读时间需要 52 分钟。

  经过前面的铺垫,在这一节我们进入NIO编程,NIO弥补了原来同步阻塞IO的不足,他提供了高速的、面向块的I/O,NIO中加入的Buffer缓冲区,体现了与原I/O的一个重要区别。在面向流的I/O中,可以将数据直接写入或者将数据直接读到Stream对象中。下面看一些概念

  Buffer缓冲区

  而在NIO中,所有数据都是用缓冲区处理的,在读取数据时,直接读到缓冲区,在写数据时,也是写到缓冲区,任何时候访问NIO中的数据,都是通过缓冲区进行操作。最常用的是个ByteBuffer缓冲区,他提供了一组功能用于操作字节数组。

  Channel通道

  Channel 是一个通道,网络数据通过它进行读写,通道和流的区别在于通道是双向的,流是单向的,一个流必须是读或者写,而通道则可以读写同时进行。Channel可以分为用于网路读写的SelectableChannel和用于文件操作的FileChannel。

  Selector多路复用器

  首先强调一点,多路复用器对于NIO编程非常重要,非常重要,多路复用器提供选择已经就绪的任务的能力。简单的讲,Selector会不断的轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续I/O操作。一个多路复用器可以轮询多个Channel,意味着只需要一个线程负责selector轮询,就可以接入成千上万个客户端。

现在改造上一节的代码使其成为NIO server端

1 package com.example.biodemo; 2  3  4 import java.io.*; 5 import java.net.ServerSocket; 6 import java.net.Socket; 7  8 public class TimeServer { 9     public static void main(String[] args) throws IOException {10         int port = 8090;11         if (args != null && args.length > 0) {12             try {13                 port = Integer.valueOf(args[0]);14             } catch (NumberFormatException e) {15                 port = 8090;16             }17         }18 //        创建多路复用线程类并初始化多路复用器,绑定端口等以及轮询注册功能19         MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);20 //        启动多路复用类线程负责轮询多路复用器Selector,IO数据处理等操作21         new Thread(timeServer, "NIO-MultiplexerTimeSever-001").start();22 23 24 25 26 27 28 // ===================以下内容注释掉=================================29 30        /* ServerSocket server = null;31         try {32             server = new ServerSocket(port);33             System.out.println("the timeServer is start in port :" + port);34             Socket socket = null;35 //           引入线程池start36             TimeServerHandlerExecutePool singleExecutor = new TimeServerHandlerExecutePool(50,10000);37             while (true) {38                 socket = server.accept();39 //           替换BIO中new Thread(new TimeServerHandler(socket)).start();为下一行代码40                 singleExecutor.execute(new TimeServerHandler(socket));41 //           引入线程池end42 43             }44         } finally {45             if (server != null) {46                 System.out.println("the time server close");47                 server.close();48                 server = null;49             }50         }*/51 52     }53 }

多路复用类代码,其注册轮询功能及请求消息处理返回在这里进行

1 package com.example.biodemo;  2   3 import java.io.IOException;  4 import java.net.InetSocketAddress;  5 import java.nio.ByteBuffer;  6 import java.nio.channels.SelectionKey;  7 import java.nio.channels.Selector;  8 import java.nio.channels.ServerSocketChannel;  9 import java.nio.channels.SocketChannel; 10 import java.util.Date; 11 import java.util.Iterator; 12 import java.util.Set; 13  14 public class MultiplexerTimeServer implements Runnable { 15     //    定义多路复用器 16     private Selector selector; 17     //    定义ServerSocketChannel 18     private ServerSocketChannel servChannel; 19     //    定义停止标识 20     private boolean stop; 21  22     //    构造函数初始化多路复用器、并绑定监听端口 23     public MultiplexerTimeServer(int port) { 24         try { 25 //            打开一个多路复用器 26             selector = Selector.open(); 27 //            打开ServerSocketChannel通道 28             servChannel = ServerSocketChannel.open(); 29 //            绑定监听端口号,并设置backlog为1024 30             servChannel.socket().bind(new InetSocketAddress(port), 1024); 31 //            设置severSocketChannel为异步非阻塞模式 32             servChannel.configureBlocking(false); 33 //            将ServerSocketChannel 注册到Reactor 线程的多路复用器Selector上,监听ACCEPT事件,并返回一个SelectionKey类型的值 34             SelectionKey selectionKey = servChannel.register(selector, SelectionKey.OP_ACCEPT); 35             System.out.println("The time server is start in port:" + port); 36         } catch (IOException ioe) { 37             ioe.printStackTrace(); 38 //            资源初始化失败则退出 39             System.exit(1); 40         } 41     } 42  43     public void stop() { 44         this.stop = true; 45     } 46  47     @Override 48     public void run() { 49 //        在线程中遍历轮询多路复用器selector 50         while (!stop) { 51             try { 52              /*selector.select();选择一些I/O操作已经准备好的channel。每个channel对应着一个key。这个方法是一个阻塞的选择操作。 53               当至少有一个通道被选择时才返回。当这个方法被执行时,当前线程是允许被中断的。*/ 54 //            该方法是阻塞的,选择一组键,其相应的通道已为 I/O 操作准备就绪。最多等1s,如果还没有就绪的就返回0 55             /*如果 timeout为正,则select(long timeout)在等待有通道被选择时至多会阻塞timeout毫秒 56               如果timeout为零,则永远阻塞直到有至少一个通道准备就绪。 57               timeout不能为负数*/ 58                 selector.select(1000); 59 //            当有处于就绪状态的channel时,返回该channel的selectionKey集合,此通道是已准备就绪的键集,已选择键集(I/O操作已就绪返回key)始终是键集的一个子集。 60                 Set
selectionKeys = selector.selectedKeys(); 61 Iterator
it = selectionKeys.iterator(); 62 // Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。 63 SelectionKey key = null; 64 while (it.hasNext()) { 65 // 来一个事件 第一次触发一个accepter线程,SocketReadHandler 66 key = it.next(); 67 // 从iterator中移除该元素 68 it.remove(); 69 try { 70 // 去对该已经准备就绪的I/O操作进行处理 71 dispatch(key); 72 } catch (Exception e) { 73 // 删除处理完不为空的键,关闭相关通道 74 if (key != null) { 75 key.cancel(); 76 if (key.channel() != null) { 77 key.channel().close(); 78 } 79 } 80 } 81 } 82 } catch (IOException ioe1) { 83 ioe1.printStackTrace(); 84 } 85 } 86 // 最后关闭多路复用器 87 if (selector != null) { 88 try { 89 selector.close(); 90 } catch (IOException e) { 91 e.printStackTrace(); 92 } 93 } 94 } 95 //该方法用于处理轮询到的I/O已经就绪的SelectionKey键 96 private void dispatch(SelectionKey key) throws IOException { 97 if (key.isValid()) { 98 // 处理请求接入的信息 99 if (key.isAcceptable()) {100 // 接受新连接,通过SelectionKey获取其通道101 ServerSocketChannel ssc = (ServerSocketChannel) key.channel();102 // 接收客户端连接请求并创建SocketChannel实例103 SocketChannel sc = ssc.accept();104 sc.configureBlocking(false);105 // 添加新连接到多路复用器上106 sc.register(selector, SelectionKey.OP_READ);107 }108 if (key.isReadable()) {109 // 本方法体读取客户端发来的请求数据110 SocketChannel sc = (SocketChannel) key.channel();111 // 开辟一个缓冲区,这里开辟了1M112 ByteBuffer readBuffer = ByteBuffer.allocate(1024);113 // 读取请求码流,返回值>0,读到字节数,返回值=0,没有读到字节数,返回值<0,说明链路已经关闭,114 // 需要关闭SocketChannel115 int readBytes = sc.read(readBuffer);116 if (readBytes > 0) {117 // 读到字节数后的解码操作,对 readBuffer 进行 flip 操作, 作用是将缓冲区当前的 limit 设置为 position118 // position 设置为 0,用于后续对缓冲区的读取操作。119 readBuffer.flip();120 // 根据缓冲区的可读的字节个数创建字节数组121 byte[] bytes = new byte[readBuffer.remaining()];122 // 调用 ByteBuffer的 get 操作将缓冲区可读的字节数复制到新创建的字节数组中123 readBuffer.get(bytes);124 // 调用字符串中的构造函数创建请求消息体并打印。125 String body = new String(bytes, "utf-8");126 System.out.println("The time server receive order :" + body);127 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";128 // 将应答消息异步发送给客户端129 doWrite(sc, currentTime);130 } else if (readBytes < 0) {131 // 对端链路关闭132 key.cancel();133 sc.close();134 } else {135 ;//读到0字节,忽略。136 }137 }138 }139 }140 141 private void doWrite(SocketChannel channel, String response) throws IOException {142 if (response != null && response.trim().length() > 0) {143 // 将字符创编码为字节数组144 byte[] bytes = response.getBytes();145 // 根据字节数组大小创建缓冲区146 ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);147 // 复制字节数组内容进入缓冲区148 writeBuffer.put(bytes);149 // 进行flip操作,作用同上面150 writeBuffer.flip();151 // 将缓冲区中的字节数组发送出去152 channel.write(writeBuffer);153 }154 }155 }

 下面是客户端的详细代码及注释,需要注意的是在客户端代码判断是否连接成功时需要进行两步判断(需要判断连接状态和连接结果是否成功),第二步判断成功需要注册状态,否则客户端发送不了消息。

1 package com.example.biodemo; 2  3 import java.io.*; 4 import java.net.Socket; 5  6 public class TimeClient { 7     public static void main(String[] args) { 8         int port = 8090; 9         if (args != null && args.length > 0) {10             try {11                 port = Integer.valueOf(args[0]);12             } catch (NumberFormatException ne) {13                 port = 8090;14             }15         }16         new Thread(new TimeClientHandles("127.0.0.1",port),"TimeClient-001").start();17       /* 代码改造注释掉以下代码18        Socket socket = null;19         BufferedReader in = null;20         PrintWriter out = null;21         try {22             socket = new Socket("127.0.0.1", port);23             System.out.println(socket.getInputStream());24             in = new BufferedReader(new InputStreamReader(socket.getInputStream()));25             out = new PrintWriter(socket.getOutputStream(), true);26             out.println("QUERY TIME ORDER");27             System.out.println("send order 2 server succeed.");28             String resp = in.readLine();29             System.out.println("now is :" + resp);30         } catch (IOException e1) {31 32         } finally {33             if (out != null) {34                 out.close();35                 out = null;36             }37 38             if (in != null) {39                 try {40                     in.close();41                 } catch (IOException e2) {42                     e2.printStackTrace();43                 }44                 in = null;45                 if (socket != null) {46                     try {47                         socket.close();48                     } catch (IOException e3) {49                         e3.printStackTrace();50                     }51 52                 }53                 socket = null;54             }55         }*/56     }57 }

客户端HandleInput代码

1 package com.example.biodemo;  2   3 import java.io.IOException;  4 import java.net.InetSocketAddress;  5 import java.nio.ByteBuffer;  6 import java.nio.channels.ClosedChannelException;  7 import java.nio.channels.SelectionKey;  8 import java.nio.channels.Selector;  9 import java.nio.channels.SocketChannel; 10 import java.util.Iterator; 11 import java.util.Set; 12  13 //该类用来处理异步连接和读写操作 14 public class TimeClientHandles implements Runnable { 15     private String host; 16     private int port; 17     private Selector selector; 18     private SocketChannel socketChannel; 19     private volatile boolean stop; 20  21     //    构造函数初始化并连接服务器 22     public TimeClientHandles(String host, int port) { 23         this.host = host == null ? "127.0.0.1" : host; 24         this.port = port; 25         try { 26             selector = Selector.open(); 27             socketChannel = SocketChannel.open(); 28             socketChannel.configureBlocking(false); 29         } catch (IOException e) { 30             e.printStackTrace(); 31             System.exit(1); 32         } 33     } 34  35     @Override 36     public void run() { 37 //        发送连接请求 38         try { 39             doConnection(); 40         }catch (Exception e){ 41             e.printStackTrace(); 42             System.exit(1); 43         } 44 //      在循环体内轮询多路复用器Selector,当有就绪的Channel时,执行handleInput(key); 45         while (!stop) { 46             try { 47                 selector.select(1000); 48                 Set
keys = selector.selectedKeys(); 49 Iterator
it = keys.iterator(); 50 SelectionKey key = null; 51 while (it.hasNext()) { 52 key = it.next(); 53 it.remove(); 54 try { 55 handleInput(key); 56 } catch (Exception e) { 57 if (key != null) { 58 key.cancel(); 59 if (key.channel() != null) { 60 key.channel().close(); 61 } 62 } 63 } 64 65 } 66 } catch (IOException e) { 67 e.printStackTrace(); 68 } 69 70 } 71 } 72 73 private void handleInput(SelectionKey key) throws IOException { 74 if (key.isValid()) { 75 // 判断是否连接成功(需要判断连接状态和连接结果是否成功) 76 SocketChannel sc = (SocketChannel) key.channel(); 77 // 连接状态判断,是连接状态返回true,判断的是服务端是否已经返回ACK应答消息 78 if (key.isConnectable()) { 79 // 是连接状态则需要对连接结果进行判断,如果true,说明客户端连接成功,如果flase则抛出IO异常,连接失败 80 if(sc.finishConnect()){ 81 sc.register(selector, SelectionKey.OP_READ); 82 doWrite(sc); 83 }else { 84 // 连接失败则进程退出 85 System.exit(1); 86 } 87 } 88 if (key.isReadable()) { 89 ByteBuffer readBuffer = ByteBuffer.allocate(1024); 90 int readBytes = sc.read(readBuffer); 91 if (readBytes > 0) { 92 readBuffer.flip(); 93 byte[] bytes = new byte[readBuffer.remaining()]; 94 readBuffer.get(bytes); 95 String body = new String(bytes, "utf-8"); 96 System.out.println("Now is :" + body); 97 this.stop = true; 98 } else if (readBytes < 0) { 99 key.cancel();100 sc.close();101 } else {102 ;//没有读到字节什么都不做103 }104 }105 }106 }107 108 private void doConnection() {109 try {110 // 如果直接连接成功,则注册到多路复用器上,并注册SelectionKey.OP_READ,发送请求消息,读应答111 if (socketChannel.connect(new InetSocketAddress(host, port))) {112 socketChannel.register(selector, SelectionKey.OP_READ);113 doWrite(socketChannel);114 } else {115 // 连接不成功说明服务端没有返回TCP握手应答消息,但是不代表连接失败,注册socketChannel到多路复用器上,116 // 并注册SelectionKey.OP_CONNECT,当服务器返回TCP syn-ack 消息后,Selector 就能轮询到这个SocketChannel117 // 处于连接就绪状态118 socketChannel.register(selector, SelectionKey.OP_CONNECT);119 }120 } catch (IOException e) {121 e.printStackTrace();122 }123 }124 125 private void doWrite(SocketChannel socketChannel) throws IOException {126 byte[] req = "QUERY TIME ORDER".getBytes();127 ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);128 writeBuffer.put(req);129 writeBuffer.flip();130 socketChannel.write(writeBuffer);131 if (!writeBuffer.hasRemaining()) {132 System.out.println("send order 2 server succeed.");133 }134 135 }136 }

  通过上面代码的学习,我们对NIO的思路有一定的了解,尽管现在还不熟悉,但是认真看过,就会发现其套路脉络还是很清楚的。

 

转载于:https://www.cnblogs.com/xiaoyao-001/p/9278093.html

你可能感兴趣的文章
机器学习基石HOW BETTER部分(3)
查看>>
BZOJ 1010: [HNOI2008]玩具装箱toy | 单调队列优化DP
查看>>
3D打印机如何添加自动调平功能
查看>>
iOS开发之 第三方字体的应用
查看>>
Executor框架的简要分析
查看>>
C++实现获取本机机器名及外网IP代码
查看>>
Uncaught TypeError: Illegal invocation
查看>>
只有高中学历的我是怎样加入谷歌的?
查看>>
MySQL的安装和启动
查看>>
hdu 1020 Encoding
查看>>
hdu 4006 The kth great number(优先队列)
查看>>
JDBC连接MySQL数据库
查看>>
一般算数表达式转换为“后缀式”
查看>>
600-460 Exam Cram - Authentic 600-460 Exam Dumps
查看>>
树-堆结构练习——合并果子之哈夫曼树(其实是优先队列)
查看>>
poj3686The Windy's(费用流)
查看>>
持久化对象
查看>>
rest-framework之视图和源码解析
查看>>
Android 获取天气预报
查看>>
二:Angular 组件 (Components)
查看>>