原理剖析(第011編)Nettyのサービス側起動動作原理分析(下)
原理剖析(第011編)Nettyのサービス側起動動作原理分析(下)
-
一、大まかに紹介する
二、三、四章は前の章を見てください.
詳しくは原理剖析(第010編)Nettyのサービス側起動動作原理分析(上)を参照
四、ソース分析Nettyサービス側起動
前の章では、スレッド管理グループのオブジェクトがどのようにインスタンス化されているかを主に分析し、各スレッド管理グループにサブスレッド配列がタスクを処理していることも理解しました.では、4.6から直接分析します.
4.6、serverBootstrapの構成パラメータを追加する
4.7、serverBootstrap呼び出しbindバインド登録
4.8、bind()操作
4.9、initAndRegister()初期化と登録
4.10、initサービス端Channel
4.11、config().group().register(channel)
4.12、register0(promise)
4.13、doBind0(regFuture, channel, localAddress, promise)
五、まとめ
六、ダウンロードアドレス
https://gitee.com/ylimhhmily/SpringCloudTutorial.git
SpringCloudTutorial交流QQ群:235322432
SpringCloudTutorial交流微信群:微信コミュニケーション群QRコードピクチャーリンク
注目を歓迎して、あなたのはきっと私に対する最大の支持です!!!
-
一、大まかに紹介する
1、 , , 【 ( 010 )Netty ( )】;
2、 Netty , Netty :netty-netty-4.1.22.Final;
二、三、四章は前の章を見てください.
詳しくは原理剖析(第010編)Nettyのサービス側起動動作原理分析(上)を参照
四、ソース分析Nettyサービス側起動
前の章では、スレッド管理グループのオブジェクトがどのようにインスタンス化されているかを主に分析し、各スレッド管理グループにサブスレッド配列がタスクを処理していることも理解しました.では、4.6から直接分析します.
4.6、serverBootstrapの構成パラメータを追加する
1、 :
// NettyServer.java
// Boss、Worker ServerBootstrap
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// NioServerSocketChannel, ,OIO OioServerSocketChannel
.localAddress("localhost", port)// InetSocketAddress 。
.childHandler(new ChannelInitializer() {// childHandler
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new PacketHeadDecoder());
ch.pipeline().addLast(new PacketBodyDecoder());
ch.pipeline().addLast(new PacketHeadEncoder());
ch.pipeline().addLast(new PacketBodyEncoder());
ch.pipeline().addLast(new PacketHandler());
}
});
2、 , Channel NioServerSocketChannel, Netty;
Channel , ;
3、 childHandler, childHandler , PacketHeadDecoder、PacketBodyDecoder、PacketHandler;
PacketHandler、PacketBodyEncoder、PacketHeadEncoder;
, ;
4.7、serverBootstrap呼び出しbindバインド登録
1、 :
// NettyServer.java
// , sync() , , sync(), 。
ChannelFuture channelFuture = serverBootstrap.bind().sync();
2、 , bind() , , ;
4.8、bind()操作
1、 :
// AbstractBootstrap.java
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind() {
validate();
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
throw new IllegalStateException("localAddress not set");
}
return doBind(localAddress); // Channel,
}
// AbstractBootstrap.java
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister(); //
// , :
// 1、 NioServerSocketChannel, Channel pipeline、config、unsafe ;
// 2、 handler pipeline , Channel handler ;
// 3、 NioServerSocketChannel NioEventLoop ;
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// NioServerSocketChannel Channel , ,
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
2、 , doBind ,initAndRegister doBind0;
3、initAndRegister Channel, Channel bossGroup ;
4、doBind0 , , ;
4.9、initAndRegister()初期化と登録
1、 :
// AbstractBootstrap.java
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// clazz.getConstructor().newInstance()
// Channel, NioServerSocketChannel
// NioServerSocketChannel , Channel DefaultChannelPipeline=pipeline
// NioServerSocketChannel , Channel NioServerSocketChannelConfig=config
// NioServerSocketChannel , Channel unsafe NioMessageUnsafe=unsafe
channel = channelFactory.newChannel(); // ReflectiveChannelFactory newChannel
// channel
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// config().group()=bossGroup parentGroup, parentGroup NioServerSocketChannel=channel
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
2、 , channel , channel ;
3、 bossGroup , channel ;
4、 , , Channel, bossGroup ;
4.10、initサービス端Channel
1、 :
// ServerBootstrap.java
@Override
void init(Channel channel) throws Exception {
final Map, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey
4.11、config().group().register(channel)
1、 :
// MultithreadEventLoopGroup.java
@Override
public ChannelFuture register(Channel channel) {
// next() NioEventLoopGroup children[] , NioEventLoop
// NioEventLoop channel
return next().register(channel);
// , Channel, NioEventLoopGroup register(Channel) Channel,
// next() NioEventLoop Channel
// NioEventLoopGroup bossGroup , NioServerSocketChannel
// NioEventLoopGroup workerGroup , ServerSocketChannel
}
// SingleThreadEventLoop.java
@Override
public ChannelFuture register(Channel channel) {
// this children[]
// Channel this DefaultChannelPromise
// register(ChannelPromise)
return register(new DefaultChannelPromise(channel, this));
}
// SingleThreadEventLoop.java
@Override
public ChannelFuture register(final ChannelPromise promise) {
// , , new
ObjectUtil.checkNotNull(promise, "promise");
// promise.channel() new DefaultChannelPromise(channel, this) channel
// promise.channel().unsafe() Channel unsafe , NioServerSocketChannel NioMessageUnsafe=unsafe
// this children[]
promise.channel().unsafe().register(this, promise);
return promise;
}
// AbstractUnsafe.java
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// eventLoop children[]
// Channel AbstractChannel, NioServerSocketChannel
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// this.eventLoop Children[i] ,
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) { // eventLoop
register0(promise); // register0
} else {
try {
// , eventLoop ,
// eventLoop execute register0(promise) ,
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
// Channel , else , ,eventLoop.inEventLoop() false
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
// SingleThreadEventExecutor.java
/**
* task。
*
* @param task
*/
@Override
public void execute(Runnable task) {
if (task == null) { // task , ,
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop(); // , nioEventLoop ?
if (inEventLoop) { // , task , addTask
addTask(task); // task
} else {
startThread(); // , nioEventLoop state , ,
addTask(task); // task
// , , , RejectedExecutionException
if (isShutdown() && removeTask(task)) {
reject(); // RejectedExecutionException
}
}
// addTaskWakesUp: , false, false
// wakesUpForTask(task): NonWakeupRunnable task true, NonWakeupRunnable task,
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
2、 config().group().register(channel) , , register0(promise) ;
3、 , config().group().register(channel) , :
channel , register , channel ;
4、 ,execute(Runnable task) , , ;
5、 register0(promise) channel ;
4.12、register0(promise)
1、 :
// AbstractUnsafe.java
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister(); // Channel , Channel AbstractNioChannel
// , Channel , ,
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded(); // initAndRegister init p.addLast initChannel
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) { // Channel , socket
if (firstRegistration) {
pipeline.fireChannelActive(); //
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead(); // Channel
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
// AbstractNioChannel.java
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) { // , , Channel selector
try {
// eventLoop() children[] ,children NioEventLoop
// , children , children selector unwrappedSelector
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
// Channel ,
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
// DefaultChannelPipeline.java
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) { // pipeline , true
firstRegistration = false; // false,
// We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
// that were added before the registration was done.
// , Channel NioEventLoop , Handler
callHandlerAddedForAllHandlers();
}
}
// DefaultChannelPipeline.java
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered; // registered false, ,
// This Channel itself was registered.
registered = true; // registered true ,
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC'ed.
this.pendingHandlerCallbackHead = null;
}
// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
// the EventLoop.
PendingHandlerCallback task = pendingHandlerCallbackHead;
// while , task execute, ,
while (task != null) {
task.execute();
task = task.next;
}
}
2、 register0(promise) , channel , javaChannel().register(...) , ;
3、 , pendingHandlerCallbackHead, task.execute ;
4、 fireChannelRegistered , channel , ;
5、 beginRead , channel ;
6、 initAndRegister , ip , ;
4.13、doBind0(regFuture, channel, localAddress, promise)
1、 :
// AbstractBootstrap.java
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
// ,
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
// channel bind
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
// AbstractChannel.java
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
// DefaultChannelPipeline.java
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
// AbstractChannelHandlerContext.java
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
// AbstractChannelHandlerContext.java
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
// HeadContext.java
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
// AbstractUnsafe.java
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
// NioServerSocketChannel.java
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
2、 , , javaChannel().bind(...) ;
javaChannel().register(...) ;
: channel , channel bind register ;
3、 , 。。
五、まとめ
, Netty , :
• , children[];
• , channel、localAddress、childHandler ;
• NioServerSocketChannel, ChannelId、unsafe、pipeline ;
• NioServerSocketChannel, attr、option, handler pipeline ;
• JDK ServerSocketChannel , pipeline task ;
• JDK NioServerSocketChannel , active ;
六、ダウンロードアドレス
https://gitee.com/ylimhhmily/SpringCloudTutorial.git
SpringCloudTutorial交流QQ群:235322432
SpringCloudTutorial交流微信群:微信コミュニケーション群QRコードピクチャーリンク
注目を歓迎して、あなたのはきっと私に対する最大の支持です!!!