Nettyソース分析-ChannelによるBufferのスタックメモリと直接メモリの使用
28289 ワード
ChannelのUnsafe対Bufferの使用
@Override
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
// allocator,allocator ByteBuf
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// allocator ByteBuf
byteBuf = allocHandle.allocate(allocator);
// doReadBytes socket byteBuf
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
// byteBuf pipeline,
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
final ByteBufAllocator allocator = config.getAllocator();
Channelはスタックメモリを使用するか、直接メモリを使用するか
//
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
// SocketChannel allocator
serverBootstrap.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
serverBootstrap.childHandler(new NettyServerInitializer(webSocketService));
1. api , :
-Dio.netty.noPreferDirect
2. sun.misc.Unsafe, : ; : sun , sun.misc.Unsafe , , , core dump, Unsafe
-Dio.netty.noUnsafe
ソースコードの分析は以下の通りである:1.
public class DefaultChannelConfig implements ChannelConfig {
...
private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
...
}
2.
public interface ByteBufAllocator {
ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
}
3. DEFAULT_ALLOCATOR
public final class ByteBufUtil {
...
static final ByteBufAllocator DEFAULT_ALLOCATOR;
static {
// android unpooled, pooled
String allocType = SystemPropertyUtil.get(
"io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
allocType = allocType.toLowerCase(Locale.US).trim();
ByteBufAllocator alloc;
if ("unpooled".equals(allocType)) {
alloc = UnpooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else if ("pooled".equals(allocType)) {
alloc = PooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else {
alloc = PooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
}
DEFAULT_ALLOCATOR = alloc;
...
}
...
}
4. UnpooledByteBufAllocator.DEFAULT: alloctor
/**
* Default instance which uses leak-detection for direct buffers.
*/
public static final UnpooledByteBufAllocator DEFAULT =
new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred());
5. PooledByteBufAllocator.DEFAULT: allocator
public static final PooledByteBufAllocator DEFAULT =
new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
6. 4,5 , PlatformDependent.directBufferPreferred(), true, , 。PlatformDependent.directBufferPreferred() :
private static final Throwable UNSAFE_UNAVAILABILITY_CAUSE = unsafeUnavailabilityCause0();
private static final boolean DIRECT_BUFFER_PREFERRED =
UNSAFE_UNAVAILABILITY_CAUSE == null && !SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false);
unsafeUnavailabilityCause0 : Java unsafe
private static Throwable unsafeUnavailabilityCause0() {
if (isAndroid()) {
logger.debug("sun.misc.Unsafe: unavailable (Android)");
return new UnsupportedOperationException("sun.misc.Unsafe: unavailable (Android)");
}
Throwable cause = PlatformDependent0.getUnsafeUnavailabilityCause();
if (cause != null) {
return cause;
}
try {
boolean hasUnsafe = PlatformDependent0.hasUnsafe();
logger.debug("sun.misc.Unsafe: {}", hasUnsafe ? "available" : "unavailable");
return hasUnsafe ? null : PlatformDependent0.getUnsafeUnavailabilityCause();
} catch (Throwable t) {
logger.trace("Could not determine if Unsafe is available", t);
// Probably failed to initialize PlatformDependent0.
return new UnsupportedOperationException("Could not determine if Unsafe is available", t);
}
}