授课语音

Java的IO模型

1. 介绍

Java的I/O(输入/输出)指的是数据交换的过程,比如文件读写、网络通信等。

基本概念

  1. 流(Streams) 是对数据的一种抽象,分为输入和输出流,用于数据的读取和写入,包括:
    • 字节流(用于处理非文本数据)
    • 字符流(用于处理文本数据)
  2. 通道(Channel) 是Java NIO/AIO中的组件,能够直接操作文件、网络连接等,包括:
    • 文件通道(如FileChannel)用于文件操作
    • 网络通道(如SocketChannelServerSocketChannel)用于网络通信
    • 通道是双向的,可以同时进行读写操作
  3. 选择器(Selector) 是Java NIO中的组件,用于监视通道的I/O事件。
  4. 缓冲区(Buffer) 提供了一种高效的数据读写方式,提升I/O性能,包括:
    • 字节缓冲区(ByteBuffer
    • 字符缓冲区(CharBuffer
    • 其他类型的缓冲区(如IntBufferLongBuffer等)

同步与异步

  • 同步 指操作按严格顺序执行,调用者会被阻塞,直到I/O操作完成,才能继续下一个请求。
  • 异步 指操作不按严格顺序执行,调用者不会被阻塞,I/O操作完成时会通过回调通知。

阻塞与非阻塞

  • 阻塞 I/O操作期间,线程被挂起,直到操作完成,无法执行其他任务。
  • 非阻塞 操作不挂起,线程可以继续执行其他任务,操作完成后通过回调进行通知。

三种I/O模型

  1. BIO(阻塞I/O):每个I/O连接都有一个独立线程,线程等待I/O操作完成,不能执行其他任务。
  2. NIO(非阻塞I/O):主要提高I/O操作效率,适用于高并发场景。
    • 使用缓冲区(Buffer)读写数据。
    • 应用多路复用机制,选择器(Selector)管理多个通道(Channel),实现单线程处理多个连接。
    • 通过读取通道数据到缓冲区,写入缓冲区数据到通道。
  3. AIO(异步I/O):也是非阻塞I/O。
    • I/O操作完成后,通过回调通知应用程序实现异步处理。
    • 核心组件包括异步通道(AsynchronousChannel)和完成处理器(CompletionHandler),比NIO进一步提升I/O效率,但编程复杂度较高,并可能在不同操作系统上表现不一。

图示

Image___1397757896017270248===cdc2972d253e07545d07a168c6dc0b8a===1_1.png___

I/O最佳实践

  1. 及时关闭资源,确保流和通道在使用完成后及时关闭。
  2. 使用缓冲区,减少I/O操作次数,提升性能。
  3. 注意编解码问题,处理字符流时应关注字符编码。
  4. 考虑线程安全问题,例如多线程访问同一通道资源时,需进行线程同步。

2. 代码案例

简单的(客户端-服务端)消息通信系统

BIO服务器

package com.zhilitech.bio;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class BioServer {
    private static final int PORT = 1234; // 服务器监听的端口
    private static final int CORE_POOL_SIZE = 5; // 核心线程数
    private static final int MAX_POOL_SIZE = 10; // 最大线程数
    private static final int KEEP_ALIVE_TIME = 60; // 线程空闲时间(秒)
    private static final int QUEUE_CAPACITY = 100; // 任务队列容量

    public static void main(String[] args) {
        // 创建自定义线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(QUEUE_CAPACITY)
        );

        try (ServerSocket serverSocket = new ServerSocket(PORT)) {
            System.out.println("服务器启动,等待客户端连接...");

            while (true) {
                // 阻塞,等待客户端连接
                Socket clientSocket = serverSocket.accept();
                System.out.println("客户端连接成功");

                // 提交任务到线程池处理客户端请求
                threadPool.execute(() -> handleClient(clientSocket));
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown(); // 关闭线程池
        }
    }

    private static void handleClient(Socket clientSocket) {
        try (InputStream inputStream = clientSocket.getInputStream();
             OutputStream outputStream = clientSocket.getOutputStream()) {

            // 读取客户端发送的数据
            byte[] buffer = new byte[1024];
            int bytesRead = inputStream.read(buffer);

            System.out.println("接收到客户端的数据: " + new String(buffer, 0, bytesRead));

            // 将读取到的数据返回给客户端
            outputStream.write(buffer, 0, bytesRead);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                clientSocket.close(); // 关闭客户端连接
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

BIO客户端

package com.zhilitech.bio;

import java.io.*;
import java.net.Socket;

public class BioClient {
    public static void main(String[] args) {
        try {
            // 连接到服务器
            Socket socket = new Socket("localhost", 1234);

            // 获取输出流,发送数据到服务器
            OutputStream outputStream = socket.getOutputStream();
            outputStream.write("Hello, BIO Server!".getBytes());

            // 获取输入流,读取服务器返回的数据
            InputStream inputStream = socket.getInputStream();
            byte[] buffer = new byte[1024];
            int bytesRead = inputStream.read(buffer);

            // 打印服务器返回的数据
            System.out.println("从服务器接收到的数据: " + new String(buffer, 0, bytesRead));

            // 关闭连接
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

NIO服务器

package com.zhilitech.nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class NioServer {
    private static final int PORT = 1234; // 服务器监听的端口
    private static final int CORE_POOL_SIZE = 5; // 核心线程数
    private static final int MAX_POOL_SIZE = 10; // 最大线程数
    private static final int KEEP_ALIVE_TIME = 60; // 线程空闲时间(秒)
    private static final int QUEUE_CAPACITY = 100; // 任务队列容量
    private static final int BUFFER_SIZE = 256; // 缓冲区大小

    public static void main(String[] args) {
        // 创建自定义线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(QUEUE_CAPACITY)
        );

        try {
            // 打开ServerSocketChannel并绑定端口
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new java.net.InetSocketAddress(PORT));
            // 设置为非阻塞模式
            serverSocketChannel.configureBlocking(false);

            // 打开Selector并注册ServerSocketChannel
            Selector selector = Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("服务器启动,等待客户端连接...");

            while (true) {
                // 等待多个通道上的事件,处理I/O操作
                selector.select();
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

                while (keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();

                    // 处理新连接
                    if (key.isAcceptable()) {
                        SocketChannel clientChannel = serverSocketChannel.accept();
                        clientChannel.configureBlocking(false);
                        clientChannel.register(selector, SelectionKey.OP_READ);
                        System.out.println("客户端连接成功");
                    }

                    // 处理读事件
                    if (key.isReadable()) {
                        // 使用线程池处理读取操作
                        threadPool.execute(() -> handleClient(key));
                    }
                    keyIterator.remove(); // 移除当前处理的选择键
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown(); // 关闭线程池
        }
    }

    private static void handleClient(SelectionKey key) {
        SocketChannel clientChannel = (SocketChannel) key.channel();
       

 if (!clientChannel.isConnected()) {
            System.out.println("客户端已关闭");
            return;
        }

        ByteBuffer buffer = ByteBuffer.allocate(256); // 创建缓冲区
        try {
            int bytesRead = clientChannel.read(buffer);

            if (bytesRead > 0) {
                buffer.flip(); // 切换到读取模式
                String message = new String(buffer.array(), 0, bytesRead);
                System.out.println("来自客户端的消息: " + message);

                // 回应客户端
                buffer.clear(); // 清空缓冲区
                buffer.put(("" + message).getBytes()); // 写入响应数据
                buffer.flip(); // 切换到写入模式
                clientChannel.write(buffer); // 发送给客户端
            } else if (bytesRead == -1) {
                // 客户端关闭连接
                closeChannel(key);
                System.out.println("客户端连接关闭");
            }
        } catch (ClosedChannelException e) {
            System.out.println("客户端连接关闭异常");
            closeChannel(key);
        } catch (IOException e) {
            System.out.println("IO异常");
            closeChannel(key);
            e.printStackTrace();
        }
    }

    private static void closeChannel(SelectionKey key) {
        try {
            if (key != null && key.channel() != null) {
                key.channel().close(); // 关闭通道
            }
        } catch (IOException e) {
            System.out.println("关闭通道异常: " + e.getMessage());
        }
    }
}

NIO客户端

package com.zhilitech.nio;

import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;

public class NioClient {
    public static void main(String[] args) throws Exception {
        // 创建一个SocketChannel并设置为非阻塞模式
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        // 连接到服务器
        socketChannel.connect(new java.net.InetSocketAddress("localhost", 1234));

        // 发送消息
        String message = "你好,NIO服务器!";
        ByteBuffer buffer = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8));
        socketChannel.write(buffer); // 发送数据到服务器

        // 清空缓冲区并准备读取响应
        buffer.clear();
        int bytesRead;
        // 接收服务端响应
        buffer.clear();
        bytesRead = socketChannel.read(buffer);
        if (bytesRead > 0) {
            buffer.flip(); // 切换到读取模式
            String response = new String(buffer.array(), 0, bytesRead);
            System.out.println("服务器响应: " + response);
        }

        Thread.sleep(3000); // 等待3秒以观察输出
        // 关闭连接
        socketChannel.close();
    }
}

AIO服务器

package com.zhilitech.aio;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class AioServer {
    private static final int PORT = 1234; // 服务器监听的端口
    private static final int CORE_POOL_SIZE = 5; // 核心线程数
    private static final int MAX_POOL_SIZE = 10; // 最大线程数
    private static final int KEEP_ALIVE_TIME = 60; // 线程空闲时间(秒)
    private static final int QUEUE_CAPACITY = 100; // 任务队列容量

    public static void main(String[] args) throws Exception {
        // 创建自定义线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(QUEUE_CAPACITY)
        );

        try (AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open()) {
            serverSocketChannel.bind(new java.net.InetSocketAddress(PORT));
            System.out.println("AIO服务器启动,等待客户端连接...");

            // 异步接受客户端连接
            serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
                @Override
                public void completed(AsynchronousSocketChannel clientChannel, Void attachment) {
                    System.out.println("客户端连接成功");

                    // 继续接受其他客户端连接
                    serverSocketChannel.accept(null, this);

                    // 使用线程池处理客户端数据
                    threadPool.execute(() -> handleClient(clientChannel));
                }

                @Override
                public void failed(Throwable exc, Void attachment) {
                    exc.printStackTrace();
                }
            });

            // 保持主线程活跃一段时间
            Thread.sleep(10000);
        } finally {
            threadPool.shutdown(); // 关闭线程池
        }
    }

    private static void handleClient(AsynchronousSocketChannel clientChannel) {
        if (!clientChannel.isOpen()) {
            System.out.println("客户端已关闭");
            return;
        }

        ByteBuffer buffer = ByteBuffer.allocate(1024); // 创建缓冲区
        clientChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer buffer) {
                buffer.flip(); // 切换到读取模式
                clientChannel.write(buffer); // 将数据返回给客户端
                buffer.clear(); // 清空缓冲区
            }

            @Override
            public void failed(Throwable exc, ByteBuffer buffer) {
                exc.printStackTrace();
            }
        });
    }
}

AIO客户端

package com.zhilitech.aio;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.Future;

public class AioClient {
    private static final String SERVER_ADDRESS = "localhost"; // 服务器地址
    private static final int PORT = 1234; // 服务器端口

    public static void main(String[] args) throws Exception {
        try (AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open()) {
            // 连接到服务器
            Future<Void> future = socketChannel.connect(new java.net.InetSocketAddress(SERVER_ADDRESS, PORT));
            future.get(); // 等待连接完成

            // 发送数据到服务器
            ByteBuffer buffer = ByteBuffer.wrap("你好, AIO 服务器!".getBytes());
            Future<Integer> writeFuture = socketChannel.write(buffer);
            writeFuture.get(); // 等待写入完成

            // 读取服务器返回的数据
            buffer.clear();
            Future<Integer> readFuture = socketChannel.read(buffer);
            readFuture.get(); // 等待读取完成
            buffer.flip(); // 切换到读取模式
            System.out.println("从服务器接收到的数据: " + new String(buffer.array(), 0, buffer.limit()));
            Thread.sleep(1000); // 等待1秒
        }
    }
}

以上代码展示了BIO、NIO和AIO三种I/O模型的基本用法,通过这些示例可以了解不同I/O模型的工作原理和使用场景。

去1:1私密咨询

系列课程: