CloudPubSubでhashデータをpublishするときにハマった


Cloud Pubsubにhashでメッセージを送ったときに、全部文字列になってハマったよって話

結論

hashデータはto_jsonするなどしてstring化してpublishしよう

何が起きたか

RubyでPubsubにpublishする際、色んなパラメータを渡したいのでHashに色々突っ込んで送っていた


Google::Cloud::Pubsub(params).topic(:hoge_topic).publish data

これでpubsubが普通にkickできるのだが、Cloud Functionsのattributesで受け取った際にevent['attributes']['hoge'] == Trueのような比較で想定通りに通らなくなった。

よくよくデータを見てみたところ、Ruby側でHashを送るとkey:valueのペアごとにstring化されてしまっている。

publish側

require 'google/cloud/pubsub'

pubsub = Google::Cloud::Pubsub.new({ project_id: 'consulting-cloud-dev', credentials: 'credential.json'})
pubsub.topic(:hoge_topic).publish({ 'true': true, 'false': false, 'nil': nil, 'num': 1, 'float': 3.4 })

subscribe側 (python on Cloud Functions)

def hello_pubsub(event, context):
    print(event['attributes'])

Stackdriver上のログ

{'false': 'false', 'float': '3.4', 'nil': '', 'num': '1', 'true': 'true'}

全部''で囲われていて文字列扱いになっている。うーん。

ライブラリの実装を見る

ライブラリ側の実装を見ると、Hashの場合はそれぞれstring化されてる。
https://github.com/googleapis/google-cloud-ruby/blob/master/google-cloud-pubsub/lib/google/cloud/pubsub/batch_publisher.rb#L91

コメントにはprotobuf definitionと書いてあるのでprotobufを想定しているということなのか。
なるほどー。確かにJSONとは限らないですね。

def create_pubsub_message data, attributes
  attributes ||= {}
  if data.is_a?(::Hash) && attributes.empty?
    attributes = data
    data = nil
  end
  # Convert IO-ish objects to strings
  if data.respond_to?(:read) && data.respond_to?(:rewind)
    data.rewind
    data = data.read
  end
  # Convert data to encoded byte array to match the protobuf defn
  data_bytes = \
    String(data).dup.force_encoding(Encoding::ASCII_8BIT).freeze

  # Convert attributes to strings to match the protobuf definition
  attributes = Hash[attributes.map { |k, v| [String(k), String(v)] }]

  Google::Pubsub::V1::PubsubMessage.new data: data_bytes,
                                        attributes: attributes
end

地味に何故メッセージ送ってるのに、Cloud Functions側でmessageに入らずattributesに入ってくるのかもここで把握。
この関数で切り替わってたのか。

json化すればok

to_jsonつけた

require 'google/cloud/pubsub'

pubsub = Google::Cloud::Pubsub.new({ project_id: 'consulting-cloud-dev', credentials: 'credential.json'})
pubsub.topic(:hoge_topic).publish({ 'true': true, 'false': false, 'nil': nil, 'num': 1, 'float': 3.4 }.to_json)

subscribe側のログ

{"true":true,"false":false,"nil":null,"num":1,"float":3.4}

nilはnullになってるし、値も元の型になってる。

messageはstring化して送りましょう。