django-rest-framework Django Model でPostgreSQL の楽観的排他制御でデータの同時更新を防ぐ


はじめに

Django のモデルには楽観的排他制御を行う仕組みがありません。(ない・・・よね?)
ここでは、PostgreSQLで楽観的排他制御を行うための実装サンプルについて記載します。

  • 行バージョン(row_version)管理用のフィールドには PostgreSQLのシステム列のxminを利用します

注)
* django-rest-framework 動的にserializerのフィールドを変更するをベースに作成しています。
* あまり凝ったquerysetを使っていると動かないかもしれません。

同時実行制御モデルの作成

xmin 列の取得

PostgreSQLのシステム列を取得するためにクエリ式を拡張するサブクラスを作成します。

from django.db.models import Expression, PositiveIntegerField


class XMin(Expression):
    output_field = PositiveIntegerField()

    def as_postgresql(self, compiler, connection):
        return f'"{compiler.query.base_table}"."xmin"', ()

同時実行制御マネージャクラスの作成

get_querysetをオーバーライドし、annotateで行バージョン(row_version)列を追加します。

from django.db.models import Manager


class ConcurrentManager(Manager):
    def get_queryset(self):
        super_query = super().get_queryset().annotate(row_version=XMin())
        return super_query

同時実行時の例外処理

同時実行制御時にエラーとなる場合は以下のカスタムExceptionを発行します。

class DbUpdateConcurrencyException(Exception):
    pass

同時実行制御モデルの作成

同時実行制御マネージャクラスを変更し、saveメソッドをオーバーライドし同時実行制御を実装します。
更新する対象の行が見つからない場合は、DbUpdateConcurrencyExceptionを発行します。

from django.db.models import Model


class ConcurrentModel(Model):
    objects = ConcurrentManager()

    class Meta:
        abstract = True

        base_manager_name = 'objects'

    def save(self, **kwargs):
        cls = self.__class__
        if self.pk and not kwargs.get('force_insert', None):
            rows = cls.objects.filter(
                pk=self.pk, row_version=self.row_version)
            if not rows:
                raise DbUpdateConcurrencyException(cls.__name__, self.pk)

        super().save(**kwargs)

モデルの変更

継承元をModelからConcurrentModelに変更します。

class Customer(ConcurrentModel):
    id = models.UUIDField(primary_key=True, default=uuid4, editable=False)
    code = models.CharField(verbose_name='コード', help_text='コード', max_length=10)
    name = models.CharField(verbose_name='名称', help_text='名称', max_length=50)

シリアライザの変更

行バージョン(row_version)を追加します。

class CustomerSerializer(DynamicFieldsModelSerializer):
    row_version = serializers.IntegerField()

    class Meta:
        model = Customer

        fields = (
            'id',
            'code',
            'name',
            'row_version',
        )

同時実行制御の動作確認

データの取得

行バージョンが取得できていることを確認できます。

curl -s -X GET "http://localhost:18000/api/customers/" -H "accept: application/json" | jq .
[
  {
    "id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx1",
    "code": "001",
    "name": "test1",
    "row_version": 588
  },
  {
    "id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx2",
    "code": "002",
    "name": "test2",
    "row_version": 592
  }
]

1行目のデータ取得

curl -s -X GET "http://localhost:18000/api/customers/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx1/" -H "accept: application/json" | jq .
{
  "id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx1",
  "code": "001",
  "name": "test",
  "row_version": 588
}

違う行バージョンを与えた場合

同時実行制御によりエラーとなるため、500が返ってきます。

curl -X PUT "http://localhost:18000/api/customers/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx1/" -H "accept: application/json" -H "Content-Type: application/json" -d "{ \"code\": \"001\", \"name\": \"test2\", \"row_version\": 0}"

<!doctype html>
<html lang="en">
<head>
  <title>Server Error (500)</title>
</head>
<body>
  <h1>Server Error (500)</h1><p></p>
</body>
</html>

同じ行バージョンを与えた場合

正常に登録できました。

curl -X PUT "http://localhost:18000/api/customers/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx1/" -H "accept: application/json" -H "Content-Type: application/json" -d "{ \"code\": \"001\", \"name\": \"test2\", \"row_version\": 588}" | jq .

{
  "id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx1",
  "code": "001",
  "name": "test2",
  "row_version": 588
}

エラーハンドリング

このままでは500エラーで返ってきてしまうので、400で返ってくるように制御します。
django-rest-frameworkには例外処理をカスタマイズする機能があります。

これを利用し、APIビューで発生した同時実行制御エラーのレスポンスを制御します。

api/handlers.custom_exception_handler
from rest_framework import status
from rest_framework.validators import ValidationError
from rest_framework.response import Response
from rest_framework.views import exception_handler
from xxxxx import DbUpdateConcurrencyException


def custom_exception_handler(exc, context):
    response = exception_handler(exc, context)

    if isinstance(exc, DbUpdateConcurrencyException):
        return Response(ValidationError({'db_update_concurrency': ['他のユーザーにより変更されています。']}).detail, status=status.HTTP_400_BAD_REQUEST)

    return response
app/settings.py
REST_FRAMEWORK = {
  'EXCEPTION_HANDLER': 'api.handlers.custom_exception_handler',
}

動作確認2

違う行バージョンを与えた場合

以下の形で400で返ってきます。

curl -X PUT "http://localhost:18000/api/customers/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx1/" -H "accept: application/json" -H "Content-Type: application/json" -d "{ \"code\": \"001\", \"name\": \"test2\", \"row_version\": 0}"

{
  "db_update_concurrency": [
    "他のユーザーにより変更されています。"
  ]
}