【Netty】新しい接続へのアクセス方法
24901 ワード
公衆番号へようこそ:【
プログラミングが好きです
バックグラウンドの返信が必要な場合
2019贈呈
1 Tの学习资料よ!!
前文に続き、本は前回【NioEventLoop】に続きます.NioEventLoop実行プロセスを検討する際、IOイベント(新しい接続を含む)を検出し、IOイベントを処理し、すべてのタスクの3つのプロセスを実行します.ここで、IOイベントのうち、保有するselectorによってイベントがポーリングされ、新しい接続が検出される.ここでは同じコードを多重化します.
チャンネルのデザイン
分析を始める前に、まずChannelのデザインを知っておきましょう.
最上位チャネルインタフェースは、読み取り、書き込み、接続、バインドなどのsocketイベントを定義し、AbstractChannelをスケルトンとして使用してこれらの方法を実装します.ビューアメンバー変数、ほとんどの共通のコンポーネントが定義されていることがわかります.
第2層AbstractNioChannelは、NIO、すなわちSelectorによる読み書きイベントの傍受を定義する.メンバー変数にはselector関連のプロパティが保存されます.
第3層の内容は比較的多く,サービス側channel(左にAbstractNioMessageChannelを継承したNioServerSocketChannel)およびクライアントchannel(右にAbstractNioByteChannelを継承したNioSocketChannel)が定義されている.
新しい接続にアクセスする方法
Nettyが新しい接続にどのようにアクセスするかを探り始めました.主に四つの部分に分かれている
1.新しい接続の検出
2.NioSocketChannelの作成
3.スレッドの割り当てとSelectorの登録
4.Selectorへの読み取りイベントの登録
1.新しい接続の検出
Nettyサービス側は、起動時にbossGroup、すなわちNioEventLoopをバインドし、
重要な新しい接続アクセスおよび読み取りイベント処理エントリ
サービス側:unsafeは
クライアント:unsafeが
b).unsafe.read()
1.jdkの最下位のコードをループしてchannelを作成し、nettyのNioSocketChannelでパッケージし、新しい接続がチャネルに正常にアクセスしたことを示します.
2.取得したすべてのチャネルを1つのコンテナに格納し、アクセスの接続数を検出し、デフォルトでは1回に16の接続を接続する
3.コンテナ内のチャネルを巡回し、メソッドfireChannelReadを順次呼び出し、4.FireChannelReadComplete,fireExceptionCaughtは、対応する伝播イベントをトリガする.
このキーコード論理の
2.NioSocketChannelの作成
IOイベントを検出して新しい接続をポーリングすると、接続アクセスイベントが正常に検出された後、
NioSocketChannelの主な仕事について説明します.元のコードを表示するには、親構造メソッドを呼び出し、NioSocketChannelConfigをインスタンス化します.
1)、NioSocketChannelの親の構築方法を表示します.主に、クライアントが登録した読み取りイベントを保存し、channelをメンバー変数とし、ブロックモードを非ブロックに設定します.
最後に親クラスを呼び出す構造方法は、クライアントchannelに対応するサービス側channelと、channelのidと2つのコンポーネントunsafeとpipelineを設定することである.
2)、NioSocketChannelConfigのインスタンス化を参照してください.主にjavaSocketが保存する、
3.スレッドの割り当てとSelectorの登録
サービス側が初期化を開始すると
サービス側pipeline全体の構造を下図に示す.
新しい接続アクセス時に
サーバBootstrapの内部クラスであり、
1.クライアントチャネルのpipelineにchildHandlerを追加する
2.クライアントTCP関連属性childOptionsとカスタム属性childAttrsの設定
3.workGroup NioEventLoopを選択してSelectorを登録する
1)、クライアントchannelのpipelineにchildHandlerを追加する
クライアント
2)、クライアントTCP関連属性childOptionsおよびカスタム属性childAttrsを設定することは、
3)、workGroupはNioEventLoopを選択し、Selectorを登録する
最後に、
4.Selectorへの読み取りイベントの登録
a)、入口:
b)、実際に
c)、
d)、読み出しイベントは、末尾のTailContent#read()からトリガーされ、ctxが順次実行する.read()は,末尾から各outboundHandlerのread()イベントがトリガーされる.頭まで.
e)、ヘッダHeadContext#read()に入り、最終的にselectionKeyを変更し、selectorに読み取りイベントを登録する
HeadContext#read()
AbstractChannel#beginRead()
AbstractNioMessageChannel#doBeginRead
AbstractNioChannel#doBeginRead()
参考記事:Jorgezhong
まとめ
Nettyが新しい接続の基本プロセスにどのようにアクセスするかは上述したように、間違いがあれば、ご指摘ください.まず前の2編から見たほうが理解しやすいことをお勧めします.
【Netty】サービス側とクライアント学習NioEventLoop
最後に
Java、ビッグデータに興味があればQRコードを長く押して注目してください.私はあなたたちに価値をもたらすように努力します.あなたに少しでも役に立つと思う人は、「いいね」や「転送」を手伝ってください.公式アカウント「愛コード」に注目し、2019に返信するには関連資料がありますよ.
プログラミングが好きです
バックグラウンドの返信が必要な場合
2019贈呈
1 Tの学习资料よ!!
前文に続き、本は前回【NioEventLoop】に続きます.NioEventLoop実行プロセスを検討する際、IOイベント(新しい接続を含む)を検出し、IOイベントを処理し、すべてのタスクの3つのプロセスを実行します.ここで、IOイベントのうち、保有するselectorによってイベントがポーリングされ、新しい接続が検出される.ここでは同じコードを多重化します.
チャンネルのデザイン
分析を始める前に、まずChannelのデザインを知っておきましょう.
最上位チャネルインタフェースは、読み取り、書き込み、接続、バインドなどのsocketイベントを定義し、AbstractChannelをスケルトンとして使用してこれらの方法を実装します.ビューアメンバー変数、ほとんどの共通のコンポーネントが定義されていることがわかります.
第2層AbstractNioChannelは、NIO、すなわちSelectorによる読み書きイベントの傍受を定義する.メンバー変数にはselector関連のプロパティが保存されます.
第3層の内容は比較的多く,サービス側channel(左にAbstractNioMessageChannelを継承したNioServerSocketChannel)およびクライアントchannel(右にAbstractNioByteChannelを継承したNioSocketChannel)が定義されている.
新しい接続にアクセスする方法
Nettyが新しい接続にどのようにアクセスするかを探り始めました.主に四つの部分に分かれている
1.新しい接続の検出
2.NioSocketChannelの作成
3.スレッドの割り当てとSelectorの登録
4.Selectorへの読み取りイベントの登録
1.新しい接続の検出
Nettyサービス側は、起動時にbossGroup、すなわちNioEventLoopをバインドし、
bind()
でポートをバインドするときにaccept(新しい接続アクセス)イベントを登録します.イベントをスキャンして処理します.そのため、入り口はNioEventLoop#processSelectedKeys()
から始まります. private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// NioEventLoop workGroup OP_READ,bossGroup OP_ACCEPT
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//
unsafe.read();
}
}
重要な新しい接続アクセスおよび読み取りイベント処理エントリ
unsafe.read();
a).ここでのunsafe
は、Channel作成プロセスの際に親AbstractChannel#AbstractChannel()
の構造方法を呼び出し、pipeline
とともに初期化される. protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
サービス側:unsafeは
NioServerSockeChanne
lの親AbstractNioMessageChannel#newUnsafe()で作成され、対応するAbstractNioMessageChannelの内部クラスNioMessageUnsafe
が表示されます.クライアント:unsafeが
NioSocketChannel
の親AbstractNioUnsafe#newUnsafe()を作成すると、AbstractNioByteChannelの内部クラスNioByteUnsafe
に対応しますb).unsafe.read()
NioMessageUnsafe.read()
の主な操作は以下の通りです.1.jdkの最下位のコードをループしてchannelを作成し、nettyのNioSocketChannelでパッケージし、新しい接続がチャネルに正常にアクセスしたことを示します.
2.取得したすべてのチャネルを1つのコンテナに格納し、アクセスの接続数を検出し、デフォルトでは1回に16の接続を接続する
3.コンテナ内のチャネルを巡回し、メソッドfireChannelReadを順次呼び出し、4.FireChannelReadComplete,fireExceptionCaughtは、対応する伝播イベントをトリガする.
private final class NioMessageUnsafe extends AbstractNioUnsafe {
//
private final List
このキーコード論理の
int localRead = doReadMessages(readBuf);
は、jdk最下位のchannelを作成し、NioSocketChannelでパッケージし、受信したコンテナに追加して保存し、カウントを返します.protected int doReadMessages(List buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// jdk channel netty channel,
//this channel
buf.add(new NioSocketChannel(this, ch));
// ,
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
2.NioSocketChannelの作成
IOイベントを検出して新しい接続をポーリングすると、接続アクセスイベントが正常に検出された後、
NioServerSocketChannel#doReadMessages()
メソッドが呼び出され、NioSocketChannel
、すなわちクライアントchannelを作成するプロセスが行われる.NioSocketChannelの主な仕事について説明します.元のコードを表示するには、親構造メソッドを呼び出し、NioSocketChannelConfigをインスタンス化します.
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
// NioSocketChannelConfig
config = new NioSocketChannelConfig(this, socket.socket());
}
1)、NioSocketChannelの親の構築方法を表示します.主に、クライアントが登録した読み取りイベントを保存し、channelをメンバー変数とし、ブロックモードを非ブロックに設定します.
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
// NioSocketChannelConfig
config = new NioSocketChannelConfig(this, socket.socket());
}
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
// : channel
super(parent, ch, SelectionKey.OP_READ);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
// channel
this.ch = ch;
//
this.readInterestOp = readInterestOp;
try {
//
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
最後に親クラスを呼び出す構造方法は、クライアントchannelに対応するサービス側channelと、channelのidと2つのコンポーネントunsafeとpipelineを設定することである.
protected AbstractChannel(Channel parent) {
//parent channel channel( )
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
2)、NioSocketChannelConfigのインスタンス化を参照してください.主にjavaSocketが保存する、
setTcpNoDelay(true);
によってtcpのNagleアルゴリズムが禁止されているが、できるだけ小さなパケットを大きな送信に統合し、遅延を低減することを目的としている. private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
super(channel, javaSocket);
calculateMaxBytesPerGatheringWrite();
}
public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {
super(channel);
if (javaSocket == null) {
throw new NullPointerException("javaSocket");
}
// socket
this.javaSocket = javaSocket;
// Enable TCP_NODELAY by default if possible.
if (PlatformDependent.canEnableTcpNoDelayByDefault()) {
try {
// Nagle ,
setTcpNoDelay(true);
} catch (Exception e) {
// Ignore.
}
}
}
3.スレッドの割り当てとSelectorの登録
サービス側が初期化を開始すると
ServerBootstrap#init()
で、主にいくつかのパラメータの構成が行われています.ここで、childGroup,childOptions,childAttrs,childHandler
等のパラメータについては個別に構成されている.パラメータとしてServerBootstrapAcceptor
とともに特殊なhandleとしてpipelineにカプセル化される.ServerBootstrapAcceptor
のうちeventLoop
はworkGroup
である.public class ServerBootstrap extends AbstractBootstrap {
// .............
@Override
void init(Channel channel) throws Exception {
// AbstractBootstrap.option
final Map, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
// AbstractBootstrap.attr
final Map, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey key = (AttributeKey) e.getKey();
channel.attr(key).set(e.getValue());
}
}
// pipeline
ChannelPipeline p = channel.pipeline();
// ServerBootstrapAcceptor
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry, Object>[] currentChildOptions;
final Entry, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// AbstractBootstrap.handler
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// ServerBootstrapAcceptor, Handle HeadContext
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
// .............
}
サービス側pipeline全体の構造を下図に示す.
bossGroup
はIOイベントの検出と処理を制御し、bossGroup
全体に対応するpipelineはヘッダ(HeadContext
)テール(TailContext
)と中部のServerBootstrap.ServerBootstrapAcceptor
のみを含む.新しい接続アクセス時に
AbstractNioMessageChannel.NioMessageUnsafe#read()
メソッドが呼び出され、最終的にfireChannelRead()
が呼び出され、次のHandlerのchannelRead
メソッドがトリガーされる.このHandlerはServerBootstrapAcceptor
ですサーバBootstrapの内部クラスであり、
ChannelInboundHandlerAdapter
から継承されています.ChannelInboundHandler
ですその中でchannelReadは主に以下のことをしました.1.クライアントチャネルのpipelineにchildHandlerを追加する
2.クライアントTCP関連属性childOptionsとカスタム属性childAttrsの設定
3.workGroup NioEventLoopを選択してSelectorを登録する
1)、クライアントchannelのpipelineにchildHandlerを追加する
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry, Object>[] childOptions;
private final Entry, Object>[] childAttrs;
private final Runnable enableAutoReadTask;
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry, Object>[] childOptions, Entry, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
// 。。。。。
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// channel channel
final Channel child = (Channel) msg;
// childHandler
child.pipeline().addLast(childHandler);
// TCP :childOptions
setChannelOptions(child, childOptions, logger);
// :childAttrs
for (Entry, Object> e: childAttrs) {
child.attr((AttributeKey) e.getKey()).set(e.getValue());
}
try {
// NioEventLoop Selector
childGroup.register(child)
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
// 。。。。。
}
クライアント
channel
のpipelineはchildHandler
を追加し、サービス側EchoServer作成プロセスにおいてchildHandlerの場合、ChannelInitializer
のカスタムインスタンスを使用した.initChannel
の方法をカバーし、方法を変更してpipelineを取得し、具体的なHandlerを追加した.ChannelInitializer
の追加ロジック、handlerAdded
メソッドを参照してください.実はinitChannel
ロジックでは、まずユーザコード実行initChannel
にコールバックし、ユーザコードはHandlerを追加する追加操作を実行し、その後ChannelInitializer自身をpipelineから削除する.public abstract class ChannelInitializer extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
// Channel
if (initChannel(ctx)) {
// We are done with init the Channel, removing the initializer now.
removeState(ctx);
}
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
//
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
//
pipeline.remove(this);
}
}
return true;
}
return false;
}
}
2)、クライアントTCP関連属性childOptionsおよびカスタム属性childAttrsを設定することは、
ServerBootstrapAcceptor#init()
の方法ですでに実現されている3)、workGroupはNioEventLoopを選択し、Selectorを登録する
AbstractBootstrap#initAndRegister()
メソッドから開始し、ソースコードを追跡するとAbstractUnsafe#register()
メソッドに到達する protected abstract class AbstractUnsafe implements Unsafe {
// 。。。。。
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
// 。。。。。
}
最後に、
AbstractNioUnsafe#doRegister()
メソッドが呼び出され、jdkのjavaChannel().register
によって登録機能が完了する.
protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
// 。。。。。
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
// 。。。。。
}
4.Selectorへの読み取りイベントの登録
a)、入口:
ServerBootstrap.ServerBootstrapAcceptor#channelRead()#childGroup.register()
; public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry, Object> e: childAttrs) {
child.attr((AttributeKey) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
b)、実際に
AbstractChannel.AbstractUnsafe#register0()
が呼び出され、チャネルアクティブ化イベントがトリガーされる. // , HeadContent
pipeline.fireChannelActive();
c)、
pipeline
の頭部から始まり、すなわちDefaultChannelPipeline.HeadContext#channelActive()
によってreadIfIsAutoRead()
がトリガーされる. @Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
readIfIsAutoRead();
}
d)、読み出しイベントは、末尾のTailContent#read()からトリガーされ、ctxが順次実行する.read()は,末尾から各outboundHandlerのread()イベントがトリガーされる.頭まで.
@Override
public final ChannelPipeline read() {
tail.read();
return this;
}
@Override
public ChannelHandlerContext read() {
// outboundhandler
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
// read
if (executor.inEventLoop()) {
next.invokeRead();
} else {
Tasks tasks = next.invokeTasks;
if (tasks == null) {
next.invokeTasks = tasks = new Tasks(next);
}
executor.execute(tasks.invokeReadTask);
}
return this;
}
e)、ヘッダHeadContext#read()に入り、最終的にselectionKeyを変更し、selectorに読み取りイベントを登録する
HeadContext#read()
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
AbstractChannel#beginRead()
@Override
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
AbstractNioMessageChannel#doBeginRead
@Override
protected void doBeginRead() throws Exception {
if (inputShutdown) {
return;
}
super.doBeginRead();
}
AbstractNioChannel#doBeginRead()
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
参考記事:Jorgezhong
まとめ
Nettyが新しい接続の基本プロセスにどのようにアクセスするかは上述したように、間違いがあれば、ご指摘ください.まず前の2編から見たほうが理解しやすいことをお勧めします.
【Netty】サービス側とクライアント学習NioEventLoop
最後に
Java、ビッグデータに興味があればQRコードを長く押して注目してください.私はあなたたちに価値をもたらすように努力します.あなたに少しでも役に立つと思う人は、「いいね」や「転送」を手伝ってください.公式アカウント「愛コード」に注目し、2019に返信するには関連資料がありますよ.