PagingItemReader実習


問題の状況



以前のデータにはsuccess state列が追加されており、false状態の取引イベントをすべてインポートしてtrueに変更する必要があります.

dtoオブジェクトの再定義

@ToString
@Getter
@Setter
@NoArgsConstructor
public class Pay {
    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss");

    private Long id;
    private Long amount;
    private String txName;
    private LocalDateTime txDateTime;
    private boolean successState;

    public Pay(Long amount, String txName, String txDateTime) {
        this.amount = amount;
        this.txName = txName;
        this.txDateTime = LocalDateTime.parse(txDateTime, FORMATTER);
    }

    public Pay(Long id, Long amount, String txName, String txDateTime) {
        this.id = id;
        this.amount = amount;
        this.txName = txName;
        this.txDateTime = LocalDateTime.parse(txDateTime, FORMATTER);
    }

    public Pay(Long id, Long amount, String txName, LocalDateTime txDateTime, boolean successState) {
        this.id = id;
        this.amount = amount;
        this.txName = txName;
        this.txDateTime = txDateTime;
        this.successState = successState;
    }
}

バッチの作成

@Slf4j
@RequiredArgsConstructor
@Configuration
public class SuccessStateJob {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;    //db

    private static final int chunkSize = 10;

    @Bean
    public Job SuccessStateJob() throws Exception{
        return jobBuilderFactory.get("successStateJob")
                .start(successStateStep())
                .build();
    }

    @Bean
    public Step successStateStep() throws Exception{
        return stepBuilderFactory.get("successStateStep")
                .<Pay, Pay>chunk(chunkSize)
                .reader(successStateReader())
                .processor(successStateProcessor())   //필요할 경우 데이터 처리
                .writer(successStateWriter())
                .build();
    }

    @Bean
    public ItemReader<? extends Pay> successStateReader() throws Exception {
        Map<String, Object> param = new HashMap<>();
        param.put("success_state","false");

        return new JdbcPagingItemReaderBuilder<Pay>()
                .pageSize(chunkSize)
                .fetchSize(chunkSize)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Pay.class))
                .queryProvider(createProvider())
                .parameterValues(param)
                .name("jdbcPagingItemReader")
                .build();
    }

    @Bean
    public PagingQueryProvider createProvider() throws Exception{
        SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
        provider.setDataSource(dataSource);
        provider.setSelectClause("id, amount, tx_name, tx_date_time, success_state");
        provider.setFromClause("from pay");
        provider.setWhereClause("where success_state = :success_state");

        Map<String, Order> sortKey = new HashMap<>(1);
        sortKey.put("id", Order.ASCENDING);
        provider.setSortKeys(sortKey);

        return provider.getObject();
    }

    @Bean
    @StepScope
    public ItemProcessor<? super Pay, ? extends Pay> successStateProcessor() { //success_state를 true 처리
        return item -> {
            item.setSuccessState("true");
            return item;
        };
    }

    @Bean
    public JdbcBatchItemWriter<? super Pay> successStateWriter() {
        return new JdbcBatchItemWriterBuilder<Pay>()
                .dataSource(dataSource)
                .sql("update pay set success_state = :successState where id = :id")
                .beanMapped()
                .build();
    }
}
やってみる

私が予想したようにデータが変更されたことを確認できます.