libp 2 p-rs上で新しいプロトコルを開発する


この文書では、floodsubを例に、libp2p-rsで新しいプロトコルを開発する方法について説明します.詳細なコードは、ソースコードを参照してください.
2つのtraitを実現
libp 2 p-rsでは、swarmは2つのtraitを提供します.
  • Notifieeは、swarmの通知を受信するために使用され、新しい接続が作成されたり、接続が閉じたりすると、swarmはconnected()またはdisconnected()を呼び出します.
  • ProtocolHandlerは、プロトコルのデータを読み書きするために使用されます.プロトコルの交渉が成功すると、swarmはhandle()を呼び出します.
  • /// Notifiee is an trait for an object wishing to receive notifications from swarm
    pub trait Notifiee {
        /// called when a connection opened
        fn connected(&mut self, _conn: &mut Connection) {}
        /// called when a connection closed
        fn disconnected(&mut self, _conn: &mut Connection) {}
    }
    
    /// Common trait for upgrades that can be applied on inbound substreams, outbound substreams,
    /// or both.
    /// Possible upgrade on a connection or substream.
    #[async_trait]
    pub trait ProtocolHandler: UpgradeInfo + Notifiee {
        /// After we have determined that the remote supports one of the protocols we support, this
        /// method is called to start handling the inbound. Swarm will start invoking this method
        /// in a newly spawned task.
        ///
        /// The `info` is the identifier of the protocol, as produced by `protocol_info`.
        async fn handle(&mut self, stream: Substream, info: ::Info) -> Result>;
        /// This is to provide a clone method for the trait object.
        fn box_clone(&self) -> IProtocolHandler;
    }

    floodsub handler実装NotifieとProtocolHandler
    #[derive(Clone)]
    pub struct Handler {
        incoming_tx: mpsc::UnboundedSender,
        new_peer: mpsc::UnboundedSender,
    }
    
    impl Handler {
        pub(crate) fn new(incoming_tx: mpsc::UnboundedSender, new_peer: mpsc::UnboundedSender) -> Self {
            Handler { incoming_tx, new_peer }
        }
    }
    
    impl UpgradeInfo for Handler {
        type Info = &'static [u8];
    
        fn protocol_info(&self) -> Vec<:info> {
            vec![FLOOD_SUB_ID]
        }
    }
    
    impl Notifiee for Handler {
        fn connected(&mut self, conn: &mut Connection) {
            let peer_id = conn.remote_peer();
            let mut new_peers = self.new_peer.clone();
            task::spawn(async move {
                let _ = new_peers.send(PeerEvent::NewPeer(peer_id)).await;
            });
        }
    }
    
    #[async_trait]
    impl ProtocolHandler for Handler {
        async fn handle(&mut self, mut stream: Substream, _info: ::Info) -> Result> {
            loop {
                /* recv, decode and send to msg process mainloop */
                self.incoming_tx.send(rpc).await.map_err(|_| FloodsubDecodeError::ProtocolExit)?;
            }
        }
    
        fn box_clone(&self) -> IProtocolHandler {
            Box::new(self.clone())
        }
    }

    swarmに登録
    let floodsub = FloodSub::new(FloodsubConfig::new(local_peer_id));
    let handler = floodsub.handler();
    
    let mut swarm = Swarm::new(local_key.public()).with_protocol(Box::new(handler))

    何が必要なの?
    簡単なプロトコル、例えばechoでは、すべてのことがProtocolHandlerにあります.handle()で処理すればいいので、ここで終わります.
    floodsubのような少し複雑なプロトコルは、swarmの通知と受信したデータを、メッセージ処理のメインサイクルに送信して処理し、リアルタイムで状態を更新することが望ましい.
    impl floodsub {
        pub fn start(mut self, control: Swarm_Control) {
            self.control = Some(control);
        
            // well, self 'move' explicitly,
            let mut floodsub = self;
            task::spawn(async move {
                let _ = floodsub.process_loop().await;
            });
        }
    
        /// Message Process Loop.
        pub async fn process_loop(&mut self) -> Result {
            loop {
                select! {
                    cmd = self.peer_rx.next() => {
                        self.handle_peer_event(cmd).await;
                    }
                    rpc = self.incoming_rx.next() => {
                        self.handle_incoming_rpc(rpc).await?;
                    }
                    cmd = self.control_rx.next() => {
                        self.on_control_command(cmd).await?;
                    }
                    sub = self.cancel_rx.next() => {
                        self.un_subscribe(sub).await?;
                    }
                }
            }
        }
    }

    上から分かるように、floodsubメッセージ処理のメインループは1つのtaskで実行され、start()時にselfを渡す必要があるため、後続のパブリケーション購読などの操作はchannelを通じてメッセージを送るしかない.これがcontrolとhandlerがchannelを包む理由である.
    #[derive(Clone)]
    pub struct Control {
        config: FloodsubConfig,
        control_sender: mpsc::UnboundedSender,
    }
    
    impl Control {
        /// Subscribe to messages on a given topic.
        pub async fn subscribe(&mut self, topic: Topic) -> Option {
            let (tx, rx) = oneshot::channel();
            self.control_sender
                .send(ControlCommand::Subscribe(topic, tx))
                .await
                .expect("control send subscribe");
            rx.await.expect("Subscribe")
        }
    }

    新しい接続が作成されるとfloodsubはアクティブにストリームを作成し、交渉が通過した後、本ノードの興味のあるtopicを相手に送信します.そのため、ここではswarmのコントロールが必要です.
    self.control.as_mut().unwrap().new_stream(pid, vec![FLOOD_SUB_ID]).await;

    まとめ
    libp 2 p-rsの上で簡単なプロトコルを開発して、ただ2歩だけ必要で、少し複雑なプロトコルに対してhandlerとcontrolのような小包channelの構造が必要で、メッセージをプロトコルメッセージ処理の主循環に送って、プロトコル全体の運転を駆動して、特定の機能を完成します.
    Netwarpsは、金融、電力、通信、インターネット業界で非常に豊富な経験を持つ国内のベテランクラウドコンピューティングと分散技術開発チームから構成されています.Netwarpsは現在、深セン、北京に研究開発センターを設立し、チームの規模は30+で、その大部分は10年以上の開発経験を持つ技術者で、それぞれインターネット、金融、クラウドコンピューティング、ブロックチェーン、科学研究機関などの専門分野から来ている.
    Netwarpsは安全なストレージ技術製品の研究開発と応用に専念し、主な製品はデセンタ化ファイルシステム(DFS)、デセンタ化コンピューティングプラットフォーム(DCP)があり、デセンタ化ネットワーク技術に基づいて実現した分布式ストレージと分布式コンピューティングプラットフォームを提供することに力を入れ、高利用可能、低消費電力、低ネットワークの技術特徴を持ち、モノのインターネット、工業インターネットなどのシーン.
    公衆番号:Netwarps