ストリームを見る


last story , 私は分散クラウド計算をどのように観測するかを論じた.ここでは、私は少し類似した話題に集中します.主に、どのようにあなたのシステムに流れるデータのストリームを観察する方法、正確にどのように特定の瞬間には、データが流れていないことを決定するために、我々は後処理を行う準備ができています.
これはおそらく、ストリームを扱う一般的なアプローチと矛盾しているようです.しかし、以下の状況を考えてみましょう.
  • 最初のイベントは、ストリームにデータをプッシュするための一組の生産者を呼び出す
  • プロデューサーは同時にデータをストリームにプッシュします.それから、消費者がS 3のような若干の永続記憶装置に記録を保存すると仮定してください
  • すべての期待されたデータが永続的なストレージにある場合にのみ、次の変換を実行できます
  • 非常に単純なアプローチは、生産者の実行を追跡することですprevious story ). 番目のステップは、すべての生産者が計算を終了する場合にのみ実行されます.畝
    このアプローチの主な欠点は、ストリームを通して記録を通過し、永久記憶装置にそれらを格納することに関連するどんな遅延も考慮に入れていない.これは、最終的に、部分的に完成したデータセットのコンピューティング集約につながることができました.
    良いアイデアは、ストレージに永続的に格納されているレコードの数を追跡するオブザーバーを使用することができます.
    以下のアーキテクチャを考えてみましょう.
    図全体を以下に示す

    我々はいくつかのデータを生成し、キネシスファイヤーホースにそれらをプッシュラムダの束を持っている.これをS 3バケットに保存する.
    ファイアウォールがS 3バケットで新しいオブジェクトを作成するたびに、モニタラムダがトリガされます.我々はそのようなイベントをカウントし、DynamoDBテーブルのカウンタを更新します.レコードの主キーは観測されたバケツ名です.
    そのようなイベントのプロトタイプハンドラは次のように定義できます.
    import boto3
    import os
    
    dynamodb = boto3.resource('dynamodb')
    
    observerTableName = os.environ.get('observerTableName')
    table = dynamodb.Table(observerTableName)
    
    
    def monitor(event, context):
        for record in event.get('Records', []):
            if record.get('eventName', '') == 'ObjectCreated:Put':
                bucketName = record['s3']['bucket']['name']
                table.update_item(
                    Key={'id': bucketName},
                    UpdateExpression='ADD num_records :val',
                    ExpressionAttributeValues={':val': 1}
                )
    
    
    図の2番目の部分がオブザーバーです.
    このラムダは、SQSキューを使用して、再帰的に実行し、ストレージカウンタDynamOdbテーブルの内容を読み込みます.
    オブザーバの背後にあるアルゴリズムは非常に簡単であり、以下のダイアグラムで説明できる

    これは次のように実装できます
    def observer(event, context):
        for record in event['Records']:
            payload = record['body']
            message = json.loads(payload)
            if message.get('repeated', 0) >= MAX_NUM_REPEAT:
                # call external service, ready to handle the data in storage.
                print(f'{message=} finished - calling external service')
                continue
    
            res = table.get_item(Key={'id': message['bucket']})
            if 'Item' in res:
                item = res['Item']
                num_records = int(item['num_records'])
                if num_records == message['last_num_records']:
                    message['repeated'] += 1
                else:
                    message['repeated'] = 0  # Reset the repeat counter.
                message['last_num_records'] = num_records
                sqs.send_message(QueueUrl=os.environ['selfSQSURL'], MessageBody=json.dumps(message), DelaySeconds=30)
    
    以下の構造のSQSメッセージで
    {
      "repeated": 0,
      "last_num_records": 0,
      "bucket": "bucket name"
    }
    
    つのパラメータを調整する必要があります.最初は観測ウィンドウの長さ(上記のコードで宣言されている)ですMAX_NUM_REPEAT ). 2番目のパラメータはDynamODBテーブルからの読み込み間の遅延です30 秒.
    これら二つのパラメータについてコメントします.
    生産者が遅いプロセスであり、我々があまりに速い(短い遅れ時間で)サンプルをとるならば、我々はプロセスを終えなければならないと誤って考えることができます.
    一方、プロデューサーが速いプロセスならば、我々は不必要に待つことができますMAX_NUM_REPEAT * delay 観測者がデータが準備される通知を送る前の秒.
    遅延時間に異なる最適化戦略を使用できます.

  • 初期遅延時間をrepeated カウンター
    delay = 30 if repeated == 0 else int(30 / repeated)
    

  • 指数関数の使用
    delay = numpy.ceil(30*numpy.exp(-repeated)).astype(int)
    
  • それは、パラメタが適切である生産プロセスの性質に依存します.
    私がここで提示した方法はデータ処理において有用であり、ここではデータストレージの準備ができた後(例えばS 3、エラスティックサーチなど)、後処理タスクを実行する必要があります.
    それは適用できるようにいくつかのチューニングを必要とし、1つだけでなく、どのように偽の場合に対処するために考慮する必要があります.
    カバー画像について,ベルギー,ルーベン近郊のディル川