Netty-kryo高性能データ転送の統合

9357 ワード

前言本編はNettyに関する特集の第3編で、前の2編は以下の通りである:•高性能NIOフレームNetty入門編•高性能NIOフレームNetty-オブジェクト転送Nettyはオープンソースのjavaベースのネットワーク通信フレームワークであり、前編では高性能NIOフレームNetty-オブジェクト転送における対象の転送はカスタムコーデックであり、JDKのシーケンス化に基づいて実現された.実はNettyが持っているObjectコーデックはオブジェクトの伝送を実現することができて、しかもJDKのシーケンス化に基づいて、Kryoは性能のもっと良いjavaシーケンス化のフレームワークで、本文はKryoでJDKのシーケンス化を取って代わって高性能のデータ伝送を実現します.Kryoはあまり使われていないかもしれませんが、私が初めて見たKryoは、拡張されたdubboxの中で、KryoとFSTに基づくJavaの効率的なシーケンス化をサポートする主な機能があります.現在よく知られているKryoとFSTの高性能シーケンス化ライブラリに基づいて、DubboのデフォルトのRPCプロトコルに新しいシーケンス化実装を追加します.そのシーケンス化システムを最適化し調整し、Dubbo RPCの性能を著しく向上させた.詳細はドキュメントのベンチマークテストレポートを参照してください.RPCの性能を向上させるために、KryoとFSTの2つの高性能のシーケンス化方式を追加し、ベンチマークテストレポートアドレス:https://dangdangdotcom.github.io/dubbox/serialization.htmlKryo紹介Kryoは高速で効率的なJavaオブジェクトシーケンス化フレームワークです.このプロジェクトの目標は、速度、効率、使いやすいAPIです.このアイテムは、オブジェクトが永続化される必要がある場合に、ファイル、データベース、ネットワークを介して使用する場合に便利です.Kryoはまた、自動深層浅層の複製/クローンを実行することもできる.これはobject->bytes->objectではなく、オブジェクトからオブジェクトに直接コピーされます.先に紹介したdubboxがKryoを使用しているほか、多くのオープンソースフレームワークがKryoを使用しています.•KryoNet(NIO networking)•Twitter's Scalding(Scala API for Cascading)•Twitter's Chill(Kryo serializers for Scala)•Apache Fluo(Kryo is default serialization for Fluo Recipes)•Apache Hive(query plan serialization)•Apache Spark(shuffled/cached data serialization)•DataNucleus(JDO/JPA persistence framework)• CloudPelican• Yahoo's S4 (distributed stream computing)• Storm (distributed realtime computation system, in turn used by many others)• Cascalog (Clojure/Java data processing and querying details)• memcached-session-manager (Tomcat high-availability sessions)• Mobility-RPC (RPC enabling distributed applications)• akka-kryo-serialization (Kryo serializers for Akka)• Groupon• Jive• DestroyAllHumans (controls a robot!)• kryo-serializers(additional serializers)KryoはKryoを追加したMaven依存を簡単に使います.こちらは古いバージョンを使っています.dubboxのバージョンと一致しています.もちろん、最新の4.0バージョンも使えます.



    com.esotericsoftware.kryo
    kryo
    2.24.0



    de.javakaffee
    kryo-serializers
    0.26

次のシーケンス化と逆シーケンス化の機能を示すテストクラスを作成します.
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

public class KryoTest {

    public static void main(String[] args) throws FileNotFoundException {
        //    
        Kryo kryo = new Kryo();
        Output output = new Output(new FileOutputStream("file.bin"));
        Message someObject = new Message();
        someObject.setContent("     ");
        kryo.writeObject(output, someObject);
        output.close();
        //     
        Input input = new Input(new FileInputStream("file.bin"));
        Message message = kryo.readObject(input, Message.class);
        System.out.println(message.getContent());
        input.close();
    }

}

使用方法と詳細については、ドキュメントを参照してください.https://github.com/EsotericSoftware/kryoNettyKryoを統合してシーケンス化
  • 工場クラスKryoFactoryを作成し、Kryoオブジェクト
    
    import com.esotericsoftware.kryo.Kryo;
    import com.esotericsoftware.kryo.serializers.DefaultSerializers;
    import com.netty.im.core.message.Message;
    import de.javakaffee.kryoserializers.*;
    import java.lang.reflect.InvocationHandler;
    import java.math.BigDecimal;
    import java.math.BigInteger;
    import java.net.URI;
    import java.text.SimpleDateFormat;
    import java.util.*;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.regex.Pattern;
  • を作成します.
    public abstract class KryoFactory {
    private final static KryoFactory threadFactory = new ThreadLocalKryoFactory();
    
    protected KryoFactory() {
    }
    
    public static KryoFactory getDefaultFactory() {
        return threadFactory;
    }
    
    protected Kryo createKryo() {
        Kryo kryo = new Kryo();
        kryo.setRegistrationRequired(false);
        kryo.register(Message.class);
        kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
        kryo.register(GregorianCalendar.class, new GregorianCalendarSerializer());
        kryo.register(InvocationHandler.class, new JdkProxySerializer());
        kryo.register(BigDecimal.class, new DefaultSerializers.BigDecimalSerializer());
        kryo.register(BigInteger.class, new DefaultSerializers.BigIntegerSerializer());
        kryo.register(Pattern.class, new RegexSerializer());
        kryo.register(BitSet.class, new BitSetSerializer());
        kryo.register(URI.class, new URISerializer());
        kryo.register(UUID.class, new UUIDSerializer());
        UnmodifiableCollectionsSerializer.registerSerializers(kryo);
        SynchronizedCollectionsSerializer.registerSerializers(kryo);
        kryo.register(HashMap.class);
        kryo.register(ArrayList.class);
        kryo.register(LinkedList.class);
        kryo.register(HashSet.class);
        kryo.register(TreeSet.class);
        kryo.register(Hashtable.class);
        kryo.register(Date.class);
        kryo.register(Calendar.class);
        kryo.register(ConcurrentHashMap.class);
        kryo.register(SimpleDateFormat.class);
        kryo.register(GregorianCalendar.class);
        kryo.register(Vector.class);
        kryo.register(BitSet.class);
        kryo.register(StringBuffer.class);
        kryo.register(StringBuilder.class);
        kryo.register(Object.class);
        kryo.register(Object[].class);
        kryo.register(String[].class);
        kryo.register(byte[].class);
        kryo.register(char[].class);
        kryo.register(int[].class);
        kryo.register(float[].class);
        kryo.register(double[].class);
        return kryo;
    }

    }
    kryo       ,             ,                        ,                    。    kryo                ,            ,        。kryo.register()                  。
    2.    ThreadLocalKryoFactory  KryoFactory,           Kryo  ,     Kryo        。            Kryo,Input   Output   。  , bytes[] Input      ,               ,                  bytes[]。
    Kryo      /         ,          ,        Kryo   。           ThreadLocal   Kryo      Threads。

    import com.esotericsoftware.kryo.Kryo;
    public class ThreadLocalKryoFactory extends KryoFactory {
    private final ThreadLocal holder  = new ThreadLocal() {
        @Override
        protected Kryo initialValue() {
            return createKryo();
        }
    };
    
    public Kryo getKryo() {
        return holder.get();
    }

    }
    3.           KryoSerializer
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import com.esotericsoftware.kryo.Kryo;
    import com.esotericsoftware.kryo.io.Input;
    import com.esotericsoftware.kryo.io.Output;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.ByteBufInputStream;
    /**
     * Kryo   
     * @author yinjihuan
     *
     */
    public class KryoSerializer {
    
        private static final ThreadLocalKryoFactory factory = new ThreadLocalKryoFactory();
    
        public static void serialize(Object object, ByteBuf out) {
            Kryo kryo = factory.getKryo();
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            Output output = new Output(baos);
            kryo.writeClassAndObject(output, object);
            output.flush();
            output.close();
            byte[] b = baos.toByteArray();
            try {
                baos.flush();
                baos.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            out.writeBytes(b);
        }
    
        public static Object deserialize(ByteBuf out) {
            if (out == null) {
                return null;
            }
            Input input = new Input(new ByteBufInputStream(out));
            Kryo kryo = factory.getKryo();
            return kryo.readClassAndObject(input);
        }
    
    }
    4.  Netty   KryoEncoder     Kryo   
    import com.netty.im.core.serialize.kryo.KryoSerializer;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    
    public class KryoEncoder extends MessageToByteEncoder {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, Message message, ByteBuf out) throws Exception {
            KryoSerializer.serialize(message, out);
            ctx.flush();
        }
    
    }
    5.  Netty   KryoDecoder     Kryo    

    import java.util.List;import com.netty.im.core.serialize.kryo.KryoSerializer;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.ByteToMessageDecoder;
    public class KryoDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
        Object obj = KryoSerializer.deserialize(in);
        out.add(obj);
    }

    }
    6. Netty               Kryo       

    ch.pipeline().addLast("decoder", new KryoDecoder());ch.pipeline().addLast("encoder", new KryoEncoder());
    
               Netty   Kryo         ,          JDK     ,          。
        :https://github.com/yinjihuan/netty-im
                    :
    
     ![](https://s4.51cto.com/images/blog/202008/04/1cfc1930047d24121613c2b0053314e0.png?x-oss-process=image/watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=)