NSQソース分析(1)-nsqdメッセージの生産

4082 ワード

NSQは、topicによって異なるメッセージキューを区別し、各topicは異なるchannelを有し、同じtopicの下の各メッセージは各channelにブロードキャストされる.
情報生産者から消費者への道
nsqはHTTPプロトコルとTCPプロトコルを同時にサポートし、クライアントはtcpを通じて特定のプロトコルを通じてnsqの指定topicにメッセージを発行したり、httpプロトコルの指定インタフェースを通じて発行したりすることができる.
まず、クライアントからNSQに送信されたメッセージのtopicについて見てみましょう.
topicからchannelへ
簡単なフローチャートを次に示します.
Alt text
httpでもtcpでもnsqd/topicが呼び出されます.go/Topic.PutMessageメソッド.内部にはmemoryMsgChanというBuffered Channelが入っています.bufferのサイズは構成によって設定され、bufferサイズを超えるメッセージはbackend、すなわちdiskqに書き込まれる.これでputメッセージの同期操作が完了する、残りの作業はこのtopicのコヒーレント非同期で完了し、このコヒーレントはnsqd/topicを実行する.go/Topic.MessagePumpメソッド.このメソッドのソースコードは次のとおりです.
// messagePump memoryMsgChan  diskq   message,      topic    Channel  。
func (t *Topic) messagePump() {    
    var msg *Message
    var buf []byte
    var err error
    var chans []*Channel
    var memoryMsgChan chan *Message
    var backendChan chan []byte

    t.RLock()
    for _, c := range t.channelMap {
        chans = append(chans, c)
    }
    t.RUnlock()

    if len(chans) > 0 {
        memoryMsgChan = t.memoryMsgChan
        backendChan = t.backend.ReadChan()
    }

    for {
        select {
        case msg =  0 {
                chanMsg = NewMessage(msg.ID, msg.Body)
                chanMsg.Timestamp = msg.Timestamp
                chanMsg.deferred = msg.deferred
            }
            if chanMsg.deferred != 0 {
                channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
                continue
            }
            err := channel.PutMessage(chanMsg)
            if err != nil {
                t.ctx.nsqd.logf(LOG_ERROR,
                    "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
                    t.name, msg.ID, channel.name, err)
            }
        }
    }

exit:
    t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
}


このコードは非常に簡単ですが、この非同期の操作は、スレッドプールに置いてコードを実行するなど、多くの伝統的な言語の実装とは異なります.
NSQのこの方式は,高同時環境では多くのロックを加えず,channelと単一コヒーレンス操作による重要なデータ構造の方式で実現される.channelは、各データ構造オブジェクト(高い同時操作が必要な関連データのセット)が作成されると、作成の初めにメンテナンス・ペイント(messagePump)を開始し、他のコンポーネントがこの構造に送信したメッセージ(対数データが必要な操作を含む)をselectで傍受し、競合なしにこのデータのセットを操作します.このような操作は、共有データに対するすべての操作をシリアル化し、ロックの大量使用を回避します.なお、ここでは、これらのデータに対するシリアル動作はいずれも読み書きデータ構造であり、また、他のchannelに書いて通信するなどの動作は、特に時間のかかる計算や同期のIOを避けるべきであり、そうしないとchannelのブロックを招くことになる.
これもgolangの下で同時開発された比較的一般的なモデルであり、golangが推奨する同期方式は共有メモリではなく通信であり、このモデルもこのような思想の体現である.詳しくはEffective Go-Concurrencyの部分を見てみましょう.
Share by communicating Concurrent programming is a large topic and there is space only for some Go-specific highlights here. Concurrent programming in many environments is made difficult by the subtleties required to implement correct access to shared variables. Go encourages a different approach in which shared values are passed around on channels and, in fact, never actively shared by separate threads of execution. Only one goroutine has access to the value at any given time. Data races cannot occur, by design. To encourage this way of thinking we have reduced it to a slogan: Do not communicate by sharing memory; instead, share memory by communicating. This approach can be taken too far. Reference counts may be best done by putting a mutex around an integer variable, for instance. But as a high-level approach, using channels to control access makes it easier to write clear, correct programs. One way to think about this model is to consider a typical single-threaded program running on one CPU. It has no need for synchronization primitives. Now run another such instance; it too needs no synchronization. Now let those two communicate; if the communication is the synchronizer, there's still no need for other synchronization. Unix pipelines, for example, fit this model perfectly. Although Go's approach to concurrency originates in Hoare's Communicating Sequential Processes (CSP), it can also be seen as a type-safe generalization of Unix pipes.
NSQコードもロックを使うことがわかりますが、いつロックを使うのか、いつchannelを使うのか.最も簡単な原則は、どちらを使うかは自然にどちらを使うか、どちらを簡単に使うか、どの効率が高いかということです.