Springbatch統合mybatisループ一括読み出しmysql


今回書く機能はspringbatchのjobが呼び出されるたびにjobでmysqlリストをクエリーするreaderを実行する機能です.バッチ・サービスが開始された後、jobが呼び出されるとspringbatchは常に1回のみ処理され、次回の呼び出し表示が完了します.初期化リストはサービス起動時に埋め込まれ、1回しか埋め込まれず、使用後は再びリストが埋め込まれないため、次回jobデータに入ると空になります.私たちは今、jobの呼び出しのたびにデータベースから最新のリストを取得し、このデータをバッチで処理するように、この問題を解決します.
前回のspringbatch読み取りrabbitmq記事に続いて書き続けます.
汎用設計を採用しているのでmybatisを使ってmysqlリストを読み込むのも同様です.
私たちが今回使った核心はMyBatisCursorItemReaderだとは言わないでください.MyBatisCursorItemReaderはmybatisがspringbatchカーソルの読み取りのために作成したものです.
まず、従来のBatchConfigクラスにsqlSessionFactoryを追加する方法を修正します.ここでは、使用例をインスタンス化します.使用回数が多いため、メモリリソースを節約するために設計されています.
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class BatchConfig {

    @Autowired
    protected JobBuilderFactory jobBuilderFactory;

    @Autowired
    protected StepBuilderFactory stepBuilderFactory;

    @Autowired
    protected RabbitTemplate amqpTemplate;

    @Autowired
    protected MongoTemplate mongoTemplate;

    @Autowired
    private MybatisProperties properties;

    private static SqlSessionFactoryBean sessionFactory;


    //    
    @Bean
    protected ThreadPoolTaskExecutor taskExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(15);
        executor.setKeepAliveSeconds(300);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
        executor.setQueueCapacity(10000);
        executor.setThreadGroupName("spring_batch");
        return executor;
    }
    public SqlSessionFactory sqlSessionFactory() {
        try {
            if(sessionFactory == null){
                sessionFactory = new SqlSessionFactoryBean();
                sessionFactory.setDataSource(SpringContextHolder.getBean("dataSource"));
                sessionFactory.setMapperLocations(properties.resolveMapperLocations());
                return sessionFactory.getObject();
            }
            return sessionFactory.getObject();
        } catch (Exception e) {
            e.printStackTrace();
            return  null;
        }
    }
}

mybatisループを使用してmysqlを読み込むためのリストも作成しました

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisCursorItemReader;
import org.springframework.batch.item.ItemReader;

import java.util.Map;

/**
 * Description:  mybatis  list      
 * @date: 2019/4/28 9:31
 */
public class ReadToListByMybatis {

    /**
     * Description:   mybatis list
     * @param:           ,queryId     mybatis   ID
     *          example:
     *          parameterValues.put("createDate", "2019-04-28");
     *          parameterValues.put("queryId", "com.xx.xx.batch.dao.XXDao.select");
     *
     * @date: 2019/4/28 9:28
     */
    public static ItemReader myBatisCursorItemReader(Map parameterValues, SqlSessionFactory sqlSessionFactory) {
        MyBatisCursorItemReader reader = new MyBatisCursorItemReader();
        reader.setSqlSessionFactory(sqlSessionFactory);
        reader.setParameterValues(parameterValues);
        reader.setQueryId(parameterValues.get("queryId").toString());
        return reader;
    }
}

カスタムconfigを変更しmybatisを使用してmysqlから読み込みます
import com.xx.xx.config.BatchConfig;
import com.xx.xx.batch.dao.UserDao;
import com.xx.xx.batch.entity.User;
import com.xx.xx.batch.listener.UserJobCompletionListener;
import com.xx.xx.batch.processor.UserProcessor;
import com.xx.xx.batch.read.RabbitRead;
import com.xx.xx.batch.writer.MongoWriter;
import com.xx.xx.batch.writer.MysqlWriter;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;


@Configuration
public class UserBatchConfig extends BatchConfig {

    @Resource
    private UserDao userDao;

    private Map param;

    @Bean
    public Step userStep(){
        param.put("queryId", "com.xx.xx.batch.dao.UserDao.select");
        return stepBuilderFactory.get("userStep").chunk(10000)
                .reader(ReadToListByMybatis.myBatisCursorItemReader(param,sqlSessionFactory())).processor(new UserProcessor()).writer(new MysqlWriter(userDao)).taskExecutor(taskExecutor())
                .build();
    }
    @Bean
    public Step userStep2(){
        //       
        /*param.put("queryId", "com.xx.xx.batch.dao.UserDao.select");
        return stepBuilderFactory.get("userStep").chunk(10000)
                .reader(ReadToListByMybatis.myBatisCursorItemReader(param,sqlSessionFactory())).processor(new UserProcessor()).writer(new MysqlWriter(userDao)).taskExecutor(taskExecutor())
                .build();*/
    }
    @Bean
    public Job processJob1(){
        param = new HashMap();
        param.put("createDate", "2019-04-28");
        //   userStep2         step       step       step2
        return jobBuilderFactory.get("processJob1").
                incrementer(new RunIdIncrementer()).listener(listener()).
                flow(userStep()).next(userStep2()).end().build();
    }
    //     
    @Bean
    public JobExecutionListener listener() {
        return new UserJobCompletionListener();
    }
}

次に、UserDaoによるidの追加方法を修正します.
@Mapper
public interface UserDao extends MysqlCommonDao{

    @Override
    Integer add(T user);
    
    @Override
    List select(Map map);
}

mapperファイルの変更



    
        insert into user_batch_test values(#{id},#{age},#{name})
    
    
    

最後に、プロジェクトを開始するとspringbatchはタスクおよびタスク内のreaderを初期化し、タスクjobを呼び出すとmysqlから検出されたリストをバッチで処理します.
次にタスクjobを呼び出すと、mysqlクエリの最新リストが再読み込みされ、処理されます.