Java的四种I/O模型
Java传统IO模型(同步阻塞IO)
NIO(同步非阻塞IO)
通过NIO实现的Reactor模式(多路复用模型)
通过AIO实现的Proactor模式(异步IO模型)
BIO java的传统IO模型即是同步阻塞的,关于传统IO模型可以参考网络编程 )中4.4节。服务器有专门的Acceptor线程用来处理客户端连接,对于每一个客户端请求都会创建一个新的线程来处理对应的业务,这是典型的一对一服务模型。java bio是面向流的,每次从(InputStream/OutputStream)中读取一个或多个字节,直到读取完所有字节,他们没有被缓存在任何地方。不能前后移动流中的数据,如果需要前后移动处理,则需要先将其缓存到一个缓冲区。调用read()和write()方法时,线程被阻塞,知道有数据被读取或数据完全写入,阻塞期间线程无法处理其他任何事情。
NIO Java提供NIO实现同步非阻塞IO,面向缓冲处理。数据会被读取到一个缓冲区,需要时可以在缓冲区中前后移动处理。读写不会阻塞房钱线程,在数据可读/写前当前线程可以继续做其他事情,所以一个单独的线程可以管理多个输入和输出通道。
Java NIO内部的IO是同步的,基于Selector实现的事件驱动机制,而selector不是异步的,他对IO的读写还是同步阻塞的,只是通过线程复用,将IO的准备时间分离出来。Select函数可以同时监听多个句柄,从而提高系统并发性。
NIO中三个比较重要的对象
Buffer
Buffer实质上是一个缓冲容易,发送给Channel的所有数据都必须先放到Buffer中;同理,从Channel中读取任何数据都必须读到Buffer中
Channel
Channel是对原IO中流的模拟,任何来源和目的数据都必须通过一个Channel对象。Channel是双向的,可读可写
Selector是Java NIO中的一个组件,用与检查一个或多个NIO Channel的状态是否处于可读、可写。使用单线程管理多个channels,也就是可以管理多个网络链接。
FileCopy 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public static void copyFile (String src, String det) throws IOException { FileInputStream fis = new FileInputStream(src); FileOutputStream fos = new FileOutputStream(det); FileChannel cin = fis.getChannel(); FileChannel cout = fos.getChannel(); ByteBuffer buffer = ByteBuffer.allocate(1024 ); while (cin.read(buffer)!=-1 ){ buffer.flip(); cout.write(buffer); buffer.clear(); } cin.close(); cout.close(); fis.close(); fos.close(); }
基于NIO的网络编程 基于SocketChannel实现客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public class NonBlockingClient { public static void main (String[] args) { byte [] data = "message from client" .getBytes(); SocketChannel channel = null ; try { channel = SocketChannel.open(); channel.configureBlocking(false ); if (!channel.connect(new InetSocketAddress(InetAddress.getLocalHost(),14000 ))){ System.out.print("wait" ); while (!channel.finishConnect()){ System.out.print("." ); } } System.out.println(); System.out.println("Connected to server..." ); ByteBuffer buffer = ByteBuffer.wrap(data); ByteBuffer readBuffer = ByteBuffer.allocate(1024 ); while (buffer.hasRemaining()) { channel.write(buffer); } int totalBytesReceived = 0 ; while (totalBytesReceived == 0 ) { totalBytesReceived += channel.read(readBuffer); } System.out.println("Server said: " + new String(readBuffer.array())); } catch (IOException e) { e.printStackTrace(); }finally { try { if (channel != null ) { channel.close(); } } catch (IOException e) { e.printStackTrace(); } } } }
基于Selector和ServerSocketChannel实现服务器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 public class NoneBlockingServer { public static void main (String[] args) { Selector selector = null ; try { selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false ); ssc.bind(new InetSocketAddress(14000 )); ssc.register(selector, SelectionKey.OP_ACCEPT); while (true ) { if (selector.selectNow() == 0 ) { System.out.println("waiting...." ); Thread.sleep(3000 ); } Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel clientChannel = serverSocketChannel.accept(); clientChannel.configureBlocking(false ); clientChannel.register(key.selector(), SelectionKey.OP_READ); }else if (key.isReadable()) { SocketChannel clientChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024 ); if (clientChannel.read(buffer) != -1 ) { String s = new String(buffer.array()); System.out.println("Client said:" + s); key.interestOps(SelectionKey.OP_WRITE); key.attach("Welcome!!!" ); } else { System.out.println("closed......." ); clientChannel.close(); key.cancel(); } }else if (key.isValid() && key.isWritable()) { SocketChannel clientChannel = (SocketChannel) key.channel(); String content = (String) key.attachment(); clientChannel.write(ByteBuffer.wrap(content.getBytes())); key.interestOps(SelectionKey.OP_READ); } iterator.remove(); } } }catch (IOException | InterruptedException e) { e.printStackTrace(); }finally { try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } }
同步非阻塞式的理解,作为client的read方法是非阻塞的,会立即返回,所以client中需要不断地read,保证数据的完全读取。同步的理解在于,客户端需要不断轮询,而不是在数据就绪后收到通知。其实上文就是一个最精简的Reactor模式实现,只不过所有的角色都被放到了Reactor中
多线程线程Reactor模型 上述代码使用一个线程同时监控多个请求(channel),但是所有读/写/新连接请求处理等都在同一个线程中处理,无法充分利用多CPU优势,同时读/写操作也会阻塞对新连接的处理。因此可以引入多线程,并行处理多个读/写操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 public class NoneBlockingMultiServer { public static void main (String[] args) { Selector selector = null ; try { selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false ); ssc.bind(new InetSocketAddress(14000 )); ssc.register(selector, SelectionKey.OP_ACCEPT); while (true ) { if (selector.selectNow() == 0 ) { System.out.println("waiting...." ); Thread.sleep(3000 ); } Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isAcceptable()) { new AcceptProcessor().process(key); } else if (key.isReadable()) { key.interestOps(SelectionKey.OP_WRITE); new ReadProcessor().process(key); } else if (key.isValid() && key.isWritable()) { key.interestOps(SelectionKey.OP_READ); new WriteProcessor().process(key); } iterator.remove(); } } } catch (IOException | InterruptedException e) { e.printStackTrace(); } finally { try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } } class ReadProcessor { private static final ExecutorService service = Executors.newFixedThreadPool(16 ); public void process (SelectionKey selectionKey) { service.submit(() -> { try { SocketChannel clientChannel = (SocketChannel) selectionKey.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024 ); if (clientChannel.read(buffer) != -1 ) { String s = new String(buffer.array()); System.out.println("Client said:" + s); selectionKey.attach("Welcome!!!" ); } else { return ; } }catch (IOException e){ e.printStackTrace(); } }); } } class AcceptProcessor { private static final ExecutorService service = Executors.newFixedThreadPool(16 ); public void process (SelectionKey selectionKey) { service.submit(() -> { try { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel(); SocketChannel clientChannel = serverSocketChannel.accept(); clientChannel.configureBlocking(false ); clientChannel.register(selectionKey.selector(), SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); } }); } } class WriteProcessor { private static final ExecutorService service = new ThreadPoolExecutor( 4 , 16 ,30 , TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); public void process (SelectionKey selectionKey) { service.submit(() -> { try { SocketChannel clientChannel = (SocketChannel) selectionKey.channel(); String content = (String) selectionKey.attachment(); clientChannel.write(ByteBuffer.wrap(content.getBytes())); System.out.println("closed......." ); clientChannel.close(); }catch (IOException e){ e.printStackTrace(); } }); } }
多Reactor Netty中使用的Reactor模式,引入了多Reactor,即一个主Reactor负责监控所有的连接请求,多个子Reactor负责监控并处理读/写请求,减轻了主Reactor的压力,降低了主Reactor压力太大而造成的延迟。并且每个子Reator分别属于一个独立的线程,每个成功连接后的Channel的所有读写操作都由同一个线程处理。这样保证了同一请求的所有状态和上下文在同一线程中,避免了不必要的上下文切换,同时也方便了监控请求响应状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 public class MultiReactor { public static void main (String[] args) throws IOException { Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false ); serverSocketChannel.bind(new InetSocketAddress(14000 )); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); int coreNum = 2 * Runtime.getRuntime().availableProcessors(); Processor[] processors = new Processor[coreNum]; for (int i = 0 ; i < processors.length; i++) { processors[i] = new Processor(); } int index = 0 ; while (selector.select() > 0 ) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); keys.remove(key); if (key.isAcceptable()) { ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = acceptServerSocketChannel.accept(); socketChannel.configureBlocking(false ); Processor processor = processors[index++ % coreNum]; processor.addChannel(socketChannel); processor.wakeup(); } } } } } class Processor { private static final ExecutorService service = Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors()); private Selector selector; public Processor () throws IOException { this .selector = SelectorProvider.provider().openSelector(); start(); } public void addChannel (SocketChannel socketChannel) throws ClosedChannelException { socketChannel.register(this .selector, SelectionKey.OP_READ); } public void wakeup () { this .selector.wakeup(); } public void start () { service.submit(() -> { while (true ) { if (selector.select(500 ) <= 0 ) { continue ; } Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(1024 ); SocketChannel socketChannel = (SocketChannel) key.channel(); int count = socketChannel.read(buffer); if (count < 0 ) { System.out.println("closed......." ); socketChannel.close(); key.cancel(); continue ; } else if (count == 0 ) { System.out.println("empty!" ); continue ; } else { System.out.println(new String(buffer.array())); key.interestOps(SelectionKey.OP_WRITE); key.attach("Welcome!!!" ); } }else if (key.isValid() && key.isWritable()) { SocketChannel clientChannel = (SocketChannel) key.channel(); String content = (String) key.attachment(); clientChannel.write(ByteBuffer.wrap(content.getBytes())); key.interestOps(SelectionKey.OP_READ); } } } }); } }
参考文献 Java进阶(五)Java I/O模型从BIO到NIO和Reactor模式
关于JAVA NIO是同步非阻塞I/O的解释
java Socket之NIO
高性能IO之Reactor模式
Java Nio(二) - 用NIO实现Reactor模式
Java NIO Tutorial