Java NIO
1. Java NIO概述
Java NIO(New IO Non Blocking IO)是从java1.4
版本开始引入的一个新的IO API,可以替代标准的Java IO API。NIO
与原来的IO
有同样的作用和目的,但是使用的方式完全不同,NIO
支持面向缓冲区的、基于通道的IO操作。NIO
将以更加高效的方式进行文件的读写操作。
IO VS NIO
IO |
NIO |
面向流(Stream Oriented) |
面向缓冲区(Buffer Oriented) |
阻塞IO(Blocking IO) |
非阻塞IO(Non Bloking IO) |
无 |
选择器(Selectors) |
2. 通道(Channel)和缓冲区(Buffer)
通道(Channel
)表示打开到IO设备(例如:文件、套接字)的连接。若需要使用NIO系统,需要获取用于连接IO设备的通道以及用于容纳数据的缓冲区。然后操作缓冲区,对数据进行处理。
简而言之,Channel
负责传输,Buffer
负责存储。
2.1 缓冲区(Buffer)
在Java NIO中负责数据的存储,缓冲区就是数组,用于存储不同数据类型的数据。
2.1.1 缓冲区基本操作
根据数据类型的而不同(boolean
除外),提供了相应类型的缓冲区
ByteBuffer
CharBuffer
ShortBuffer
IntBuffer
LongBuffer
FloatBuffer
DoubleBuffer
上述缓冲区的管理方式几乎一致,通过allocate()
获取缓冲区。
缓冲区存取数据的两个核心方法
put()
存入数据到缓冲区
get()
获取缓冲区中的数据
缓冲区中的四个核心属性
Invariants: mark <= position <= limit <= capacity
private int mark = -1;
标记,表示记录当前position的位置,可以通过rset()恢复到mark的位置
private int position = 0;
位置, 表示缓冲区中正在操作数据的位置。
private int limit;
界限,缓冲区中可以操作数据的大小。(limit后面的数据是不能进行操作的)
private int capacity;
容量,缓冲器中最大存储数据的容量。一旦声明,无法改变。
代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| public class BufferTest {
@Test public void test2(){ String s = "abcde"; ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.put(s.getBytes()); buffer.flip(); byte[] dst = new byte[buffer.limit()]; buffer.get(dst,0,2); System.out.println(new String(dst,0,2)); System.out.println("position = " + buffer.position());
buffer.mark();
buffer.get(dst,2,2); System.out.println(new String(dst,2,2)); System.out.println("position = " + buffer.position()); if(buffer.hasRemaining()){ System.out.println(buffer.remaining()); } buffer.reset(); System.out.println("position = " + buffer.position());
if(buffer.hasRemaining()){ System.out.println(buffer.remaining()); } }
@Test public void test1(){ ByteBuffer byteBuffer = ByteBuffer.allocate(1024); System.out.println("---------- allocate() ----------"); System.out.println("mark = " + byteBuffer.mark()); System.out.println("position = " + byteBuffer.position()); System.out.println("limit = " + byteBuffer.limit()); System.out.println("capacity = " + byteBuffer.capacity()); String str= "ABCDE"; byteBuffer.put(str.getBytes()); System.out.println("---------- put() ----------"); System.out.println("mark = " + byteBuffer.mark()); System.out.println("position = " + byteBuffer.position()); System.out.println("limit = " + byteBuffer.limit()); System.out.println("capacity = " + byteBuffer.capacity()); byteBuffer.flip(); System.out.println("---------- flip() ----------"); System.out.println("mark = " + byteBuffer.mark()); System.out.println("position = " + byteBuffer.position()); System.out.println("limit = " + byteBuffer.limit()); System.out.println("capacity = " + byteBuffer.capacity()); byte[] dst = new byte[byteBuffer.limit()]; byteBuffer.get(dst); System.out.println(new String(dst,0,dst.length)); System.out.println("---------- get() ----------"); System.out.println("mark = " + byteBuffer.mark()); System.out.println("position = " + byteBuffer.position()); System.out.println("limit = " + byteBuffer.limit()); System.out.println("capacity = " + byteBuffer.capacity());
byteBuffer.rewind(); System.out.println("---------- rewind() ----------"); System.out.println("mark = " + byteBuffer.mark()); System.out.println("position = " + byteBuffer.position()); System.out.println("limit = " + byteBuffer.limit()); System.out.println("capacity = " + byteBuffer.capacity());
byteBuffer.clear(); System.out.println("---------- clear() ----------"); System.out.println("mark = " + byteBuffer.mark()); System.out.println("position = " + byteBuffer.position()); System.out.println("limit = " + byteBuffer.limit()); System.out.println("capacity = " + byteBuffer.capacity());
byteBuffer.get(dst); System.out.println(new String(dst,0,dst.length)); } }
|
2.1.2 直接缓冲区 VS 非直接缓冲区
非直接缓冲区,通过allocate()
方法非直接缓冲区,将缓冲区建立在JVM
的内存中。
直接缓冲区,通过allocateDirect()
方法分配直接缓冲区,将缓冲区建立在物理内存中。可以提高效率。
1 2 3 4 5 6 7 8 9
| public class BufferTest {
@Test public void test3(){ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024); System.out.println(byteBuffer.isDirect()); }
}
|
2.2 通道(Channel)
通道(Channel
)由java.nio.channels
包定义的。Channel
表示IO源与目标打开的连接。Channel
类似于传统的“流”。只不过Channel
本身不能直接访问数据,Channel
只能与Buffer
进行交互。
2.2.1 Channel的原理与获取
应用程序与磁盘之间的数据写入或者读出,都需要由用户地址空间和内存地址空间之间来回复制数据,内存地址空间中的数据通过操作系统层面的IO接口,完成与磁盘的数据存取。在应用程序调用这些系统IO接口时,由CPU完成一系列调度、任务分配,早先这些IO接口都是由CPU独立负责。所以当发生大规模读写请求时,CPU的占用率很高。
之后,操作系统为了避免CPU完全被各种IO接口调用占用,引入了DMA(直接存储器存储)。当应用程序对操作系统发出一个读写请求时,会由DMA先向CPU申请权限,申请到权限之后,内存地址空间与磁盘之间的IO操作就全由DMA来负责操作。这样,在读写请求的过程中,CPU不需要再参与,CPU去做其他事情。当然,DMA来独立完成数据在磁盘与内存空间中的来去,需要借助于DMA总线。但是当DMA总线过多时,大量的IO操作也会造成总线冲突,即也会影响最终的读写性能。
为了避免DMA总线冲突对性能的影响,后来便有了通道的方式。通道,它是一个完全独立的处理器。CPU是中央处理器,通道本身也是一个处理器,专门负责IO操作。既然是处理器,通道有自己的IO命令,与CPU无关。它更适用于大型的IO操作,性能更高。
总结
- 直接存储器DMA有独立总线。
- 但在大量数据面前,可能会存在总线冲突,还是需要CPU来处理。
- 通道是一个独立的处理器
- DMA方式还是需要向CPU申请DMA总线的。
- 通道有自己的处理器,适合与大量IO请求的场景,数据传输直接通过通道进行传输,不再需要请求CPU
2.2.2 Channel的基本操作
通道的主要实现类
1 2 3 4 5
| java.nio.channels.Channel接口 |-- FileChannel 用于本地文件数据传输 |-- SocketChannel 用于网络,TCP |-- ServerSocketChannel 用于网络,TCP |-- DatagramChannel 用于网络,UDP
|
获取通道
- Java针对支持通道的类提供了
getChannel()
方法
本地IO
- FileInputStream/FileOutputStream
- RandomAccessFile
网络IO
- Socket
- ServerSocket
- DatagramSocket
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
|
@Test public void test1(){ FileInputStream fileInputStream = null; FileOutputStream fileOutputStream = null; FileChannel inChannel = null; FileChannel outChannel = null; try { fileInputStream = new FileInputStream("classpath://../resource/channel/1.png"); fileOutputStream = new FileOutputStream("classpath://../resource/channel/2.png"); inChannel = fileInputStream.getChannel(); outChannel = fileOutputStream.getChannel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); while(inChannel.read(byteBuffer) != -1){ byteBuffer.flip(); outChannel.write(byteBuffer); byteBuffer.clear(); } } catch (IOException e) { e.printStackTrace(); } finally { if (outChannel != null){ try { outChannel.close(); } catch (IOException e) { e.printStackTrace(); } }
if (inChannel !=null){ try { inChannel.close(); } catch (IOException e) { e.printStackTrace(); } }
if (fileOutputStream != null){ try { fileOutputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if(fileInputStream != null){ try { fileInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } }
|
- 在JDK1.7中的NIO.2针对各个通道童工了静态方法
open()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
@Test public void test2() throws IOException { FileChannel inChannel = FileChannel.open(Paths.get("D:\\idea_projects\\java-example\\java-chapter-NIO\\resource\\channel\\1.png"), StandardOpenOption.READ); FileChannel outChannel = FileChannel.open(Paths.get("D:\\idea_projects\\java-example\\java-chapter-NIO\\resource\\channel\\3.png"), StandardOpenOption.WRITE, StandardOpenOption.READ,StandardOpenOption.CREATE_NEW);
MappedByteBuffer inMappedByteBuffer = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size()); MappedByteBuffer outMappedByteBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());
byte[] dst = new byte[inMappedByteBuffer.limit()]; inMappedByteBuffer.get(dst); outMappedByteBuffer.put(dst);
inChannel.close(); outChannel.close(); }
|
- 在JDK1.7中的NIO.2的Files工具类的
newByteChannel()
通道之间数据传输
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
@Test public void test3() throws IOException {
FileChannel inChannel = FileChannel.open(Paths.get("D:\\idea_projects\\java-example\\java-chapter-NIO\\resource\\channel\\1.png"), StandardOpenOption.READ); FileChannel outChannel = FileChannel.open(Paths.get("D:\\idea_projects\\java-example\\java-chapter-NIO\\resource\\channel\\4.png"), StandardOpenOption.WRITE, StandardOpenOption.READ,StandardOpenOption.CREATE);
outChannel.transferFrom(inChannel,0,inChannel.size()); inChannel.close(); outChannel.close(); }
|
分散(Scatter)与聚集(Gather)
分散读取(Scatter Reads
),将通道中数据分散到多个缓冲区中
聚集写入(Gather Writes
),将多个缓冲区中的数据聚集到通道中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
@Test public void test4() throws IOException { RandomAccessFile randomAccessFile = new RandomAccessFile("classpath://../resource/channel/1.txt","rw"); FileChannel channel = randomAccessFile.getChannel(); ByteBuffer byteBuffer1 = ByteBuffer.allocate(100); ByteBuffer byteBuffer2 = ByteBuffer.allocate(1024); ByteBuffer[] byteBuffers = {byteBuffer1, byteBuffer2}; channel.read(byteBuffers);
for (ByteBuffer byteBuffer:byteBuffers) { byteBuffer.flip(); } System.out.println(new String(byteBuffers[0].array(),0,byteBuffers[0].limit())); System.out.println("--------------------"); System.out.println(new String(byteBuffers[1].array(),0,byteBuffers[1].limit())); RandomAccessFile randomAccessFile1 = new RandomAccessFile("classpath://../resource/channel/2.txt", "rw"); FileChannel channel1 = randomAccessFile1.getChannel(); channel1.write(byteBuffers); }
|
字符集(Charset)
- 编码,字符串->字节数组
- 解码,字节数组->字符串
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
@Test public void test6() throws CharacterCodingException { Charset gbkCharset = Charset.forName("GBK"); CharsetEncoder gbkEncoder = gbkCharset.newEncoder(); CharsetDecoder gbkDecoder = gbkCharset.newDecoder();
CharBuffer charBuffer = CharBuffer.allocate(1024); charBuffer.put("你好,世界!"); charBuffer.flip(); ByteBuffer byteBuffer = gbkEncoder.encode(charBuffer); for (int i = 0; i < byteBuffer.limit(); i++) { System.out.println(byteBuffer.get()); } byteBuffer.flip(); CharBuffer charBuffer1 = gbkDecoder.decode(byteBuffer); System.out.println(charBuffer1.toString()); System.out.println("------------------------");
Charset utf8Charset = Charset.forName("UTF-8"); byteBuffer.flip(); CharBuffer decode = utf8Charset.decode(byteBuffer); System.out.println(decode.toString()); } @Test public void test5(){ Map<String, Charset> charsetMap = Charset.availableCharsets(); Set<Map.Entry<String, Charset>> entrySet = charsetMap.entrySet(); for (Map.Entry<String,Charset> entry: entrySet) { System.out.println(entry.getKey() + "=" + entry.getValue()); } }
|
3. NIO-阻塞与非阻塞
传统的 IO 流都是阻塞式的。也就是说,当一个线程调用 read()
或 write()
时,该线程被阻塞,直到有一些数据被读取或写入,该线程在此期间不能执行其他任务。
因此,在完成网络通信进行 IO 操作时,由于线程会阻塞,所以服务器端必须为每个客户端都提供一个独立的线程进行处理,当服务器端需要处理大量客户端时,性能急剧下降。
Java NIO 是非阻塞模式的。当线程从某通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。因此, NIO 可以让服务器端使用一个或有限几个线程来同时处理连接到服务器端的所有客户端。
选择器和通道的关系:通道注册到选择器上,选择器监控通道。
当某一个通道上,某一个事件准备就绪时,那么选择器才会将这个通道分配到服务器端一个或多个线程上,再继续运行。比如说当客户端发送一些数据给服务器端,只有当客户端的所有数据都准备就绪时,选择器才会将这个注册的通道分配到服务器端的一个或者多个线程上。那就意味这,如果客户端的线程没有将数据准备就绪时,服务器端的线程可以执行其他任务,而不必阻塞在那里。
3.1 选择器(Selector)与通道(Channel)的关系
选择器(Selector
) 是 SelectableChannle
对象的多路复用器, Selector
可以同时监控多个 SelectableChannel
的 IO 状况,也就是说,利用 Selector可使一个单独的线程管理多个 Channel
。 Selector
是非阻塞 IO 的核心。
注意: FileChannel切换为非阻塞模式!!!非阻塞模式是相对于网络IO而言的。选择器主要监控网络Channel。 (FileChannel不是可作为选择器复用的通道!FileChannel不能注册到选择器Selector!FileChannel不能切换到非阻塞模式!FileChannel不是SelectableChannel的子类!)
3.2 网络NIO示例(阻塞式 TCP协议)
阻塞IO模式:客户端向服务端发送文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| package com.java.demo.selector;
import org.junit.Test;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.file.Paths; import java.nio.file.StandardOpenOption;
public class BlockingNIOTest {
@Test public void client() throws IOException { SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8888)); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); FileChannel inChannel = FileChannel.open(Paths.get("D:\\idea_projects\\java-example\\java-chapter-NIO\\resource\\channel\\1.png"), StandardOpenOption.READ); while (inChannel.read(byteBuffer)!=-1){ byteBuffer.flip(); socketChannel.write(byteBuffer); byteBuffer.clear(); } inChannel.close(); socketChannel.close(); }
@Test public void server() throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(8888)); SocketChannel socketChannel = serverSocketChannel.accept(); FileChannel outChannel = FileChannel.open(Paths.get("D:\\idea_projects\\java-example\\java-chapter-NIO\\resource\\channel\\5.png"), StandardOpenOption.WRITE,StandardOpenOption.CREATE); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); while(socketChannel.read(byteBuffer)!=-1){ byteBuffer.flip(); outChannel.write(byteBuffer); byteBuffer.clear(); } socketChannel.close(); outChannel.close(); serverSocketChannel.close(); } }
|
阻塞IO模式:服务端向客户端发送反馈信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| package com.java.demo.selector;
import org.junit.Test;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.file.Paths; import java.nio.file.StandardOpenOption;
public class BlockingNIOTest1 {
@Test public void client() throws IOException { SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8888)); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); FileChannel inFileChannel = FileChannel.open(Paths.get("D:\\idea_projects\\java-example\\java-chapter-NIO\\resource\\channel\\1.png"), StandardOpenOption.READ); while(inFileChannel.read(byteBuffer) != -1){ byteBuffer.flip(); socketChannel.write(byteBuffer); byteBuffer.clear(); }
socketChannel.shutdownOutput();
int len = 0; while((len = socketChannel.read(byteBuffer)) !=-1){ byteBuffer.flip(); System.out.println(new String(byteBuffer.array(),0,len)); }
inFileChannel.close(); socketChannel.close(); }
@Test public void server() throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(8888)); SocketChannel socketChannel = serverSocketChannel.accept(); FileChannel outFileChannel = FileChannel.open(Paths.get("D:\\idea_projects\\java-example\\java-chapter-NIO\\resource\\channel\\6.png"), StandardOpenOption.WRITE, StandardOpenOption.CREATE); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); while(socketChannel.read(byteBuffer) != -1){ byteBuffer.flip(); outFileChannel.write(byteBuffer); byteBuffer.clear(); }
byteBuffer.put("服务端接收数据成功!".getBytes()); byteBuffer.flip(); socketChannel.write(byteBuffer);
outFileChannel.close(); socketChannel.close(); serverSocketChannel.close(); }
}
|
3.3 网络NIO示例(非阻塞式 TCP协议)
非阻塞IO模式:客户端向服务端发送数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| package com.java.demo.selector;
import org.junit.Test;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Date; import java.util.Iterator; import java.util.Scanner;
public class NonBlockingNIOTest {
@Test public void client() throws IOException { SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8888)); socketChannel.configureBlocking(false); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); Scanner scanner = new Scanner(System.in); while(scanner.hasNext()){ String inputStr=scanner.next(); byteBuffer.put((new Date().toString() + "\n" + inputStr).getBytes()); byteBuffer.flip(); socketChannel.write(byteBuffer); byteBuffer.clear(); } scanner.close();
socketChannel.close(); }
@Test public void server() throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(8888)); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0){ Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while(iterator.hasNext()){ SelectionKey selectionKey = iterator.next(); if (selectionKey.isAcceptable()){ SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector,SelectionKey.OP_READ); }else if(selectionKey.isReadable()){ SocketChannel socketChannel = (SocketChannel)selectionKey.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int length = 0; while((length = socketChannel.read(byteBuffer))>0){ byteBuffer.flip(); System.out.println(new String(byteBuffer.array(),0,length)); byteBuffer.clear(); } } iterator.remove(); } } } }
|
3.4 网络NIO示例(非阻塞式 UDP协议)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| package com.java.demo.selector;
import org.junit.Test;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.ArrayList; import java.util.Date; import java.util.Iterator; import java.util.Scanner;
public class NonBlockingNIOTest1 {
@Test public void send() throws IOException { DatagramChannel datagramChannel = DatagramChannel.open(); datagramChannel.configureBlocking(false); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String inputStr = scanner.next(); byteBuffer.put((new Date().toString() + "\n" +inputStr).getBytes()); byteBuffer.flip(); datagramChannel.send(byteBuffer,new InetSocketAddress("127.0.0.1",8888)); byteBuffer.clear(); } scanner.close(); datagramChannel.close(); }
@Test public void receive() throws IOException { DatagramChannel datagramChannel = DatagramChannel.open(); datagramChannel.configureBlocking(false); datagramChannel.bind(new InetSocketAddress(8888)); Selector selector = Selector.open(); datagramChannel.register(selector, SelectionKey.OP_READ); while (selector.select()>0){ Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()){ SelectionKey selectionKey = iterator.next(); if(selectionKey.isReadable()){ ByteBuffer byteBuffer = ByteBuffer.allocate(1024); datagramChannel.receive(byteBuffer); System.out.println(new String(byteBuffer.array(),0,byteBuffer.limit())); byteBuffer.clear(); } iterator.remove(); } } datagramChannel.close(); } }
|
4. NIO-管道(Pipe)
Java NIO 管道是2个线程之间的单向数据连接。Pipe有一个source通道和一个sink通道。数据会被写到sink通道,从source通道读取。
代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package com.java.demo.pipe;
import org.junit.Test;
import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Pipe;
public class PipeTest {
@Test public void test() throws IOException { Pipe pipe = Pipe.open(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); byteBuffer.put("通过单向管道发送数据".getBytes()); Pipe.SinkChannel sinkChannel = pipe.sink(); byteBuffer.flip(); sinkChannel.write(byteBuffer); byteBuffer.clear();
Pipe.SourceChannel sourceChannel = pipe.source(); int len = sourceChannel.read(byteBuffer); byteBuffer.flip(); System.out.println(new String(byteBuffer.array(),0,len));
sourceChannel.close(); sinkChannel.close(); } }
|