ItemReader


ItemReader


Chunk Tasklet
  • ItemReaderによるデータ読み出し
  • ItemProcessorによるサービスロジック
  • の実行
  • ItemWriterバッチ処理
  • のロジックで行います.その中のリーダーを勉強しましょう.

    CursorItemReader


    CursorItemReaderは、Streamingを使用してデータを処理します.簡単に考えて、dbとアプリケーションの間にチャネルを接続し、一つ一つインポートすればいいです.

    queryを実行して実験を行う

    create table pay (
      id         bigint not null auto_increment,
      amount     bigint,
      tx_name     varchar(255),
      tx_date_time datetime,
      primary key (id)
    ) engine = InnoDB;
    
    insert into pay (amount, tx_name, tx_date_time) VALUES (1000, 'trade1', '2018-09-10 00:00:00');
    insert into pay (amount, tx_name, tx_date_time) VALUES (2000, 'trade2', '2018-09-10 00:00:00');
    insert into pay (amount, tx_name, tx_date_time) VALUES (3000, 'trade3', '2018-09-10 00:00:00');
    insert into pay (amount, tx_name, tx_date_time) VALUES (4000, 'trade4', '2018-09-10 00:00:00');
    まずdbにデータを登録します.

    Payの作成

    @ToString
    @Getter
    @Setter
    @NoArgsConstructor
    public class Pay {
        private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss");
    
        private Long id;
        private Long amount;
        private String txName;
        private LocalDateTime txDateTime;
    
        public Pay(Long amount, String txName, String txDateTime) {
            this.amount = amount;
            this.txName = txName;
            this.txDateTime = LocalDateTime.parse(txDateTime, FORMATTER);
        }
    
        public Pay(Long id, Long amount, String txName, String txDateTime) {
            this.id = id;
            this.amount = amount;
            this.txName = txName;
            this.txDateTime = LocalDateTime.parse(txDateTime, FORMATTER);
        }
    }
    クラスを作成してpayをdbからオブジェクトにインポート
    @Slf4j
    @RequiredArgsConstructor
    @Configuration
    public class JdbcCursorItemReaderJobConfiguration {
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
        private final DataSource dataSource;
    
        private static final int chunkSize = 10;
    
        @Bean
        public Job jdbcCursorItemReaderJob(){
            return jobBuilderFactory.get("jdbcCursorItemReaderJob")     //jdbcCursorItemReaderJob 이름으로 job 생성
                    .start(jdbcCursorItemReaderStep())  //step 실행
                    .build();
        }
    
        @Bean
        public Step jdbcCursorItemReaderStep() {
            return stepBuilderFactory.get("jdbcCursorItemReaderStep")   //jdbcCursorItemReaderStep 이름으로 step 생성
                    .<Pay, Pay>chunk(chunkSize)     //chunk size 지정 (=transaction 범위)
                    .reader(jdbcCursorItemReader()) //reader 실행
                    .writer(jdbcCursorItemWriter()) //writer 실행
                    .build();
        }
    
        @Bean
        public ItemReader<? extends Pay> jdbcCursorItemReader() {       //itemReader 구현
            return new JdbcCursorItemReaderBuilder<Pay>()
                    .fetchSize(chunkSize)       //db에서 읽어오는 데이터 양
                    .dataSource(dataSource)     //db 설정
                    .rowMapper(new BeanPropertyRowMapper<>(Pay.class))  //반환되는 데이터를 Pay 객체로 변환
                    .sql("select id, amount, tx_name, tx_date_time from pay")
                    .name("jdbcCursorItemReader")
                    .build();
        }
    
        private ItemWriter<? super Pay> jdbcCursorItemWriter() {        //writer 구현
            return list -> {
                for(Pay pay : list){
                    log.info("Current Pay={}",pay);
                }
            };
        }
    }
    StepのReaderとWriterを実装し、実行結果を直接表示します.
    intellijサーバの実行時
    --job.name=jdbcCursorItemReaderJob version=7
    以下に設定した値に送信すればよい.

    確認できる結果は以下の通りです.
    プロセス()は必ずしも実現しなくてもよい部分であり,サービスロジックを担当する部分である.

    PagingItemReader

    @Slf4j
    @RequiredArgsConstructor
    @Configuration
    public class JdbcPagingItemReaderJobConfiguration {
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
        private final DataSource dataSource; // DataSource DI
    
        private static final int chunkSize = 10;
    
        @Bean
        public Job jdbcPagingItemReaderJob() throws Exception{
            return jobBuilderFactory.get("jdbcPagingItemReaderJob")     //jdbcPagingItemReaderJob 이름으로 Job 생성
                    .start(jdbcPagingItemReaderStep())
                    .build();
        }
    
        @Bean
        public Step jdbcPagingItemReaderStep() throws Exception{
            return stepBuilderFactory.get("jdbcPagingItemReaderStep")   //jdbcPagingItemReaderStep 이름으로 Step 생성
                    .<Pay, Pay>chunk(chunkSize)
                    .reader(jdbcPagingItemReader())
                    .writer(jdbcPagingItemWriter())
                    .build();
        }
    
        @Bean
        public ItemReader<Pay> jdbcPagingItemReader() throws Exception{
            Map<String, Object> param = new HashMap<>();    //parameter 생성
            param.put("amount", 2000);
    
            return new JdbcPagingItemReaderBuilder<Pay>()
                    .pageSize(chunkSize)
                    .fetchSize(chunkSize)
                    .dataSource(dataSource)
                    .rowMapper(new BeanPropertyRowMapper<>(Pay.class))
                    .queryProvider(createQueryProvider())   //각 DB에 정의된 provider를 사용해도 되지만 spring에서 적용 가능한 더 범용성 있는 코드 작성
                    .parameterValues(param)                 //parameter
                    .name("jdbcPagingItemReader")
                    .build();
        }
    
        private ItemWriter<Pay> jdbcPagingItemWriter() {    //writer 실행
            return list -> {
                for(Pay pay : list){
                    log.info("Current Pay={}",pay);
                }
            };
        }
    
        @Bean
        public PagingQueryProvider createQueryProvider() throws Exception {
            SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
            queryProvider.setDataSource(dataSource);    //db에 맞는 pagingProvider 자동 선택
            queryProvider.setSelectClause("id, amount, tx_name, tx_date_time"); //select 문
            queryProvider.setFromClause("from pay");                            //from 절
            queryProvider.setWhereClause("where amount >= :amount");            //where 절
    
            Map<String, Order> sortKey = new HashMap<>(1);          //sort 정의
            sortKey.put("id", Order.ASCENDING);
            queryProvider.setSortKeys(sortKey);
    
            return queryProvider.getObject();   //결과값 반환
        }
    }
    PagingItemReaderの注意事項
    ! ソートを含める必要があります