授课语音

Netty聊天工具实践

1. 介绍

Netty是一个高性能、异步事件驱动的网络应用程序框架,由JBOSS开发和维护。它旨在替代Java原生的NIO类库,提供更高层次的抽象和更友好的API。很多开源框架使用Netty作为底层网络通信架构,例如:

  • Dubbo(阿里巴巴开源的高性能RPC框架,应用于分布式架构)
  • gRpc-Java(谷歌开源的RPC框架)
  • Druid(阿里巴巴开源的数据库连接池和数据查询系统)
  • Elasticsearch(基于Lucene的开源搜索引擎,用于全文搜索和分析)
  • Redisson(Redis的客户端,用于与Redis通信)
  • ZooKeeper(开源的分布式协调服务,用于分布式系统的协调和任务管理)

核心特点

  1. 异步和事件驱动:

    • 采用非阻塞的I/O模型,通过事件驱动机制处理网络事件(如连接、读取、写入等),使Netty能够高效处理大量并发连接。
  2. 高性能:

    • Netty经过精心设计和优化,以提供高性能的网络传输能力。它使用了零拷贝、内存池化等多种优化技术。
  3. 简化编程模型:

    • 提供高层次的抽象,将底层网络细节封装,使网络编程更简单,让开发者专注于业务逻辑。
  4. 丰富的协议支持:

    • 支持多种网络协议,包括TCP、UDP、HTTP、WebSocket等,并允许开发者轻松实现自定义协议。
  5. 可扩展性:

    • 允许通过自定义的编解码器和处理器等组件扩展功能,适应各种复杂需求和应用场景。
  6. 跨平台支持:

    • 虽然Netty是用Java编写的,但它能够在任何支持Java的操作系统上运行。

核心组件

  1. Channel:

    • Channel是Netty的核心接口,代表一个网络连接,提供读写操作的能力,是I/O操作的主要入口。
  2. EventLoop:

    • EventLoop负责处理I/O操作。每个EventLoop处理一个或多个Channel的I/O操作,Netty通过EventLoopGroup管理EventLoop实例。
  3. Bootstrap:

    • Bootstrap是Netty的启动类,用于配置和启动客户端或服务器。ServerBootstrap和Bootstrap分别用于服务器端和客户端的启动。
  4. ChannelHandler:

    • ChannelHandler处理Channel的各种事件和操作(如数据读取、写入等),开发者可以通过实现ChannelHandler接口定义自定义业务逻辑。
  5. Pipeline:

    • Pipeline是一个ChannelHandler的链表,负责将处理器链中的各个ChannelHandler组合,数据在ChannelPipeline中流动,通过各个处理器处理。
  6. Codec:

    • Codec是编解码器,用于将数据从字节流转换为应用程序中的对象(解码),或将对象转换为字节流(编码)。Netty提供了多种内置编解码器,也允许自定义编解码器。

工作流程

  1. 初始化:

    • 创建和配置Bootstrap对象,设置Channel初始化参数、EventLoopGroup、ChannelHandler等。
  2. 绑定:

    • 使用Bootstrap绑定服务器端口(或客户端地址)并启动服务器(或客户端)。
  3. 连接:

    • 客户端和服务器建立连接后,ChannelHandler处理I/O事件。
  4. 处理:

    • 数据在ChannelPipeline中流动,通过ChannelHandler处理。处理完成后,数据会被发送到远程端或从远程端接收。
  5. 关闭:

    • 关闭连接并释放资源。

2. 代码案例

下面是一个使用Netty实现的简单聊天客户端和服务器的代码示例,展示了如何自定义协议、数据编解码和连接管理等。

在pom.xml中添加Netty的依赖

<properties>
    <netty.version>4.1.68.Final</netty.version>
</properties>
<dependencies>
    <!-- Netty的核心模块 -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-common</artifactId>
        <version>${netty.version}</version>
    </dependency>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-buffer</artifactId>
        <version>${netty.version}</version>
    </dependency>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-codec</artifactId>
        <version>${netty.version}</version>
    </dependency>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-transport</artifactId>
        <version>${netty.version}</version>
    </dependency>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-handler</artifactId>
        <version>${netty.version}</version>
    </dependency>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-codec-http</artifactId>
        <version>${netty.version}</version>
    </dependency>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-codec-http2</artifactId>
        <version>${netty.version}</version>
    </dependency>
</dependencies>

聊天服务器实现

package com.zhilitech.nettydemo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class ChatServer {

    private final int port;

    public ChatServer(int port) {
        this.port = port;
    }

    public void start() throws InterruptedException {
        // 创建两个线程池:一个用于接受连接,另一个用于处理I/O操作
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 服务端的启动类,配置和启动服务端的Channel
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup) // 设置用于接受连接的线程池和处理I/O操作的线程池
             .channel(NioServerSocketChannel.class) // 指定NIO传输通道,服务端是NioServerSocketChannel
             .childHandler(new ChannelInitializer<SocketChannel>() { // 设置处理新连接的初始化器
                 @Override
                 protected void initChannel(SocketChannel ch) {
                     ChannelPipeline p = ch.pipeline(); // 获取ChannelPipeline
                     p.addLast(new MessageDecoder()); // 添加解码器
                     p.addLast(new MessageEncoder()); // 添加编码器
                     p.addLast(new ChatServerHandler()); // 添加自定义处理器
                 }
             });

            // 绑定端口并启动服务器
            ChannelFuture f = b.bind(port).sync();
            System.out.println("Chat server started on port " + port);
            f.channel().closeFuture().sync(); // 等待服务器的关闭
        } finally {
            // 优雅地关闭线程池
            bossGroup.shutdownGracefully().sync();
            workerGroup.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new ChatServer(8080).start(); // 启动服务器
    }
}

聊天客户端实现

package com.zhilitech.nettydemo;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.ChannelFutureListener;

import java.util.Scanner;

public class ChatClient {

    private final String host;
    private final int port;

    public ChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup(); // 创建线程池
        try {
            // 客户端的启动类,用于配置和启动客户端的Channel
            Bootstrap b = new Bootstrap();
            b.group(group) // 设置线程池
             .channel(NioSocketChannel.class) // 指定NIO传输通道,客户端是NioSocketChannel
             .handler(new ChannelInitializer<SocketChannel>() { // 设置处理新连接的初始化器
                 @Override
                 protected void initChannel(SocketChannel ch) {
                     ChannelPipeline p = ch.pipeline(); // 获取ChannelPipeline
                     p.addLast(new MessageDecoder()); // 添加解码器
                     p.addLast(new MessageEncoder()); // 添加编码器
                     p.addLast(new ChatClientHandler()); // 添加自定义处理器
                 }
             });

            // 连接到服务器
            ChannelFuture f = b

.connect(host, port).sync();
            f.addListener((ChannelFutureListener) future -> {
                if (future.isSuccess()) {
                    System.out.println("Connected to server");
                } else {
                    System.err.println("Failed to connect to server");
                }
            });

            // 读取用户输入并发送到服务器
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String msg = scanner.nextLine();

                if ("quit".equalsIgnoreCase(msg.trim())) {
                    // 如果用户输入 "quit",则关闭连接并退出
                    System.out.println("Exiting...");
                    f.channel().writeAndFlush("Client disconnected\n");
                    f.channel().close().sync(); // 关闭连接
                    break;
                }
                f.channel().writeAndFlush(msg); // 发送消息
            }

            f.channel().closeFuture().sync(); // 等待连接关闭
        } finally {
            group.shutdownGracefully().sync(); // 优雅地关闭线程池
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new ChatClient("localhost", 8080).start(); // 启动客户端
    }
}

消息解码器实现

package com.zhilitech.nettydemo;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class MessageDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        // 确保数据包的长度至少为4字节(用于读取长度)
        if (in.readableBytes() < 4) {
            return; // 数据不足,等待更多数据到达
        }

        in.markReaderIndex(); // 标记当前读索引
        int length = in.readInt(); // 读取消息长度
        if (in.readableBytes() < length) {
            in.resetReaderIndex(); // 数据不足,重置读索引,等待更多数据
            return;
        }

        ByteBuf messageBuf = in.readBytes(length); // 读取消息内容
        String message = messageBuf.toString(io.netty.util.CharsetUtil.UTF_8); // 转换为字符串
        out.add(message); // 将消息添加到输出列表
    }
}

消息编码器实现

package com.zhilitech.nettydemo;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class MessageEncoder extends MessageToByteEncoder<String> {

    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) {
        byte[] bytes = msg.getBytes(io.netty.util.CharsetUtil.UTF_8); // 将消息转换为字节数组
        out.writeInt(bytes.length); // 写入消息长度
        out.writeBytes(bytes); // 写入消息内容
    }
}

聊天服务器处理器实现

package com.zhilitech.nettydemo;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class ChatServerHandler extends SimpleChannelInboundHandler<String> {

    // 用于存储所有连接的客户端
    private static final ConcurrentMap<ChannelHandlerContext, String> clients = new ConcurrentHashMap<>();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
        System.out.println("Received message: " + msg); // 打印收到的消息

        // 处理客户端发来的消息并自动回复
        String reply = "Server received: " + msg;
        ctx.writeAndFlush(reply + "\n"); // 向客户端发送自动回复消息

        // 将消息广播给其他已连接的客户端
        for (ChannelHandlerContext clientCtx : clients.keySet()) {
            if (clientCtx != ctx) { // 不发送给发送消息的客户端
                clientCtx.writeAndFlush(msg + "\n");
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        // 当有客户端连接时,将其添加到客户端列表
        clients.put(ctx, ctx.channel().remoteAddress().toString());
        System.out.println("Client connected: " + ctx.channel().remoteAddress());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        // 当客户端断开连接时,从客户端列表中移除
        clients.remove(ctx);
        System.out.println("Client disconnected: " + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace(); // 打印异常
        ctx.close(); // 关闭连接
    }
}

聊天客户端处理器实现

package com.zhilitech.nettydemo;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class ChatClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
        // 打印从服务器接收到的消息
        System.out.println("Received from server: " + msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace(); // 打印异常
        ctx.close(); // 关闭连接
    }
}

总结

以上代码展示了如何使用Netty构建一个简单的聊天客户端和服务器。通过使用自定义的消息编码器和解码器,我们能够处理不同格式的数据,并通过ChannelHandler管理客户端连接及消息的处理。Netty提供了强大的工具集,使得网络编程变得更加简洁高效。

去1:1私密咨询

系列课程: