dubboシーケンス化メカニズムのhessian 2シーケンス化実現原理分析
105453 ワード
リモート通信では、データの永続化伝送の問題に関連することが多い.大きく言えば、Aから発信されたメッセージが、どのようにBに同じメッセージの内容を受信できるのか!小さい点で言えば、符号化と復号化の問題です!
一方、dubboまたはjavaのリモート通信では、コーデックはシーケンス化と逆シーケンス化を伴うことが多い.
通常javaオブジェクトのシーケンス化には、一般的にいくつかのステップがあります.
1.Serializableインタフェースを実現する;
2.シリアル番号を生成します:serialVersionUID、(必須ではありませんが、推奨);
3.writeObject()/readObject()カスタムシーケンス化を書き換える必要があれば;
4.java.io.ObjectOutputStreamのwriteObject()/readObject()を呼び出してシーケンス化と逆シーケンス化を行う.
簡単でしょうが、市場には多くのシーケンス化の枠組みがあることを知っています.どうしてですか.速度が速く、体積が小さい必要があるからです.
今日はdubboのデフォルトのシーケンス化器Hession 2がどのようにしているかを詳しく見てみましょう.
サーバの初期化から、デフォルトのdubboを使用してnettyに基づいてserverを作成することがわかります.
serverではパイプを使用して通信しています.主に3つのChannelHandlerがあります.
1.decoder、メッセージ復号を担当し、nettyインフラストラクチャに依存する.
2.encoderは、nettyインフラストラクチャに依存するメッセージの符号化を担当する.(本稿の主な目標)
3.業務処理のhandler、NettyHandler;
これらのパイプの流れはnettyで述べたように、アウトバウンドとインバウンドのステップに従って流れます.ここではアウトバウンドリクエストについて説明するので、handler->encoder->decoderに進みます.
HeaderExchangeChannelのカプセル化後、Requestリクエストがあり、次はリモートに送信するプロセスです!いくつかのポイント:
1.各要求にはシリアル番号があり、順次増加する.
2.双方向通信、すなわちtwoWay=trueに設定し、要求を送信しても受信してもよい.
3.DefaultFutureパッケージを使用して戻り値をカプセル化し、非同期結果を受信する.
future結果の処理については、DubboInvokerに戻り、どのように処理されているかを見てみましょう.
次はSocketの送信ネットワークの流れに入ります.どうすればいいか見てみましょう.(注:現在のデータは元のデータであり、シーケンス化されていません)
次にhandlerのwriteRequest()を呼び出し、pipelineパイプ呼び出しを行います.
そしてエンコード操作!
具体的な書き込み形式は、要求ヘッダマジック数->要求シーケンス化方式識別->要求タイプ識別->要求シーケンス番号->body
次にhessianがどのようにデータシーケンス化されているかを見てみましょう.実は呼び出しです hessianのwriteObject()メソッドの原理!
stringの書き方を見てみましょう!
一方、dubboまたはjavaのリモート通信では、コーデックはシーケンス化と逆シーケンス化を伴うことが多い.
通常javaオブジェクトのシーケンス化には、一般的にいくつかのステップがあります.
1.Serializableインタフェースを実現する;
2.シリアル番号を生成します:serialVersionUID、(必須ではありませんが、推奨);
3.writeObject()/readObject()カスタムシーケンス化を書き換える必要があれば;
4.java.io.ObjectOutputStreamのwriteObject()/readObject()を呼び出してシーケンス化と逆シーケンス化を行う.
簡単でしょうが、市場には多くのシーケンス化の枠組みがあることを知っています.どうしてですか.速度が速く、体積が小さい必要があるからです.
今日はdubboのデフォルトのシーケンス化器Hession 2がどのようにしているかを詳しく見てみましょう.
サーバの初期化から、デフォルトのdubboを使用してnettyに基づいてserverを作成することがわかります.
// com.alibaba.dubbo.remoting.transport.netty.NettyServer
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org.browse.NETTY-365
// https://issues.jboss.org.browse.NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
// encoder netty : NettyCodecAdapter.InternalDecoder extends SimpleChannelUpstreamHandler
pipeline.addLast("decoder", adapter.getDecoder());
// decoder netty : NettyCodecAdapter.InternalEncoder extends OneToOneEncoder
pipeline.addLast("encoder", adapter.getEncoder());
// handler
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
serverではパイプを使用して通信しています.主に3つのChannelHandlerがあります.
1.decoder、メッセージ復号を担当し、nettyインフラストラクチャに依存する.
2.encoderは、nettyインフラストラクチャに依存するメッセージの符号化を担当する.(本稿の主な目標)
3.業務処理のhandler、NettyHandler;
これらのパイプの流れはnettyで述べたように、アウトバウンドとインバウンドのステップに従って流れます.ここではアウトバウンドリクエストについて説明するので、handler->encoder->decoderに進みます.
// com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
// Request : private static final AtomicLong INVOKE_ID = new AtomicLong(0);
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
// com.alibaba.dubbo.remoting.transport.netty.NettyClient send()
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
HeaderExchangeChannelのカプセル化後、Requestリクエストがあり、次はリモートに送信するプロセスです!いくつかのポイント:
1.各要求にはシリアル番号があり、順次増加する.
2.双方向通信、すなわちtwoWay=trueに設定し、要求を送信しても受信してもよい.
3.DefaultFutureパッケージを使用して戻り値をカプセル化し、非同期結果を受信する.
// NettyClient, com.alibaba.dubbo.remoting.transport.AbstractPeer
@Override
public void send(Object message) throws RemotingException {
send(message, url.getParameter(Constants.SENT_KEY, false));
}
// NettyClient, com.alibaba.dubbo.remoting.transport.AbstractClient
@Override
public void send(Object message, boolean sent) throws RemotingException {
if (send_reconnect && !isConnected()) {
connect();
}
Channel channel = getChannel();
//TODO Can the value returned by getChannel() be null? need improvement.
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
}
// channel
channel.send(message, sent);
}
// com.alibaba.dubbo.remoting.transport.netty.NettyChannel
@Override
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
// , Future
ChannelFuture future = channel.write(message);
if (sent) {
// ,
// , DubboInvoker
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
success = future.await(timeout);
}
// ,
Throwable cause = future.getCause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
future結果の処理については、DubboInvokerに戻り、どのように処理されているかを見てみましょう.
// com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
// ,
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
// , future ,
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter
次はSocketの送信ネットワークの流れに入ります.どうすればいいか見てみましょう.(注:現在のデータは元のデータであり、シーケンス化されていません)
// org.jboss.netty.channel.socket.nio.NioClientSocketChannel, org.jboss.netty.channel.AbstractChannel
public ChannelFuture write(Object message) {
return Channels.write(this, message);
}
// org.jboss.netty.channel.Channels
/**
* Sends a {@code "write"} request to the last
* {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of
* the specified {@link Channel}.
*
* @param channel the channel to write a message
* @param message the message to write to the channel
*
* @return the {@link ChannelFuture} which will be notified when the
* write operation is done
*/
public static ChannelFuture write(Channel channel, Object message) {
return write(channel, message, null);
}
// , , future ! pipeline , ,
/**
* Sends a {@code "write"} request to the last
* {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of
* the specified {@link Channel}.
*
* @param channel the channel to write a message
* @param message the message to write to the channel
* @param remoteAddress the destination of the message.
* {@code null} to use the default remote address
*
* @return the {@link ChannelFuture} which will be notified when the
* write operation is done
*/
public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {
ChannelFuture future = future(channel);
channel.getPipeline().sendDownstream(
new DownstreamMessageEvent(channel, future, message, remoteAddress));
return future;
}
// org.jboss.netty.channel.DefaultChannelPipeline
public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
if (tail == null) {
try {
getSink().eventSunk(this, e);
return;
} catch (Throwable t) {
notifyHandlerException(e, t);
return;
}
}
// tail
sendDownstream(tail, e);
}
void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
if (e instanceof UpstreamMessageEvent) {
throw new IllegalArgumentException("cannot send an upstream event to downstream");
}
try {
// pipeline handler handleDownstream(), NettyHandler
((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
} catch (Throwable t) {
// Unlike an upstream event, a downstream event usually has an
// incomplete future which is supposed to be updated by ChannelSink.
// However, if an exception is raised before the event reaches at
// ChannelSink, the future is not going to be updated, so we update
// here.
e.getFuture().setFailure(t);
notifyHandlerException(e, t);
}
}
// NettyHandler, org.jboss.netty.channel.SimpleChannelHandler
/**
* {@inheritDoc} Down-casts the received downstream event into more
* meaningful sub-type event and calls an appropriate handler method with
* the down-casted event.
*/
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof MessageEvent) {
// , ctx tail
writeRequested(ctx, (MessageEvent) e);
} else if (e instanceof ChannelStateEvent) {
//
ChannelStateEvent evt = (ChannelStateEvent) e;
switch (evt.getState()) {
case OPEN:
if (!Boolean.TRUE.equals(evt.getValue())) {
closeRequested(ctx, evt);
}
break;
case BOUND:
if (evt.getValue() != null) {
bindRequested(ctx, evt);
} else {
unbindRequested(ctx, evt);
}
break;
case CONNECTED:
if (evt.getValue() != null) {
connectRequested(ctx, evt);
} else {
disconnectRequested(ctx, evt);
}
break;
case INTEREST_OPS:
setInterestOpsRequested(ctx, evt);
break;
default:
ctx.sendDownstream(e);
}
} else {
ctx.sendDownstream(e);
}
}
次にhandlerのwriteRequest()を呼び出し、pipelineパイプ呼び出しを行います.
// NettyHandler, com.alibaba.dubbo.remoting.transport.netty.NettyHandler
@Override
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
// , SimpleChannelHandler,
super.writeRequested(ctx, e);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
handler.sent(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
// org.jboss.netty.channel.SimpleChannelHandler
/**
* Invoked when {@link Channel#write(Object)} is called.
*/
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
ctx.sendDownstream(e);
}
// org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext
public void sendDownstream(ChannelEvent e) {
// , , 。 : pipeline ,
// pipeline
DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
if (prev == null) {
try {
getSink().eventSunk(DefaultChannelPipeline.this, e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
} else {
DefaultChannelPipeline.this.sendDownstream(prev, e);
}
}
// DefaultChannelPipeline.this.sendDownstream() , handler ,
void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
if (e instanceof UpstreamMessageEvent) {
throw new IllegalArgumentException("cannot send an upstream event to downstream");
}
try {
((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
} catch (Throwable t) {
// Unlike an upstream event, a downstream event usually has an
// incomplete future which is supposed to be updated by ChannelSink.
// However, if an exception is raised before the event reaches at
// ChannelSink, the future is not going to be updated, so we update
// here.
e.getFuture().setFailure(t);
notifyHandlerException(e, t);
}
}
そしてエンコード操作!
// encoder, InternalEncoder : org.jboss.netty.handler.codec.oneone.OneToOneEncoder
public void handleDownstream(
ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
if (!(evt instanceof MessageEvent)) {
ctx.sendDownstream(evt);
return;
}
MessageEvent e = (MessageEvent) evt;
//
if (!doEncode(ctx, e)) {
ctx.sendDownstream(e);
}
}
protected boolean doEncode(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Object originalMessage = e.getMessage();
// encode() ,
Object encodedMessage = encode(ctx, e.getChannel(), originalMessage);
if (originalMessage == encodedMessage) {
return false;
}
if (encodedMessage != null) {
// , , tcp , TruncatedChannelBuffer
// encode()
write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress());
}
return true;
}
// com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter$InternalEncoder
@Sharable
private class InternalEncoder extends OneToOneEncoder {
@Override
protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
try {
// codec encode(), DubboCountCodec, , codec url
codec.encode(channel, buffer, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ch);
}
// ChannelBuffers _buffer , ,
return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
}
}
// com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
codec.encode(channel, buffer, msg);
}
// DubboCountCodec, com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
if (msg instanceof Request) {
//
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
}
}
具体的な書き込み形式は、要求ヘッダマジック数->要求シーケンス化方式識別->要求タイプ識別->要求シーケンス番号->body
// com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec, com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Serialization serialization = getSerialization(channel);
// : -> -> -> -> body
// buffer
// header.
byte[] header = new byte[HEADER_LENGTH];
// set magic number.
Bytes.short2bytes(MAGIC, header);
// set request and serialization flag.
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
if (req.isEvent()) header[2] |= FLAG_EVENT;
// set request id.
Bytes.long2bytes(req.getId(), header, 4);
// encode request data.
int savedWriteIndex = buffer.writerIndex();
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
encodeEventData(channel, out, req.getData());
} else {
// , , JavaSerializer
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
int len = bos.writtenBytes();
checkPayload(channel, len);
Bytes.int2bytes(len, header, 12);
// write
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // write header.
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
// com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec ,
@Override
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
RpcInvocation inv = (RpcInvocation) data;
out.writeUTF(version);
out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
out.writeUTF(inv.getMethodName());
out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
Object[] args = inv.getArguments();
if (args != null)
for (int i = 0; i < args.length; i++) {
//
out.writeObject(encodeInvocationArgument(channel, inv, i));
}
out.writeObject(inv.getAttachments());
}
次にhessianがどのようにデータシーケンス化されているかを見てみましょう.実は呼び出しです hessianのwriteObject()メソッドの原理!
// com.alibaba.dubbo.common.serialize.hessian2.Hessian2ObjectOutput
@Override
public void writeObject(Object obj) throws IOException {
mH2o.writeObject(obj);
}
// com.alibaba.com.caucho.hessian.io.Hessian2Output
/**
* Writes any object to the output stream.
*/
@Override
public void writeObject(Object object)
throws IOException {
if (object == null) {
writeNull();
return;
}
Serializer serializer;
// : Serializer, JavaSerializer, writeObject() ,
serializer = findSerializerFactory().getSerializer(object.getClass());
serializer.writeObject(object, this);
}
// com.alibaba.com.caucho.hessian.io.SerializerFactory
/**
* Returns the serializer for a class.
*
* @param cl the class of the object that needs to be serialized.
* @return a serializer object for the serialization.
*/
@Override
public Serializer getSerializer(Class cl)
throws HessianProtocolException {
Serializer serializer;
// 1.
serializer = (Serializer) _staticSerializerMap.get(cl);
if (serializer != null) {
return serializer;
}
// 2. ,
if (_cachedSerializerMap != null) {
serializer = (Serializer) _cachedSerializerMap.get(cl);
if (serializer != null) {
return serializer;
}
}
// 3.
for (int i = 0;
serializer == null && _factories != null && i < _factories.size();
i++) {
AbstractSerializerFactory factory;
factory = (AbstractSerializerFactory) _factories.get(i);
serializer = factory.getSerializer(cl);
}
// 4.
if (serializer != null) {
} else if (isZoneId(cl)) //must before "else if (JavaSerializer.getWriteReplace(cl) != null)"
serializer = ZoneIdSerializer.getInstance();
else if (isEnumSet(cl))
serializer = EnumSetSerializer.getInstance();
else if (JavaSerializer.getWriteReplace(cl) != null)
serializer = new JavaSerializer(cl, _loader);
else if (HessianRemoteObject.class.isAssignableFrom(cl))
serializer = new RemoteSerializer();
// else if (BurlapRemoteObject.class.isAssignableFrom(cl))
// serializer = new RemoteSerializer();
else if (Map.class.isAssignableFrom(cl)) {
if (_mapSerializer == null)
_mapSerializer = new MapSerializer();
serializer = _mapSerializer;
} else if (Collection.class.isAssignableFrom(cl)) {
if (_collectionSerializer == null) {
_collectionSerializer = new CollectionSerializer();
}
serializer = _collectionSerializer;
} else if (cl.isArray()) {
//
serializer = new ArraySerializer();
} else if (Throwable.class.isAssignableFrom(cl)) {
serializer = new ThrowableSerializer(cl, getClassLoader());
} else if (InputStream.class.isAssignableFrom(cl)) {
serializer = new InputStreamSerializer();
} else if (Iterator.class.isAssignableFrom(cl)) {
serializer = IteratorSerializer.create();
} else if (Enumeration.class.isAssignableFrom(cl)) {
serializer = EnumerationSerializer.create();
} else if (Calendar.class.isAssignableFrom(cl)) {
serializer = CalendarSerializer.create();
} else if (Locale.class.isAssignableFrom(cl)) {
serializer = LocaleSerializer.create();
} else if (Enum.class.isAssignableFrom(cl)) {
serializer = new EnumSerializer(cl);
}
// 5. , Serializable
if (serializer == null) {
serializer = getDefaultSerializer(cl);
}
if (_cachedSerializerMap == null) {
_cachedSerializerMap = new ConcurrentHashMap(8);
}
// 6.
_cachedSerializerMap.put(cl, serializer);
return serializer;
}
/**
* Returns the default serializer for a class that isn't matched
* directly. Application can override this method to produce
* bean-style serialization instead of field serialization.
*
* @param cl the class of the object that needs to be serialized.
* @return a serializer object for the serialization.
*/
protected Serializer getDefaultSerializer(Class cl) {
if (_defaultSerializer != null)
return _defaultSerializer;
if (!Serializable.class.isAssignableFrom(cl)
&& !_isAllowNonSerializable) {
throw new IllegalStateException("Serialized class " + cl.getName() + " must implement java.io.Serializable");
}
return new JavaSerializer(cl, _loader);
}
// com.alibaba.com.caucho.hessian.ioSerializer
@Override
public void writeObject(Object obj, AbstractHessianOutput out)
throws IOException {
if (out.addRef(obj)) {
return;
}
Class cl = obj.getClass();
try {
// writeReplace() , ;
if (_writeReplace != null) {
Object repl;
if (_writeReplaceFactory != null)
repl = _writeReplace.invoke(_writeReplaceFactory, obj);
else
repl = _writeReplace.invoke(obj);
//Some class would return itself for wrapReplace, which would cause infinite recursion
//In this case, we could write the object just like normal cases
if (repl != obj) {
out.removeRef(obj);
out.writeObject(repl);
out.replaceRef(repl, obj);
return;
}
}
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
// log.log(Level.FINE, e.toString(), e);
throw new RuntimeException(e);
}
//
int ref = out.writeObjectBegin(cl.getName());
if (ref < -1) {
writeObject10(obj, out);
} else {
if (ref == -1) {
//
writeDefinition20(out);
out.writeObjectBegin(cl.getName());
}
//
writeInstance(obj, out);
}
}
// , , , ,
public void writeInstance(Object obj, AbstractHessianOutput out)
throws IOException {
for (int i = 0; i < _fields.length; i++) {
Field field = _fields[i];
_fieldSerializers[i].serialize(out, obj, field);
}
}
// , FieldSerializer JavaSerializer
static class FieldSerializer {
static final FieldSerializer SER = new FieldSerializer();
void serialize(AbstractHessianOutput out, Object obj, Field field)
throws IOException {
Object value = null;
try {
value = field.get(obj);
} catch (IllegalAccessException e) {
log.log(Level.FINE, e.toString(), e);
}
try {
// , value , value() ,
out.writeObject(value);
} catch (RuntimeException e) {
throw new RuntimeException(e.getMessage() + "
Java field: " + field,
e);
} catch (IOException e) {
throw new IOExceptionWrapper(e.getMessage() + "
Java field: " + field,
e);
}
}
}
stringの書き方を見てみましょう!
// String , out.writeString()
static class StringFieldSerializer extends FieldSerializer {
static final FieldSerializer SER = new StringFieldSerializer();
@Override
void serialize(AbstractHessianOutput out, Object obj, Field field)
throws IOException {
String value = null;
try {
value = (String) field.get(obj);
} catch (IllegalAccessException e) {
log.log(Level.FINE, e.toString(), e);
}
// string buffer
out.writeString(value);
}
}
// com.alibaba.com.caucho.hessian.io.Hessian2Output string byte ,
/**
* Writes a string value to the stream using UTF-8 encoding.
* The string will be written with the following syntax:
*
*
* S b16 b8 string-value
*
*
* If the value is null, it will be written as
*
*
* N
*
*
* @param value the string value to write.
*/
@Override
public void writeString(String value)
throws IOException {
int offset = _offset;
byte[] buffer = _buffer;
if (SIZE <= offset + 16) {
flush();
offset = _offset;
}
if (value == null) {
buffer[offset++] = (byte) 'N';
_offset = offset;
} else {
int length = value.length();
int strOffset = 0;
// い に して、セグメント き み
while (length > 0x8000) {
int sublen = 0x8000;
offset = _offset;
if (SIZE <= offset + 16) {
flush();
offset = _offset;
}
//chunk can't end in high surrogate
char tail = value.charAt(strOffset + sublen - 1);
if (0xd800 <= tail && tail <= 0xdbff)
sublen--;
buffer[offset + 0] = (byte) BC_STRING_CHUNK;
buffer[offset + 1] = (byte) (sublen >> 8);
buffer[offset + 2] = (byte) (sublen);
_offset = offset + 3;
printString(value, strOffset, sublen);
length -= sublen;
strOffset += sublen;
}
offset = _offset;
if (SIZE <= offset + 16) {
flush();
offset = _offset;
}
// さ を き み、string を き む
if (length <= STRING_DIRECT_MAX) {
//STRING_DIRECT_MAX = 0x1f
//b 0 き み
buffer[offset++] = (byte) (BC_STRING_DIRECT + length);
} else if (length <= STRING_SHORT_MAX) {
//STRING_SHORT_MAX = 0x3ff
//b8, 0x30 + x >> 8
buffer[offset++] = (byte) (BC_STRING_SHORT + (length >> 8));
buffer[offset++] = (byte) (length);
} else {
//<=0 x 8000、2バイト
//S len
buffer[offset++] = (byte) ('S');
buffer[offset++] = (byte) (length >> 8);
buffer[offset++] = (byte) (length);
}
_offset = offset;
printString(value, strOffset, length);
}
}
//string を き みutf-8で
/**
* Prints a string to the stream, encoded as UTF-8
*
* @param v the string to print.
*/
public void printString(String v, int strOffset, int length)
throws IOException {
int offset = _offset;
byte[] buffer = _buffer;
for (int i = 0; i < length; i++) {
if (SIZE <= offset + 16) {
_offset = offset;
flush();
offset = _offset;
}
char ch = v.charAt(i + strOffset);
if (ch < 0x80)
buffer[offset++] = (byte) (ch);
else if (ch < 0x800) {
buffer[offset++] = (byte) (0xc0 + ((ch >> 6) & 0x1f));
buffer[offset++] = (byte) (0x80 + (ch & 0x3f));
} else {
buffer[offset++] = (byte) (0xe0 + ((ch >> 12) & 0xf));
buffer[offset++] = (byte) (0x80 + ((ch >> 6) & 0x3f));
buffer[offset++] = (byte) (0x80 + (ch & 0x3f));
}
}
_offset = offset;
}
はJavaDeserializerで、シーケンス の に って、 に すればいいという え です. :
public JavaDeserializer(Class cl) {
_type = cl;
_fieldMap = getFieldMap(cl);
_readResolve = getReadResolve(cl);
if (_readResolve != null) {
_readResolve.setAccessible(true);
}
Constructor[] constructors = cl.getDeclaredConstructors();
long bestCost = Long.MAX_VALUE;
for (int i = 0; i < constructors.length; i++) {
Class[] param = constructors[i].getParameterTypes();
long cost = 0;
for (int j = 0; j < param.length; j++) {
cost = 4 * cost;
if (Object.class.equals(param[j]))
cost += 1;
else if (String.class.equals(param[j]))
cost += 2;
else if (int.class.equals(param[j]))
cost += 3;
else if (long.class.equals(param[j]))
cost += 4;
else if (param[j].isPrimitive())
cost += 5;
else
cost += 6;
}
if (cost < 0 || cost > (1 << 48))
cost = 1 << 48;
cost += (long) param.length << 48;
if (cost < bestCost) {
_constructor = constructors[i];
bestCost = cost;
}
}
if (_constructor != null) {
_constructor.setAccessible(true);
Class[] params = _constructor.getParameterTypes();
_constructorArgs = new Object[params.length];
for (int i = 0; i < params.length; i++) {
_constructorArgs[i] = getParamArg(params[i]);
}
}
}
static class ObjectFieldDeserializer extends FieldDeserializer {
private final Field _field;
ObjectFieldDeserializer(Field field) {
_field = field;
}
@Override
void deserialize(AbstractHessianInput in, Object obj)
throws IOException {
Object value = null;
try {
value = in.readObject(_field.getType());
_field.set(obj, value);
} catch (Exception e) {
logDeserializeError(_field, obj, value, e);
}
}
}
のデータを き むと、 workerをトリガーして を います! // org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink, , worker
private static void handleAcceptedSocket(ChannelEvent e) {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent event = (ChannelStateEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
channel.worker.close(channel, future);
}
break;
case BOUND:
case CONNECTED:
if (value == null) {
channel.worker.close(channel, future);
}
break;
case INTEREST_OPS:
channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
break;
}
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
boolean offered = channel.writeBufferQueue.offer(event);
assert offered;
channel.worker.writeFromUserCode(channel);
}
}
// org.jboss.netty.channel.socket.nio.AbstractNioWorker
void writeFromUserCode(final AbstractNioChannel> channel) {
if (!channel.isConnected()) {
// , buffer,
cleanUpWriteBuffer(channel);
return;
}
// ,
if (scheduleWriteIfNecessary(channel)) {
return;
}
// From here, we are sure Thread.currentThread() == workerThread.
if (channel.writeSuspended) {
return;
}
if (channel.inWriteNowLoop) {
return;
}
// ,
write0(channel);
}
protected static void cleanUpWriteBuffer(AbstractNioChannel> channel) {
Exception cause = null;
boolean fireExceptionCaught = false;
// Clean up the stale messages in the write buffer.
synchronized (channel.writeLock) {
MessageEvent evt = channel.currentWriteEvent;
if (evt != null) {
// Create the exception only once to avoid the excessive overhead
// caused by fillStackTrace.
if (channel.isOpen()) {
cause = new NotYetConnectedException();
} else {
cause = new ClosedChannelException();
}
ChannelFuture future = evt.getFuture();
if (channel.currentWriteBuffer != null) {
channel.currentWriteBuffer.release();
channel.currentWriteBuffer = null;
}
channel.currentWriteEvent = null;
// Mark the event object for garbage collection.
//noinspection UnusedAssignment
evt = null;
future.setFailure(cause);
fireExceptionCaught = true;
}
Queue writeBuffer = channel.writeBufferQueue;
for (;;) {
evt = writeBuffer.poll();
if (evt == null) {
break;
}
// Create the exception only once to avoid the excessive overhead
// caused by fillStackTrace.
if (cause == null) {
if (channel.isOpen()) {
cause = new NotYetConnectedException();
} else {
cause = new ClosedChannelException();
}
fireExceptionCaught = true;
}
evt.getFuture().setFailure(cause);
}
}
if (fireExceptionCaught) {
if (isIoThread(channel)) {
fireExceptionCaught(channel, cause);
} else {
fireExceptionCaughtLater(channel, cause);
}
}
}
protected void write0(AbstractNioChannel> channel) {
boolean open = true;
boolean addOpWrite = false;
boolean removeOpWrite = false;
boolean iothread = isIoThread(channel);
long writtenBytes = 0;
final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
final WritableByteChannel ch = channel.channel;
final Queue writeBuffer = channel.writeBufferQueue;
final int writeSpinCount = channel.getConfig().getWriteSpinCount();
List causes = null;
synchronized (channel.writeLock) {
channel.inWriteNowLoop = true;
for (;;) {
MessageEvent evt = channel.currentWriteEvent;
SendBuffer buf = null;
ChannelFuture future = null;
try {
if (evt == null) {
if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
removeOpWrite = true;
channel.writeSuspended = false;
break;
}
future = evt.getFuture();
channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
} else {
future = evt.getFuture();
buf = channel.currentWriteBuffer;
}
long localWrittenBytes = 0;
for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = buf.transferTo(ch);
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
}
if (buf.finished()) {
break;
}
}
if (buf.finished()) {
// Successful write - proceed to the next message.
buf.release();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
// Mark the event object for garbage collection.
//noinspection UnusedAssignment
evt = null;
buf = null;
future.setSuccess();
} else {
// Not written fully - perhaps the kernel buffer is full.
addOpWrite = true;
channel.writeSuspended = true;
if (writtenBytes > 0) {
// Notify progress listeners if necessary.
future.setProgress(
localWrittenBytes,
buf.writtenBytes(), buf.totalBytes());
}
break;
}
} catch (AsynchronousCloseException e) {
// Doesn't need a user attention - ignore.
} catch (Throwable t) {
if (buf != null) {
buf.release();
}
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
// Mark the event object for garbage collection.
//noinspection UnusedAssignment
buf = null;
//noinspection UnusedAssignment
evt = null;
if (future != null) {
future.setFailure(t);
}
if (iothread) {
// An exception was thrown from within a write in the iothread. We store a reference to it
// in a list for now and notify the handlers in the chain after the writeLock was released
// to prevent possible deadlock.
// See #1310
if (causes == null) {
causes = new ArrayList(1);
}
causes.add(t);
} else {
fireExceptionCaughtLater(channel, t);
}
if (t instanceof IOException) {
// close must be handled from outside the write lock to fix a possible deadlock
// which can happen when MemoryAwareThreadPoolExecutor is used and the limit is exceed
// and a close is triggered while the lock is hold. This is because the close(..)
// may try to submit a task to handle it via the ExecutorHandler which then deadlocks.
// See #1310
open = false;
}
}
}
channel.inWriteNowLoop = false;
// Initially, the following block was executed after releasing
// the writeLock, but there was a race condition, and it has to be
// executed before releasing the writeLock:
//
// https://issues.jboss.org/browse/NETTY-410
//
if (open) {
if (addOpWrite) {
setOpWrite(channel);
} else if (removeOpWrite) {
clearOpWrite(channel);
}
}
}
if (causes != null) {
for (Throwable cause: causes) {
// notify about cause now as it was triggered in the write loop
fireExceptionCaught(channel, cause);
}
}
if (!open) {
// close the channel now
close(channel, succeededFuture(channel));
}
//
if (iothread) {
fireWriteComplete(channel, writtenBytes);
} else {
fireWriteCompleteLater(channel, writtenBytes);
}
}