StreamlitとGoogle BigQueryの接続方法 / Connect Streamlit to Google BigQuery


streamlitは素晴らしいサービス且つ、ドキュメントサイトも充実していて基本ここ見とけば不自由しないんですけど、DB接続周りだけ情報の更新が最新じゃなくてうまくいかなかったのでここにやり方をメモします。

Introduction & Create a BigQuery database

このガイドでは、StreamlitCloudからBigQueryデータベースに安全にアクセスする方法について説明します。google-cloud-bigqueryライブラリとStreamlitのシークレット管理を使用します。

Enable the BigQuery API

BigQueryへのプログラムによるアクセスは、Google CloudPlatformを介して制御されます。アカウントを作成するか、サインインして、APIとサービスのダッシュボードに移動します(求められた場合は、プロジェクトを選択または作成します)。以下に示すように、BigQueryAPIを検索して有効にします。また、読み込みを高速化するために、BigQuery Storage APIも検索して有効にすることをオススメします。



Create a service account & key file

StreamlitCloudのBigQueryAPIを使用するには、Google Cloud Platformサービスアカウント(プログラムによるデータアクセス用の特別なアカウントタイプ)が必要です。[サービスアカウント]ページに移動し、閲覧者権限でアカウントを作成します(これにより、アカウントはデータにアクセスできますが、変更はできません)。


Add the key file to your local app secrets

公式ドキュメントでは、.streamlit/secrets.tomlでアプリのルートディレクトリにあるファイルからシークレットを読み取る様に指示していますが、GCPの「IAMと管理」の「サービスアカウント」からJSON情報を発行しているので、JSONファイルでの読み込み、もしくはローカルで実行する場合に限り、そのまま辞書型で読み込んで良いと思います

※ このファイルを.gitignoreに記載して、コミットしない様に気をつけましょう

    gcs_service_account = {
      "type": "service_account",
      "project_id": "project-291031",
      "private_key_id": "464564c7f86786afsa453345dsf234vr32",
      "private_key": "-----BEGIN PRIVATE KEY-----\ndD\n-----END PRIVATE KEY-----\n",
      "client_email": "[email protected]",
      "client_id": "543423423542344334",
      "auth_uri": "https://accounts.google.com/o/oauth2/auth",
      "token_uri": "https://oauth2.googleapis.com/token",
      "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
      "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/d453/[email protected]"
    }

    # こんな感じの使い方のイメージ
    credentials = service_account.Credentials.from_service_account_info(gcs_service_account)
    bqstorage_client = BigQueryReadClient(credentials=credentials)

Copy your app secrets to the cloud

Streamlitは、HerokuやGCPなどサーバを自分で立てなくてもGithubにコードを載せるだけで、Streamlit Cloudというサービスを使ってデプロイすることができます。上記のsecrets.tomlファイルはGithubにコミットされていないため、そのコンテンツをデプロイされたアプリ(Streamlit Cloud上)に個別に渡す必要があります。アプリのダッシュボードに移動し、アプリのドロップダウンメニューで[シークレットの編集]をクリックします。の内容をsecrets.tomlテキスト領域にコピーします。

Add google-cloud-bigquery to your requirements file

GCP関連のパッケージをファイルに追加します。できれば、そのバージョンを固定します(インストールしたいバージョンrequirements.txtに置き換えます)。

pip install --upgrade 'google-cloud-bigquery[bqstorage,pandas]'
pyarrow==6.0.1
google-api-core==2.5.0
google-auth==2.6.0
google-auth-oauthlib==0.4.1
google-cloud-bigquery==2.34.0
google-cloud-bigquery-storage==2.12.0
google-cloud-core==2.2.2
google-crc32c==1.3.0
google-pasta==0.1.8
google-resumable-media==2.3.0
googleapis-common-protos==1.55.0
googletransx==2.4.2

Write your Streamlit app

以下のコードをStreamlitアプリにコピーして実行します。ここがハマりポイントなのですが、公式ドキュメントが記載してる@st.cache(ttl=600)とかではうまくいきません。こんな感じのエラーが出てきます。データ量が多いときにキャッシュとして持たせたいんですが、データベースと接続して接続してデータをキャッシュさせようとするとここのエラーが結構発生しました。

ここら辺の解決策として、streamlitのエラーメッセージで提案されるのは、@st.cache(allow_output_mutation=True)や、@st.cache(hash_funcs={pd.DataFrame: lambda _: None})などですが、どれも解決しません。ちゃんと調べていくと、streamlit v1.0から@st.experimental_memoが追加されて、DB関連のデータを返すものには、これを使うと良いという感じで紹介されており、クラウドでデプロイを試したらちゃんと動作しました。

import pyarrow as pa
from google.oauth2 import service_account
from google.cloud.bigquery_storage import types
from google.cloud.bigquery_storage import BigQueryReadClient

'''
ローカルで実行する場合
gcs_service_account = {
      "type": "service_account",
      "project_id": "project-291031",
      "private_key_id": "464564c7f86786afsa453345dsf234vr32",
      "private_key": "-----BEGIN PRIVATE KEY-----\ndD\n-----END PRIVATE KEY-----\n",
      "client_email": "[email protected]",
      "client_id": "543423423542344334",
      "auth_uri": "https://accounts.google.com/o/oauth2/auth",
      "token_uri": "https://oauth2.googleapis.com/token",
      "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
      "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/d453/[email protected]"
    }
'''

@st.experimental_memo
def fetch_bq_data():
    '''
    ローカルで実行する場合
    credentials = service_account.Credentials.from_service_account_info(gcs_service_account)
    '''
    
    #クラウド上で実行する場合
    credentials = service_account.Credentials.from_service_account_info(st.secrets["gcp_service_account"])
    bqstorage_client = BigQueryReadClient(credentials=credentials)
    
    gcp_project_id = 'gcp-project-id'
    project_id = "bigquery-public-data"
    dataset_id = "new_york_trees"
    table_id = "tree_species"
    table = f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"

    read_options = types.ReadSession.TableReadOptions(
        selected_fields=["species_common_name", "fall_color"]
    )

    parent = "projects/{}".format(gcp_project_id)

    requested_session = types.ReadSession(
        table=table,
        data_format=types.DataFormat.ARROW,
        read_options=read_options,
    )

    read_session = bqstorage_client.create_read_session(parent=parent, read_session=requested_session, max_stream_count=1)
    stream = read_session.streams[0]
    reader = bqstorage_client.read_rows(stream.name)

    frames = []
    for message in reader.rows().pages:
        frames.append(message.to_arrow())
    pa_table = pa.Table.from_batches(frames)
    dataframe = pa_table.to_pandas()
    return dataframe

参考文献

Connect Streamlit to Google BigQuery
st.experimental_memo
Optimize performance with st.cache
UnhashableTypeError: Cannot hash object of type _thread.RLock, found in the body of Scheme_Names(). #1627
キャディでの Streamlit 活用事例