1. MyServerHandler是否必须继承ChannelInboundHandlerAdapter?
不,不是固定的。 继承 ChannelInboundHandlerAdapter 是最常见的方式,因为它让你可以选择性地重写你关心的事件方法(如 channelRead, exceptionCaught)。
但还有另一种选择:实现 ChannelInboundHandler 接口。
ChannelInboundHandlerAdapter:是一个适配器类,它实现了 ChannelInboundHandler 接口,但所有方法都是空实现。你可以只重写你需要的方法,这是一种推荐的做法,更简洁。
ChannelInboundHandler:是一个接口,你必须实现里面的所有方法(包括你可能不关心的)。
所以,MyServerHandler 的核心是成为一个 ChannelInboundHandler,而继承适配器是实现这一目标最方便的方式。
2. 是否可以添加多个 .childHandler?流程是怎样的?
不可以直接添加多个 .childHandler 方法。 后一个 .childHandler 会覆盖前一个。
正确的做法是:在一个 ChannelInitializer 的 initChannel 方法中,向管道(Pipeline)添加多个处理器(Handler)。
Netty的处理器模型是基于 责任链模式 的。数据(比如接收到的消息)会在管道中的各个处理器之间传递。每个处理器负责完成一项特定的任务。
流程如下:
- 入站(Inbound)流程:当数据从网络通道(Channel)读取后,会从管道的头部开始,依次传递给每一个 入站处理器(InboundHandler)。例如:
channelRead 事件会从第一个 InboundHandler 传递到最后一个。
- 出站(Outbound)流程:当要向通道写入数据时,会从管道的尾部开始,逆序传递给每一个 出站处理器(OutboundHandler)。例如:
write 事件会从最后一个 OutboundHandler 传递到第一个。
示例:一个包含多个处理器的服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new MyServerHandler()); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
} });
|
假设客户端发送了数据 “Hello”:
StringDecoder 的 channelRead 方法被触发,它将 ByteBuf 类型的 msg 转换成了 String 类型的 “Hello”。
StringDecoder 调用 ctx.fireChannelRead("Hello"),将转换后的字符串传递给下一个处理器。
MyServerHandler 的 channelRead 方法被触发,此时参数 msg 已经是一个 String 对象了,可以直接处理。
MyServerHandler 处理完后,调用 ctx.writeAndFlush("服务端回复") 发送消息。
- 这个出站消息会先传递给
StringEncoder。
StringEncoder 的 write 方法将 “服务端回复” 这个字符串编码回 ByteBuf,然后继续传递,最终通过网络发送给客户端。
关键点:
- 顺序很重要! 解码器必须在业务处理器之前,编码器通常在之后。
- 处理器需要调用
ctx.fireChannelRead(msg) 或 ctx.write(msg) 将事件传递给链中的下一个处理器。如果忘记调用,责任链就会中断。
3. 如何实现文字与文件的传输?
实现文字和文件的混合传输,核心在于设计一个简单的协议来区分不同类型的消息。通常有两种主流方式:
方法一:使用定长消息头(推荐用于学习)
在消息开头用一个固定长度的字段来表示消息类型和长度。
例如,设计一个8字节的消息头:
- 第1个字节:消息类型(比如
1 代表文本,2 代表文件)
- 第2-8个字节:消息体的长度(7个字节可以表示很大的文件)
流程:
- 服务端/客户端首先有一个 解码器,它专门读取8个字节的头。
- 根据头部的类型,判断接下来要读取的是文本还是文件。
- 根据头部的长度,读取指定字节数的内容。
- 将读取到的内容(ByteBuf)交给不同的业务处理器处理。
Netty提供了 LengthFieldBasedFrameDecoder 来帮助你处理这种基于长度的分包,这是最专业和高效的方式。
方法二:使用分隔符(简单,适用于小文件或内网)
用特殊的字符序列来分隔不同的消息。例如,用 \n 分隔文本行,用一个特殊的 [FILE_END] 标记来表示文件传输结束。这种方式对于大文件不太可靠。
代码示例:方法一的简化版(自定义解码器)
这里提供一个非常简化的概念性代码,展示如何区分文本和文件。
1. 自定义协议消息类
1 2 3 4 5 6 7
| public class MyMessage { private byte type; private int length; private ByteBuf data;
}
|
2. 自定义解码器(继承 ByteToMessageDecoder)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class MyMessageDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < 5) { return; }
in.markReaderIndex();
byte type = in.readByte(); int length = in.readInt();
if (in.readableBytes() < length) { in.resetReaderIndex(); return; }
ByteBuf dataBuf = in.readBytes(length); MyMessage message = new MyMessage(); message.setType(type); message.setLength(length); message.setData(dataBuf);
out.add(message); } }
|
3. 服务端管道配置
1 2 3 4 5 6 7 8 9 10 11 12 13
| .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyMessageDecoder()); pipeline.addLast(new MyServerHandler()); } });
|
4. 业务处理器 MyServerHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class MyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { MyMessage myMessage = (MyMessage) msg;
if (myMessage.getType() == 1) { String text = myMessage.getData().toString(CharsetUtil.UTF_8); System.out.println("收到文本: " + text); } else if (myMessage.getType() == 2) { ByteBuf fileData = myMessage.getData(); System.out.println("收到文件,大小: " + myMessage.getLength() + " 字节"); }
myMessage.getData().release(); } }
|
总结:
- 多个Handler:通过向Pipeline添加多个处理器实现,形成责任链。
- 文件文字传输:核心是设计协议(如长度前缀法),并编写相应的解码器(Decoder) 来区分消息类型。业务处理器则根据类型进行不同的处理。对于大型文件,还需要考虑分块传输、断点续传等更复杂的机制。
设计一个简单的文本与文件互发案例
1. 协议设计
我们设计一个简单的协议:
- 消息头:8字节
- 第1字节:消息类型(1=文本,2=文件块)
- 第2-5字节:数据长度(int,4字节)
- 第6-8字节:文件块序号(int,3字节,0=文本消息或文件结束标记)
2. 公共组件
消息类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public class FileMessage { private byte type; private int length; private int chunkIndex; private String fileName; private ByteBuf data; public FileMessage(byte type, int length, int chunkIndex, String fileName, ByteBuf data) { this.type = type; this.length = length; this.chunkIndex = chunkIndex; this.fileName = fileName; this.data = data; } public byte getType() { return type; } public int getLength() { return length; } public int getChunkIndex() { return chunkIndex; } public String getFileName() { return fileName; } public ByteBuf getData() { return data; } public void setFileName(String fileName) { this.fileName = fileName; } @Override public String toString() { return "FileMessage{" + "type=" + type + ", length=" + length + ", chunkIndex=" + chunkIndex + ", fileName='" + fileName + '\'' + '}'; } }
|
编码器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class FileMessageEncoder extends MessageToByteEncoder<FileMessage> { private static final Charset CHARSET = StandardCharsets.UTF_8; @Override protected void encode(ChannelHandlerContext ctx, FileMessage msg, ByteBuf out) throws Exception { out.writeByte(msg.getType()); out.writeInt(msg.getLength()); out.writeMedium(msg.getChunkIndex()); if (msg.getType() == 2 && msg.getFileName() != null) { byte[] fileNameBytes = msg.getFileName().getBytes(CHARSET); out.writeByte(fileNameBytes.length); out.writeBytes(fileNameBytes); } if (msg.getData() != null) { out.writeBytes(msg.getData()); } } }
|
解码器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class FileMessageDecoder extends LengthFieldBasedFrameDecoder { private static final Charset CHARSET = StandardCharsets.UTF_8; private static final int MAX_FRAME_LENGTH = 1024 * 1024; private static final int LENGTH_FIELD_OFFSET = 1; private static final int LENGTH_FIELD_LENGTH = 4; private static final int LENGTH_ADJUSTMENT = 3; private static final int INITIAL_BYTES_TO_STRIP = 0; public FileMessageDecoder() { super(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = (ByteBuf) super.decode(ctx, in); if (frame == null) { return null; } try { byte type = frame.readByte(); int length = frame.readInt(); int chunkIndex = frame.readUnsignedMedium(); String fileName = null; if (type == 2) { byte fileNameLength = frame.readByte(); byte[] fileNameBytes = new byte[fileNameLength]; frame.readBytes(fileNameBytes); fileName = new String(fileNameBytes, CHARSET); } ByteBuf data = frame.readRetainedSlice(length); return new FileMessage(type, length, chunkIndex, fileName, data); } finally { frame.release(); } } }
|
3. 服务端实现
服务端处理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
| public class FileServerHandler extends ChannelInboundHandlerAdapter { private static final String FILE_STORAGE_DIR = "server_files/"; private Map<String, FileOutputStream> fileStreams = new ConcurrentHashMap<>(); private Map<String, Integer> expectedChunks = new ConcurrentHashMap<>(); static { new File(FILE_STORAGE_DIR).mkdirs(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FileMessage) { FileMessage fileMsg = (FileMessage) msg; if (fileMsg.getType() == 1) { handleTextMessage(ctx, fileMsg); } else if (fileMsg.getType() == 2) { handleFileChunk(ctx, fileMsg); } } else { ctx.fireChannelRead(msg); } } private void handleTextMessage(ChannelHandlerContext ctx, FileMessage msg) { String text = msg.getData().toString(StandardCharsets.UTF_8); System.out.println("【服务端】收到文本: " + text); String response = "服务端已收到文本: " + text.substring(0, Math.min(text.length(), 10)) + "..."; sendTextMessage(ctx, response); msg.getData().release(); } private void handleFileChunk(ChannelHandlerContext ctx, FileMessage msg) { String fileName = msg.getFileName(); int chunkIndex = msg.getChunkIndex(); ByteBuf chunkData = msg.getData(); try { if (chunkIndex == 0) { FileOutputStream fos = new FileOutputStream(FILE_STORAGE_DIR + fileName); fileStreams.put(fileName, fos); expectedChunks.put(fileName, 0); System.out.println("【服务端】开始接收文件: " + fileName); } FileOutputStream fos = fileStreams.get(fileName); if (fos != null) { byte[] bytes = new byte[chunkData.readableBytes()]; chunkData.readBytes(bytes); fos.write(bytes); expectedChunks.put(fileName, expectedChunks.get(fileName) + 1); System.out.println("【服务端】收到文件块: " + fileName + " [块" + chunkIndex + "]"); if (chunkIndex % 10 == 0) { sendTextMessage(ctx, "已收到 " + fileName + " 的第 " + chunkIndex + " 块"); } } } catch (Exception e) { e.printStackTrace(); sendTextMessage(ctx, "文件传输错误: " + e.getMessage()); } finally { chunkData.release(); } } private void sendTextMessage(ChannelHandlerContext ctx, String text) { ByteBuf buffer = Unpooled.copiedBuffer(text, StandardCharsets.UTF_8); FileMessage response = new FileMessage((byte) 1, buffer.readableBytes(), 0, null, buffer); ctx.writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { for (FileOutputStream fos : fileStreams.values()) { try { fos.close(); } catch (Exception e) { e.printStackTrace(); } } fileStreams.clear(); expectedChunks.clear(); super.channelInactive(ctx); } }
|
服务端启动类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class FileServer { private static final int PORT = 8888; public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new FileMessageDecoder(), new FileMessageEncoder(), new FileServerHandler() ); } }); ChannelFuture future = bootstrap.bind(PORT).sync(); System.out.println("文件服务器启动成功,端口: " + PORT); future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
|
4. 客户端实现
客户端处理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| public class FileClientHandler extends ChannelInboundHandlerAdapter { private ChannelHandlerContext ctx; private final String fileToSend; public FileClientHandler(String fileToSend) { this.fileToSend = fileToSend; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; System.out.println("【客户端】连接服务器成功"); sendTextMessage("Hello Server! 准备发送文件: " + fileToSend); if (fileToSend != null && new File(fileToSend).exists()) { new Thread(() -> sendFileInChunks(fileToSend)).start(); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FileMessage) { FileMessage fileMsg = (FileMessage) msg; if (fileMsg.getType() == 1) { String text = fileMsg.getData().toString(StandardCharsets.UTF_8); System.out.println("【客户端】收到回复: " + text); } fileMsg.getData().release(); } } private void sendTextMessage(String text) { ByteBuf buffer = Unpooled.copiedBuffer(text, StandardCharsets.UTF_8); FileMessage message = new FileMessage((byte) 1, buffer.readableBytes(), 0, null, buffer); ctx.writeAndFlush(message); } private void sendFileInChunks(String filePath) { try { File file = new File(filePath); String fileName = file.getName(); FileInputStream fis = new FileInputStream(file); byte[] buffer = new byte[1024]; int bytesRead; int chunkIndex = 0; sendFileChunk(fileName, new byte[0], 0); Thread.sleep(100); System.out.println("【客户端】开始发送文件: " + fileName); while ((bytesRead = fis.read(buffer)) != -1) { byte[] chunkData = Arrays.copyOf(buffer, bytesRead); sendFileChunk(fileName, chunkData, ++chunkIndex); Thread.sleep(10); if (chunkIndex % 50 == 0) { System.out.println("【客户端】已发送 " + chunkIndex + " 个数据块"); } } fis.close(); System.out.println("【客户端】文件发送完成,总共 " + chunkIndex + " 个数据块"); sendTextMessage("文件 " + fileName + " 发送完成!"); } catch (Exception e) { e.printStackTrace(); sendTextMessage("文件发送错误: " + e.getMessage()); } } private void sendFileChunk(String fileName, byte[] data, int chunkIndex) { ByteBuf buffer = Unpooled.copiedBuffer(data); FileMessage message = new FileMessage((byte) 2, data.length, chunkIndex, fileName, buffer); ctx.writeAndFlush(message); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
|
客户端启动类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class FileClient { private static final String HOST = "localhost"; private static final int PORT = 8888; public static void main(String[] args) throws Exception { String fileToSend = args.length > 0 ? args[0] : "test.txt"; EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new FileMessageDecoder(), new FileMessageEncoder(), new FileClientHandler(fileToSend) ); } }); ChannelFuture future = bootstrap.connect(HOST, PORT).sync(); future.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
|
5. 测试文件创建
创建一个测试文件 test.txt:
1 2 3
| echo "这是一个测试文件,用于验证Netty文件分块传输功能。" > test.txt
dd if=/dev/zero of=largefile.bin bs=1M count=5
|
6. 运行方式
- 启动服务端:
1 2
| javac *.java java FileServer
|
- 启动客户端:
1
| java FileClient test.txt
|
关键特性说明
- 分块传输:文件被分成1KB的块逐个发送
- 协议设计:使用固定头部分辨消息类型
- 内存管理:正确使用ByteBuf的retain/release机制
- 并发安全:使用ConcurrentHashMap处理并发文件写入
- 错误处理:完善的异常处理和资源释放
这个实现展示了Netty如何处理混合类型的数据传输,你可以根据需要调整块大小、协议格式等参数。