NettyでのServer Bootstrap初期化ソース解析
23452 ワード
前言
以前の記事ではNettyのスレッドプールNioEventLoopGroupの初期化プロセスを紹介していましたが、今回はNettyで同様に非常に重要なものであるServerBootstrapを分析します.
ServerBootstrap使用フラグメント
これはNettyサービス側の比較標準の初期化フラグメントであり、
ソース解析
以上のフラグメントから、初期化時に
ここで、入力された
ここから、この方法は、入力された
次に
ここで
ここでは、
ここで、
次に
操作が多すぎず、
最後に
ここでは、
ここで,オブジェクトを入れるkey値は
ここでは、tcpのパラメータ設定の大部分があり、一般的に使用されているいくつかのパラメータがソースコードに注釈されています.パフォーマンスに大きな影響を及ぼすのは
この方法は主に
ここを見て私はまたショックを受けました.Netty自身はjava 8以下のバージョンに対応するために
ここでは、ポート番号に基づいて
ここで、
まず
この方法は長いように見えますが、主に2つのことをしました.1つは、親と子のoptionsとattrsを割り当てることです.二、
最終的には、3つのパラメータの
ここでの
ここでは、バインド操作
続いて
通常、この方法はwhileサイクル中に常に存在し、
まとめ
以前の記事ではNettyのスレッドプールNioEventLoopGroupの初期化プロセスを紹介していましたが、今回はNettyで同様に非常に重要なものであるServerBootstrapを分析します.
ServerBootstrap使用フラグメント
ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setNameFormat(getServerName() + " Server Acceptor NIO Thread#%d").build();
ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat(getServerName() + " Server Reactor NIO Thread#%d").build();
this.bossGroup = new NioEventLoopGroup(numberOfThreads, bossThreadFactory);
this.workerGroup = new NioEventLoopGroup(numberOfThreads, workerThreadFactory);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("readTimeoutHandler", new ReadTimeoutHandler(DeviceServerListener.this.timeoutSeconds));
pipeline.addLast("lineBasedFrameDecoder-" + maxLength, new LineBasedFrameDecoder(Integer.parseInt(maxLength)));// ('
') ByteBuf
pipeline.addLast("stringPluginMessageDecoder", new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast("stringToByteEncoder", new StringToByteEncoder());// JSON ByteBuf
pipeline.addLast("deviceMessageDecoder", new DeviceMessageDecoder());// JSON deviceMessage
pipeline.addLast("deviceMessageEncoder", new DeviceMessageEncoder());// deviceMessage JSON
pipeline.addLast("deviceHeartBeatResponseHandler", new DeviceHeartBeatResponseHandler(heartTime));
pipeline.addLast("deviceAuthResponseHandler",
new DeviceAuthResponseHandler(DeviceServerListener.this.timeoutSeconds, DeviceServerListener.serverInstanceName));
pipeline.addLast("deviceMessageHandler", new DeviceMessageHandler());
// log.debug("Added Handler to Pipeline: {}", pipeline.names());
}
}).option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT);
// Start the server. Bind and start to accept incoming connections.
this.channelFuture = bootstrap.bind(serverPort).sync();
これはNettyサービス側の比較標準の初期化フラグメントであり、
ServerBootstrap
が非常に重要なシーンを持っていることがわかります.これはNettyのイニシエータのように、NioEventLoopGroup
が以前に分析されていたので、ServerBootstrap
の初期化を直接見てみましょう.ソース解析
以上のフラグメントから、初期化時に
ServerBootstrap
の非パラメトリック構造関数によってオブジェクトが作成され、この構造関数には何の操作もないので分析しないことがわかります.次に、このオブジェクトのチェーン呼び出しbootstrap.group().channel().childHandler().option()
の列を見てみましょう.まずはgroup()
メソッド public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
ここで、入力された
childGroup
オブジェクトは、ServerBootstrap
のchildGroup
属性に割り当てられ、親AbstractBootstrap
のgroup()
メソッドが呼び出される public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
}
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return (B) this;
}
ここから、この方法は、入力された
parentGroup
をgroup
属性に付与することが分かる.次に
channel()
の方法を見てみましょう public B channel(Class extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new BootstrapChannelFactory(channelClass));
}
ここで
channelFactory(new BootstrapChannelFactory(channelClass))
を呼び出して、ついてきます. public B channelFactory(ChannelFactory extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return (B) this;
}
ここでは、
AbstractBootstrap
クラスのchannelFactory
属性の付与が完了し、付与対象はnew BootstrapChannelFactory(channelClass)
である.public abstract class AbstractBootstrap, C extends Channel> implements Cloneable
ここで、
C
は、AbstractBootstrap
のクラスに従ってChannel
のサブクラスとして定義されるので、実際に入力されるchannelClass
の値はNioServerSocketChannel.class
である.次に
childHandler()
の方法を見てみましょう public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
this.childHandler = childHandler;
return this;
}
操作が多すぎず、
ServerBootstrap
のchildHandler
属性にのみ値を付与しているが、ここに伝わるchildHandler
は少し複雑である.受信されたオブジェクトはnew ChannelInitializer()
であり、受信および送信の双方向メッセージを処理するための処理チェーンを初期化するinitChannel(SocketChannel ch)
の方法が同時に上書きされている.この方法は比較的重要であり、その後、単独で分析され、ここでは詳細に説明しない.最後に
option()
の方法を見てみましょう public B option(ChannelOption option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
if (value == null) {
synchronized (options) {
options.remove(option);
}
} else {
synchronized (options) {
options.put(option, value);
}
}
return (B) this;
}
ここでは、
options
配列を維持し、設定する項目を配列に配置します.定義は次のとおりです.private final Map, Object> options = new LinkedHashMap, Object>();
ここで,オブジェクトを入れるkey値は
ChannelOption>
オブジェクトに限定されていることが分かるが,おおよそ次のようなものがある. public static final ChannelOption ALLOCATOR = valueOf("ALLOCATOR");// PooledByteBufAllocator UnpooledByteBufAllocator (4.1 )
public static final ChannelOption RCVBUF_ALLOCATOR = valueOf("RCVBUF_ALLOCATOR");//
public static final ChannelOption MESSAGE_SIZE_ESTIMATOR = valueOf("MESSAGE_SIZE_ESTIMATOR");
public static final ChannelOption CONNECT_TIMEOUT_MILLIS = valueOf("CONNECT_TIMEOUT_MILLIS");
public static final ChannelOption MAX_MESSAGES_PER_READ = valueOf("MAX_MESSAGES_PER_READ");
public static final ChannelOption WRITE_SPIN_COUNT = valueOf("WRITE_SPIN_COUNT");
public static final ChannelOption WRITE_BUFFER_HIGH_WATER_MARK = valueOf("WRITE_BUFFER_HIGH_WATER_MARK");
public static final ChannelOption WRITE_BUFFER_LOW_WATER_MARK = valueOf("WRITE_BUFFER_LOW_WATER_MARK");
public static final ChannelOption ALLOW_HALF_CLOSURE = valueOf("ALLOW_HALF_CLOSURE");
public static final ChannelOption AUTO_READ = valueOf("AUTO_READ");
public static final ChannelOption SO_BROADCAST = valueOf("SO_BROADCAST");//
public static final ChannelOption SO_KEEPALIVE = valueOf("SO_KEEPALIVE");//
public static final ChannelOption SO_SNDBUF = valueOf("SO_SNDBUF");//
public static final ChannelOption SO_RCVBUF = valueOf("SO_RCVBUF");//
public static final ChannelOption SO_REUSEADDR = valueOf("SO_REUSEADDR");//
public static final ChannelOption SO_LINGER = valueOf("SO_LINGER");// close() ,
public static final ChannelOption SO_BACKLOG = valueOf("SO_BACKLOG");//
public static final ChannelOption SO_TIMEOUT = valueOf("SO_TIMEOUT");
public static final ChannelOption IP_TOS = valueOf("IP_TOS");
public static final ChannelOption IP_MULTICAST_ADDR = valueOf("IP_MULTICAST_ADDR");
public static final ChannelOption IP_MULTICAST_IF = valueOf("IP_MULTICAST_IF");
public static final ChannelOption IP_MULTICAST_TTL = valueOf("IP_MULTICAST_TTL");
public static final ChannelOption IP_MULTICAST_LOOP_DISABLED = valueOf("IP_MULTICAST_LOOP_DISABLED");
public static final ChannelOption TCP_NODELAY = valueOf("TCP_NODELAY");// Nagle ,
ここでは、tcpのパラメータ設定の大部分があり、一般的に使用されているいくつかのパラメータがソースコードに注釈されています.パフォーマンスに大きな影響を及ぼすのは
ALLOCATOR
で、一部の資料によると、オープンオブジェクトプールの性能はオープンしないよりはるかに高く、Nettyは4.1バージョンからデフォルトのオプションをオープンオブジェクトプールに設定しています.ここでは、valueOf
の方法の実装について説明します. public static ChannelOption valueOf(String name) {
checkNotNull(name, "name");
ChannelOption option = names.get(name);
if (option == null) {
option = new ChannelOption(name);
ChannelOption old = names.putIfAbsent(name, option);
if (old != null) {
option = old;
}
}
return option;
}
この方法は主に
names
という配列を維持しており,以下のように定義されていることがわかる.private static final ConcurrentMap names = PlatformDependent.newConcurrentHashMap();
PlatformDependent.newConcurrentHashMap()
の具体的な実装を見てみましょう public static ConcurrentMap newConcurrentHashMap() {
if (CAN_USE_CHM_V8) {
return new ConcurrentHashMapV8();
} else {
return new ConcurrentHashMap();
}
}
ここを見て私はまたショックを受けました.Netty自身はjava 8以下のバージョンに対応するために
ConcurrentHashMapV8
を実現しました.これは6000行以上のコードのクラスであることを知っています.のNettyの開発者は本当に狂魔の称号を最適化したと言わざるを得ない.bootstrap.group().channel().childHandler().option()
という一連のチェーン呼び出しを解析し、次に次の一連のbootstrap.bind(serverPort).sync()
を見てみましょう.ここではまずbind()
の方法を見てみましょう public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
ここでは、ポート番号に基づいて
InetSocketAddress
オブジェクトを作成し、bind(SocketAddress localAddress)
メソッドを呼び出します. public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
ここで、
validate()
メソッドは、AbstractBootstrap
クラスのgroup
およびchannelFactory
属性が空であるかどうかを検証し、doBind()
メソッドを実行する. private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();// Channel
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
//
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// , ,
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
//
promise.setFailure(cause);
} else {
//
promise.executor = channel.eventLoop();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
まず
regFuture
オブジェクトがinitAndRegister()
メソッドで生成された方法を見てみましょう. final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// Channel
channel = channelFactory().newChannel();
// Handler
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// Channel , GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// channel Reactor
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
channelFactory().newChannel()
ここで呼び出されるのはAbstractBootstrap
のchannelFactory
属性のnewChannel()
メソッドであり、ここで最終的に得られるのは、以前に付与されたchannelFactory
オブジェクトの場合に基づいてNioServerSocketChannel
のインスタンスオブジェクトである.channel
オブジェクトが得られた後、init(channel)
が呼び出されて初期化され、この方法はサブクラスServerBootstrap
によって実現される. @Override
void init(Channel channel) throws Exception {
final Map, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);// options DefaultChannelConfig
}
final Map, Object> attrs = attrs();
synchronized (attrs) {
for (Entry, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey
この方法は長いように見えますが、主に2つのことをしました.1つは、親と子のoptionsとattrsを割り当てることです.二、
channel
のpipeline
属性の処理チェーンを構築する.ここでは主にaddLast()
の方法を見てみましょう.DefaultChannelPipeline
によって具体的に実現されています.見てみましょう. @Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);// handler sharable
newCtx = newContext(group, filterName(name, handler), handler);//filterName(name, handler) name,newContext newCtx handler、inbound、outbound、name、pipeline
addLast0(newCtx);// newCtx tail
if (!registered) {// channel eventloop registered true,
newCtx.setAddPending();//CAS newCtx handlerState
callHandlerCallbackLater(newCtx, true);// newCtx pendingHandlerCallbackHead next
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
最終的には、3つのパラメータの
addLast()
メソッドが実行され、送信されたgroup
およびname
はいずれも空である.この方法では主にchannel
がeventloop
に登録されているかどうかに応じて異なる処理を行い、handlerチェーンを初期化する.initAndRegister()
メソッドに戻り、init(channel)
を解析した後、channel
をReactorスレッドプールに登録するもう一つの重要なregister()
メソッドを見てみましょう.この方法はNioEventLoopGroup
の親MultithreadEventLoopGroup
で実現される @Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
@Override
public EventExecutor next() {
return chooser.next();
}
ここでの
chooser
の前の記事では、スレッドプールに設定されたスレッド数が2のべき乗次数で異なる実装があるかどうかを分析したが、next()
メソッドは配列のNioEventLoop
オブジェクトを返し、このクラスのregister()
メソッドは、最終的にはchannel
が登録されたDefaultChannelPromise
オブジェクトを返し、具体的にはここでは深くない.これでinitAndRegister()
メソッドの解析が完了し、doBind()
メソッドに戻ってdoBind0()
メソッドの解析を継続します. private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
ここでは、バインド操作
regFuture.isSuccess()
が実行されるのは、channel
、すなわちchannel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
の登録が成功した場合のみであり、そうでなければpromise
に直接登録失敗を書き込むことはできない.ここでのpromise
はDefaultChannelPromise
のタイプのオブジェクトであり、このクラスはFuture
から継承され、特殊なFuture
のオブジェクトと考えられる.bind(localAddress, promise)
メソッドは最終的にtail
ノードにバインドされ、最終的にDefaultChannelPipeline
の内部クラスのunsafe
にバインドされ、呼び出しチェーンは非常に深く、ここでは展開されません.最後にジャンプしてbootstrap.bind(serverPort).sync()
のsync()
を見てみると、bootstrap.bind(serverPort)
がpromise
を返したと分析されていたので、sync()
法はDefaultChannelPromise
によって実現された. @Override
public ChannelPromise sync() throws InterruptedException {
super.sync();
return this;
}
続いて
DefaultPromise
クラスについて @Override
public Promise sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
@Override
public Promise await() throws InterruptedException {
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
checkDeadLock();
synchronized (this) {
while (!isDone()) {
incWaiters();
try {
wait();
} finally {
decWaiters();
}
}
}
return this;
}
通常、この方法はwhileサイクル中に常に存在し、
sync()
メソッドに実行されるスレッドがブロックされるため、bootstrap.bind(serverPort).sync()
以降のコードは達成できないことが分かる点に注意すべきである.まとめ
ServerBootstrap
の使用方法は非常に固定されており、ほとんどの通常の使用では初期化コードテンプレートが適用されます.主なことは、ServerBootstrap
の各属性を付与し、Channel
、バインドユーザ定義Handler
、およびChannel
を1つのeventloop
に登録することであり(ここで特に強調するのは、Channel
は1つのeventloop
にのみバインドされる)、最後にローカルポートをバインドしてIOイベントをリスニングすることである.