「Spring Batch+Spring Scheduler+MongoDb」を使用した簡単な説明と例
12898 ワード
サンプル設定
駆動目標:Movie Open APIを通じて今日の人気急上昇映画のリストを獲得して、それからローカルDBに保存します
プロジェクト初期設定
application.yml
spring:
data:
mongodb:
uri: mongodb://localhost:27017/local # 로컬 DB경로
batch:
job:
enabled: false # 애플리케이션 구동과 동시에 배치 작업이 실행되는 것을 방지
# Movie Open Api 관련
movie:
openApi:
trending:
uri: https://api.themoviedb.org/3 # Default 경로 설정
apiKey : cbfa4*007409cc**86bfeaf** # API키
[APIサイトを利用]実際に利用されている方は会員加入後にAPIキーを入力してください dependencies
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.boot:spring-boot-starter-data-mongodb'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
testImplementation 'org.springframework.batch:spring-batch-test'
}
OpenAPIから受信したデータをバインドするドメインオブジェクト
OpenAPIから受信した応答の変数名をバインドするオブジェクトの変数名と一致させてください.一致しない場合はNULLに入ります.
TrendingObject.Class
@Getter
@ToString
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class TrendingObject {
private TrendingMovie[] results;
@Builder
public TrendingObject(TrendingMovie[] results) {
this.results = results;
}
}
TrendingMobie.Class
@Data
@Document(collection = "TrendingMovie") // MongoDb에 저장되어 있는 collection을 정의해줍니다.
@NoArgsConstructor(access = AccessLevel.PROTECTED) // 빈 생성자가 만들어지는 것을 방지해줍니다.
public class TrendingMovie {
@Id
private int id;
private boolean adult;
private String overview;
private int[] genre_ids;
private String title;
private int vote_count;
private int vote_average;
@Builder
public TrendingMovie(int id, boolean adult, String overview, int[] genre_ids, String title, int vote_count,
int vote_average) {
this.id = id;
this.adult = adult;
this.overview = overview;
this.genre_ids = genre_ids;
this.title = title;
this.vote_count = vote_count;
this.vote_average = vote_average;
}
}
WebClient Config
@Configuration
public class WebClientConfig {
@Bean
public WebClient.Builder webClientBuilder() {
return WebClient.builder();
}
}
Webクライアントを使用しているにもかかわらず、バッチ・タスクの特性上、ブロックを使用しないテンプレートを使用して、限られたデータを処理することをお勧めします.Spring Batch
Spring Batchを使用する前に、Batchの意味を理解してください.
マッチバー
リアルタイムではなくデータを集中的に処理
Spring Batch
Spring Batchは、これらのBatch機能を実現するためのオープンソースフレームワークです.
Spring Baschのいくつかの有用な機能
Spring Batchの構造と用語
Job
バッチ・プロシージャをユニットとして作成するオブジェクトとして、Jobはバッチ・プロシージャの最上位レベルに存在する概念である.簡単な例を挙げると、あるウェブサイト上のすべてのメンバーの情報が読み出され、1年後のメンバーを終了するためにデータ消去処理の論理がある場合、このプロセスはJobになります.
JobLauncher
JobLauncherは、JobとJobParametersを使用してJobを実行するオブジェクトです.
JobInstance
Jobを駆動する概念を区別する.ジョブが同じパラメータで実行されたときにExceptionが起動します
JobParameters
同じパラメータをJobInstanceで異なる方法で入力すると、同じオブジェクトとして認識されます.JobParametersはString、Double、Long、Dateの4つのフォーマットしかサポートしていません.
Step
ジョブのバッチを定義し、順序ステップを分割します.
JobRepository
上記のすべてのバッチ情報を処理するやつ.ジョブを実行する場合は、ジョブレポートにジョブ実行とStep実行を作成します.
JobExecution
上記の例では、1月1日に同じパラメータで実行された場合、Spring Batchは同じジョブインスタンスを実行するジョブ実行情報のオブジェクトを保持します.ただし、2回目の実行のためにオブジェクトが個別に作成され、データベースに保存されます.ジョブを再起動するなど、過去に実行されたジョブを操作できます.
StepExecution
ジョブExecutionと同様に、Step実行試行のオブジェクトが表示されますが、前のステップのStepに失敗した場合、次のStepに対してStep Executionは作成されません.Step Executionでは、データの読み出し回数、データの作成回数(txtファイルの作成またはデータベースへの保存)、Stepを無視した回数も格納します.
ExecutionContext
Jobはデータのデータストアを共有することができる.このExecutionContextには、JobExecutionContextとStep ExecutionContextの2種類がありますが、保存時間は異なります.ジョブExecutionの場合、コミットポイントとStep ExecutionはStep間に格納されます.
Spring Batchの使用
ブートストラップクラスで@EnableBaschProcessing宣言を使用する
@SpringBootApplication
@EnableBatchProcessing // 배치 인프라스트럭처 사용을 위한 대부분의 빈을 등록해줍니다.
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
JobConfig Classの作成とJobの登録
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import lombok.RequiredArgsConstructor;
import mongo.movie.domain.TrendingMovie;
import reactor.core.publisher.Mono;
@Configuration
@RequiredArgsConstructor
public class OpenApiJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
// Job build 및 순서 정의
@Bean
public Job trendingMovieJob() {
Job trendingMovieJob = jobBuilderFactory.get("trendingMovieJob")
.start(openApiFristStep()) // 데이터를 한번만 받으면 되기 때문에 단일 스텝으로 구성
.build();
return trendingMovieJob;
}
@Bean
@JobScope // JobParameter를 보내므로 설정
public Step openApiFristStep() {
return stepBuilderFactory.get("openApiFristStep")
.<Mono<TrendingMovie[]>, TrendingMovie[]>chunk(1) // Input, Output, chunk 사이즈
.reader(openApiReader())
.processor(dataEditProcessor())
.writer(dataInsertWrite())
.build();
}
// 데이터를 읽어오는 ItemReader 인터페이스의 커스텀 구현체
@Bean
@StepScope
public OpenApiReader openApiReader(){
return new TrendingMovieItemReader();
}
// 읽어온 데이터를 가공 후 반환하는 ItemProcessor 인터페이스의 커스텀 구현체
@Bean
@StepScope
public OpenApiProcessor dataEditProcessor() {
return new OpenApiProcessor();
}
// 가공 되어진 데이터들(Chunk)를 DB 혹은 특정 파일에 작성하는 ItemWriter 인터페이스의 커스텀 구현체
@Bean
@StepScope
public OpenApiWriter dataInsertWrite() {
return new OpenApiWriter();
}
}
上で作成したItemReader、ItemProcessor、およびItemWriterのカスタムインプリメンテーション。
ItemReader
public class OpenApiReader implements ItemReader<Mono<TrendingMovie[]>> {
@Value("${movie.openApi.trending.uri}")
private final String TRENDING_MOVIE_URL;
@Value("${movie.openApi.apiKey}")
private final String API_KEY;
@Autowired
private WebClient.Builder wcBuilder;
private int cnt = 0;
@Override
public Mono<TrendingMovie[]> read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
cnt++;
return cnt == 1
? wcBuilder.build().get()
.uri(TRENDING_MOVIE_URL+"/trending/movie/day?api_key={API_KEY}",API_KEY)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(TrendingObject.class)
.map(trendingObject -> trendingObject.getResults())
: null;
}
}
リーダーを介してデータを読み込む場合、NULLが最終的に返されない場合は、無限ループが回転していることを知らないため、非常に困難です.ItemProcessor
// exception이 발생하였을 때 Roll Back
// 적용된 범위에서는 트랜잭션 기능이 포함된 프록시 객체가 생성되어 자동으로 commit 혹은 rollback을 진행해준다.
@Transactional(rollbackFor = Exception.class)
public class OpenApiProcessor implements ItemProcessor<Mono<TrendingMovie[]>, TrendingMovie[]>{
@Override
public TrendingMovie[] process(Mono<TrendingMovie[]> item) throws Exception {
return item.block();
}
}
ItemWriter@Slf4j
public class OpenApiWriter implements ItemWriter<TrendingMovie[]> {
@Autowired
private MongoOperations mongoOperations;
@Override
public void write(List<? extends TrendingMovie[]> items) throws Exception {
// chunk 사이즈가 1이므로 한번만 돌음
for(int i = 0; i < items.size(); i++) {
TrendingMovie[] movies = items.get(i);
for(TrendingMovie movie : movies) {
log.info("movie : {}", movie.toString());
mongoOperations.save(movie);
}
}
}
}
Spring Scheduler
バッチ処理を支援するSpring Batchについて説明した.では、スケジューラとは何でしょうか.
Scheduler
Baschがバッチである場合、スケジューラは、一定の時間間隔または一定の時間間隔で特定の論理を回転させるために使用される.
Spring SchedulerとSpring QuartzはSpringでこれらのScheduler機能を使用する2つの方法であり、Spring Quartzは独立した依存性を必要とし、より複雑で困難である必要がある.
Spring Schedulerの使用
@EnableScheduling
@EnableBatchProcessing
@SpringBootApplication
public class Application() {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
ブートストラップクラスで@EnableScheduling
操作が設定されている場合、準備作業は終了します.Spring Schedulerを使用する際の注意点
実際の使用
@Slf4j
@Component
@RequiredArgsConstructor
public class JobScheduler {
private final JobLauncher jobLauncher;
private final OpenApiJob openApiJob;
@Scheduled(cron = "0 0/3 * * * ?") // 3분에 1번씩 실행
public void openApiRequestSchedule() {
// 넘기는 파라미터를 매번 다르게 해서 별개의 JobInstance로 인식하게 함
Map<String, JobParameter> confMap = new HashMap<>();
confMap.put("time", new JobParameter(System.currentTimeMillis()));
JobParameters jobParameters = new JobParameters(confMap);
try {
JobExecution jobExecution = jobLauncher.run(openApiJob.trendingMovieJob(), jobParameters);
log.info("Job Execution: " + jobExecution.getStatus());
log.info("Job getJobConfigurationName: " + jobExecution.getJobConfigurationName());
log.info("Job getJobId: " + jobExecution.getJobId());
log.info("Job getExitStatus: " + jobExecution.getExitStatus());
log.info("Job getJobInstance: " + jobExecution.getJobInstance());
log.info("Job getStepExecutions: " + jobExecution.getStepExecutions());
log.info("Job getLastUpdated: " + jobExecution.getLastUpdated());
log.info("Job getFailureExceptions: " + jobExecution.getFailureExceptions());
} catch (JobExecutionAlreadyRunningException
|JobRestartException
|JobInstanceAlreadyCompleteException
|JobParametersInvalidException e) {
e.getMessage();
}
}
}
結果
画像が小さすぎます
Spring Batchプロセス画像ソース
Reference
この問題について(「Spring Batch+Spring Scheduler+MongoDb」を使用した簡単な説明と例), 我々は、より多くの情報をここで見つけました https://velog.io/@kdj9878/Spring-Batch-및-Spring-Schedulerテキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol