pd.DataFrameのto_sql()で失敗した行をスキップする方法について


はじめに

Pandasを使っていて、pd.DataFrameをDBにINSERTする際にto_sql()メソッドでやりたいことがうまく出来なかったのが解決できたのでメモとして残す。
※この方法で数万レコードをINSERTしようとすると時間がかなりかかったので要注意

やりたいこと

pd.DataFrameをDBにINSERTする際にNotNull制約違反や主キー制約違反が発生した場合にその行をスキップしてINSERTしたい。
具体的には、以下のようなデータフレームを主キー制約がcompanyにNotNull制約がestablished_inにかけられたテーブルにto_sql()メソッドでINSERTしようとするとエラーになり全くINSERTされないが、INSERTできるレコードはINSERTして失敗したレコードはINSERTしないようにしたい。

company established_in founder
0 A 1900.0 Alice
1 B 1925.0 Bob
2 C NaN Charlie
3 A 1995.0 Ahn
4 D 2010.0 David

ちなみに、普通に、

with engine.begin() as conn:
    df_fail.to_sql('company', if_exists='append', index=False, con=conn)

してあげると、

(psycopg2.errors.NotNullViolation) null value in column "established_in" violates not-null constraint
DETAIL:  Failing row contains (C, null, Charlie).

[SQL: INSERT INTO company (company, established_in, founder) VALUES (%(company)s, %(established_in)s, %(founder)s)]
[parameters: ({'company': 'A', 'established_in': 1900.0, 'founder': 'Alice'}, {'company': 'B', 'established_in': 1925.0, 'founder': 'Bob'}, {'company': 'C', 'established_in': None, 'founder': 'Charlie'}, {'company': 'A', 'established_in': 1995.0, 'founder': 'Ahn'}, {'company': 'D', 'established_in': 2010.0, 'founder': 'David'})]
(Background on this error at: http://sqlalche.me/e/14/gkpj)

となり、全くINSERTされない。

ソースコード

JupyterLabで以下を実行した。なお、DB側でcompanyには主キー制約がestablished_inにはNotNull制約がかけられている。

import numpy as np
import pandas as pd
import psycopg2
import sqlalchemy
from IPython.display import display
engine = sqlalchemy.create_engine(url='postgresql://postgres:[email protected]/mydb1')
df_fail = pd.DataFrame({'company': ['A', 'B', 'C', 'A', 'D'], 
                        'established_in': [1900, 1925, np.nan, 1995, 2010],
                        'founder': ['Alice', 'Bob', 'Charlie', 'Ahn', 'David' ]})
df_fail
# 主キー制約違反・NotNull制約違反
company established_in founder
0 A 1900.0 Alice
1 B 1925.0 Bob
2 C NaN Charlie
3 A 1995.0 Ahn
4 D 2010.0 David
for row in range(len(df_fail)):
    record = df_fail.iloc[[row]]
    try:
        with engine.begin() as conn:
            record.to_sql('company', if_exists='append', index=False, con=conn)
            print('Pass')
    except sqlalchemy.exc.IntegrityError as e:
        print('IntegrityError')
        print(e)
    except Exception as e:
        print('Error')
        print(e)
    finally:
        display(record)
        print('\n\n')
Pass
company established_in founder
0 A 1900.0 Alice
Pass
company established_in founder
1 B 1925.0 Bob
IntegrityError
(psycopg2.errors.NotNullViolation) null value in column "established_in" violates not-null constraint
DETAIL:  Failing row contains (C, null, Charlie).

[SQL: INSERT INTO company (company, established_in, founder) VALUES (%(company)s, %(established_in)s, %(founder)s)]
[parameters: {'company': 'C', 'established_in': None, 'founder': 'Charlie'}]
(Background on this error at: http://sqlalche.me/e/14/gkpj)
company established_in founder
2 C NaN Charlie
IntegrityError
(psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "constraint_pkey"
DETAIL:  Key (company)=(A) already exists.

[SQL: INSERT INTO company (company, established_in, founder) VALUES (%(company)s, %(established_in)s, %(founder)s)]
[parameters: {'company': 'A', 'established_in': 1995.0, 'founder': 'Ahn'}]
(Background on this error at: http://sqlalche.me/e/14/gkpj)
company established_in founder
3 A 1995.0 Ahn
Pass
company established_in founder
4 D 2010.0 David
with engine.begin() as conn:
    data_on_db = pd.read_sql('SELECT * FROM company;', con=conn)
data_on_db
company established_in founder
0 A 1900 Alice
1 B 1925 Bob
2 D 2010 David

確かに例外処理を使ってINSERTできるものだけINSERTできた(ただし、主キー制約違反の場合上の行からINSERTされてしまう点に注意)。

おわりに

失敗した行だけ集めたpd.DataFrameを作っておけば、どこがINSERTされなかったのかわかるので便利なはず。