15.Fabric 1.0ソースコード分析(15)gossip(流言アルゴリズム)

174783 ワード

Fabric 1.0ソースノートのgossip(流言アルゴリズム)
1、gossipの概要
gossipは、デマに翻訳され、最終的に一致するアルゴリズムである.最終的に一致する別の意味は、同時に一致することを保証しないことです.gossipには3つの基本的な操作があります.
  • push-Aノードはデータ(key,value,version)および対応するバージョン番号をBノードにプッシュし、BノードはAの中で自分より新しいデータ
  • を更新する.
  • pull-Aはデータkey,versionのみをBに,BはAよりも新しいデータ(Key,value,version)をAに,Aはローカル
  • を更新する.
  • push/pull-pullと似ていますが、一歩増えただけで、AはBより新しいデータをBにプッシュし、Bはローカル
  • を更新します.
    gossipのFabricでの役割:
  • は、組織内のノードおよびチャネル情報を管理し、ノードがオンラインまたはオフラインであるかどうかを測定する.
  • はデータをブロードキャストし、組織内の同じchannelのノードに同じデータを同期させる.
  • は、新しく追加されたノードを管理し、新しいノードにデータを同期する.

  • gossipコードはgossip、peer/gossipディレクトリの下に分布し、ディレクトリ構造は以下の通りである.
  • gossipディレクトリ:
  • サービスディレクトリ、GossipServiceインタフェースの定義と実装.
  • integrationディレクトリ、NewGossipComponentツール関数.
  • gossipディレクトリ、Gossipインタフェースの定義と実装.
  • commディレクトリ、GossipServerインタフェース実装.
  • stateディレクトリ、GossipStateProviderインタフェース定義および実装(ステータスレプリケーション).
  • apiディレクトリ:メッセージ暗号化サービスインタフェース定義.
  • crypto.go,MessageCryptoServiceインタフェース定義.
  • channel.go,SecurityAdvisorインタフェース定義.


  • peer/gossipディレクトリ:
  • mcs.go,MessageCryptoServiceインタフェース実装,すなわちmspMessageCryptoService構造体および方法.
  • sa.go,SecurityAdvisorインタフェース実装,すなわちmspSecurityAdvisor構造体および方法.


  • GossipServerの詳細、参考:Fabric 1.0ソースノートのgossip(噂アルゴリズム)#GossipServer(Gossipサービス側)
    2、GossipServiceインタフェースの定義と実現
    2.1、GossipServiceインタフェースの定義
    type GossipService interface {
        gossip.Gossip
        NewConfigEventer() ConfigProcessor
        InitializeChannel(chainID string, committer committer.Committer, endpoints []string)
        GetBlock(chainID string, index uint64) *common.Block
        AddPayload(chainID string, payload *proto.Payload) error
    }
    //   gossip/service/gossip_service.go
    

    補足Gossip:
    type Gossip interface {
        Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer)
        Peers() []discovery.NetworkMember
        PeersOfChannel(common.ChainID) []discovery.NetworkMember
        UpdateMetadata(metadata []byte)
        UpdateChannelMetadata(metadata []byte, chainID common.ChainID)
        Gossip(msg *proto.GossipMessage)
        Accept(acceptor common.MessageAcceptor, passThrough bool) (chan *proto.GossipMessage, chan proto.ReceivedMessage)
        JoinChan(joinMsg api.JoinChannelMessage, chainID common.ChainID)
        SuspectPeers(s api.PeerSuspector)
        Stop()
    }
    //   gossip/gossip/gossip.go
    

    2.2、GossipServiceインタフェースの実現
    GossipServiceインタフェース実装、すなわちgossipServiceImpl構造体および方法.
    type gossipSvc gossip.Gossip
    
    type gossipServiceImpl struct {
        gossipSvc
        chains          map[string]state.GossipStateProvider // 
        leaderElection  map[string]election.LeaderElectionService //    
        deliveryService deliverclient.DeliverService
        deliveryFactory DeliveryServiceFactory
        lock            sync.RWMutex
        idMapper        identity.Mapper
        mcs             api.MessageCryptoService
        peerIdentity    []byte
        secAdv          api.SecurityAdvisor
    }
    
    //   Gossip Service,  InitGossipServiceCustomDeliveryFactory()
    func InitGossipService(peerIdentity []byte, endpoint string, s *grpc.Server, mcs api.MessageCryptoService,secAdv api.SecurityAdvisor, secureDialOpts api.PeerSecureDialOpts, bootPeers ...string) error
    //   Gossip Service
    func InitGossipServiceCustomDeliveryFactory(peerIdentity []byte, endpoint string, s *grpc.Server,factory DeliveryServiceFactory, mcs api.MessageCryptoService, secAdv api.SecurityAdvisor,secureDialOpts api.PeerSecureDialOpts, bootPeers ...string) error
    //  gossipServiceInstance
    func GetGossipService() GossipService 
    //   newConfigEventer(g)
    func (g *gossipServiceImpl) NewConfigEventer() ConfigProcessor 
    //     
    func (g *gossipServiceImpl) InitializeChannel(chainID string, committer committer.Committer, endpoints []string) 
    func (g *gossipServiceImpl) configUpdated(config Config) 
    func (g *gossipServiceImpl) GetBlock(chainID string, index uint64) *common.Block 
    func (g *gossipServiceImpl) AddPayload(chainID string, payload *proto.Payload) error 
    //  gossip  
    func (g *gossipServiceImpl) Stop() 
    func (g *gossipServiceImpl) newLeaderElectionComponent(chainID string, callback func(bool)) election.LeaderElectionService 
    func (g *gossipServiceImpl) amIinChannel(myOrg string, config Config) bool 
    func (g *gossipServiceImpl) onStatusChangeFactory(chainID string, committer blocksprovider.LedgerInfo) func(bool) 
    func orgListFromConfig(config Config) []string 
    //   gossip/service/gossip_service.go
    

    2.2.1、func InitGossipService(peerIdentity []byte, endpoint string, s *grpc.Server, mcs api.MessageCryptoService,secAdv api.SecurityAdvisor, secureDialOpts api.PeerSecureDialOpts, bootPeers …string) error
    func InitGossipService(peerIdentity []byte, endpoint string, s *grpc.Server, mcs api.MessageCryptoService,
        secAdv api.SecurityAdvisor, secureDialOpts api.PeerSecureDialOpts, bootPeers ...string) error {
        util.GetLogger(util.LoggingElectionModule, "")
        return InitGossipServiceCustomDeliveryFactory(peerIdentity, endpoint, s, &deliveryFactoryImpl{},
            mcs, secAdv, secureDialOpts, bootPeers...)
    }
    
    func InitGossipServiceCustomDeliveryFactory(peerIdentity []byte, endpoint string, s *grpc.Server,
        factory DeliveryServiceFactory, mcs api.MessageCryptoService, secAdv api.SecurityAdvisor,
        secureDialOpts api.PeerSecureDialOpts, bootPeers ...string) error {
        var err error
        var gossip gossip.Gossip
        once.Do(func() {
            //peer.gossip.endpoint,        gossip id,    peerEndpoint.Address
            if overrideEndpoint := viper.GetString("peer.gossip.endpoint"); overrideEndpoint != "" {
                endpoint = overrideEndpoint
            }
            idMapper := identity.NewIdentityMapper(mcs, peerIdentity)
            gossip, err = integration.NewGossipComponent(peerIdentity, endpoint, s, secAdv,
                mcs, idMapper, secureDialOpts, bootPeers...)
            gossipServiceInstance = &gossipServiceImpl{
                mcs:             mcs,
                gossipSvc:       gossip,
                chains:          make(map[string]state.GossipStateProvider),
                leaderElection:  make(map[string]election.LeaderElectionService),
                deliveryFactory: factory,
                idMapper:        idMapper,
                peerIdentity:    peerIdentity,
                secAdv:          secAdv,
            }
        })
        return err
    }
    
    //   gossip/service/gossip_service.go
    

    2.2.2、func (g *gossipServiceImpl) Stop()
    func (g *gossipServiceImpl) Stop() {
        g.lock.Lock()
        defer g.lock.Unlock()
        for _, ch := range g.chains {
            logger.Info("Stopping chain", ch)
            ch.Stop()
        }
    
        for chainID, electionService := range g.leaderElection {
            logger.Infof("Stopping leader election for %s", chainID)
            electionService.Stop()
        }
        g.gossipSvc.Stop()
        if g.deliveryService != nil {
            g.deliveryService.Stop()
        }
    }
    //   gossip/service/gossip_service.go
    

    2.2.3、func (g *gossipServiceImpl) InitializeChannel(chainID string, committer committer.Committer, endpoints []string)
    func (g *gossipServiceImpl) InitializeChannel(chainID string, committer committer.Committer, endpoints []string) {
        g.lock.Lock()
        defer g.lock.Unlock()
    
        //  GossipStateProviderImpl,   goroutine   orderer        block
        g.chains[chainID] = state.NewGossipStateProvider(chainID, g, committer, g.mcs)
        if g.deliveryService == nil {
            var err error
            g.deliveryService, err = g.deliveryFactory.Service(gossipServiceInstance, endpoints, g.mcs)
        }
        if g.deliveryService != nil {
            //          
            leaderElection := viper.GetBool("peer.gossip.useLeaderElection") //          
            isStaticOrgLeader := viper.GetBool("peer.gossip.orgLeader") //              
    
            if leaderElection {
                //      
                g.leaderElection[chainID] = g.newLeaderElectionComponent(chainID, g.onStatusChangeFactory(chainID, committer))
            } else if isStaticOrgLeader {
                //            ,   Deliver client
                g.deliveryService.StartDeliverForChannel(chainID, committer, func() {})
            }
    }
    
    //   gossip/service/gossip_service.go
    

    2.2.4、func (g *gossipServiceImpl) AddPayload(chainID string, payload *proto.Payload) error
    func (g *gossipServiceImpl) AddPayload(chainID string, payload *proto.Payload) error {
        g.lock.RLock()
        defer g.lock.RUnlock()
        return g.chains[chainID].AddPayload(payload)
    }
    //   gossip/service/gossip_service.go
    

    3、NewGossipComponentツール関数
    func NewGossipComponent(peerIdentity []byte, endpoint string, s *grpc.Server,
        secAdv api.SecurityAdvisor, cryptSvc api.MessageCryptoService, idMapper identity.Mapper,
        secureDialOpts api.PeerSecureDialOpts, bootPeers ...string) (gossip.Gossip, error) {
    
        //peer.gossip.externalEndpoint               
        externalEndpoint := viper.GetString("peer.gossip.externalEndpoint")
    
        //endpoint,  peer.address,          
        //bootPeers  peer.gossip.bootstrap,         gossip       ,    
        conf, err := newConfig(endpoint, externalEndpoint, bootPeers...)
        gossipInstance := gossip.NewGossipService(conf, s, secAdv, cryptSvc, idMapper,
            peerIdentity, secureDialOpts)
    
        return gossipInstance, nil
    }
    
    func newConfig(selfEndpoint string, externalEndpoint string, bootPeers ...string) (*gossip.Config, error) {
        _, p, err := net.SplitHostPort(selfEndpoint)
        port, err := strconv.ParseInt(p, 10, 64) //         
    
        var cert *tls.Certificate
        if viper.GetBool("peer.tls.enabled") {
            certTmp, err := tls.LoadX509KeyPair(config.GetPath("peer.tls.cert.file"), config.GetPath("peer.tls.key.file"))
            cert = &certTmp
        }
    
        return &gossip.Config{
            BindPort:                   int(port),
            BootstrapPeers:             bootPeers,
            ID:                         selfEndpoint,
            MaxBlockCountToStore:       util.GetIntOrDefault("peer.gossip.maxBlockCountToStore", 100),
            MaxPropagationBurstLatency: util.GetDurationOrDefault("peer.gossip.maxPropagationBurstLatency", 10*time.Millisecond),
            MaxPropagationBurstSize:    util.GetIntOrDefault("peer.gossip.maxPropagationBurstSize", 10),
            PropagateIterations:        util.GetIntOrDefault("peer.gossip.propagateIterations", 1),
            PropagatePeerNum:           util.GetIntOrDefault("peer.gossip.propagatePeerNum", 3),
            PullInterval:               util.GetDurationOrDefault("peer.gossip.pullInterval", 4*time.Second),
            PullPeerNum:                util.GetIntOrDefault("peer.gossip.pullPeerNum", 3),
            InternalEndpoint:           selfEndpoint,
            ExternalEndpoint:           externalEndpoint,
            PublishCertPeriod:          util.GetDurationOrDefault("peer.gossip.publishCertPeriod", 10*time.Second),
            RequestStateInfoInterval:   util.GetDurationOrDefault("peer.gossip.requestStateInfoInterval", 4*time.Second),
            PublishStateInfoInterval:   util.GetDurationOrDefault("peer.gossip.publishStateInfoInterval", 4*time.Second),
            SkipBlockVerification:      viper.GetBool("peer.gossip.skipBlockVerification"),
            TLSServerCert:              cert,
        }, nil
    }
    //   gossip/integration/integration.go
    

    4、Gossipインタフェースの定義と実現
    4.1、Gossipインタフェース定義
    type Gossip interface {
        //       
        Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer)
        //      
        Peers() []discovery.NetworkMember
        //           
        PeersOfChannel(common.ChainID) []discovery.NetworkMember
        //  metadata
        UpdateMetadata(metadata []byte)
        //    metadata
        UpdateChannelMetadata(metadata []byte, chainID common.ChainID)
        //            
        Gossip(msg *proto.GossipMessage)
        Accept(acceptor common.MessageAcceptor, passThrough bool) (chan *proto.GossipMessage, chan proto.ReceivedMessage)
        //    
        JoinChan(joinMsg api.JoinChannelMessage, chainID common.ChainID)
        //        ,       
        SuspectPeers(s api.PeerSuspector)
        //  Gossip  
        Stop()
    }
    //   gossip/gossip/gossip.go
    

    4.2、Config構造体定義
    type Config struct {
        BindPort            int      //     ,     
        ID                  string   //   id,          
        BootstrapPeers      []string //         
        PropagateIterations int      //  peer.gossip.propagateIterations,       ,   1
        PropagatePeerNum    int      //  peer.gossip.propagatePeerNum,          ,   3
        MaxBlockCountToStore int //  peer.gossip.maxBlockCountToStore,             ,  100
        MaxPropagationBurstSize    int           //  peer.gossip.maxPropagationBurstSize,         ,          ,   10
        MaxPropagationBurstLatency time.Duration //  peer.gossip.maxPropagationBurstLatency,         ,          ,   10  
    
        PullInterval time.Duration //  peer.gossip.pullInterval,         ,   4 
        PullPeerNum  int           //  peer.gossip.pullPeerNum,            ,  3 
    
        SkipBlockVerification bool //  peer.gossip.skipBlockVerification,            ,  false    
        PublishCertPeriod        time.Duration    //  peer.gossip.publishCertPeriod,              ,  10s
        PublishStateInfoInterval time.Duration    //  peer.gossip.publishStateInfoInterval,                ,  4s
        RequestStateInfoInterval time.Duration    //  peer.gossip.requestStateInfoInterval,                ,  4s
        TLSServerCert            *tls.Certificate //   TLS   
    
        InternalEndpoint string //            
        ExternalEndpoint string //             
    }
    //   gossip/gossip/gossip.go
    

    4.3、Gossipインタフェースの実現
    Gossipインタフェースインタフェース実装、すなわちgossipServiceImpl構造体および方法.
    4.3.1、gossipServiceImpl構造体及び方法
    type gossipServiceImpl struct {
        selfIdentity          api.PeerIdentityType
        includeIdentityPeriod time.Time
        certStore             *certStore
        idMapper              identity.Mapper
        presumedDead          chan common.PKIidType
        disc                  discovery.Discovery
        comm                  comm.Comm
        incTime               time.Time
        selfOrg               api.OrgIdentityType
        *comm.ChannelDeMultiplexer
        logger            *logging.Logger
        stopSignal        *sync.WaitGroup
        conf              *Config
        toDieChan         chan struct{}
        stopFlag          int32
        emitter           batchingEmitter
        discAdapter       *discoveryAdapter
        secAdvisor        api.SecurityAdvisor
        chanState         *channelState
        disSecAdap        *discoverySecurityAdapter
        mcs               api.MessageCryptoService
        stateInfoMsgStore msgstore.MessageStore
    }
    
    func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvisor,
    func NewGossipServiceWithServer(conf *Config, secAdvisor api.SecurityAdvisor, mcs api.MessageCryptoService,
    func (g *gossipServiceImpl) JoinChan(joinMsg api.JoinChannelMessage, chainID common.ChainID)
    func (g *gossipServiceImpl) SuspectPeers(isSuspected api.PeerSuspector)
    func (g *gossipServiceImpl) Gossip(msg *proto.GossipMessage)
    func (g *gossipServiceImpl) Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer)
    func (g *gossipServiceImpl) Peers() []discovery.NetworkMember
    func (g *gossipServiceImpl) PeersOfChannel(channel common.ChainID) []discovery.NetworkMember
    func (g *gossipServiceImpl) Stop()
    func (g *gossipServiceImpl) UpdateMetadata(md []byte)
    func (g *gossipServiceImpl) UpdateChannelMetadata(md []byte, chainID common.ChainID)
    func (g *gossipServiceImpl) Accept(acceptor common.MessageAcceptor, passThrough bool) (chan *proto.GossipMessage, chan proto.ReceivedMessage)
    
    func (g *gossipServiceImpl) newStateInfoMsgStore() msgstore.MessageStore
    func (g *gossipServiceImpl) selfNetworkMember() discovery.NetworkMember
    func newChannelState(g *gossipServiceImpl) *channelState
    func createCommWithoutServer(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,
    func createCommWithServer(port int, idStore identity.Mapper, identity api.PeerIdentityType,
    func (g *gossipServiceImpl) toDie() bool
    func (g *gossipServiceImpl) periodicalIdentityValidationAndExpiration()
    func (g *gossipServiceImpl) periodicalIdentityValidation(suspectFunc api.PeerSuspector, interval time.Duration)
    func (g *gossipServiceImpl) learnAnchorPeers(orgOfAnchorPeers api.OrgIdentityType, anchorPeers []api.AnchorPeer)
    func (g *gossipServiceImpl) handlePresumedDead()
    func (g *gossipServiceImpl) syncDiscovery()
    func (g *gossipServiceImpl) start()
    func (g *gossipServiceImpl) acceptMessages(incMsgs chan proto.ReceivedMessage)
    func (g *gossipServiceImpl) handleMessage(m proto.ReceivedMessage)
    func (g *gossipServiceImpl) forwardDiscoveryMsg(msg proto.ReceivedMessage)
    func (g *gossipServiceImpl) validateMsg(msg proto.ReceivedMessage) bool
    func (g *gossipServiceImpl) sendGossipBatch(a []interface{})
    func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage)
    func (g *gossipServiceImpl) sendAndFilterSecrets(msg *proto.SignedGossipMessage, peers ...*comm.RemotePeer)
    func (g *gossipServiceImpl) gossipInChan(messages []*proto.SignedGossipMessage, chanRoutingFactory channelRoutingFilterFactory)
    func selectOnlyDiscoveryMessages(m interface{}) bool
    func (g *gossipServiceImpl) newDiscoverySecurityAdapter() *discoverySecurityAdapter
    func (sa *discoverySecurityAdapter) ValidateAliveMsg(m *proto.SignedGossipMessage) bool
    func (sa *discoverySecurityAdapter) SignMessage(m *proto.GossipMessage, internalEndpoint string) *proto.Envelope
    func (sa *discoverySecurityAdapter) validateAliveMsgSignature(m *proto.SignedGossipMessage, identity api.PeerIdentityType) bool
    func (g *gossipServiceImpl) createCertStorePuller() pull.Mediator
    func (g *gossipServiceImpl) sameOrgOrOurOrgPullFilter(msg proto.ReceivedMessage) func(string) bool
    func (g *gossipServiceImpl) connect2BootstrapPeers()
    func (g *gossipServiceImpl) createStateInfoMsg(metadata []byte, chainID common.ChainID) (*proto.SignedGossipMessage, error)
    func (g *gossipServiceImpl) hasExternalEndpoint(PKIID common.PKIidType) bool
    func (g *gossipServiceImpl) isInMyorg(member discovery.NetworkMember) bool
    func (g *gossipServiceImpl) getOrgOfPeer(PKIID common.PKIidType) api.OrgIdentityType
    func (g *gossipServiceImpl) validateLeadershipMessage(msg *proto.SignedGossipMessage) error
    func (g *gossipServiceImpl) validateStateInfoMsg(msg *proto.SignedGossipMessage) error
    func (g *gossipServiceImpl) disclosurePolicy(remotePeer *discovery.NetworkMember) (discovery.Sieve, discovery.EnvelopeFilter)
    func (g *gossipServiceImpl) peersByOriginOrgPolicy(peer discovery.NetworkMember) filter.RoutingFilter
    func partitionMessages(pred common.MessageAcceptor, a []*proto.SignedGossipMessage) ([]*proto.SignedGossipMessage, []*proto.SignedGossipMessage)
    func extractChannels(a []*proto.SignedGossipMessage) []common.ChainID
    
    func (g *gossipServiceImpl) newDiscoveryAdapter() *discoveryAdapter
    func (da *discoveryAdapter) close()
    func (da *discoveryAdapter) toDie() bool
    func (da *discoveryAdapter) Gossip(msg *proto.SignedGossipMessage)
    func (da *discoveryAdapter) SendToPeer(peer *discovery.NetworkMember, msg *proto.SignedGossipMessage)
    func (da *discoveryAdapter) Ping(peer *discovery.NetworkMember) bool
    func (da *discoveryAdapter) Accept() chan *proto.SignedGossipMessage
    func (da *discoveryAdapter) PresumedDead() chan common.PKIidType
    func (da *discoveryAdapter) CloseConn(peer *discovery.NetworkMember)
    //   gossip/gossip/gossip_impl.go
    

    4.3.2、func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvisor,mcs api.MessageCryptoService, idMapper identity.Mapper, selfIdentity api.PeerIdentityType,secureDialOpts api.PeerSecureDialOpts) Gossip
    grpcに添付を作成する.サーバ上のGossipインスタンス.
    func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvisor,
        mcs api.MessageCryptoService, idMapper identity.Mapper, selfIdentity api.PeerIdentityType,
        secureDialOpts api.PeerSecureDialOpts) Gossip {
    
        var c comm.Comm
        var err error
    
        lgr := util.GetLogger(util.LoggingGossipModule, conf.ID)
        if s == nil {      gRPC Server,    GossipServer  
            c, err = createCommWithServer(conf.BindPort, idMapper, selfIdentity, secureDialOpts)
        } else { // GossipServer     peerServer
            c, err = createCommWithoutServer(s, conf.TLSServerCert, idMapper, selfIdentity, secureDialOpts)
        }
    
        g := &gossipServiceImpl{
            selfOrg:               secAdvisor.OrgByPeerIdentity(selfIdentity),
            secAdvisor:            secAdvisor,
            selfIdentity:          selfIdentity,
            presumedDead:          make(chan common.PKIidType, presumedDeadChanSize),
            idMapper:              idMapper,
            disc:                  nil,
            mcs:                   mcs,
            comm:                  c,
            conf:                  conf,
            ChannelDeMultiplexer:  comm.NewChannelDemultiplexer(),
            logger:                lgr,
            toDieChan:             make(chan struct{}, 1),
            stopFlag:              int32(0),
            stopSignal:            &sync.WaitGroup{},
            includeIdentityPeriod: time.Now().Add(conf.PublishCertPeriod),
        }
        g.stateInfoMsgStore = g.newStateInfoMsgStore()
        g.chanState = newChannelState(g)
        g.emitter = newBatchingEmitter(conf.PropagateIterations,
            conf.MaxPropagationBurstSize, conf.MaxPropagationBurstLatency,
            g.sendGossipBatch)
        g.discAdapter = g.newDiscoveryAdapter()
        g.disSecAdap = g.newDiscoverySecurityAdapter()
        g.disc = discovery.NewDiscoveryService(g.selfNetworkMember(), g.discAdapter, g.disSecAdap, g.disclosurePolicy)
        g.certStore = newCertStore(g.createCertStorePuller(), idMapper, selfIdentity, mcs)
    
        go g.start()
        go g.periodicalIdentityValidationAndExpiration()
        go g.connect2BootstrapPeers()
        return g
    }
    //   gossip/gossip/gossip_impl.go
    

    4.3.3、go g.start()
    func (g *gossipServiceImpl) start() {
        go g.syncDiscovery()
        go g.handlePresumedDead()
    
        msgSelector := func(msg interface{}) bool {
            gMsg, isGossipMsg := msg.(proto.ReceivedMessage)
            if !isGossipMsg {
                return false
            }
            isConn := gMsg.GetGossipMessage().GetConn() != nil
            isEmpty := gMsg.GetGossipMessage().GetEmpty() != nil
            return !(isConn || isEmpty)
        }
    
        incMsgs := g.comm.Accept(msgSelector)
        go g.acceptMessages(incMsgs)
    }
    //   gossip/gossip/gossip_impl.go
    

    go g.acceptMessages(incMsgs)コードは以下の通りである.
    func (g *gossipServiceImpl) acceptMessages(incMsgs chan proto.ReceivedMessage) {
        defer g.logger.Debug("Exiting")
        g.stopSignal.Add(1)
        defer g.stopSignal.Done()
        for {
            select {
            case s := g.toDieChan:
                g.toDieChan  s
                return
            case msg := incMsgs:
                g.handleMessage(msg) //     gc.HandleMessage(m),      
            }
        }
    }
    //   gossip/gossip/gossip_impl.go
    

    gc.HandleMessage(m)コードは次のとおりです.
    func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
        m := msg.GetGossipMessage()
        orgID := gc.GetOrgOfPeer(msg.GetConnectionInfo().ID)
        if m.IsDataMsg() || m.IsStateInfoMsg() {
            added := false
    
            if m.IsDataMsg() {
                added = gc.blockMsgStore.Add(msg.GetGossipMessage())
            } else {
                added = gc.stateInfoMsgStore.Add(msg.GetGossipMessage())
            }
    
            if added {
                gc.Gossip(msg.GetGossipMessage()) //       ,     func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage)      
                gc.DeMultiplex(m)
                if m.IsDataMsg() {
                    gc.blocksPuller.Add(msg.GetGossipMessage())
                }
            }
            return
        }
        //...
    }
    //   gossip/gossip/channel/channel.go
    

    func(g*gossipServiceImpl)gossipBatch(msgs[]*proto.SignedGossipMessage)コードは次のとおりです.
    func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage) {
        var blocks []*proto.SignedGossipMessage
        var stateInfoMsgs []*proto.SignedGossipMessage
        var orgMsgs []*proto.SignedGossipMessage
        var leadershipMsgs []*proto.SignedGossipMessage
    
        isABlock := func(o interface{}) bool {
            return o.(*proto.SignedGossipMessage).IsDataMsg()
        }
        isAStateInfoMsg := func(o interface{}) bool {
            return o.(*proto.SignedGossipMessage).IsStateInfoMsg()
        }
        isOrgRestricted := func(o interface{}) bool {
            return aliveMsgsWithNoEndpointAndInOurOrg(o) || o.(*proto.SignedGossipMessage).IsOrgRestricted()
        }
        isLeadershipMsg := func(o interface{}) bool {
            return o.(*proto.SignedGossipMessage).IsLeadershipMsg()
        }
    
        // Gossip blocks
        blocks, msgs = partitionMessages(isABlock, msgs)
        g.gossipInChan(blocks, func(gc channel.GossipChannel) filter.RoutingFilter {
            return filter.CombineRoutingFilters(gc.EligibleForChannel, gc.IsMemberInChan, g.isInMyorg)
        })
    
        // Gossip Leadership messages
        leadershipMsgs, msgs = partitionMessages(isLeadershipMsg, msgs)
        g.gossipInChan(leadershipMsgs, func(gc channel.GossipChannel) filter.RoutingFilter {
            return filter.CombineRoutingFilters(gc.EligibleForChannel, gc.IsMemberInChan, g.isInMyorg)
        })
    
        // Gossip StateInfo messages
        stateInfoMsgs, msgs = partitionMessages(isAStateInfoMsg, msgs)
        for _, stateInfMsg := range stateInfoMsgs {
            peerSelector := g.isInMyorg
            gc := g.chanState.lookupChannelForGossipMsg(stateInfMsg.GossipMessage)
            if gc != nil && g.hasExternalEndpoint(stateInfMsg.GossipMessage.GetStateInfo().PkiId) {
                peerSelector = gc.IsMemberInChan
            }
    
            peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), peerSelector)
            g.comm.Send(stateInfMsg, peers2Send...)
        }
    
        // Gossip messages restricted to our org
        orgMsgs, msgs = partitionMessages(isOrgRestricted, msgs)
        peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), g.isInMyorg)
        for _, msg := range orgMsgs {
            g.comm.Send(msg, peers2Send...)
        }
    
        // Finally, gossip the remaining messages
        for _, msg := range msgs {
            selectByOriginOrg := g.peersByOriginOrgPolicy(discovery.NetworkMember{PKIid: msg.GetAliveMsg().Membership.PkiId})
            peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), selectByOriginOrg)
            g.sendAndFilterSecrets(msg, peers2Send...)
        }
    }
    //   gossip/gossip/gossip_impl.go
    

    5、GossipStateProviderインタフェースの定義と実現(状態コピー)
    5.1、GossipStateProviderインタフェース定義
    欠落したブロックをステータスコピーで埋め込み、他のノードから欠落したブロックを取得する要求を送信します.
    type GossipStateProvider interface {
        //      
        GetBlock(index uint64) *common.Block
        //   
        AddPayload(payload *proto.Payload) error
        //      
        Stop()
    }
    //   gossip/state/state.go
    

    5.2、GossipStateProviderインタフェース実現
    type GossipStateProviderImpl struct {
        mcs api.MessageCryptoService //      
        chainID string //Chain id
        gossip GossipAdapter //gossiping service
        gossipChan chan *proto.GossipMessage
        commChan chan proto.ReceivedMessage
        payloads PayloadsBuffer //Payloads    
        committer committer.Committer
        stateResponseCh chan proto.ReceivedMessage
        stateRequestCh chan proto.ReceivedMessage
        stopCh chan struct{}
        done sync.WaitGroup
        once sync.Once
        stateTransferActive int32
    }
    
    func (s *GossipStateProviderImpl) GetBlock(index uint64) *common.Block
    func (s *GossipStateProviderImpl) AddPayload(payload *proto.Payload) error
    func (s *GossipStateProviderImpl) Stop()
    
    func NewGossipStateProvider(chainID string, g GossipAdapter, committer committer.Committer, mcs api.MessageCryptoService) GossipStateProvider
    func (s *GossipStateProviderImpl) listen()
    func (s *GossipStateProviderImpl) directMessage(msg proto.ReceivedMessage)
    func (s *GossipStateProviderImpl) processStateRequests()
    func (s *GossipStateProviderImpl) handleStateRequest(msg proto.ReceivedMessage)
    func (s *GossipStateProviderImpl) handleStateResponse(msg proto.ReceivedMessage) (uint64, error)
    func (s *GossipStateProviderImpl) queueNewMessage(msg *proto.GossipMessage)
    func (s *GossipStateProviderImpl) deliverPayloads()
    func (s *GossipStateProviderImpl) antiEntropy()
    func (s *GossipStateProviderImpl) maxAvailableLedgerHeight() uint64
    func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64)
    func (s *GossipStateProviderImpl) stateRequestMessage(beginSeq uint64, endSeq uint64) *proto.GossipMessage
    func (s *GossipStateProviderImpl) selectPeerToRequestFrom(height uint64) (*comm.RemotePeer, error)
    func (s *GossipStateProviderImpl) filterPeers(predicate func(peer discovery.NetworkMember) bool) []*comm.RemotePeer
    func (s *GossipStateProviderImpl) hasRequiredHeight(height uint64) func(peer discovery.NetworkMember) bool
    func (s *GossipStateProviderImpl) addPayload(payload *proto.Payload, blockingMode bool) error
    func (s *GossipStateProviderImpl) commitBlock(block *common.Block) error
    
    //   gossip/state/state.go
    

    5.2.1、func (s *GossipStateProviderImpl) AddPayload(payload *proto.Payload) error
    func (s *GossipStateProviderImpl) AddPayload(payload *proto.Payload) error {
        blockingMode := blocking
        if viper.GetBool("peer.gossip.nonBlockingCommitMode") { //       
            blockingMode = false
        }
        return s.addPayload(payload, blockingMode)
    }
    //   gossip/state/state.go
    

    s.addPayload(payload,blockingMode)コードは以下の通りである.
    func (s *GossipStateProviderImpl) addPayload(payload *proto.Payload, blockingMode bool) error {
        height, err := s.committer.LedgerHeight()
        return s.payloads.Push(payload) //    
    }
    //   gossip/state/state.go
    

    5.2.2、func NewGossipStateProvider(chainID string, g GossipAdapter, committer committer.Committer, mcs api.MessageCryptoService) GossipStateProvider
    func NewGossipStateProvider(chainID string, g GossipAdapter, committer committer.Committer, mcs api.MessageCryptoService) GossipStateProvider {
        logger := util.GetLogger(util.LoggingStateModule, "")
    
        gossipChan, _ := g.Accept(func(message interface{}) bool {
            return message.(*proto.GossipMessage).IsDataMsg() &&
                bytes.Equal(message.(*proto.GossipMessage).Channel, []byte(chainID))
        }, false)
    
        remoteStateMsgFilter := func(message interface{}) bool {
            receivedMsg := message.(proto.ReceivedMessage)
            msg := receivedMsg.GetGossipMessage()
            connInfo := receivedMsg.GetConnectionInfo()
            authErr := mcs.VerifyByChannel(msg.Channel, connInfo.Identity, connInfo.Auth.Signature, connInfo.Auth.SignedData)
            return true
        }
    
        _, commChan := g.Accept(remoteStateMsgFilter, true)
    
        height, err := committer.LedgerHeight()
    
        s := &GossipStateProviderImpl{
            mcs: mcs,
            chainID: chainID,
            gossip: g,
            gossipChan: gossipChan,
            commChan: commChan,
            payloads: NewPayloadsBuffer(height),
            committer: committer,
            stateResponseCh: make(chan proto.ReceivedMessage, defChannelBufferSize),
            stateRequestCh: make(chan proto.ReceivedMessage, defChannelBufferSize),
            stopCh: make(chan struct{}, 1),
            stateTransferActive: 0,
            once: sync.Once{},
        }
    
        nodeMetastate := NewNodeMetastate(height - 1)
        b, err := nodeMetastate.Bytes()
        s.done.Add(4)
    
        go s.listen()
        go s.deliverPayloads() //   orderer    
        go s.antiEntropy() //     
        go s.processStateRequests() //        
        return s
    }
    //   gossip/state/state.go
    

    5.2.3、func (s *GossipStateProviderImpl) deliverPayloads()
    func (s *GossipStateProviderImpl) deliverPayloads() {
        defer s.done.Done()
    
        for {
            select {
            case s.payloads.Ready():
                for payload := s.payloads.Pop(); payload != nil; payload = s.payloads.Pop() {
                    rawBlock := &common.Block{}
                    err := pb.Unmarshal(payload.Data, rawBlock)
                    err := s.commitBlock(rawBlock)
                }
            case s.stopCh:
                s.stopCh  struct{}{}
                logger.Debug("State provider has been stoped, finishing to push new blocks.")
                return
            }
        }
    }
    
    func (s *GossipStateProviderImpl) commitBlock(block *common.Block) error {
        err := s.committer.Commit(block)
        nodeMetastate := NewNodeMetastate(block.Header.Number)
        b, err := nodeMetastate.Bytes()
        return nil
    }
    //   gossip/state/state.go
    

    5.2.4、func (s *GossipStateProviderImpl) antiEntropy()
    現在の高さとノードの最大高さの差をタイミングで取得
    func (s *GossipStateProviderImpl) antiEntropy() {
        defer s.done.Done()
    
        for {
            select {
            case s.stopCh:
                s.stopCh  struct{}{}
                return
            case time.After(defAntiEntropyInterval):
                current, err := s.committer.LedgerHeight()
                max := s.maxAvailableLedgerHeight() //    
                s.requestBlocksInRange(uint64(current), uint64(max))
            }
        }
    }
    //   gossip/state/state.go
    

    s.requestBlocksInRange(uint 64(current)、uint 64(max))コードは次のとおりです.
    func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64) {
        atomic.StoreInt32(&s.stateTransferActive, 1)
        defer atomic.StoreInt32(&s.stateTransferActive, 0)
    
        for prev := start; prev <= end; {
            next := min(end, prev+defAntiEntropyBatchSize) //    10 
            gossipMsg := s.stateRequestMessage(prev, next) //  GossipMessage_StateRequest
    
            responseReceived := false
            tryCounts := 0
    
            for !responseReceived {
                peer, err := s.selectPeerToRequestFrom(next) //  peer
                s.gossip.Send(gossipMsg, peer)
                tryCounts++
                select {
                case msg := s.stateResponseCh:
                    if msg.GetGossipMessage().Nonce != gossipMsg.Nonce {
                        continue
                    }
                    index, err := s.handleStateResponse(msg)
                    prev = index + 1
                    responseReceived = true
                case time.After(defAntiEntropyStateResponseTimeout):
                case s.stopCh:
                    s.stopCh  struct{}{}
                    return
                }
            }
        }
    }
    //   gossip/state/state.go
    

    index,err:=s.handleStateResponse(msg)コードは次のとおりです.
    func (s *GossipStateProviderImpl) handleStateResponse(msg proto.ReceivedMessage) (uint64, error) {
        max := uint64(0)
        response := msg.GetGossipMessage().GetStateResponse()
        for _, payload := range response.GetPayloads() {
            err := s.mcs.VerifyBlock(common2.ChainID(s.chainID), payload.SeqNum, payload.Data)
            err := s.addPayload(payload, blocking) //    
        }
        return max, nil
    }
    //   gossip/state/state.go
    

    committer詳細、参考:Fabric 1.0ソースノートのPeer#committer(提出者)
    5.2.5、func (s *GossipStateProviderImpl) listen()
    func (s *GossipStateProviderImpl) listen() {
        defer s.done.Done()
    
        for {
            select {
            case msg := s.gossipChan:
                //               
                go s.queueNewMessage(msg)
            case msg := s.commChan:
                logger.Debug("Direct message ", msg)
                go s.directMessage(msg)
            case s.stopCh:
                s.stopCh  struct{}{}
                return
            }
        }
    }
    //   gossip/state/state.go
    

    go s.queueNewMessage(msg)コードは次のとおりです.
    func (s *GossipStateProviderImpl) queueNewMessage(msg *proto.GossipMessage) {
        dataMsg := msg.GetDataMsg()
        if dataMsg != nil {
            err := s.addPayload(dataMsg.GetPayload(), nonBlocking) //    
        }
    }
    //   gossip/state/state.go
    

    10、MessageCryptoServiceインタフェース及び実現
    MessageCryptoServiceインタフェース定義:メッセージ暗号化サービス.
    type MessageCryptoService interface {
        //  Peer   PKI ID(Public Key Infrastructure,      )
        GetPKIidOfCert(peerIdentity PeerIdentityType) common.PKIidType
        //     
        VerifyBlock(chainID common.ChainID, seqNum uint64, signedBlock []byte) error
        //  Peer          
        Sign(msg []byte) ([]byte, error)
        //           
        Verify(peerIdentity PeerIdentityType, signature, message []byte) error
        //                 
        VerifyByChannel(chainID common.ChainID, peerIdentity PeerIdentityType, signature, message []byte) error
        //  Peer  
        ValidateIdentity(peerIdentity PeerIdentityType) error
    }
    //   gossip/api/crypto.go
    

    MessageCryptoServiceインタフェースの実装、すなわちmspMessageCryptoService構造体および方法:
    type mspMessageCryptoService struct {
        channelPolicyManagerGetter policies.ChannelPolicyManagerGetter //       ,type ChannelPolicyManagerGetter interface
        localSigner                crypto.LocalSigner //     ,type LocalSigner interface
        deserializer               mgmt.DeserializersManager //       ,type DeserializersManager interface
    }
    
    //  mspMessageCryptoService
    func NewMCS(channelPolicyManagerGetter policies.ChannelPolicyManagerGetter, localSigner crypto.LocalSigner, deserializer mgmt.DeserializersManager) api.MessageCryptoService
    //  s.getValidatedIdentity(peerIdentity)
    func (s *mspMessageCryptoService) ValidateIdentity(peerIdentity api.PeerIdentityType) error
    func (s *mspMessageCryptoService) GetPKIidOfCert(peerIdentity api.PeerIdentityType) common.PKIidType
    func (s *mspMessageCryptoService) VerifyBlock(chainID common.ChainID, seqNum uint64, signedBlock []byte) error
    func (s *mspMessageCryptoService) Sign(msg []byte) ([]byte, error)
    func (s *mspMessageCryptoService) Verify(peerIdentity api.PeerIdentityType, signature, message []byte) error
    func (s *mspMessageCryptoService) VerifyByChannel(chainID common.ChainID, peerIdentity api.PeerIdentityType, signature, message []byte) error
    func (s *mspMessageCryptoService) getValidatedIdentity(peerIdentity api.PeerIdentityType) (msp.Identity, common.ChainID, error)
    //   peer/gossip/mcs.go