AthenaからGoogle スプレッドシートへの連携


背景

デジタル広告事業でデータレイクとかデータマートみたいなことを引き続きやってます
もともとTableau Serverで可視化したくて作ったのですが、データレイクからのデータマートみたいなものはあるので、可視化の先は何でも良い訳です。

人も増え、要件によっては、スプレッドシートに出力したい、ということが増えました。

やったこと

  • Atehna向けのSQLを置いておくと日次でスプレッドシートが更新されていくやつ
    • SELECTされる1列目はシート名というルールにし、group by 単位でシート作成
    • SELECTした列の数だけ良い感じにシートに貼り付ける
    • 数値は数値型で、文字列は文字列型でシートに貼り付ける

SQLさえ書いてしまえば、それ以降の運用はエンジニアから手が離れる、というのは良い点かと思います。システム変更依頼ではなくデータ抽出依頼にする ことで、工数も心理的ハードルも下げられるのではないかと思ってます。
また、仕組上、既に運用で利用しているシートなどにそのまま乗っけられることもポイントかと思います。

INPUT

・スプレッドシートのシートのID
・SQLファイル

シートのIDは、URLから取れるやつです。
実際にはスプレッドシートのURLを貰って、こちらでシートのIDを抽出して、連携用に発行したサービスアカウントIDの編集権限を付けてから実施してます。

code

ShellScriptでAthenaの結果を取得し、その結果ファイルをPythonを通してスプレッドシートに反映させる感じにします。

抜粋.sh
# INPUT:QUERY=SQLのファイルがあるパス
# INPUT:SHEETID=スプレッドシートのID
TIMEOUT=20
INTERVAL=5
QUERYSTR=`cat $QUERY`
RES=`aws athena start-query-execution --query-string "${QUERYSTR}"  --result-configuration OutputLocation=s3://xxxxxxxxxx/xxxxxxx/xxxxxxx/`

for i in `seq 1 ${TIMEOUT}`
do
  if [ `aws athena get-query-execution --query-execution-id ${RES}|grep SUCCEEDED|wc -l` -ne 0 ];then
    break
  else

    if [ `aws athena get-query-execution --query-execution-id ${RES}|grep FAILED|wc -l` -ne 0 ];then
      aws athena get-query-execution --query-execution-id ${RES}|grep FAILED
      exit 1
    fi
    sleep ${INTERVAL}
  fi
done
S3PATH=`aws athena get-query-execution --query-execution-id ${RES} |grep RESULTCONFIGURATION|awk '{print $2}'`

OUTPUTFILEPATH=./output/${SHEETID}.csv
/usr/bin/aws s3 cp ${S3PATH} ${OUTPUTFILEPATH}
/usr/bin/python3 push_data.py  ${SHEETID}  ${OUTPUTFILEPATH}

日次処理を前提としているので、sleepのインターバルは適当です。

以下、Pythonのコードです。
色々ご容赦ください(特に、カラム数に対して横を決めるの、コードの可読性も含めた上手い方法ってあるんでしょうか。。。)

※既存のシートは消さず更新だけするため、SQLの内容次第ではSQLの結果=スプレッドシートの内容とならず不整合が起きるのでご注意ください。

push_data.py
import sys
import csv
import datetime
import gspread
from oauth2client.service_account import ServiceAccountCredentials

args = sys.argv

scope = ['https://spreadsheets.google.com/feeds',
         'https://www.googleapis.com/auth/drive']

TARGET_SHEET_KEY=args[1]
DATACSV=args[2]
credentials = ServiceAccountCredentials.from_json_keyfile_name('service.json', scope)
gc = gspread.authorize(credentials)
wkb = gc.open_by_key(TARGET_SHEET_KEY)
sheetname_set= set()
for sheet in wkb.worksheets():
    sheetname_set.add(sheet.title)

sheetnameset=set()
pre_sheetname="dummy"
idx=0
rowcount=0;
linecount=0;
rowalphabet={1 : 'A',2 : 'B',3 : 'C',4 : 'D',5 : 'E',6 : 'F',7 : 'G',8 : 'H',9 : 'I',10 : 'J',11 : 'K',12 : 'L',13 : 'M',14 : 'N',15 : 'O',16 : 'P',17 : 'Q',18 : 'R',19 : 'S',20 : 'T',21 : 'U',22 : 'V',23 : 'W',24 : 'X',25 : 'Y',26 : 'Z',27 : 'AA',28 : 'AB',29 : 'AC',30 : 'AD',31 : 'AE',32 : 'AF',33 : 'AG',34 : 'AH',35 : 'AI',36 : 'AJ',37 : 'AK',38 : 'AL',39 : 'AM',40 : 'AN',41 : 'AO',42 : 'AP',43 : 'AQ',44 : 'AR',45 : 'AS',46 : 'AT',47 : 'AU',48 : 'AV',49 : 'AW',50 : 'AX',51 : 'AY',52 : 'AZ'}
header=[]
isHeader=True;
with open(DATACSV, 'r', encoding='utf8') as f_in:
        reader = csv.reader(f_in)
        for line in reader:
            linecount+=1
            for i in range(len(line)):
                # 1列目はシート名が入っている必要がある
                sheetname = line[0]
                # 1行目はヘッダーが入っている必要がある
                if linecount == 1:
                    header.append(line[i])
                    continue
                if sheetname != pre_sheetname :
                    if pre_sheetname != "dummy" :
                        wks.update_cells(cell_list)
                    if not sheetname in sheetname_set :
                        wkb.add_worksheet(title=sheetname, rows=5000, cols=30)
                        sheetname_set.add(sheetname)
                    wks = wkb.worksheet(sheetname)
                    srange ='A1:'+rowalphabet[len(header)]+'3000'
                    cell_list = wks.range(srange)
                    idx=0
                    for f in header:
                        cell_list[idx].value = f
                        idx+=1

                sheetnameset.add(sheetname)
                c_val = line[i]

                try :
                    c_val = float(c_val)
                except :
                    try:
                        c_val = int(c_val)
                    except :
                        pass
                cell_list[idx].value = c_val #1列目はsheetname
                idx+=1
                pre_sheetname = sheetname
wks.update_cells(cell_list)

感想

いろいろ楽になりました。
ここから更にGoogleデータポータルで可視化、というのも出来ると思うので夢は広がります。