AzureコスモスDBにおける変化フィードプルモデルの第一歩


AzureコスモスDBのあなたの好きな特徴は何ですか?私は変更フィードが一番好きです.変更フィードは非常に柔軟な機能であり、我々は時々超高性能保証オーダーキューとしてそれを使用します.
公式文書には様々なユースケースがあります.少なくとも一度それを読んでください.
  • Change feed design patterns in Azure Cosmos DB | Microsoft Docs
  • 変更フィードの美しさは、コンテナーが存在する限り更新される順序でデータを読み取ることができ、APIがプルモデルであるため、複数の読者が同時に変更フィードを読むことができます.
    この記事では、変更フィードを読む新しい方法についてお話します.

    プッシュモデル(変更フィードプロセッサ)


    最も一般的に使用される変更フィードのメソッドはプッシュモデルです.
    チェンジフィードプロセッサとアジュール機能では、変更フィードを読み込むタイミングで、メソッドと機能が実行され、プッシュモデルとなる.
  • Change feed processor in Azure Cosmos DB | Microsoft Docs
  • プッシュモデルはこれを行うにはとても簡単な方法ですが、変更フィードからどれだけ読むかを制御する場合に使用されます.
    私の例では、変更フィードから読み込んだデータの量と頻度を制御し、データレイクストレージGen 2に書き込んだ.

    プルモデル(イテレータ形式)


    Pushモデルでカバーされていないユースケースでは、NET用のCosmos DB SDK V 3は、変更フィードを読むためのプルモデルを実装します.
  • Change feed pull model in Azure Cosmos DB | Microsoft Docs
  • プルモデルは、変更フィードプロセッサに比べて非常に薄いAPIですが、開発者はより多くの制御と柔軟性を持つことができます.
    おそらく、あなたがプルモデルを使用してコードを見るならば、それは理解するのがより簡単でしょう.
    class Program
    {
        static async Task Main(string[] args)
        {
            var connectionString = "AccountEndpoint=https://***.documents.azure.com:443/;AccountKey=***;";
    
            var cosmosClient = new CosmosClient(connectionString, new CosmosClientOptions
            {
                SerializerOptions = new CosmosSerializationOptions
                {
                    PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase
                }
            });
    
            var container = cosmosClient.GetContainer("HackAzure", "TodoItem");
    
            // Read the Change Feed from the current time
            var iterator = container.GetChangeFeedIterator<TodoItem>(ChangeFeedStartFrom.Now());
    
            // HasMoreResults will always be true
            while (iterator.HasMoreResults)
            {
                try
                {
                    // Read data from Change Feed
                    var items = await iterator.ReadNextAsync();
    
                    foreach (var todoItem in items)
                    {
                        Console.WriteLine($"{DateTime.Now}: {todoItem.Id},{todoItem.Title},{todoItem.Body}");
                    }
                }
                catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotModified)
                {
                    // If Change Feed is empty, an exception will be thrown with StatusCode set to NotModified
                    await Task.Delay(5000);
                }
            }
        }
    }
    
    基本的な使用法はGetChangeFeedIterator<T> メソッドとループを返しますFeedIterator<T> .
    変更フィードの実際の読み込みはReadNextAsync . このメソッドを実行する頻度で変更フィードを読む速度を制御できます.

    私は実際に変更フィードの動作をチェックしています.既定では、プルモデルはコンテナ全体の変更フィードを読み取ります.

    変更開始位置


    サンプルコードでは、現在の時刻以降に更新されたデータから変更フィードを読み込むように設定しました.変更フィードの開始を設定する方法はいくつかあります.ChangeFeedStartFrom クラス.
    以下の4つの方法がある.
  • Beginning
  • ContinuationToken
  • Now
  • Time
  • を除くContinuationToken , 設定の残りの部分は、変更フィードプロセッサまたはAzure関数を使用してプッシュモデルを使用することとは大きく異なりません.The ContinuationToken 後述する.
    その前に、オーバーロードされた方法について少し説明しましょう.

    全法以外ContinuationToken 受け入れる過負荷があるFeedRange . The FeedRange コスモスDBの物理的なパーティションを表します.すなわち、並列処理の単位でもある.
    あなたがそれを知っているならばLeases プッシュモデルでは、各物理パーティションごとに作成され、それは同じであることを理解しやすいです.
    プルモデルは薄いラッパーであるので、変更フィードの並列処理は少し複雑ですが、代わりに変更フィードで特定のパーティションキーのデータだけを読むことができました.
    // Read only the data whose partition key is "shibayan"
    var startFrom = ChangeFeedStartFrom.Now(FeedRange.FromPartitionKey(new PartitionKey("shibayan")));
    var iterator = container.GetChangeFeedIterator<TodoItem>(startFrom);
    
    を使用してFeedRange.FromPartitionKey 特定のパーティションキーの変更フィードを使用してデータを読み込むことができます.これはプッシュモデルでは不可能でした.

    ハンドル継続トークン


    最後に、継続トークンを説明します.プルモデルはリースモデルのようなリースモデルを使用して読み取り位置を管理する機能を提供しません.あなたは自分でリースDBに対応する機能を作成する必要があります.
    class Program
    {
        static async Task Main(string[] args)
        {
            var connectionString = "AccountEndpoint=https://***.documents.azure.com:443/;AccountKey=***;";
    
            var cosmosClient = new CosmosClient(connectionString, new CosmosClientOptions
            {
                SerializerOptions = new CosmosSerializationOptions
                {
                    PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase
                }
            });
    
            var container = cosmosClient.GetContainer("HackAzure", "TodoItem");
    
            // Read the previous continuation token
            var continuationToken = LoadContinuationToken();
    
            // If there is no continuation token, start from the current time
            var startFrom = continuationToken != null ? ChangeFeedStartFrom.ContinuationToken(continuationToken) : ChangeFeedStartFrom.Now();
            var iterator = container.GetChangeFeedIterator<TodoItem>(startFrom);
    
            while (iterator.HasMoreResults)
            {
                try
                {
                    var items = await iterator.ReadNextAsync();
    
                    // Always update the continuation token after reading the Change Feed
                    continuationToken = items.ContinuationToken;
    
                    foreach (var todoItem in items)
                    {
                        Console.WriteLine($"{DateTime.Now}: {todoItem.Id},{todoItem.User.Id}");
                    }
                }
                catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotModified)
                {
                    // Update only if there is no continuation token
                    continuationToken ??= ex.Headers.ContinuationToken;
                    break;
                }
            }
    
            // Save the last obtained continuation token
            SaveContinuationToken(continuationToken);
        }
    
        private static string LoadContinuationToken()
        {
            // TODO: Need to implement your own.
        }
    
        private static void SaveContinuationToken(string continuationToken)
        {
            // TODO: Need to implement your own.
        }
    }
    
    ロードとセーブ継続トークンはユースケースに従って実装する必要があります.pushモデルに似たリースdbを作成することでこれを解決しました.
    ワンタイムデータマイグレーションなどの場合を除き、連続トークンを格納する必要がある.
    コスモDBの変更フィードをお楽しみください!