MySQL、MongoDBへの接続


📍 n/a.ターゲット


f 92に接続してデータを抽出
非同期SQLAlchemyの使用
MongoDBに接続してデータを抽出
データをインポートするための簡単なAPIの作成と配置

📍 fastapi-sqlalchemyパッケージでMySQL DBに接続


FastAPI関連パッケージを収集したGithub Repoでは、fastapi-sqlalchemyパッケージを参照して使用します.

必要なパッケージのインストール

$ pip install fastapi-sqlalchemy  # https://github.com/mfreeborn/fastapi-sqlalchemy
$ pip install pymysql

モデル宣言


薬学とWorker DB Tableを作成しました
# models.py

from sqlalchemy import Column, BigInteger, SmallInteger, String, DateTime
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class Pharmacy(Base):
  __tablename__ = 'pharmacy'

  id = Column(BigInteger, primary_key=True, autoincrement=True)
  name = Column(String, nullable=False)
  address = Column(String, nullable=False)
  is_deleted = Column(SmallInteger, nullable=False)
  created_at = Column(DateTime, nullable=False)
  updated_at = Column(DateTime, nullable=False)

class Worker(Base):
  __tablename__ = 'worker'

  id = Column(BigInteger, primary_key=True, autoincrement=True)
  pharmacy_id = Column(BigInteger, nullable=False)
  type = Column(String, nullable=False)
  name = Column(String, nullable=False)
  is_deleted = Column(SmallInteger, nullable=False)
  created_at = Column(DateTime, nullable=False)
  updated_at = Column(DateTime, nullable=False)

fastapi-sqlalchemyの使用

# main.py

from fastapi import FastAPI
from fastapi_sqlalchemy import DBSessionMiddleware
from fastapi_sqlalchemy import db

from models import Pharmacy, Worker

HOSTNAME = ''
PORT = 
USERNAME = ''
PASSWORD = ''
DBNAME = ''
MYSQL_URL = f'mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DBNAME}'

app = FastAPI()

app.add_middleware(DBSessionMiddleware, db_url=MYSQL_URL)

@app.get('/worker')
async def select_worker():
  query = db.session.query(Worker)
  return query.all()

@app.get('/pharmacy')
async def select_pharmacy():
  query = db.session.query(Pharmacy)
  return query.all()

@app.get('/worker-pharmacy')
async def join_worker_pharmacy():
  query = db.session.query(
      Worker.id,
      Worker.pharmacy_id,
      Worker.name,
      Worker.type,
      Worker.created_at,
      Worker.updated_at,
      Worker.is_deleted,
      Pharmacy.id.label('ph-id'),
      Pharmacy.name.label('ph-name'),
      Pharmacy.address.label('ph-address'),
      Pharmacy.created_at.label('ph-created_at'),
      Pharmacy.updated_at.label('ph-updated_at')
  ).join(
      Pharmacy,
      Worker.pharmacy_id == Pharmacy.id
  )
  return query.all()

APIの検証

  • http://127.0.0.1:8000/worker
  • http://127.0.0.1:8000/pharmacy
  • http://127.0.0.1:8000/worker-pharmacy
  • 📍 誤り


    Timeout Error


    [エラー]
    データベースに接続し、ローカルでテストするときに必要な値を検証します.
    しかしAWS SAMによる配備で値のないエラーが発生した.
    [エラーの原因]
    Timeout Error
    [解決策]template.yamlファイルのTimeout値の変更

    Timeout最大値


    [エラー]template.yamlファイルでTimeout値を3600に変更し、配置コマンドを実行します.
    Resource handler returned message: "1 validation error detected: Value '3600' at 'timeout' failed to satisfy constraint: Member must have value less than or equal to 900
    [エラーの原因]
    Timeoutの最大値は900です
    [解決策]
    Timeout値を3600~900に変更

    DB Connection Error


    [エラー]
    ローカルからテストフェーズへの導入を完了します.
    ただし、/worker/pharmacy/worker-pharmacyのエンドポイントを実行中にエラーが発生しました.
    ログを表示すると、次の内容が表示されます.
    sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (2003, "Can't connect to MySQL server on ' My AWS RDS EndPoint Link ' (timed out)")
    [エラーの原因]
    データベースへのアクセス権がありません.
    [解決策]
    すべてがアクセス可能に変更されると、必要な結果が抽出されます.
    インバウンドルールに入れる必要があります.LambdaをVPCに入れる必要があります

    📍 非同期SQLAlchemyの使用


    FastAPIの利点の1つは、Fastapi-sqlalchemyを使用している場合でも、DBに関連するAPIは「同期」して実行できることです.そこで、非同期でデータベース関連APIを作成したいと思います.

    必要なパッケージのインストール

    $ pip install 'sqlalchemy[asyncio]'
    $ pip install aiomysql

    モデル宣言


    既存と同じ

    Async SQLAlchemyの使用

  • SQLAlchemy 2.0スタイルで作成する必要があります.
  • # main.py
    
    from asyncio import current_task
    from fastapi import FastAPI
    from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_scoped_session
    from sqlalchemy.future import select
    from sqlalchemy.orm import sessionmaker
    
    from models import Pharmacy, Worker
    
    HOSTNAME = ''
    PORT = 
    USERNAME = ''
    PASSWORD = ''
    DBNAME = ''
    MYSQL_URL = f'mysql+aiomysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DBNAME}'
    
    app = FastAPI()
    
    engine = create_async_engine(MYSQL_URL, echo=True)
    async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
    session = async_scoped_session(async_session, scopefunc=current_task)
    
    @app.get('/worker')
    async def select_worker():
      query = select(Worker)
      result = await session.execute(query)
      return result.scalars().all()
    
    @app.get('/pharmacy')
    async def select_pharmacy():
      query = select(Pharmacy)
      result = await session.execute(query)
      return result.scalars().all()
    
    @app.get('/worker-pharmacy')
    async def join_worker_pharmacy():
      query = select(
          Worker.id,
          Worker.pharmacy_id,
          Worker.name,
          Worker.type,
          Worker.created_at,
          Worker.updated_at,
          Worker.is_deleted,
          Pharmacy.id.label('ph-id'),
          Pharmacy.name.label('ph-name'),
          Pharmacy.address.label('ph-address'),
          Pharmacy.created_at.label('ph-created_at'),
          Pharmacy.updated_at.label('ph-updated_at')
      ).join(
          Pharmacy,
          Worker.pharmacy_id == Pharmacy.id
      )
      result = await session.execute(query)
      return result.all()

    APIの検証

  • http://127.0.0.1:8000/worker
  • http://127.0.0.1:8000/pharmacy
  • http://127.0.0.1:8000/worker-pharmacy
  • リファレンス

  • https://blog.neonkid.xyz/269
  • https://www.hides.kr/1101
  • https://towardsdatascience.com/build-an-async-python-service-with-fastapi-sqlalchemy-196d8792fa08
  • 📍 誤り


    パッケージエラー


    [エラー]
    ValueError: the greenlet library is required to use this function. No module named 'greenlet’
    [エラーの原因]
    SQLAlchemy 1.4.x上でasync関連タスクを実行するにはgreenletパッケージが必要です.
    [解決策]
    SQLAlchemyパッケージをインストールすると、asyncに関連するパッケージが同時にインストールされます.
    # asyncio 안 붙이면 greenlet 설치 안 됨
    $ pip install sqlalchemy
    ...
    Installing collected packages: sqlalchemy
    Successfully installed sqlalchemy-1.4.35
    
    # 이와 같이 설치해야 함
    $ pip install 'sqlalchemy[asyncio]'
    ...
    Installing collected packages: sqlalchemy, greenlet
    Successfully installed greenlet-1.1.2 sqlalchemy-1.4.35

    📍 pymongoでMongoDBに接続する


    MySQLとともにMongoDBに接続されています.

    パッケージのインストール

    $ pip install pymongo

    データベースの作成とデータ・マウント


    このセクションをスキップ

    pymongoの使用

    from asyncio import current_task
    from fastapi import FastAPI
    from pymongo import MongoClient
    
    app = FastAPI()
    
    HOST = ''
    PORT = 
    
    client = MongoClient(HOST, PORT)
    # client = MongoClient(f'mongodb://{HOST}:{PORT}')
    
    db = client['mediscountDB']
    # db = client.mediscountDB
    
    @app.get('/mongo')
    async def get_users_in_mongo():
      users = db['users']
      return list(users.find().limit(10))

    TODO


    ▼Python MongoDB ORMチェック

    📍 の最後の部分


    MySQLとMongoDBを簡単に接続まず、保存したデータが抽出されたかどうかを確認し、次により多くの機能を使用してみます.
    「非同期SQLAlchemy」の部分を使うと、よく知っているdb.session.query()の文が書けないのが気まずいと思います.2.0バージョンを続けたら、慣れると思います:)
    pymongoは初めての使用ですが、MongoDBの文法にまだ疎いせいか、見慣れない感じです.しかし、書き続けると慣れてくると思います.