AzureFunctionsでCosmosDBに登録するAzureEventhubsのConsumer作成


GYAOのtsです。
我々のチームは、オールパブリッククラウドで、Microservice Architectureを採用した次期バックエンドを設計中です。

経緯

前回の投稿でPartitionKeyを設定することで、Eventhubsへ発行するeventの順序性が保証された。
あとはFunctionsでConsumerを作成してCosmosDBのDocumentをいじくってみるのみ。

やりたいこと

イベント発行される内容(bodyのjson)は下記の通り

Event
{
  "Id": "user1",
  "ContentsId": "content01",
  "Type": "ADD",
  "Time": 1500016145
}

TypeはADDとDELETEが発行される。
CosmosDBにはIdをKeyにしたドキュメントが下記のように登録される。
要はContents配列にADDされたり配列からDELETEされたりする。
※PkeyはCosmosDBのPartitionKey。

CosmosDBのDocument
{
  "Contents": [
    {
      "ContentsId": "content01",
      "Time": 1500016145
    },
    {
      "ContentsId": "content02",
      "Time": 1500016162
    }
  ],
  "Pkey": "reserve",
  "id": "user1"
}

統合設定

DocumentをId検索して、あるかないかで挙動が変わってくるため、下記の様になる。

また、今回はCosmosDBにもCosmosDBのPartitionKeyを指定しているので、下記のようにfunction.jsonでpartitionKeyを指定してあげないとエラーになる。

function.json

    {
      "type": "documentDB",
      "name": "inputDocument",
      "databaseName": "xxxxdb",
      "collectionName": "contents",
      "partitionKey": "reserve",
      "id": "{Id}",
      "connection": "xxxxdatabase_DOCUMENTDB",
      "direction": "in"
    },
    {
      "type": "documentDB",
      "name": "outputDocument",
      "databaseName": "xxxxdb",
      "partitionKey": "reserve",
      "collectionName": "contents",
      "createIfNotExists": false,
      "connection": "xxxxdatabase_DOCUMENTDB",
      "direction": "out"
    }

CosmosDB入力バインド

今回は明示的にPOCOにバインド。

#r "Newtonsoft.Json"

using System;
using System.Net;
using Newtonsoft.Json;

public class Input
{
    public string ContentsId { get; set; }
    public int Time { get; set; }
    public string Id { get; set; }
    public string Type { get; set; }
}

public class Content
{
    public string ContentsId { get; set; }
    public int Time { get; set; }
}

public class Document
{
    public List<Content> Contents { get; set; }
    public string Pkey { get; set; }
    public string id { get; set; }
}

TypeはADDとDELETEのみなのでEnumにした方がいいね。あとでそうします。

ADD

初回

Document自体がないので、作成する。


    if (input.Type == "ADD" && inputDocument == null) 
    {
        var doc = new Document()
        {
            id = input.Id,
            Pkey = "reserve",
            Contents = new List<Content>()            
        };
        var content = new Content()
        {
            ContentsId = input.ContentsId,
            Time = input.Time
        };
        doc.Contents.Add(content);
        outputDocument = doc;
    }

初回ではない場合(すでにDocumentがある場合)

ただ配列にADDするのみ

    else if (input.Type == "ADD" && inputDocument != null)
    {
        var content = new Content()
        {
            ContentsId = input.ContentsId,
            Time = input.Time
        };
        inputDocument.Contents.Add(content);
        outputDocument = inputDocument;
    }

DELETE

removeするのみ。ラムダ式ではないが。

    else if (input.Type == "DELETE" && inputDocument != null)
    {
        Content targetContent = inputDocument.Contents.Find(
               delegate(Content content) { return content.ContentsId == input.ContentsId; });
        inputDocument.Contents.Remove(targetContent);
        outputDocument = inputDocument;
    }

所感

順序性の部分を除けば、ごくごくシンプルなことしかしていない。
EventhubsのPartitionKeyとCosmosDBのPartitionKeyがあるが、同じにしても面白そうかなと思った。新しいサービス用に拡張するときにPartitionKeyを新たに作るみたいな形にするとかかな?複雑だから後で考よう。。。