第3课java_netty_聊天工具实践
热度🔥:24 免费课程
授课语音
Netty聊天工具实践
1. 介绍
Netty是一个高性能、异步事件驱动的网络应用程序框架,由JBOSS开发和维护。它旨在替代Java原生的NIO类库,提供更高层次的抽象和更友好的API。很多开源框架使用Netty作为底层网络通信架构,例如:
- Dubbo(阿里巴巴开源的高性能RPC框架,应用于分布式架构)
- gRpc-Java(谷歌开源的RPC框架)
- Druid(阿里巴巴开源的数据库连接池和数据查询系统)
- Elasticsearch(基于Lucene的开源搜索引擎,用于全文搜索和分析)
- Redisson(Redis的客户端,用于与Redis通信)
- ZooKeeper(开源的分布式协调服务,用于分布式系统的协调和任务管理)
核心特点
异步和事件驱动:
- 采用非阻塞的I/O模型,通过事件驱动机制处理网络事件(如连接、读取、写入等),使Netty能够高效处理大量并发连接。
高性能:
- Netty经过精心设计和优化,以提供高性能的网络传输能力。它使用了零拷贝、内存池化等多种优化技术。
简化编程模型:
- 提供高层次的抽象,将底层网络细节封装,使网络编程更简单,让开发者专注于业务逻辑。
丰富的协议支持:
- 支持多种网络协议,包括TCP、UDP、HTTP、WebSocket等,并允许开发者轻松实现自定义协议。
可扩展性:
- 允许通过自定义的编解码器和处理器等组件扩展功能,适应各种复杂需求和应用场景。
跨平台支持:
- 虽然Netty是用Java编写的,但它能够在任何支持Java的操作系统上运行。
核心组件
Channel:
- Channel是Netty的核心接口,代表一个网络连接,提供读写操作的能力,是I/O操作的主要入口。
EventLoop:
- EventLoop负责处理I/O操作。每个EventLoop处理一个或多个Channel的I/O操作,Netty通过EventLoopGroup管理EventLoop实例。
Bootstrap:
- Bootstrap是Netty的启动类,用于配置和启动客户端或服务器。ServerBootstrap和Bootstrap分别用于服务器端和客户端的启动。
ChannelHandler:
- ChannelHandler处理Channel的各种事件和操作(如数据读取、写入等),开发者可以通过实现ChannelHandler接口定义自定义业务逻辑。
Pipeline:
- Pipeline是一个ChannelHandler的链表,负责将处理器链中的各个ChannelHandler组合,数据在ChannelPipeline中流动,通过各个处理器处理。
Codec:
- Codec是编解码器,用于将数据从字节流转换为应用程序中的对象(解码),或将对象转换为字节流(编码)。Netty提供了多种内置编解码器,也允许自定义编解码器。
工作流程
初始化:
- 创建和配置Bootstrap对象,设置Channel初始化参数、EventLoopGroup、ChannelHandler等。
绑定:
- 使用Bootstrap绑定服务器端口(或客户端地址)并启动服务器(或客户端)。
连接:
- 客户端和服务器建立连接后,ChannelHandler处理I/O事件。
处理:
- 数据在ChannelPipeline中流动,通过ChannelHandler处理。处理完成后,数据会被发送到远程端或从远程端接收。
关闭:
- 关闭连接并释放资源。
2. 代码案例
下面是一个使用Netty实现的简单聊天客户端和服务器的代码示例,展示了如何自定义协议、数据编解码和连接管理等。
在pom.xml中添加Netty的依赖
<properties>
<netty.version>4.1.68.Final</netty.version>
</properties>
<dependencies>
<!-- Netty的核心模块 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
<version>${netty.version}</version>
</dependency>
</dependencies>
聊天服务器实现
package com.zhilitech.nettydemo;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class ChatServer {
private final int port;
public ChatServer(int port) {
this.port = port;
}
public void start() throws InterruptedException {
// 创建两个线程池:一个用于接受连接,另一个用于处理I/O操作
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 服务端的启动类,配置和启动服务端的Channel
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup) // 设置用于接受连接的线程池和处理I/O操作的线程池
.channel(NioServerSocketChannel.class) // 指定NIO传输通道,服务端是NioServerSocketChannel
.childHandler(new ChannelInitializer<SocketChannel>() { // 设置处理新连接的初始化器
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline(); // 获取ChannelPipeline
p.addLast(new MessageDecoder()); // 添加解码器
p.addLast(new MessageEncoder()); // 添加编码器
p.addLast(new ChatServerHandler()); // 添加自定义处理器
}
});
// 绑定端口并启动服务器
ChannelFuture f = b.bind(port).sync();
System.out.println("Chat server started on port " + port);
f.channel().closeFuture().sync(); // 等待服务器的关闭
} finally {
// 优雅地关闭线程池
bossGroup.shutdownGracefully().sync();
workerGroup.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws InterruptedException {
new ChatServer(8080).start(); // 启动服务器
}
}
聊天客户端实现
package com.zhilitech.nettydemo;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.ChannelFutureListener;
import java.util.Scanner;
public class ChatClient {
private final String host;
private final int port;
public ChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup(); // 创建线程池
try {
// 客户端的启动类,用于配置和启动客户端的Channel
Bootstrap b = new Bootstrap();
b.group(group) // 设置线程池
.channel(NioSocketChannel.class) // 指定NIO传输通道,客户端是NioSocketChannel
.handler(new ChannelInitializer<SocketChannel>() { // 设置处理新连接的初始化器
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline(); // 获取ChannelPipeline
p.addLast(new MessageDecoder()); // 添加解码器
p.addLast(new MessageEncoder()); // 添加编码器
p.addLast(new ChatClientHandler()); // 添加自定义处理器
}
});
// 连接到服务器
ChannelFuture f = b
.connect(host, port).sync();
f.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
System.out.println("Connected to server");
} else {
System.err.println("Failed to connect to server");
}
});
// 读取用户输入并发送到服务器
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
if ("quit".equalsIgnoreCase(msg.trim())) {
// 如果用户输入 "quit",则关闭连接并退出
System.out.println("Exiting...");
f.channel().writeAndFlush("Client disconnected\n");
f.channel().close().sync(); // 关闭连接
break;
}
f.channel().writeAndFlush(msg); // 发送消息
}
f.channel().closeFuture().sync(); // 等待连接关闭
} finally {
group.shutdownGracefully().sync(); // 优雅地关闭线程池
}
}
public static void main(String[] args) throws InterruptedException {
new ChatClient("localhost", 8080).start(); // 启动客户端
}
}
消息解码器实现
package com.zhilitech.nettydemo;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class MessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 确保数据包的长度至少为4字节(用于读取长度)
if (in.readableBytes() < 4) {
return; // 数据不足,等待更多数据到达
}
in.markReaderIndex(); // 标记当前读索引
int length = in.readInt(); // 读取消息长度
if (in.readableBytes() < length) {
in.resetReaderIndex(); // 数据不足,重置读索引,等待更多数据
return;
}
ByteBuf messageBuf = in.readBytes(length); // 读取消息内容
String message = messageBuf.toString(io.netty.util.CharsetUtil.UTF_8); // 转换为字符串
out.add(message); // 将消息添加到输出列表
}
}
消息编码器实现
package com.zhilitech.nettydemo;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class MessageEncoder extends MessageToByteEncoder<String> {
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) {
byte[] bytes = msg.getBytes(io.netty.util.CharsetUtil.UTF_8); // 将消息转换为字节数组
out.writeInt(bytes.length); // 写入消息长度
out.writeBytes(bytes); // 写入消息内容
}
}
聊天服务器处理器实现
package com.zhilitech.nettydemo;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class ChatServerHandler extends SimpleChannelInboundHandler<String> {
// 用于存储所有连接的客户端
private static final ConcurrentMap<ChannelHandlerContext, String> clients = new ConcurrentHashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println("Received message: " + msg); // 打印收到的消息
// 处理客户端发来的消息并自动回复
String reply = "Server received: " + msg;
ctx.writeAndFlush(reply + "\n"); // 向客户端发送自动回复消息
// 将消息广播给其他已连接的客户端
for (ChannelHandlerContext clientCtx : clients.keySet()) {
if (clientCtx != ctx) { // 不发送给发送消息的客户端
clientCtx.writeAndFlush(msg + "\n");
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 当有客户端连接时,将其添加到客户端列表
clients.put(ctx, ctx.channel().remoteAddress().toString());
System.out.println("Client connected: " + ctx.channel().remoteAddress());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
// 当客户端断开连接时,从客户端列表中移除
clients.remove(ctx);
System.out.println("Client disconnected: " + ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace(); // 打印异常
ctx.close(); // 关闭连接
}
}
聊天客户端处理器实现
package com.zhilitech.nettydemo;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class ChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
// 打印从服务器接收到的消息
System.out.println("Received from server: " + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace(); // 打印异常
ctx.close(); // 关闭连接
}
}
总结
以上代码展示了如何使用Netty构建一个简单的聊天客户端和服务器。通过使用自定义的消息编码器和解码器,我们能够处理不同格式的数据,并通过ChannelHandler管理客户端连接及消息的处理。Netty提供了强大的工具集,使得网络编程变得更加简洁高效。