パンダへのStreammachine輸出

11162 ワード

このポストは、ストリームマシンプラットフォームを使用する方法に関する一連のポストの2番目です.始めましょう.
今回は、Googleマシンのストレージバケットにストリームマシンのイベントを取得する方法を見て、そこからパンダのデータグラムに.始めましょう.
私は、あなたがどこかでGoogleクラウドバケットを持っていると仮定します、そして、あなたはサービスアカウントを作成しました.
  • サービスアカウントを作成する
  • ポストでは、サービスアカウント資格情報をファイルに保存しましたcredentials.json ワーキングディレクトリで.ファイルの内容は以下のようなものです.
    {
      "type": "service_account",
      "project_id": "stream-machine-development",
      "private_key_id": "bae39......d6efda",
      "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG......3kS+0=\n-----END PRIVATE KEY-----\n",
      ...
    }
    

    シンクとエクスポートを作成する


    イベントデータをクラウドバケットに保存するには
  • いわゆるバケツ(名前、資格情報、パス)を設定するsink エンティティ.
  • そのシンクにストリームからバッチエクスポートを作成します.
  • strm create sink demo strm-demo --credentials-file=credentials.json
    {
      "ref": { "billingId": "strmbart5986941267", "name": "demo" },
      "sinkType": "GCLOUD",
      "bucket": { "bucketName": "strm-demo", "credentials": "..." }
    }
    
    # batch exporter for the encrypted stream at path demo-in on the bucket
    strm create batch-exporter demo --interval 30 --path-prefix demo-in --sink demo
    
    # batch exporter for the decrypted level 2 stream at path demo-2 on the bucket
    strm create batch-exporter demo-2 --interval 30 --sink demo --path-prefix demo-2
    {
      "ref": {
        "billingId": "strmbart5986941267",
        "name": "demo-demo-2"
      },
      "streamRef": {
        "billingId": "strmbart5986941267",
        "name": "demo-2"
      },
      "interval": "30s",
      "sinkName": "demo",
      "pathPrefix": "demo-2"
    }
    

    データの送信を開始する


    CLIはボード上にシミュレータを持っています.CLIのバージョン1.4.0を使用してください.そうでなければ、別のシミュレートされたイベントを取得します.
    strm sim run-random demo --interval 100
    Starting to simulate random streammachine/demo/1.0.2 events to stream demo.
    Sending one event every 100 ms.
    Sent 50 events
    Sent 100 events
    Sent 150 events
    Sent 200 events
    ...
    
    我々のバケツにデータを見るべきです.
    gsutil ls gs://strm-demo/demo-in | head -5
    gs://strm-demo/demo-in/2021-08-12T09:38:00-stream-69c9bbcd-98c2-40e6-b041-fe1af8498752---0-1-2-3-4.jsonl
    gs://strm-demo/demo-in/2021-08-12T09:38:30-stream-69c9bbcd-98c2-40e6-b041-fe1af8498752---0-1-2-3-4.jsonl
    gs://strm-demo/demo-in/2021-08-12T11:09:00-stream-69c9bbcd-98c2-40e6-b041-fe1af8498752---0-1-2-3-4.jsonl
    gs://strm-demo/demo-in/2021-08-12T11:09:30-stream-69c9bbcd-98c2-40e6-b041-fe1af8498752---0-1-2-3-4.jsonl
    gs://strm-demo/demo-in/2021-08-12T11:10:00-stream-69c9bbcd-98c2-40e6-b041-fe1af8498752---0-1-2-3-4.jsonl
    
    gsutil ls gs://strm-demo/demo-2 | head -5
    gs://strm-demo/demo-2/2021-08-12T09:38:00-stream-4932a907-1972-4982-8018-45a5cd648003---0-1-2-3-4.jsonl
    gs://strm-demo/demo-2/2021-08-12T09:38:30-stream-4932a907-1972-4982-8018-45a5cd648003---0-1-2-3-4.jsonl
    gs://strm-demo/demo-2/2021-08-12T09:39:00-stream-4932a907-1972-4982-8018-45a5cd648003---0-1-2-3-4.jsonl
    gs://strm-demo/demo-2/2021-08-12T11:09:00-stream-4932a907-1972-4982-8018-45a5cd648003---0-1-2-3-4.jsonl
    gs://strm-demo/demo-2/2021-08-12T11:09:30-stream-4932a907-1972-4982-8018-45a5cd648003---0-1-2-3-4.jsonl
    
    そして、ファイルのうちの1つで、JSONイベントを見ることができます.
    gsutil cat gs://strm-demo/demo-in/.....jsonl | head -1
    
    {
      "strmMeta": {
        "eventContractRef": "streammachine/example/1.3.0",
        "nonce": -1557054268,
        "timestamp": 1628761079144,
        "keyLink": "1d960e7a-4169-4bcf-bece-e8fcc3243c06",
        "billingId": "strmbart5986941267",
        "consentLevels": [ 0, 1 ]
      },
      "uniqueIdentifier": "AQq8Ihq3DBOahkZNXpBfdky8m04pb6c02RIUNOHo",
      "consistentValue": "AQq8IhqopLqWCpIDd1+xZUw/KCtXObL7irK5NbgE1I4=",
      "someSensitiveValue": "AQq8Ihq62mxCv1fqEZ+0bcijMEFZ/VFnpA4EEs8XRp0P",
      "notSensitiveValue": "not-sensitive-30"
    }
    
    
    The demo-2 ディレクトリに復号データが含まれます.
    gsutil cat gs://strm-demo/demo-2/.....jsonl | head -1
    {
      "strmMeta": {
        "eventContractRef": "streammachine/example/1.3.0",
        "nonce": -62169113,
        "timestamp": 1628761077936,
        "keyLink": "1f118159-a27a-4468-a540-23a4938bce14",
        "billingId": "strmbart5986941267",
        "consentLevels": [ 0, 1, 2 ]
      },
      "uniqueIdentifier": "unique-25",
      "consistentValue": "session-488",
      "someSensitiveValue": "AXHCR9j0KLyYy7Bivvrk+xfU0D4pRJkIlHAE/PtvtsPx",
      "notSensitiveValue": "not-sensitive-19"
    }
    

    Jupyterとパンダに


    次のステップは、JupyterノートブックのパンダのDataFrameにこれらを取得することです.あなたが必要credentials.json これはGoogleクラウドサービスのアカウント資格を定義します.
    Jupyterノートの本質は次のとおりです.
    pip install jupyter gcsfs pandas
    
    import pandas as pd
    from pandas import json_normalize
    import gcsfs
    import json
    
    # set these to your own project and bucket
    bucket = "strm-demo"
    project = "stream-machine-development"
    
    データを読み込む
    fs = gcsfs.GCSFileSystem(project=project, token="credentials.json")
    
    def one_object_to_df(path):
        with fs.open(path, "r") as _f:
            df = json_normalize([json.loads(l) for l in _f.readlines()])
            df['strmMeta.timestamp'] = pd.to_datetime(df['strmMeta.timestamp'], unit='ms')
            df = df.set_index('strmMeta.timestamp').sort_index()
        return df
    
    def make_df(folder):
        print(folder)
        df = None
        for f in fs.find(f"{bucket}/{folder}"):
            if df is None:
                df = one_object_to_df(f)
            else:
                df = df.append(one_object_to_df(f))
        return df
    
    demo = make_df("demo-in")
    demo_2 = make_df("demo-2")
    
    データを表示する
    demo.consistentValue.value_counts()
    
    AScCSOgnaoVZw2nqRtTSBlwV8pWe5R7SXXJcL1tXC5M=    55
    AV3MqegMMQ3Bikr1klqdNN+X+6rykyNtftZizyRKA6U=    52
    AQMdVpGbhAScpImKh1I5Lx1FLhb19N97/1reQhZd0ig=    50
    AV4eJeE0kNrUw5svSpJTUryc+C4ZZ/zCdZL++VEgY6g=    48
    AX2OWCN2zha+odokuX9rTDwOkM47lbNBGnMDPbJZieU=    48
                                                    ..
    AV8dQPY3mopfROkxlMDeLqcYAxA3qqbYG89J0SSEDio=    19
    AVn0ykPCKcX5vtT07DjoV+tcTgUeLmknOytDWzcPAQ==    19
    ATKFsJ5+fXeCnsLOc1k+IA+VSYtWR+wT7Iq/4IfwPjA=    19
    AXRHjzm9RTN60ocpLNVgMvdW8mSEKKg0/fezhT/We78=    19
    ARtzjnb15acL/xVvsS6dslv3M7A8WHUkCmy94LmK5g==    19
    Name: consistentValue, Length: 1000, dtype: int64