ipfs pubsubコード解読
16999 ワード
Pubsub:Publish-subscribe配信購読モード
バージョン:[email protected] [email protected]
本明細書では、ubuntuで、windowsで、ubuntuエンドのipfsをipfs 1、windowsエンドのipfsをipfs 2、windowsエンドのipfsをipfs 2で表す2つのノードを実行する.2つのノードは、互いのアドレスを互いのbootstrapに追加し、2つのノードからなるテストネットワークを形成する.
両者のpeerIDはそれぞれ:
両方のノードがdaemonを起動するときにパラメータを追加します:–enable-pubsub-experiment
両方のノードはbitswap/dht/namesys/pubsubのlogレベルをdebugに設定します
まずipfs 2は「phone」というtopicを購読します
続いてipfs 1 publish」、topicは「phone」、内容は「Moring」
最後にipfs 2の端末を見て
ipfs 2に別の端末を開き、ノードが現在購読しているtopicをクエリーします.
ipfs 1は「phone」というtopicを購読するpeerを検索し、topicが空の場合、デフォルトはすべてのtopicを検索します.
ipfs configファイルのpubsubのデフォルト構成について
DisableSigningはfalseであり、本ノードpublishメッセージを表す場合に署名が必要であり、StrictSignatureVerificationはfalseであり、署名なしpublishメッセージを受け入れることを示す.
Routerには3種類あり、デフォルトではFloodSubRouterが使用されていますが、RandomSubRouterは現在ipfsでは使用されていません.
Floodは、その名の通り、すべてのサブスクリプションノードに公開するという意味です.gossipは私語の意味で、6つの購読ノードに公開されます.randomはランダムという意味で,6つのサブスクリプションノードにランダムにパブリッシュする.注目すべきは、FloodSubRouterがベースプロトコルであり、GossipSubRouterとRandomSubRouterがFloodSubRouterをサポートしていることです.
明らかに、GossipSubRouterは最も使いやすく、最も複雑で、次は主にGossipSubRouterについて説明します.
GossipSubRouter構造:
GossipSubはtopic単位で2つのネットワークを維持し、それぞれmeshとfanoutであり、両方ともこのtopicを購読するノードを含み、自身が1つのtopicを購読した場合、mesh[topic]は空ではなく、fanoutは空である.サブスクリプションがない場合、mesh[topic]は空です.
publishの場合、まず自分がこのtopicを購読しているかどうかを判断します.すなわち、mesh[topic]が空であるかどうかを判断し、空でなければ、mesh[topic]のノードにpublishメッセージを送信します.空であればfanout[topic]が空であるか否かを判断し、空でなければfanout[topic]のノードにpublishメッセージを送信する.空の場合はpubsubから.topic[topic]でfanout[topic]にノードを6つまで補充し、最後にこれらのノードにpublishメッセージを送信します.
他のノードはpublishメッセージを受信すると、まずこのpublishメッセージを受信したかどうかを判断し、受信したら破棄する.以前に受信しなかった場合は、まずメッセージを検証し(publishメッセージに署名が付いている場合は署名を検証する)、ローカルのtopicを購読したオブジェクトにプッシュするだけでなく、そのメッセージをそのままpublishして、プロセスと同様にして、メッセージ伝達プロセスを完了するのが典型的なgossipプロセスです.
空間を回収するために,1つのtopicに対して1分間publishを行わないとdelete fanout[topic]となる.
go-libp2p-pubsub/gossipsub.go
Subscribeがtopicである場合、まず自分がこのtopicを購読したかどうかを判断し、pubsubを判断する.myTopics[topic]が空かどうか、空でなければ、自分が購読したことを表明して、直接return;空の場合、join gossip meshは、まずmesh[topic]が空であるかどうか、空でない場合は、以前にjoinしていたことを示し、直接return;空であればfanout[topic]が空かどうかを判断し、空でなければ1 min以内にpublishがこのtopicを通過したことを示すとfanout[topic]をmesh[topic]に移動しdelete fanout[topic]となる.空の場合はpubsubから.topics[topic]は6つのノードをmesh[topic]に補充し、これらの補充されたノードにiGraftメッセージを送信し、mesh[topic]に追加すると伝え、他のノードがiGraftメッセージを受信した後もmesh[topic]に追加します.
購読をキャンセルすると、まず自分がこのtopicを購読しているかどうかを判断します.すなわち、mesh[topic]が空であるかどうかを判断し、空であれば、自分が購読していないことを表明し、直接returnします.空でない場合は、自分が購読したことを示すメッセージを削除し、削除されたノードにiPruneメッセージを送信し、メッセージh[topic]ネットワークから削除したことを伝え、他のノードがiPruneメッセージを受信すると、メッセージh[topic]から削除します.
ネットワークノードがオンラインであることを確認するために、gossipは1 Sごとにハートビートパケットを送信し、ネットワークを維持する.ハートビートパックのもう一つの機能はiHave、iGraft、iPruneなどの情報交換です.
他のノードがハートビートパケットを受信すると,iHaveが自分の希望するメッセージを持っているか否かを判断し,もしあればiWantメッセージを送信する.
gossipSubは1 Sごとにmesh[topic]のノード数をチェックし、6ノード未満ではpubsubからチェックする.topics[topic]で補足し、これらの補足したノードにiGraftメッセージを送信し、mesh[topic]に追加すると伝え、他のノードがiGraftメッセージを受信した後もmesh[topic]に追加します.
6ノード以上であれば、余分なノードをランダムに削除し、削除されたノードにiPruneメッセージを送信し、mesh[topic]ネットワークから削除したことを伝え、他のノードがiPruneメッセージを受信すると、mesh[topic]から削除します.
pubsubはRPC通信を使用する
RPC_SubOptsは、あるtopicを購読するかどうかという購読情報をカプセル化しています.新しいpeerが接続されると、ipfs 1は新しいpeerと握手(Helo)し、握手内容はSubscriptions、すなわち現在購読されているtopicである.他のノードは、ipfs 1からのhello(rpc)を受信と、そのpeerIDをpubsubに加える.topics[topic]では、後続のpublishまたはSubscribeがtopicの場合、購読したpeerが見つかります.
Messageはpublishメッセージをカプセル化し、Fromはpublisherである.Dataはコンテンツデータです.Seqnoはシーケンス番号であり、バージョンを識別するために使用され、publishごとにSeqnoに1(pubsub.connter++)が加算される.TopicIDsはtopicセットです.Signatureは署名です.
ControlMessageは、IWant、IHave、IGraft、IPruneの制御メッセージをカプセル化し、後者3つはtopic単位である.
実行環境
バージョン:[email protected] [email protected]
本明細書では、ubuntuで、windowsで、ubuntuエンドのipfsをipfs 1、windowsエンドのipfsをipfs 2、windowsエンドのipfsをipfs 2で表す2つのノードを実行する.2つのノードは、互いのアドレスを互いのbootstrapに追加し、2つのノードからなるテストネットワークを形成する.
両者のpeerIDはそれぞれ:
ipfs1: QmUB36eFCLEN4PvwSQaJ2tsEBr9epTm5h1rATuY11baZ6o
ipfs2: Qmco9fPhEC9aYsFxY3ZekoUMZucmNa9soWDXh6xgr6FsJy
両方のノードがdaemonを起動するときにパラメータを追加します:–enable-pubsub-experiment
$ ipfs daemon --enable-pubsub-experiment
両方のノードはbitswap/dht/namesys/pubsubのlogレベルをdebugに設定します
$ ipfs log level bitswap debug & ipfs log level dht debug & ipfs log level namesys debug & ipfs log level pubsub debug
ipfs pubsubコマンド実操
$ ipfs pubsub
USAGE
ipfs pubsub - An experimental publish-subscribe system on ipfs.
ipfs pubsub
ipfs pubsub allows you to publish messages to a given topic, and also to
subscribe to new messages on a given topic.
This is an experimental feature. It is not intended in its current state
to be used in a production environment.
To use, the daemon must be run with '--enable-pubsub-experiment'.
SUBCOMMANDS
ipfs pubsub ls - List subscribed topics by name.
ipfs pubsub peers [<topic>] - List peers we are currently pubsubbing with.
ipfs pubsub pub <topic> <data>... - Publish a message to a given pubsub topic.
ipfs pubsub sub <topic> - Subscribe to messages on a given topic.
For more information about each command, use:
'ipfs pubsub --help'
まずipfs 2は「phone」というtopicを購読します
$ ipfs pubsub sub phone
続いてipfs 1 publish」、topicは「phone」、内容は「Moring」
$ ipfs pubsub pub phone "Moring"
最後にipfs 2の端末を見て
$ ipfs pubsub sub phone
Moring
ipfs 2に別の端末を開き、ノードが現在購読しているtopicをクエリーします.
$ ipfs pubsub ls
phone
ipfs 1は「phone」というtopicを購読するpeerを検索し、topicが空の場合、デフォルトはすべてのtopicを検索します.
ipfs pubsub peers phone
QmUB36eFCLEN4PvwSQaJ2tsEBr9epTm5h1rATuY11baZ6o
コードの説明
ipfs configファイルのpubsubのデフォルト構成について
"Pubsub": {
"DisableSigning": false,
"Router": "",
"StrictSignatureVerification": false
}
DisableSigningはfalseであり、本ノードpublishメッセージを表す場合に署名が必要であり、StrictSignatureVerificationはfalseであり、署名なしpublishメッセージを受け入れることを示す.
Routerには3種類あり、デフォルトではFloodSubRouterが使用されていますが、RandomSubRouterは現在ipfsでは使用されていません.
FloodSubRouter
GossipSubRouter
RandomSubRouter
Floodは、その名の通り、すべてのサブスクリプションノードに公開するという意味です.gossipは私語の意味で、6つの購読ノードに公開されます.randomはランダムという意味で,6つのサブスクリプションノードにランダムにパブリッシュする.注目すべきは、FloodSubRouterがベースプロトコルであり、GossipSubRouterとRandomSubRouterがFloodSubRouterをサポートしていることです.
明らかに、GossipSubRouterは最も使いやすく、最も複雑で、次は主にGossipSubRouterについて説明します.
GossipSubRouter
GossipSubRouter構造:
type GossipSubRouter struct {
p *PubSub
peers map[peer.ID]protocol.ID // peer protocols
mesh map[string]map[peer.ID]struct{} // topic meshes
fanout map[string]map[peer.ID]struct{} // topic fanout
lastpub map[string]int64 // last publish time for fanout topics
gossip map[peer.ID][]*pb.ControlIHave // pending gossip
control map[peer.ID]*pb.ControlMessage // pending control messages
mcache *MessageCache
}
GossipSubはtopic単位で2つのネットワークを維持し、それぞれmeshとfanoutであり、両方ともこのtopicを購読するノードを含み、自身が1つのtopicを購読した場合、mesh[topic]は空ではなく、fanoutは空である.サブスクリプションがない場合、mesh[topic]は空です.
publishプロセス
publishの場合、まず自分がこのtopicを購読しているかどうかを判断します.すなわち、mesh[topic]が空であるかどうかを判断し、空でなければ、mesh[topic]のノードにpublishメッセージを送信します.空であればfanout[topic]が空であるか否かを判断し、空でなければfanout[topic]のノードにpublishメッセージを送信する.空の場合はpubsubから.topic[topic]でfanout[topic]にノードを6つまで補充し、最後にこれらのノードにpublishメッセージを送信します.
他のノードはpublishメッセージを受信すると、まずこのpublishメッセージを受信したかどうかを判断し、受信したら破棄する.以前に受信しなかった場合は、まずメッセージを検証し(publishメッセージに署名が付いている場合は署名を検証する)、ローカルのtopicを購読したオブジェクトにプッシュするだけでなく、そのメッセージをそのままpublishして、プロセスと同様にして、メッセージ伝達プロセスを完了するのが典型的なgossipプロセスです.
空間を回収するために,1つのtopicに対して1分間publishを行わないとdelete fanout[topic]となる.
go-libp2p-pubsub/gossipsub.go
// overlay parameters
GossipSubD = 6
GossipSubDlo = 4
GossipSubDhi = 12
// gossip parameters
GossipSubHistoryLength = 5
GossipSubHistoryGossip = 3
// heartbeat interval
GossipSubHeartbeatInitialDelay = 100 * time.Millisecond
GossipSubHeartbeatInterval = 1 * time.Second
// fanout ttl
GossipSubFanoutTTL = 60 * time.Second
Subscribeプロセス
Subscribeがtopicである場合、まず自分がこのtopicを購読したかどうかを判断し、pubsubを判断する.myTopics[topic]が空かどうか、空でなければ、自分が購読したことを表明して、直接return;空の場合、join gossip meshは、まずmesh[topic]が空であるかどうか、空でない場合は、以前にjoinしていたことを示し、直接return;空であればfanout[topic]が空かどうかを判断し、空でなければ1 min以内にpublishがこのtopicを通過したことを示すとfanout[topic]をmesh[topic]に移動しdelete fanout[topic]となる.空の場合はpubsubから.topics[topic]は6つのノードをmesh[topic]に補充し、これらの補充されたノードにiGraftメッセージを送信し、mesh[topic]に追加すると伝え、他のノードがiGraftメッセージを受信した後もmesh[topic]に追加します.
購読をキャンセルすると、まず自分がこのtopicを購読しているかどうかを判断します.すなわち、mesh[topic]が空であるかどうかを判断し、空であれば、自分が購読していないことを表明し、直接returnします.空でない場合は、自分が購読したことを示すメッセージを削除し、削除されたノードにiPruneメッセージを送信し、メッセージh[topic]ネットワークから削除したことを伝え、他のノードがiPruneメッセージを受信すると、メッセージh[topic]から削除します.
メッセージ交換&ネットワーク制御
ネットワークノードがオンラインであることを確認するために、gossipは1 Sごとにハートビートパケットを送信し、ネットワークを維持する.ハートビートパックのもう一つの機能はiHave、iGraft、iPruneなどの情報交換です.
他のノードがハートビートパケットを受信すると,iHaveが自分の希望するメッセージを持っているか否かを判断し,もしあればiWantメッセージを送信する.
gossipSubは1 Sごとにmesh[topic]のノード数をチェックし、6ノード未満ではpubsubからチェックする.topics[topic]で補足し、これらの補足したノードにiGraftメッセージを送信し、mesh[topic]に追加すると伝え、他のノードがiGraftメッセージを受信した後もmesh[topic]に追加します.
6ノード以上であれば、余分なノードをランダムに削除し、削除されたノードにiPruneメッセージを送信し、mesh[topic]ネットワークから削除したことを伝え、他のノードがiPruneメッセージを受信すると、mesh[topic]から削除します.
RPCメッセージ
pubsubはRPC通信を使用する
// pubsub.go
type RPC struct {
pb.RPC
// unexported on purpose, not sending this over the wire
from peer.ID
}
// rpc.pb.go
type RPC struct {
Subscriptions []*RPC_SubOpts `protobuf:"bytes,1,rep,name=subscriptions" json:"subscriptions,omitempty"`
Publish []*Message `protobuf:"bytes,2,rep,name=publish" json:"publish,omitempty"`
Control *ControlMessage `protobuf:"bytes,3,opt,name=control" json:"control,omitempty"`
}
type RPC_SubOpts struct {
Subscribe *bool `protobuf:"varint,1,opt,name=subscribe" json:"subscribe,omitempty"`
Topicid *string `protobuf:"bytes,2,opt,name=topicid" json:"topicid,omitempty"`
}
type Message struct {
From []byte `protobuf:"bytes,1,opt,name=from" json:"from,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"`
Seqno []byte `protobuf:"bytes,3,opt,name=seqno" json:"seqno,omitempty"`
TopicIDs []string `protobuf:"bytes,4,rep,name=topicIDs" json:"topicIDs,omitempty"`
Signature []byte `protobuf:"bytes,5,opt,name=signature" json:"signature,omitempty"`
}
type ControlMessage struct {
Ihave []*ControlIHave `protobuf:"bytes,1,rep,name=ihave" json:"ihave,omitempty"`
Iwant []*ControlIWant `protobuf:"bytes,2,rep,name=iwant" json:"iwant,omitempty"`
Graft []*ControlGraft `protobuf:"bytes,3,rep,name=graft" json:"graft,omitempty"`
Prune []*ControlPrune `protobuf:"bytes,4,rep,name=prune" json:"prune,omitempty"`
}
type ControlIHave struct {
TopicID *string `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"`
MessageIDs []string `protobuf:"bytes,2,rep,name=messageIDs" json:"messageIDs,omitempty"`
}
type ControlIWant struct {
MessageIDs []string `protobuf:"bytes,1,rep,name=messageIDs" json:"messageIDs,omitempty"`
}
type ControlGraft struct {
TopicID *string `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"`
}
type ControlPrune struct {
TopicID *string `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"`
}
RPC_SubOptsは、あるtopicを購読するかどうかという購読情報をカプセル化しています.新しいpeerが接続されると、ipfs 1は新しいpeerと握手(Helo)し、握手内容はSubscriptions、すなわち現在購読されているtopicである.他のノードは、ipfs 1からのhello(rpc)を受信と、そのpeerIDをpubsubに加える.topics[topic]では、後続のpublishまたはSubscribeがtopicの場合、購読したpeerが見つかります.
Messageはpublishメッセージをカプセル化し、Fromはpublisherである.Dataはコンテンツデータです.Seqnoはシーケンス番号であり、バージョンを識別するために使用され、publishごとにSeqnoに1(pubsub.connter++)が加算される.TopicIDsはtopicセットです.Signatureは署名です.
ControlMessageは、IWant、IHave、IGraft、IPruneの制御メッセージをカプセル化し、後者3つはtopic単位である.