ipfs pubsubコード解読

16999 ワード

Pubsub:Publish-subscribe配信購読モード

実行環境


バージョン:[email protected] [email protected]
本明細書では、ubuntuで、windowsで、ubuntuエンドのipfsをipfs 1、windowsエンドのipfsをipfs 2、windowsエンドのipfsをipfs 2で表す2つのノードを実行する.2つのノードは、互いのアドレスを互いのbootstrapに追加し、2つのノードからなるテストネットワークを形成する.
両者のpeerIDはそれぞれ:
ipfs1: QmUB36eFCLEN4PvwSQaJ2tsEBr9epTm5h1rATuY11baZ6o
ipfs2: Qmco9fPhEC9aYsFxY3ZekoUMZucmNa9soWDXh6xgr6FsJy

両方のノードがdaemonを起動するときにパラメータを追加します:–enable-pubsub-experiment
$ ipfs daemon --enable-pubsub-experiment

両方のノードはbitswap/dht/namesys/pubsubのlogレベルをdebugに設定します
$ ipfs log level bitswap debug & ipfs log level dht debug & ipfs log level namesys debug & ipfs log level pubsub debug

ipfs pubsubコマンド実操

$ ipfs pubsub
USAGE
  ipfs pubsub - An experimental publish-subscribe system on ipfs.

  ipfs pubsub

  ipfs pubsub allows you to publish messages to a given topic, and also to
  subscribe to new messages on a given topic.
  
  This is an experimental feature. It is not intended in its current state
  to be used in a production environment.
  
  To use, the daemon must be run with '--enable-pubsub-experiment'.

SUBCOMMANDS
  ipfs pubsub ls                    - List subscribed topics by name.
  ipfs pubsub peers [<topic>]       - List peers we are currently pubsubbing with.
  ipfs pubsub pub <topic> <data>... - Publish a message to a given pubsub topic.
  ipfs pubsub sub <topic>           - Subscribe to messages on a given topic.

  For more information about each command, use:
  'ipfs pubsub  --help'


まずipfs 2は「phone」というtopicを購読します
$ ipfs pubsub sub phone


続いてipfs 1 publish」、topicは「phone」、内容は「Moring」
$ ipfs pubsub pub phone "Moring"

最後にipfs 2の端末を見て
$ ipfs pubsub sub phone
Moring

ipfs 2に別の端末を開き、ノードが現在購読しているtopicをクエリーします.
$ ipfs pubsub ls
phone

ipfs 1は「phone」というtopicを購読するpeerを検索し、topicが空の場合、デフォルトはすべてのtopicを検索します.
ipfs pubsub peers phone
QmUB36eFCLEN4PvwSQaJ2tsEBr9epTm5h1rATuY11baZ6o

コードの説明


ipfs configファイルのpubsubのデフォルト構成について
"Pubsub": {
    "DisableSigning": false,
    "Router": "",
    "StrictSignatureVerification": false
  }

DisableSigningはfalseであり、本ノードpublishメッセージを表す場合に署名が必要であり、StrictSignatureVerificationはfalseであり、署名なしpublishメッセージを受け入れることを示す.
Routerには3種類あり、デフォルトではFloodSubRouterが使用されていますが、RandomSubRouterは現在ipfsでは使用されていません.
FloodSubRouter
GossipSubRouter
RandomSubRouter

Floodは、その名の通り、すべてのサブスクリプションノードに公開するという意味です.gossipは私語の意味で、6つの購読ノードに公開されます.randomはランダムという意味で,6つのサブスクリプションノードにランダムにパブリッシュする.注目すべきは、FloodSubRouterがベースプロトコルであり、GossipSubRouterとRandomSubRouterがFloodSubRouterをサポートしていることです.
明らかに、GossipSubRouterは最も使いやすく、最も複雑で、次は主にGossipSubRouterについて説明します.

GossipSubRouter


GossipSubRouter構造:
type GossipSubRouter struct {
	p       *PubSub
	peers   map[peer.ID]protocol.ID         // peer protocols
	mesh    map[string]map[peer.ID]struct{} // topic meshes
	fanout  map[string]map[peer.ID]struct{} // topic fanout
	lastpub map[string]int64                // last publish time for fanout topics
	gossip  map[peer.ID][]*pb.ControlIHave  // pending gossip
	control map[peer.ID]*pb.ControlMessage  // pending control messages
	mcache  *MessageCache
}

GossipSubはtopic単位で2つのネットワークを維持し、それぞれmeshとfanoutであり、両方ともこのtopicを購読するノードを含み、自身が1つのtopicを購読した場合、mesh[topic]は空ではなく、fanoutは空である.サブスクリプションがない場合、mesh[topic]は空です.

publishプロセス


publishの場合、まず自分がこのtopicを購読しているかどうかを判断します.すなわち、mesh[topic]が空であるかどうかを判断し、空でなければ、mesh[topic]のノードにpublishメッセージを送信します.空であればfanout[topic]が空であるか否かを判断し、空でなければfanout[topic]のノードにpublishメッセージを送信する.空の場合はpubsubから.topic[topic]でfanout[topic]にノードを6つまで補充し、最後にこれらのノードにpublishメッセージを送信します.
他のノードはpublishメッセージを受信すると、まずこのpublishメッセージを受信したかどうかを判断し、受信したら破棄する.以前に受信しなかった場合は、まずメッセージを検証し(publishメッセージに署名が付いている場合は署名を検証する)、ローカルのtopicを購読したオブジェクトにプッシュするだけでなく、そのメッセージをそのままpublishして、プロセスと同様にして、メッセージ伝達プロセスを完了するのが典型的なgossipプロセスです.
空間を回収するために,1つのtopicに対して1分間publishを行わないとdelete fanout[topic]となる.
go-libp2p-pubsub/gossipsub.go
	// overlay parameters
	GossipSubD   = 6
	GossipSubDlo = 4
	GossipSubDhi = 12

	// gossip parameters
	GossipSubHistoryLength = 5
	GossipSubHistoryGossip = 3

	// heartbeat interval
	GossipSubHeartbeatInitialDelay = 100 * time.Millisecond
	GossipSubHeartbeatInterval     = 1 * time.Second

	// fanout ttl
	GossipSubFanoutTTL = 60 * time.Second

Subscribeプロセス


Subscribeがtopicである場合、まず自分がこのtopicを購読したかどうかを判断し、pubsubを判断する.myTopics[topic]が空かどうか、空でなければ、自分が購読したことを表明して、直接return;空の場合、join gossip meshは、まずmesh[topic]が空であるかどうか、空でない場合は、以前にjoinしていたことを示し、直接return;空であればfanout[topic]が空かどうかを判断し、空でなければ1 min以内にpublishがこのtopicを通過したことを示すとfanout[topic]をmesh[topic]に移動しdelete fanout[topic]となる.空の場合はpubsubから.topics[topic]は6つのノードをmesh[topic]に補充し、これらの補充されたノードにiGraftメッセージを送信し、mesh[topic]に追加すると伝え、他のノードがiGraftメッセージを受信した後もmesh[topic]に追加します.
購読をキャンセルすると、まず自分がこのtopicを購読しているかどうかを判断します.すなわち、mesh[topic]が空であるかどうかを判断し、空であれば、自分が購読していないことを表明し、直接returnします.空でない場合は、自分が購読したことを示すメッセージを削除し、削除されたノードにiPruneメッセージを送信し、メッセージh[topic]ネットワークから削除したことを伝え、他のノードがiPruneメッセージを受信すると、メッセージh[topic]から削除します.

メッセージ交換&ネットワーク制御


ネットワークノードがオンラインであることを確認するために、gossipは1 Sごとにハートビートパケットを送信し、ネットワークを維持する.ハートビートパックのもう一つの機能はiHave、iGraft、iPruneなどの情報交換です.
他のノードがハートビートパケットを受信すると,iHaveが自分の希望するメッセージを持っているか否かを判断し,もしあればiWantメッセージを送信する.
gossipSubは1 Sごとにmesh[topic]のノード数をチェックし、6ノード未満ではpubsubからチェックする.topics[topic]で補足し、これらの補足したノードにiGraftメッセージを送信し、mesh[topic]に追加すると伝え、他のノードがiGraftメッセージを受信した後もmesh[topic]に追加します.
6ノード以上であれば、余分なノードをランダムに削除し、削除されたノードにiPruneメッセージを送信し、mesh[topic]ネットワークから削除したことを伝え、他のノードがiPruneメッセージを受信すると、mesh[topic]から削除します.

RPCメッセージ


pubsubはRPC通信を使用する
// pubsub.go
type RPC struct {
	pb.RPC

	// unexported on purpose, not sending this over the wire
	from peer.ID
}

// rpc.pb.go
type RPC struct {
	Subscriptions        []*RPC_SubOpts  `protobuf:"bytes,1,rep,name=subscriptions" json:"subscriptions,omitempty"`
	Publish              []*Message      `protobuf:"bytes,2,rep,name=publish" json:"publish,omitempty"`
	Control              *ControlMessage `protobuf:"bytes,3,opt,name=control" json:"control,omitempty"`
}

type RPC_SubOpts struct {
	Subscribe            *bool    `protobuf:"varint,1,opt,name=subscribe" json:"subscribe,omitempty"`
	Topicid              *string  `protobuf:"bytes,2,opt,name=topicid" json:"topicid,omitempty"`
}

type Message struct {
	From                 []byte   `protobuf:"bytes,1,opt,name=from" json:"from,omitempty"`
	Data                 []byte   `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"`
	Seqno                []byte   `protobuf:"bytes,3,opt,name=seqno" json:"seqno,omitempty"`
	TopicIDs             []string `protobuf:"bytes,4,rep,name=topicIDs" json:"topicIDs,omitempty"`
	Signature            []byte   `protobuf:"bytes,5,opt,name=signature" json:"signature,omitempty"`
}

type ControlMessage struct {
	Ihave                []*ControlIHave `protobuf:"bytes,1,rep,name=ihave" json:"ihave,omitempty"`
	Iwant                []*ControlIWant `protobuf:"bytes,2,rep,name=iwant" json:"iwant,omitempty"`
	Graft                []*ControlGraft `protobuf:"bytes,3,rep,name=graft" json:"graft,omitempty"`
	Prune                []*ControlPrune `protobuf:"bytes,4,rep,name=prune" json:"prune,omitempty"`
}

type ControlIHave struct {
	TopicID              *string  `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"`
	MessageIDs           []string `protobuf:"bytes,2,rep,name=messageIDs" json:"messageIDs,omitempty"`
}

type ControlIWant struct {
	MessageIDs           []string `protobuf:"bytes,1,rep,name=messageIDs" json:"messageIDs,omitempty"`
}

type ControlGraft struct {
	TopicID              *string  `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"`
}

type ControlPrune struct {
	TopicID              *string  `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"`
}

RPC_SubOptsは、あるtopicを購読するかどうかという購読情報をカプセル化しています.新しいpeerが接続されると、ipfs 1は新しいpeerと握手(Helo)し、握手内容はSubscriptions、すなわち現在購読されているtopicである.他のノードは、ipfs 1からのhello(rpc)を受信と、そのpeerIDをpubsubに加える.topics[topic]では、後続のpublishまたはSubscribeがtopicの場合、購読したpeerが見つかります.
Messageはpublishメッセージをカプセル化し、Fromはpublisherである.Dataはコンテンツデータです.Seqnoはシーケンス番号であり、バージョンを識別するために使用され、publishごとにSeqnoに1(pubsub.connter++)が加算される.TopicIDsはtopicセットです.Signatureは署名です.
ControlMessageは、IWant、IHave、IGraft、IPruneの制御メッセージをカプセル化し、後者3つはtopic単位である.