《Netty官方文档》引用计数对象

原文地址     翻译:Tyrian

从Netty 4起,对象的生命周期由它们的引用计数来管理,因此,一旦对象不再被引用后,Netty 会将它(或它共享的资源)归还到对象池(或对象分配器)。在垃圾回收和引用队列不能保证这么有效、实时的不可达性检测的情况下,引用计数以牺牲轻微的便利性为代价,提供了 另一种可选的解决方案。 最值得注意的类型是ByteBuf,它正是利用了引用计数来提升内存分配和释放的性能。这一节 将用ByteBuf来讲述引用计数在Netty中是如何工作的。

引用计数基本原理

一个新创建的引用计数对象的初始引用计数是1。

 ByteBuf buf = ctx.alloc().directbuffer();
 assert buf.refCnt() == 1;

当你释放掉引用计数对象,它的引用次数减1.如果一个对象的引用计数到达0,该对象就会被 释放或者归还到创建它的对象池。

 assert buf.refCnt() == 1;
 // release() returns true only if the reference count becomes 0.
 boolean destroyed = buf.release();
 assert destroyed;
 assert buf.refCnt() == 0;

悬挂引用

访问引用计数为0的引用计数对象会触发一次IllegalReferenceCountException:

 assert buf.refCnt() == 0;
 try {
 buf.writeLong(0xdeadbeef);
 throw new Error("should not reach here");
 } catch (IllegalReferenceCountExeception e) {
 // Expected
 }

增加引用计数

只要引用计数对象未被销毁,就可以通过调用retain()方法来增加引用次数:

 ByteBuf buf = ctx.alloc().directBuffer();
 assert buf.refCnt() == 1;

 buf.retain();
 assert buf.refCnt() == 2;

 boolean destroyed = buf.release();
 assert !destroyed;
 assert buf.refCnt() == 1;

谁来销毁

一般的原则是,最后访问引用计数对象的部分负责对象的销毁。更具体地来说:

  • 如果一个[发送]组件要传递一个引用计数对象到另一个[接收]组件,发送组件通常不需要 负责去销毁对象,而是将这个销毁的任务推延到接收组件
  • 如果一个组件消费了一个引用计数对象,并且不知道谁会再访问它(例如,不会再将引用 发送到另一个组件),那么,这个组件负责销毁工作 这里有一个简单的示例:
public ByteBuf a(ByteBuf input) {
 input.writeByte(42);
 return input;
}

public ByteBuf b(ByteBuf input) {
 try {
 output = input.alloc().directBuffer(input.readableBytes() + 1);
 output.writeBytes(input);
 output.writeByte(42);
 return output;
 } finally {
 input.release();
 }
}

public void c(ByteBuf input) {
 System.out.println(input);
 input.release();
}

public void main() {
 ...
 ByteBuf buf = ...;
 // This will print buf to System.out and destroy it.
 c(b(a(buf)));
 assert buf.refCnt() == 0;
}

 

动作 谁应该负责释放? 谁实际释放?
1. main()创建buf buf->main()
2. main()调用a(buf) buf->a()
3. a()直接返回buf buf->main()
4. main()调用b(buf) buf->b()
5. b()返回buf的copy buf->b(),copy->main() b()释放buf
6. main()调用c(copy) copy->c()
7. c()释放copy copy->c() c()释放copy

子缓冲区(Derived buffers)

调用ByteBuf.duplicate(),ByteBuf.slice()和ByteBuf.order(ByteOrder)三个方法, 会创建一个子缓冲区,子缓冲区共享父缓冲区的内存区域。子缓冲区没有自己的引用计数,而是 共享父缓冲区的引用计数。

 ByteBuf parent = ctx.alloc().directBuffer();
 ByteBuf derived = parent.duplicate();

 // Creating a derived buffer does not increase the reference count.
 assert parent.refCnt() == 1;
 assert derived.refCnt() == 1;

但是,调用ByteBuf.copy()和ByteBuf.readBytes(int)创建的并不是子缓冲区,返回的 ByteBuf缓冲区是需要被释放的。 需要注意,父缓冲区和它的子缓冲区共享引用计数,创建子缓冲区并不会增加引用计数。 因此,当你将子缓冲区传到应用中的其他组件,必须先调用retain()。

ByteBuf parent = ctx.alloc().directBuffer(512);
parent.writeBytes(...);

try {
 while (parent.isReadable(16)) {
 ByteBuf derived = parent.readSlice(16);
 derived.retain();
 process(derived);
 }
} finally {
 parent.release();
}
...

public void process(ByteBuf buf) {
 ...
 buf.release();
}

ByteBufHolder接口

有时候,ByteBuf被缓冲区容器(buffer holder)持有,像DatagramPacket、HttpComponent和WebSocketFrame。 这些类型都继承自一个通用接口,叫做ByteBufHolder。 缓冲区容器(buffer holder)共享它持有的缓冲区的引用计数,和子缓冲区一样。

Channel-handler中的引用计数

入口消息

当一个事件循环(event loop)读取数据并写入到ByteBuf,在触发一次channelRead()事件后,应该由对应pipeline的 ChannelHandler负责去释放缓冲区的内存。因此,消费接收数据的handler应该在它channelRead()方法中调用数据的 release()方法。

public void channelRead(ChannelHandlerContext ctx, Object msg) {
 ByteBuf buf = (ByteBuf) msg;
 try {
 ...
 } finally {
 buf.release();
 }
 }

在“谁负责销毁”一节中我们提到,如果你的handler将缓冲区(或者其他任何引用计数对象)传递到下一个handler, 那么你不需要负责去释放。

public void channelRead(ChannelHandlerContext ctx, Object msg) {
 ByteBuf buf = (ByteBuf) msg;
 ...
 ctx.fireChannelRead(buf);
}

需要注意的是,ByteBuf并不是Netty中唯一的引用计数类型。如果你在与解码程序(decoder)生成的消息打交道,这些消息一样可能 是引用计数的。

/ Assuming your handler is placed next to `HttpRequestDecoder`
public void channelRead(ChannelHandlerContext ctx, Object msg) {
 if (msg instanceof HttpRequest) {
 HttpRequest req = (HttpRequest) msg;
 ...
 }
 if (msg instanceof HttpContent) {
 HttpContent content = (HttpContent) msg;
 try {
 ...
 } finally {
 content.release();
 }
 }
}

如果你有疑虑,或者你想简化释放消息内存的过程,你可以使用ReferenceCountUtil.release():

public void channelRead(ChannelHandlerContext ctx, Object msg) {
 try {
 ...
 } finally {
 ReferenceCountUtil.release(msg);
 }
}

同样地,你可以考虑继承SimpleChannelHandler,它会帮你调用ReferenceCountUtil.release()释放所有 你接收到的消息内存。

出口消息

与入口消息不同的是,出口消息是在你的应用中创建的,由Netty负责在将消息发送出去后释放掉。但是,如果你 有拦截写请求的handler程序,则需要保证正确释放中间对象(例如,编码程序)。

public void write(ChannelHandlerContext ctx, Object message, ChannelPromise promise) {
 System.err.println("Writing: " + message);
 ctx.write(message, promise);
}

// Transformation
public void write(ChannelHandlerContext ctx, Object message, ChannelPromise promise) {
 if (message instanceof HttpContent) {
 // Transform HttpContent to ByteBuf.
 HttpContent content = (HttpContent) message;
 try {
 ByteBuf transformed = ctx.alloc().buffer();
 ....
 ctx.write(transformed, promise);
 } finally {
 content.release();
 }
 } else {
 // Pass non-HttpContent through.
 ctx.write(message, promise);
 }
}

内存泄漏

引用计数的缺点是,引用计数对象容易发生泄露。因为JVM并不知道Netty的引用计数实现,当引用计数对象不 可达时,JVM就会将它们GC掉,即时此时它们的引用计数并不为0。一旦对象被GC就不能再访问,也就不能 归还到缓冲池,所以会导致内存泄露。 庆幸的是,尽管发现内存泄露很难,但是Netty会对分配的缓冲区的1%进行采样,来检查你的应用中是否存在内存 泄露。一旦有内存泄露,你将会发现如下日志消息:

LEAK: ByteBuf.release() was not called before it’s garbage-collected. Enable advanced leak reporting to find out where the leak occurred. To enable advanced leak reporting, specify the JVM option ‘-Dio.netty.leakDetectionLevel=advanced’ or call ResourceLeakDetector.setLevel()

重启程序时加上上面提到的JVM选项,你就会找到离你程序中内存泄露最近的位置。以下是一段单元测试中的 内存泄露检查输出(XmlFrameDecoderTest.testDecodeWithXml())

Running io.netty.handler.codec.xml.XmlFrameDecoderTest 15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector – LEAK: ByteBuf.release() was not called before it’s garbage-collected. Recent access records: 1 #1: io.netty.buffer.AdvancedLeakAwareByteBuf.toString(AdvancedLeakAwareByteBuf.java:697) io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(XmlFrameDecoderTest.java:157) io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(XmlFrameDecoderTest.java:133) …

Created at: io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:55) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155) io.netty.buffer.UnpooledUnsafeDirectByteBuf.copy(UnpooledUnsafeDirectByteBuf.java:465) io.netty.buffer.WrappedByteBuf.copy(WrappedByteBuf.java:697) io.netty.buffer.AdvancedLeakAwareByteBuf.copy(AdvancedLeakAwareByteBuf.java:656) io.netty.handler.codec.xml.XmlFrameDecoder.extractFrame(XmlFrameDecoder.java:198) io.netty.handler.codec.xml.XmlFrameDecoder.decode(XmlFrameDecoder.java:174) io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:227) io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:140) io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:74) io.netty.channel.embedded.EmbeddedEventLoop.invokeChannelRead(EmbeddedEventLoop.java:142) io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:317) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) io.netty.channel.embedded.EmbeddedChannel.writeInbound(EmbeddedChannel.java:176) io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(XmlFrameDecoderTest.java:147) io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(XmlFrameDecoderTest.java:133) … If you use Netty 5 or above, an additional information is provided to help you find which handler handled the leaked buffer lastly. The following example shows that the leaked buffer was handled by the handler whose name is EchoServerHandler#0 and then garbage-collected, which means it is likely that EchoServerHandler#0 forgot to release the buffer:

12:05:24.374 [nioEventLoop-1-1] ERROR io.netty.util.ResourceLeakDetector – LEAK: ByteBuf.release() was not called before it’s garbage-collected. Recent access records: 2 #2: Hint: ‘EchoServerHandler#0’ will handle the message from this point. io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:329) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:133) io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485) io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452) io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346) io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:794) java.lang.Thread.run(Thread.java:744) #1: io.netty.buffer.AdvancedLeakAwareByteBuf.writeBytes(AdvancedLeakAwareByteBuf.java:589) io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:208) io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:125) io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485) io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452) io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346) io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:794) java.lang.Thread.run(Thread.java:744) Created at: io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:55) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146) io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107) io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123) io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485) io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452) io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346) io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:794) java.lang.Thread.run(Thread.java:744)

内存泄露检查等级

总共有4个内存泄露检查等级:

  • DISABLED – 完全禁用检查。不推荐。
  • SIMPLE – 检查1%的缓冲区是否存在内存泄露。默认。
  • ADVANCED – 检查1%的缓冲区,并提示发生内存泄露的位置
  • PARANOID – 与ADVANCED等级一样,不同的是会检查所有的缓冲区。对于自动化测试很有用,你可以让构建测试失败 如果构建输出中包含’LEAK’ 用JVM选项 -Dio.netty.leakDetectionLevel 来指定内存泄露检查等级

java -Dio.netty.leakDetectionLevel=advanced …

避免泄露最佳实践

  • 指定SIMPLE和PARANOI等级,运行单元测试和集成测试
  • 在将你的应用部署到整个集群前,尽可能地用足够长的时间,使用SIMPLE级别去调试你的程序,来看是否存在内存泄露
  • 如果存在内存泄露,使用ADVANCED级别去调试程序,去获取内存泄漏的位置信息
  • 不要将存在内存泄漏的应用部署到整个集群

在单元测试中修复内存泄露

在单元测试中很容易忘记去释放缓冲区,这就会生成一个内存泄漏的警告。但是这并不意味着你的应用中也存在内存泄漏。你可以 在单元测试中使用ReferenceCountUtil.releaseLater()工具类方法,来代替try-finally块去释放所有的缓冲区:

import static io.netty.util.ReferenceCountUtil.*;

@Test
public void testSomething() throws Exception {
 // ReferenceCountUtil.releaseLater() will keep the reference of buf,
 // and then release it when the test thread is terminated.
 ByteBuf buf = releaseLater(Unpooled.directBuffer(512));
 ...
}

了解更多:

Why do we need to manually handle reference counting for Netty ByteBuf if JVM GC is still in place?

Buffer ownership in Netty 4: How is buffer life-cycle managed?

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 《Netty官方文档》引用计数对象

Tyrian

Software Engineer at TP-LINK

Latest posts by Tyrian (see all)

FavoriteLoading添加本文到我的收藏
  • Trackback are closed
  • Comments (0)
  1. No comments yet.

You must be logged in to post a comment.

return top