Springbatch統合mybatisループ一括読み出しmysql
7084 ワード
今回書く機能はspringbatchのjobが呼び出されるたびにjobでmysqlリストをクエリーするreaderを実行する機能です.バッチ・サービスが開始された後、jobが呼び出されるとspringbatchは常に1回のみ処理され、次回の呼び出し表示が完了します.初期化リストはサービス起動時に埋め込まれ、1回しか埋め込まれず、使用後は再びリストが埋め込まれないため、次回jobデータに入ると空になります.私たちは今、jobの呼び出しのたびにデータベースから最新のリストを取得し、このデータをバッチで処理するように、この問題を解決します.
前回のspringbatch読み取りrabbitmq記事に続いて書き続けます.
汎用設計を採用しているのでmybatisを使ってmysqlリストを読み込むのも同様です.
私たちが今回使った核心はMyBatisCursorItemReaderだとは言わないでください.MyBatisCursorItemReaderはmybatisがspringbatchカーソルの読み取りのために作成したものです.
まず、従来のBatchConfigクラスにsqlSessionFactoryを追加する方法を修正します.ここでは、使用例をインスタンス化します.使用回数が多いため、メモリリソースを節約するために設計されています.
mybatisループを使用してmysqlを読み込むためのリストも作成しました
カスタムconfigを変更しmybatisを使用してmysqlから読み込みます
次に、UserDaoによるidの追加方法を修正します.
mapperファイルの変更
最後に、プロジェクトを開始するとspringbatchはタスクおよびタスク内のreaderを初期化し、タスクjobを呼び出すとmysqlから検出されたリストをバッチで処理します.
次にタスクjobを呼び出すと、mysqlクエリの最新リストが再読み込みされ、処理されます.
前回のspringbatch読み取りrabbitmq記事に続いて書き続けます.
汎用設計を採用しているのでmybatisを使ってmysqlリストを読み込むのも同様です.
私たちが今回使った核心はMyBatisCursorItemReaderだとは言わないでください.MyBatisCursorItemReaderはmybatisがspringbatchカーソルの読み取りのために作成したものです.
まず、従来のBatchConfigクラスにsqlSessionFactoryを追加する方法を修正します.ここでは、使用例をインスタンス化します.使用回数が多いため、メモリリソースを節約するために設計されています.
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class BatchConfig {
@Autowired
protected JobBuilderFactory jobBuilderFactory;
@Autowired
protected StepBuilderFactory stepBuilderFactory;
@Autowired
protected RabbitTemplate amqpTemplate;
@Autowired
protected MongoTemplate mongoTemplate;
@Autowired
private MybatisProperties properties;
private static SqlSessionFactoryBean sessionFactory;
//
@Bean
protected ThreadPoolTaskExecutor taskExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(15);
executor.setKeepAliveSeconds(300);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
executor.setQueueCapacity(10000);
executor.setThreadGroupName("spring_batch");
return executor;
}
public SqlSessionFactory sqlSessionFactory() {
try {
if(sessionFactory == null){
sessionFactory = new SqlSessionFactoryBean();
sessionFactory.setDataSource(SpringContextHolder.getBean("dataSource"));
sessionFactory.setMapperLocations(properties.resolveMapperLocations());
return sessionFactory.getObject();
}
return sessionFactory.getObject();
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
mybatisループを使用してmysqlを読み込むためのリストも作成しました
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisCursorItemReader;
import org.springframework.batch.item.ItemReader;
import java.util.Map;
/**
* Description: mybatis list
* @date: 2019/4/28 9:31
*/
public class ReadToListByMybatis {
/**
* Description: mybatis list
* @param: ,queryId mybatis ID
* example:
* parameterValues.put("createDate", "2019-04-28");
* parameterValues.put("queryId", "com.xx.xx.batch.dao.XXDao.select");
*
* @date: 2019/4/28 9:28
*/
public static ItemReader myBatisCursorItemReader(Map parameterValues, SqlSessionFactory sqlSessionFactory) {
MyBatisCursorItemReader reader = new MyBatisCursorItemReader();
reader.setSqlSessionFactory(sqlSessionFactory);
reader.setParameterValues(parameterValues);
reader.setQueryId(parameterValues.get("queryId").toString());
return reader;
}
}
カスタムconfigを変更しmybatisを使用してmysqlから読み込みます
import com.xx.xx.config.BatchConfig;
import com.xx.xx.batch.dao.UserDao;
import com.xx.xx.batch.entity.User;
import com.xx.xx.batch.listener.UserJobCompletionListener;
import com.xx.xx.batch.processor.UserProcessor;
import com.xx.xx.batch.read.RabbitRead;
import com.xx.xx.batch.writer.MongoWriter;
import com.xx.xx.batch.writer.MysqlWriter;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
public class UserBatchConfig extends BatchConfig {
@Resource
private UserDao userDao;
private Map param;
@Bean
public Step userStep(){
param.put("queryId", "com.xx.xx.batch.dao.UserDao.select");
return stepBuilderFactory.get("userStep").chunk(10000)
.reader(ReadToListByMybatis.myBatisCursorItemReader(param,sqlSessionFactory())).processor(new UserProcessor()).writer(new MysqlWriter(userDao)).taskExecutor(taskExecutor())
.build();
}
@Bean
public Step userStep2(){
//
/*param.put("queryId", "com.xx.xx.batch.dao.UserDao.select");
return stepBuilderFactory.get("userStep").chunk(10000)
.reader(ReadToListByMybatis.myBatisCursorItemReader(param,sqlSessionFactory())).processor(new UserProcessor()).writer(new MysqlWriter(userDao)).taskExecutor(taskExecutor())
.build();*/
}
@Bean
public Job processJob1(){
param = new HashMap();
param.put("createDate", "2019-04-28");
// userStep2 step step step2
return jobBuilderFactory.get("processJob1").
incrementer(new RunIdIncrementer()).listener(listener()).
flow(userStep()).next(userStep2()).end().build();
}
//
@Bean
public JobExecutionListener listener() {
return new UserJobCompletionListener();
}
}
次に、UserDaoによるidの追加方法を修正します.
@Mapper
public interface UserDao extends MysqlCommonDao{
@Override
Integer add(T user);
@Override
List select(Map map);
}
mapperファイルの変更
insert into user_batch_test values(#{id},#{age},#{name})
最後に、プロジェクトを開始するとspringbatchはタスクおよびタスク内のreaderを初期化し、タスクjobを呼び出すとmysqlから検出されたリストをバッチで処理します.
次にタスクjobを呼び出すと、mysqlクエリの最新リストが再読み込みされ、処理されます.