BIO/NIO/AIO
约 4706 字大约 16 分钟
(一) 基础概念
1. IO说明
- IO是操作系统与其他网络进行数据交互,JDK底层并没有实现IO,而是对操作系统内核函数做的一个封装,IO代码进入底层其实都是native形式的。
- Java共支持3种网络编程IO模式:BIO,NIO,AIO。
2. 同步/异步/阻塞/非阻塞
- 什么是同步? 操作在执行过程中,必须等待一个操作完成后才能执行后续操作。
- 什么是异步? 操作在执行过程中,不需要等待,可以立刻执行后续操作。
- 什么是阻塞? 操作在执行过程中,如果需要等待,那么当前线程会停止执行,直到等待的操作完成。
- 什么是非阻塞? 操作在执行过程中,如果需要等待,当前线程不会停止执行,可以继续执行其他任务。
- 什么是同步阻塞? 操作需要等待,且当前线程会停止执行。
- 什么是同步非阻塞? 操作不需要等待,但当前线程不会停止执行。
- 什么是异步阻塞? 理论上不存在,因为异步操作不会让当前线程停止。
- 什么是异步非阻塞? 操作不需要等待,且当前线程不会停止执行。
以老张烧水为例
- 出场人物: 老张,水壶两把(普通水壶,响水壶)。
- 同步阻塞: 老张把普通水壶放到火上,等水开。
- 同步非阻塞: 老张把普通水壶放到火上,去客厅看电视,时不时去厨房看看水开没有。
- 异步阻塞: 老张把响水壶放到火上,等水开。没有意义。
- 异步非阻塞: 老张把响水壶放到火上,去客厅看电视,响了再去拿壶。
- 所谓同步异步,只是对于水壶而言。 普通水壶同步,响水壶异步。
- 所谓阻塞非阻塞,仅仅对于老张而言。 等的老张阻塞,看电视的老张非阻塞。
以银行取款为例
同步: 自己持银行卡到银行取钱(使用同步IO时,Java自己处理IO读写)
异步: 委托别人拿银行卡到银行取钱,然后给你(使用异步IO时,Java将IO读写委托给OS处理,需要将数据缓冲区地址和大小传给OS,OS需要支持异步IO操作API)
阻塞: ATM排队取款,只能等待(使用阻塞IO时,Java调用会一直阻塞到读写完成才返回)
非阻塞: 柜台取款,取个号,等号广播会通知你(使用非阻塞IO时,如果不能读写Java调用会马上返回,当IO事件分发器会通知可读写时再继续进行读写)
同步和异步是针对应用程序和内核的交互而言的。
阻塞和非阻塞是针对于进程在访问数据的时候, 阻塞方式下读取或者写入函数将一直等待,而非阻塞方式下,读取或者写入函数会立即返回一个状态值。
3. BIO/NIO/AIO
- xxxxxxxxxx package demo6;// (3)有序性(volatile)public class ThreadDemo3 { private volatile static int x = 0, y = 0; private volatile static int a = 0, b = 0; public static void main(String[] args) throws InterruptedException { int i=0; while (true) { i++; x = 0; y = 0; a = 0; b = 0; Thread thread1 = new Thread(new Runnable() { @Override public void run() { //用于调整两个线程的执行顺序 shortWait(20000); a = 1; x = b; } }); Thread thread2 = new Thread(new Runnable() { @Override public void run() { b = 1; y = a; } }); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("第" + i + "次(" + x + "," + y + ")"); if (x==0&&y==0){ break; } } } public static void shortWait(long interval){ long start = System.nanoTime(); long end; do{ end = System.nanoTime(); }while(start + interval >= end); }}java
- 同步并阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可以通过线程池机制改善。
- 【同步非阻塞IO(Java NIO)】 NIO是一个请求一个线程。
- 同步非阻塞,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。用户进程也需要时不时的询问IO操作是否就绪,这就要求用户进程不停的去询问。
- 【(Java AIO (NIO.2))异步非阻塞IO 】 AIO是一个有效请求一个线程。
- 用户进程只需要发起一个IO操作然后立即返回,等IO操作真正的完成以后,应用程序会得到IO操作完成的通知,此时用户进程只需要对数据进行处理就好了,不需要进行实际的IO读写操作,因为真正的IO读取或者写入操作已经由内核完成了。
(二) BIO
1. 概念原理
- BIO(Blocking IO) 又称同步阻塞IO
- BIO API:Socket、ServerSocket
- 弊端
- 如果BIO使用单线程接受连接,则会阻塞其他连接,效率较低。
- 如果使用多线程虽然减弱了单线程带来的影响,但当有大并发进来时,会导致服务器线程太多,压力太大而崩溃。
- 就算使用线程池,也只能同时允许有限个数的线程进行连接,如果并发量远大于线程池设置的数量,还是与单线程无异。另外多线程也会有线程切换带来的消耗。
- IO代码里read操作是阻塞操作,如果连接不做数据读写操作会导致线程阻塞,就是说只占用连接,不发送数据,则会浪费资源。比如线程池中500个连接,只有100个是频繁读写的连接,其他占着浪费资源!
2. BIOServer
public class BIOServer {
public static void main(String[] args) throws IOException {
//创建服务端套接字 & 绑定host:port & 监听client
ServerSocket serverSocket = new ServerSocket(9999);
//等待客户端连接到来
Socket socket = serverSocket.accept();
//拿到输入流 -- client write to server
InputStream in = socket.getInputStream();
//拿到输出流 -- server write to client
OutputStream out = socket.getOutputStream();
while (true) {
//将数据读到buf中
byte[] buf = new byte[32];
//server read from client
int len = in.read(buf);
//如果len == 1,说明client已经断开连接
if(len == -1){
throw new RuntimeException("连接已断开");
}
System.out.println("Server:" + new String(buf, 0, len));
//将读出来的数据写回给client
//如果不使用偏移量,可能会将buf中的无效数据也写回给client
byte[] send = "hi".getBytes();
out.write(send, 0, send.length);
}
}
}
3. BIOClient
public class BIOClient {
public static void main(String[] args) throws IOException, InterruptedException {
//创建客户端套接字 & 连接服务器
Socket socket = new Socket("127.0.0.1", 9999);
//拿到输入流 -- server write to client, client read from server
InputStream in = socket.getInputStream();
//拿到输出流 -- client write to server
OutputStream out = socket.getOutputStream();
while (true) {
//client write to server
byte[] send = "hello".getBytes();
out.write(send);
//read from server
byte[] buf = new byte[32];
int len = in.read(buf, 0 ,send.length);
//如果len == 1,说明server已经断开连接
if(len == -1){
throw new RuntimeException("连接已断开");
}
System.out.println("Client:" + new String(buf, 0, len));
Thread.sleep(1000);
}
}
}
(三) NIO
1. 概念原理
- NIO(Non Blocking IO)又称同步非阻塞IO。
- 服务器实现模式为把多个连接(请求)放入集合中,只用一个线程可以处理多个请求(连接),也就是多路复用。
- 一个线程处理大量的客户端的请求,通过一个线程轮询大量channel,每次获取一批有事件的channel,然后对每个请求启动一个线程处理即可。
- NIO API:网络SelectableChannel、SocketChannel、ServerSocketChannel、文件FileChannel
- NIO 三大核心组件:
- Buffer缓冲区
- NIO的库中所有对象的读写都是用缓冲区处理的:读数据是从缓冲区读,写数据是写入缓冲区。buffer 底层就是个数组。
- Channel通道
- channel 类似于流,每个 channel 对应一个 buffer缓冲区。可以进行读写,与Stream的区别是:channel是全双工双向的,一个流只能是单向移动,channel既可以读也可以写。
- Selector多路复用器
- channel 会注册到 selector 上,由 selector 根据 channel 读写事件的发生将其交由某个空闲的线程处理、如果某个channel上有新的tcp连接接入读写事件,这个channel就处于就绪的状态,会被seclector轮询出来,然后通过SelectionKey可以获得就绪Channel的集合,进行后续的I/O操作。
- Buffer缓冲区
- Redis就是典型的基于epoll的NIO线程模型(nginx也是),epoll实例收集所有事件(连接与读写事件),把有数据交互的连接放入就绪事件列表中,由一个服务端线程连续处理所有就绪事件列表中的命令。
2. NIOServer
public class NIOServer {
public static void main(String[] args) throws IOException {
//创建服务端socket通道 & 绑定host:port
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open().bind(new InetSocketAddress(9999));
//设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
//新创建一个selector(其实可以为每一个channel单独创建一个selector)
Selector selector = Selector.open();
//将该通道注册到该selector上,并且注明感兴趣的事件,因为是服务端通道,所以只对accept事件感兴趣
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true){
//selector会帮我们去轮询,当前是否有我们感兴趣的事件发生,一直阻塞到有为止
selector.select();
//若返回的key个数不为0,那么就可以一一处理这些事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
iterator.remove();
if(selectionKey.isAcceptable()){
SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if(selectionKey.isReadable()){
SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(32);
int len = socketChannel.read(buffer);
if(len == -1){
throw new RuntimeException("连接已断开");
}
buffer.flip();
byte[] buf = new byte[len];
buffer.get(buf);
System.out.println("Server:" + new String(buf, 0, len));
selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);
} else if(selectionKey.isWritable()){
SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
int len = socketChannel.write(ByteBuffer.wrap("hi".getBytes()));
if(len == -1){
throw new RuntimeException("连接已断开");
}
selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE);
}
}
}
}
}
3. NIOClient
public class NIOClient {
public static void main(String[] args) throws IOException, InterruptedException {
//创建客户端socket通道 & 连接host:port
SocketChannel socketChannel = SocketChannel.open();
//设置为非阻塞模式
socketChannel.configureBlocking(false);
//非阻塞的形式连接服务器
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
//新创建一个selector
Selector selector = Selector.open();
//将该通道注册到该selector上,并且注明感兴趣的事件
socketChannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
while (true){
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
iterator.remove();
if(selectionKey.isConnectable()){
if(socketChannel.finishConnect()){
selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE);
}
} else if(selectionKey.isReadable()){
ByteBuffer buffer = ByteBuffer.allocate(32);
int len = socketChannel.read(buffer);
if(len == -1){
throw new RuntimeException("连接已断开");
}
byte[] buf = new byte[len];
buffer.flip();
buffer.get(buf);
System.out.println("Client:" + new String(buf, 0, len));
selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);
} else if(selectionKey.isWritable()){
int len = socketChannel.write(ByteBuffer.wrap("hello".getBytes()));
if(len == -1){
throw new RuntimeException("连接已断开");
}
selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE);
Thread.sleep(1000);
}
}
}
}
}
4. select/poll/epoll模型比较
NIO的多路复用底层主要用的是Linux 内核函数(select,poll,epoll)来实现的。windows不支持epoll实现,windows底层是基于winsock2的select函数实现的(不开源)。
select、poll模型
- NIO底层的早期JDK1.4版本(linux的内核函数select()或poll())
- 多路复用采用的是遍历Selector中所有的连接,然后对有事件的连接做出响应。假如连接数太多,有10000个连接,其中只有1000个连接有写数据,但是由于其他9000个连接并没有断开,我们还是要每次轮询遍历一万次,其中有十分之九的遍历都是无效的,为了处理无效遍历的问题,在jdk1.5及以上版本引入了epoll模型
epoll模型
- JDK1.5开始引入了epoll基于事件响应机制来优化NIO。epoll模型解决了select和poll模型的无效遍历问题,是NIO的核心。epool是基于事件响应的!
- 在使用epoll模型之后,对简单版本的NIO做了优化处理,可以理解为在第一个版本的NIO上,又增加了一个就绪事件列表集合,这个集合中存放着有事件响应的连接,然后开启一个线程去监听这个集合,有元素的话就进行处理。
最主要三行代码(可查看底层源码深入理解)
Selector selector = Selector.open(); serverSocket.register(selector, SelectionKey.OP_ACCEPT); selector.select();
(四) AIO
1. 概念原理
- AIO 自JDK1.7以后才开始支持,是异步非阻塞的,与NIO不同,当进行读写操作时,只需直接调用API的read或write方法即可。这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。即可以理解为, read/write方法都是异步的,完成后会主动调用回调函数。
- AIO基于Proactor模型实现,分为发送请求和读取数据两个步骤:
- 发送请求:将数据写入的缓冲区后,剩下的交给操作系统去完成
- 读取数据:操作系统写回数据也是写到Buffer里面,完成后再通知客户端来进行读取数据。
- AIO主要在java.nio.channels包下增加了下面四个异步通道:
- AsynchronousSocketChannel
- AsynchronousServerSocketChannel
- AsynchronousFileChannel
- AsynchronousDatagramChannel
- 其中的read/write方法,会返回一个带回调函数的对象,当执行完读取/写入操作后,直接调用回调函数。
- 为什么大多数公司并没有使用AIO,而是使用了netty?
- AIO的底层实现仍使用Epoll,并没有很好的实现异步,在性能上对比NIO没有太大优势
- AIO的代码逻辑比较复杂,且Linux上AIO还不够成熟
- Netty在NIO上做了很多异步的封装,是异步非阻塞框架
2. AIOServer
public class AIOServer {
public static void main(String[] args) throws IOException, InterruptedException {
AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
ssc.bind(new InetSocketAddress(9001));
ssc.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
try {
ssc.accept(attachment, this);
System.out.println(socketChannel.getRemoteAddress());
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer bf) {
bf.flip();
System.out.println(new String(bf.array(), 0, result));
ByteBuffer send = ByteBuffer.wrap("Hello Client".getBytes());
socketChannel.write(send);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
} catch (Exception ex) {
ex.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});
Thread.sleep(Integer.MAX_VALUE);
}
}
3. AIOClient
public class AIOClient {
public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
AsynchronousSocketChannel sc = AsynchronousSocketChannel.open();
sc.connect(new InetSocketAddress("127.0.0.1", 9001)).get();
sc.write(ByteBuffer.wrap("Hello Server".getBytes()));
ByteBuffer bf = ByteBuffer.allocate(1024);
Integer len = sc.read(bf).get();
if (len != -1) {
System.out.println("收到服务端数据: "+new String(bf.array(), 0, len));
}
}
}
4. BIO/NIO/AIO适用场景
- BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序直观简单易理解。
- NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。
- AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。
(五) Netty
1. 概念原理
- Netty概述
- Netty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。Netty 对 JDK 自带的 NIO 的 API 进行了良好的封装,解决了上述问题。且Netty拥有高性能、 吞吐量更高,延迟更低,减少资源消耗,最小化不必要的内存复制等优点。
- Netty 现在都在用的是4.x,5.x版本已经废弃,Netty 4.x 需要JDK 6以上版本支持。
- Netty地位
- Netty 在 Java 网络应用框架中的地位就好比:Spring 框架在 JavaEE 开发中的地位
- 下面的框架都使用了 Netty,因为它们有网络通信需求!
- Cassandra - nosql 数据库
- Spark - 大数据分布式计算框架
- Hadoop - 大数据分布式存储框架
- RocketMQ - ali 开源的消息队列
- ElasticSearch - 搜索引擎
- gRPC - rpc 框架
- Dubbo - rpc 框架
- Spring 5.x - flux api 完全抛弃了 tomcat ,使用 netty 作为服务器端
- Zookeeper - 分布式协调框架
- Netty场景
- 互联网行业:在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现。各进程节点之间的内部通信。Rocketmq底层也是用的Netty作为基础通信组件。
- 游戏行业:无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用。Netty 作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈。
- 大数据领域:经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通信,它的 Netty Service 基于 Netty 框架二次封装实现。
- NIO缺点
- NIO的类库和API繁杂,学习成本高,你需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
- 需要熟悉Java多线程编程。这是因为NIO编程涉及到Reactor模式,你必须对多线程和网络编程非常熟悉,才能写出高质量的NIO程序。
- 臭名昭著的 epoll bug。它会导致Selector空轮询,最终导致CPU 100%。直到JDK1.7版本依然没得到根本性的解决。
- Netty优点
- API使用简单,学习成本低。
- 功能强大,内置了多种解码编码器,支持多种协议。
- 性能高,对比其他主流的NIO框架,Netty的性能最优。
- 社区活跃,发现BUG会及时修复,迭代版本周期短,不断加入新的功能。
- 线程模型
- 传统阻塞IO的服务模型
- Reactor模式
- 根据Reactor的数量和1处理资源的线程数不同,又分3种:
- Reactor单线程;
- Reactor多线程;
- 主从Reactor多线程
- Netty的线程模型是基于主从Reactor多线程做了改进。
2. NettyServer
public class NettyServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup()) // 1
.channel(NioServerSocketChannel.class) // 2
.childHandler(new ChannelInitializer<NioSocketChannel>() { // 3
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringDecoder()); // 5
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { // 6
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println(msg);
}
});
}
})
.bind(8080); // 4
}
}
3. NettyClient
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
new Bootstrap()
.group(new NioEventLoopGroup()) // 1
.channel(NioSocketChannel.class) // 2
.handler(new ChannelInitializer<Channel>() { // 3
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder()); // 8
}
})
.connect("127.0.0.1", 8080) // 4
.sync() // 5
.channel() // 6
.writeAndFlush(new Date() + ": hello world!"); // 7
}
}