【kafka KSQL】ゲームログ統計分析(1)


【kafka KSQL】ゲームログ統計分析(1)
ゲームの決済ログを例にとって、KSQLを利用してログを統計的に分析するプロセスを示します。
スタートアップ
cd ~/Documents/install/confluent-5.0.1/

bin/confluent start
kafkaのテーマリストを表示します。
bin/kafka-topics --list --zookeeper localhost:2181
ゲームの決済を受けるログを作成するtopic
bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic score-normalized
生産者命令ツールを使ってtopicに日誌を書きます。
bin/kafka-console-producer --broker-list localhost:9092 --topic score-normalized

> 

{"cost":7, "epoch":1512342568296,"gameId":"2017-12-04_07:09:28_  1 _200_015_185175","gameType":"situan","gamers": [{"balance":4405682,"delta":-60,"username":"0791754000"}, {"balance":69532,"delta":-60,"username":"70837999"}, {"balance":972120,"delta":-60,"username":"abc6378303"}, {"balance":23129,"delta":180,"username":"a137671268"}],"reason":"xiayu"}
消費者コマンドラインツールを使ってログが正常に書き込まれているかを確認します。
bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic score-normalized --from-beginning

;;     

{"cost":7, "epoch":1512342568296,"gameId":"2017-12-04_07:09:28_  1 _200_015_185175","gameType":"situan","gamers": [{"balance":4405682,"delta":-60,"username":"0791754000"}, {"balance":69532,"delta":-60,"username":"70837999"}, {"balance":972120,"delta":-60,"username":"abc6378303"}, {"balance":23129,"delta":180,"username":"a137671268"}],"reason":"xiayu"}
KSQLクライアントを起動します
bin/ksql http://localhost:8088
ksql起動後のアイコンと操作端末が見えます。
ksql端末はkafka topicリストを調べます。
ksql> show topics;
topicのメッセージを印刷します。
PRINT 'score-normalized';
見られます
Format:STRING
19-1-5   11 59 31  , NULL , {"cost":7, "epoch":1512342568296,"gameId":"2017-12-04_07:09:28_\xE9\xAB\x98\xE6\x89\x8B1\xE5\x8C\xBA_200_015_185175","gameType":"situan","gamers": [{"balance":4405682,"delta":-60,"username":"0791754000"}, {"balance":69532,"delta":-60,"username":"70837999"}, {"balance":972120,"delta":-60,"username":"abc6378303"}, {"balance":23129,"delta":180,"username":"a137671268"}],"reason":"xiayu"}
その中:
  • 最初のカンマ19-1-5 11 59 31 はメッセージ時間を表しています。
  • 番目のコンマNULLは、kafka-console-producerからプッシュされたもので、デフォルトはNULLです。
  • の後ろには、押してきたメッセージの内容があります。
  • topic score-normalizedからStreamを作成します。
    CREATE STREAM SCORE_EVENT \
     (epoch BIGINT, \
      gameType VARCHAR, \
      cost INTEGER, \
      gamers ARRAY< \
                  STRUCT< \
                          username VARCHAR, \
                          balance BIGINT, \
                          delta BIGINT \
                          > \
                   >, \
      gameId VARCHAR, \
      tax BIGINT, \
      reason VARCHAR) \
      WITH ( KAFKA_TOPIC='score-normalized', \
             VALUE_FORMAT='JSON', \
             TIMESTAMP='epoch');
    TIMESTAMP='epoch'は、epochの時間をイベントとするタイムスタンプを表す。
    STREAMを一つ削除します
    DROP  STREAM stream_name ;
    クエリ文がストリームを調べていると、エラーが発生します。
    Cannot drop USER_SCORE_EVENT. 
    The following queries read from this source: []. 
    The following queries write into this source: [CSAS_USER_SCORE_EVENT_2, InsertQuery_4, InsertQuery_5, InsertQuery_3]. 
    You need to terminate them before dropping USER_SCORE_EVENT.
    TERMINATEコマンドでこれらのクエリステートメントを停止し、ストリームを削除する必要があります。
    TERMINATE CSAS_USER_SCORE_EVENT_2;
    TERMINATE InsertQuery_4;
    最初のレコードから検索を開始します。
    ksql> SET 'auto.offset.reset' = 'earliest';
    Streamからすべてのデータを調べます。
    ksql> SELECT * FROM SCORE_EVENT;
    見られます
    1546702389664 | null | 1512342568296 | situan | 7 | [{USERNAME=0791754000, BALANCE=4405682, DELTA=-60}, {USERNAME=70837999, BALANCE=69532, DELTA=-60}, {USERNAME=abc6378303, BALANCE=972120, DELTA=-60}, {USERNAME=a137671268, BALANCE=23129, DELTA=180}] | 2017-12-04_07:09:28_  1 _200_015_185175 | null | xiayu
    その中:
  • 第1列は記録のタイムスタンプです。
  • 第2列は記録のkeyである。
  • 第3列以降は、メッセージ内の各フィールドの値であり、ストリームを作成する際の順序に対応する。
  • 最後から2番目の列のnullは、メッセージ中のtaxフィールドが存在しないからです。
  • 統計2017-12-04日の対局総数
    ;;     game_date  ,    
    CREATE STREAM SCORE_EVENT_WITH_DATE AS \
        SELECT SUBSTRING(gameId, 0, 10) AS game_date, * \
        FROM SCORE_EVENT;
        
    SELECT game_date, COUNT(*) \
        FROM SCORE_EVENT_WITH_DATE \
        WHERE game_date = '2017-12-04' AND reason = 'game' \
        GROUP BY game_date;
    
    現在KSQLはまだ以下のような問い合わせをサポートしていません。
    SELECT COUNT(*) \
      FROM SCORE_EVENT \
      WHERE gameId LIKE '2017-12-04_%';
    対局に参加する総プレイヤー数を集計します。
    一つのログには複数のプレイヤーの対局情報が含まれていますので、各プレイヤーを個別のイベントに分割したいです。
  • は、各プレイヤーのイベントを統合して、同じストリームUSER_SCORE_EVENT:
  • に進む。
    CREATE STREAM USER_SCORE_EVENT AS \
        SELECT epoch, gameType, cost, gameId, tax, reason, gamers[0]->username AS username, gamers[0]->balance AS balance, gamers[0]->delta AS delta \
        FROM SCORE_EVENT;
        
    INSERT INTO USER_SCORE_EVENT \
        SELECT epoch, gameType, cost, gameId, tax, reason, gamers[1]->username AS username, gamers[1]->balance AS balance, gamers[1]->delta AS delta \
        FROM SCORE_EVENT;
        
    INSERT INTO USER_SCORE_EVENT \
        SELECT epoch, gameType, cost, gameId, tax, reason, gamers[2]->username AS username, gamers[2]->balance AS balance, gamers[2]->delta AS delta \
        FROM SCORE_EVENT;
        
    INSERT INTO USER_SCORE_EVENT \
        SELECT epoch, gameType, cost, gameId, tax, reason, gamers[3]->username AS username, gamers[3]->balance AS balance, gamers[3]->delta AS delta \
        FROM SCORE_EVENT;
  • は、ユーザー名usernameの接続JOINクエリを後続するために、Key:
  • を再設定する必要があります。
    CREATE STREAM USER_SCORE_EVENT_REKEY AS \ 
    SELECT * FROM USER_SCORE_EVENT \
    PARTITION BY username;
    出力:
    ksql> SELECT * FROM USER_SCORE_EVENT_REKEY;
    
    
    4000 | lzc | 4000 | situan | 7 | 2017-12-04_07:09:28_  2 _500_015_185175 | null | game | lzc | 972120 | -60
    4000 | lzb | 4000 | situan | 7 | 2017-12-04_07:09:28_  2 _500_015_185175 | null | game | lzb | 69532 | -60
    注意:
    実践の過程で、STREAMのfieldに対してPATION BYを行うだけで発効することができます。
  • は各プレイヤーの対局数、勝ち負け総数、貢献の総税金を統計して、表USER_SCORE_TABLE:
  • を作成します。
    CREATE TABLE USER_SCORE_TABLE AS \
        SELECT username, COUNT(*) AS game_count, SUM(delta) AS delta_sum, SUM(tax) AS tax_sum \
        FROM USER_SCORE_EVENT_REKEY \
        WHERE reason = 'game' \
        GROUP BY username;
    USER_SCORE_TABLEのすべてのデータを参照してください。
    ksql> SELECT * FROM USER_SCORE_TABLE;
    1546709338711 | 70837999 | 70837999 | 4 | -240 | 0
    1546709352758 | 0791754000 | 0791754000 | 4 | -240 | 0
    1546709338711 | a137671268 | a137671268 | 4 | 720 | 0
    1546709352758 | abc6378303 | abc6378303 | 4 | -240 | 0
  • は、あるプレーヤーの対局数、勝ち負け総数、貢献の総税金を調べます。
    ksql> SELECT * FROM USER_SCORE_TABLE WHERE username = '70837999';
    出力:
    1546709338711 | 70837999 | 70837999 | 4 | -240 | 0
    合計プレイヤー数(デバッグ)
  • 統計用の を追加します。
  • CREATE TABLE USER_SCORE_WITH_TAG AS \
        SELECT 1 AS tag, * FROM USER_SCORE_TABLE;
  • 統計によると、重いプレイヤー数は
  • です。
    SELECT tag, COUNT(username) \
    FROM USER_SCORE_WITH_TAG \
    GROUP BY tag;
    続きをつける
    KSQL WINDOW機能。