JPA同時性の問題

25711 ワード


背景

  • パイプラインの運用とパイプライン上のTask結果の更新のためのAPIの開発中に同期性の問題が発生したので、これらの問題を解決するために得られた知識をまとめる.
  • Tekton運転後、Calback関数を指定することで、PipelinerunとTaskRunの状態変化に関するCloudEventの情報をJson形式で入手できます.( https://tekton.dev/docs/pipelines/events/ )
  • CloudEventデータがパケット化された後、TaskRunId、Status、Messageが取得され、TaskRunの状態が更新され、update APIが同時に呼び出されると情報が失われるという問題が発生する.
  • 障害スキャンにより、これらの問題は複数のトランザクションの同時実行時のリフレッシュ損失の問題であり、楽観的または悲観的に解決できることが分かった.
  • コード#コード#


    ろんり

  • 「作成」状態でstartTime、endTime、duration値nullのTaskを作成
  • startTimeが空の場合startTimeを更新
  • 「運転」状態に更新すると、状態のみ更新
  • 「Succeded」状態に更新するにはendTime、duration updateが使用可能
  • 「起動」、「運転」、「Succeded」の状態で、3つの更新APIを順次呼び出し、sleepTime変数を加え、意図的にリフレッシュ損失の問題を引き起こす.指定したsleep関数時間内にスレッドを停止する
  • DTO

    		@Getter
        public static class CreateUpdate{
    
            private String status;
        }

    Service

    		public TaskEntity findTask(Long id) {
    
            Optional<TaskEntity> findTask = taskRepository.findById(id);
            return findTask.orElse(null);
        }	
    
    		@Transactional
        public TaskEntity updateTask(Long id, TaskDto.CreateUpdate createUpdate, int sleepTime){
    
            TaskEntity taskEntity = findTask(id);
            threadSleep(sleepTime);
            taskEntity.updateStatus(createUpdate.getStatus());
            taskEntity.updateTime(createUpdate.getStatus());
            return taskEntity;
        }
    
    		public void threadSleep(int sleepTime){
            try {
                Thread.sleep(sleepTime);
            } catch (Exception e){
                log.info(e.getMessage());
            }
        }

    Repository

    public interface TaskRepository extends JpaRepository<TaskEntity, Long> 
    

    Domain

    @Getter
    @NoArgsConstructor(access = AccessLevel.PROTECTED)
    @Entity
    public class TaskEntity {
    
        @Id
        @GeneratedValue(strategy = GenerationType.AUTO)
        private Long id;
    
        private String status;
    
        @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "Asia/Seoul")
        private LocalDateTime startTime;
    
        @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "Asia/Seoul")
        private LocalDateTime endTime;
    
        private Long duration;
    
        @Builder
        public TaskEntity(String status){
            this.status = status;
        }
    
        public void updateStatus(String status){
                this.status = status;
        }
    
        public void updateTime(String status) {
    
            if (this.startTime == null){
                this.startTime = LocalDateTime.now().withNano(0);
            }
    
            if (status.equals("Succeeded")){
                this.endTime = LocalDateTime.now().withNano(0);
                this.duration = Duration.between(this.startTime,this.endTime).getSeconds();
            }
        }
    }

    Test

    @Test
        @DisplayName("Lock 적용하기 전")
        void test1() throws Exception {
    
            // given
            TaskEntity newTask = TaskEntity.builder()
                    .status("Created")
                    .build();
            taskService.createTask(newTask);
    
            // when
            final ExecutorService executor = Executors.newFixedThreadPool(3);
    
            executor.execute(()->taskService.updateTask(newTask.getId(), TaskDto.CreateUpdate.builder().status("Started").build(), 2000));
            Thread.sleep(500);
    
            executor.execute(()->taskService.updateTask(newTask.getId(), TaskDto.CreateUpdate.builder().status("Running").build(), 1000));
            Thread.sleep(500);
    
            executor.execute(()->taskService.updateTask(newTask.getId(), TaskDto.CreateUpdate.builder().status("Succeeded").build(), 100));
    
            // Thread 작업이 다 끝날때까지 최대 10초 대기
            executor.shutdown();
            executor.awaitTermination(10, TimeUnit.SECONDS);
    
            //then
            TaskEntity findTask = taskService.findTask(newTask.getId());
    
            assertAll(
                    ()-> assertEquals("Started", findTask.getStatus()),
                    ()-> assertNotEquals(null,findTask.getStartTime()),
                    ()-> assertEquals(null,findTask.getEndTime()),
                    ()-> assertEquals(null,findTask.getDuration())
            );
    
        }
  • 「作成」状態でstartTime、endTime、duration値nullのTaskがある
  • 「起動」状態に更新されたトランザクションを実行するのに2秒かかる
  • 0.5秒後に「実行」状態に更新されたトランザクションを実行-1秒必要
  • 0.5秒後に「Succeed」状態に更新するトランザクションを実行するには0.1秒
  • 「起動」、「運転」および「Succeded」の状態で順次3つの更新APIが呼び出されたが、各APIの処理速度は最終更新の情報に影響を与える.
  • この場合、まず「起動」状態に更新された関数を呼び出すが、トランザクションが最も遅く完了したため、最終的には「起動」状態、時間をすべてnullに更新する
  • statusが変更され、startTime、endTime、durationもすべてnullに更新された.理由はJPAのDirty Checkingである.
  • JPAはデータを問合せ、永続性コンテキストに保存し、トランザクション終了時に初期状態と最終状態を比較してupdate問合せ-Dirty Checkingを発行する
  • Dirty Checkingによって生成されたupdateクエリは基本的にすべてのフィールドを更新する
  • 数式化



    n/a.結論

  • 1つのエンティティで2つ以上の更新トランザクションが同時に行われた場合、トランザクション完了後のタスクの変更がデータベースに反映されないという問題が最初に発生する
  • データベースの内容は、最後に開始したトランザクションの操作ではなく、トランザクションの終了時間に依存します.
  • 完了したタスクでも、ステータスが運転または起動の場合がある