「Spring Batch+Spring Scheduler+MongoDb」を使用した簡単な説明と例


サンプル設定


駆動目標:Movie Open APIを通じて今日の人気急上昇映画のリストを獲得して、それからローカルDBに保存します

プロジェクト初期設定

  • Springbootバージョン:2.6.6
  • Javaバージョン:openJDK 1.8
  • 構築ツール:Gradle 7.4.1
  • MongoDBバージョン:5.0.6
  • application.yml

    spring:
      data:
        mongodb:
          uri: mongodb://localhost:27017/local # 로컬 DB경로
      batch:
        job:
          enabled: false # 애플리케이션 구동과 동시에 배치 작업이 실행되는 것을 방지
          
    # Movie Open Api 관련
    movie:
      openApi:
        trending:
          uri: https://api.themoviedb.org/3	# Default 경로 설정
        apiKey : cbfa4*007409cc**86bfeaf**	# API키
    [APIサイトを利用]実際に利用されている方は会員加入後にAPIキーを入力してください

    dependencies

    dependencies {
    	implementation 'org.springframework.boot:spring-boot-starter-batch'
    	implementation 'org.springframework.boot:spring-boot-starter-data-mongodb'
    	implementation 'org.springframework.boot:spring-boot-starter-webflux'
    	
    	compileOnly 'org.projectlombok:lombok'
    	developmentOnly 'org.springframework.boot:spring-boot-devtools'
    	annotationProcessor 'org.projectlombok:lombok'
    	testImplementation 'org.springframework.boot:spring-boot-starter-test'
    	testImplementation 'io.projectreactor:reactor-test'
    	testImplementation 'org.springframework.batch:spring-batch-test'
    }

    OpenAPIから受信したデータをバインドするドメインオブジェクト



    OpenAPIから受信した応答の変数名をバインドするオブジェクトの変数名と一致させてください.一致しない場合はNULLに入ります.

    TrendingObject.Class

    @Getter
    @ToString
    @NoArgsConstructor(access = AccessLevel.PROTECTED)
    public class TrendingObject {
    	
    	private TrendingMovie[] results;
    	
    	@Builder
    	public TrendingObject(TrendingMovie[] results) {
    		this.results = results;
    	}
    }

    TrendingMobie.Class

    @Data
    @Document(collection = "TrendingMovie")	// MongoDb에 저장되어 있는 collection을 정의해줍니다.
    @NoArgsConstructor(access = AccessLevel.PROTECTED)	// 빈 생성자가 만들어지는 것을 방지해줍니다.
    public class TrendingMovie {
    	
    	@Id
    	private int id;
    	private boolean adult;
    	private String overview;
    	private int[] genre_ids;
    	private String title;
    	private int vote_count;
    	private int vote_average;
    	
    	@Builder
    	public TrendingMovie(int id, boolean adult, String overview, int[] genre_ids, String title, int vote_count,
    			int vote_average) {
    		this.id = id;
    		this.adult = adult;
    		this.overview = overview;
    		this.genre_ids = genre_ids;
    		this.title = title;
    		this.vote_count = vote_count;
    		this.vote_average = vote_average;
    	}
    
    }
    

    WebClient Config

    @Configuration
    public class WebClientConfig {
    
      @Bean
      public WebClient.Builder webClientBuilder() {
        return WebClient.builder();
      }
    }
    Webクライアントを使用しているにもかかわらず、バッチ・タスクの特性上、ブロックを使用しないテンプレートを使用して、限られたデータを処理することをお勧めします.

    Spring Batch


    Spring Batchを使用する前に、Batchの意味を理解してください.

    マッチバー


    リアルタイムではなくデータを集中的に処理

    Spring Batch


    Spring Batchは、これらのBatch機能を実現するためのオープンソースフレームワークです.

    Spring Baschのいくつかの有用な機能

  • 異常と非正常動作防御機能
  • サポート
  • タスクが失敗し、最初からではなく再起動時に失敗点から実行されます.
  • 成功経験のあるJobに対して同じパラメータを実行し、繰り返し実行されるExceptionを防止する(
  • )

    Spring Batchの構造と用語



    Job


    バッチ・プロシージャをユニットとして作成するオブジェクトとして、Jobはバッチ・プロシージャの最上位レベルに存在する概念である.簡単な例を挙げると、あるウェブサイト上のすべてのメンバーの情報が読み出され、1年後のメンバーを終了するためにデータ消去処理の論理がある場合、このプロセスはJobになります.

    JobLauncher


    JobLauncherは、JobとJobParametersを使用してJobを実行するオブジェクトです.
  • レビュー、実行方法、検証パラメータ等の実行
  • 他の設定がない場合、ジョブ
  • は、アプリケーションの起動時に直ちに実行する.

    JobInstance


    Jobを駆動する概念を区別する.ジョブが同じパラメータで実行されたときにExceptionが起動します

    JobParameters


    同じパラメータをJobInstanceで異なる方法で入力すると、同じオブジェクトとして認識されます.JobParametersはString、Double、Long、Dateの4つのフォーマットしかサポートしていません.

    Step


    ジョブのバッチを定義し、順序ステップを分割します.

    JobRepository


    上記のすべてのバッチ情報を処理するやつ.ジョブを実行する場合は、ジョブレポートにジョブ実行とStep実行を作成します.

    JobExecution


    上記の例では、1月1日に同じパラメータで実行された場合、Spring Batchは同じジョブインスタンスを実行するジョブ実行情報のオブジェクトを保持します.ただし、2回目の実行のためにオブジェクトが個別に作成され、データベースに保存されます.ジョブを再起動するなど、過去に実行されたジョブを操作できます.

    StepExecution


    ジョブExecutionと同様に、Step実行試行のオブジェクトが表示されますが、前のステップのStepに失敗した場合、次のStepに対してStep Executionは作成されません.Step Executionでは、データの読み出し回数、データの作成回数(txtファイルの作成またはデータベースへの保存)、Stepを無視した回数も格納します.

    ExecutionContext


    Jobはデータのデータストアを共有することができる.このExecutionContextには、JobExecutionContextとStep ExecutionContextの2種類がありますが、保存時間は異なります.ジョブExecutionの場合、コミットポイントとStep ExecutionはStep間に格納されます.

    Spring Batchの使用


    ブートストラップクラスで@EnableBaschProcessing宣言を使用する

    @SpringBootApplication
    @EnableBatchProcessing	// 배치 인프라스트럭처 사용을 위한 대부분의 빈을 등록해줍니다.
    public class DemoApplication {
    
    	public static void main(String[] args) {
    		SpringApplication.run(DemoApplication.class, args);
    	}
    
    }
    

    JobConfig Classの作成とJobの登録

    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.JobScope;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepScope;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import lombok.RequiredArgsConstructor;
    import mongo.movie.domain.TrendingMovie;
    import reactor.core.publisher.Mono;
    
    @Configuration
    @RequiredArgsConstructor
    public class OpenApiJob {
    	
    	private final JobBuilderFactory jobBuilderFactory;
    	private final StepBuilderFactory stepBuilderFactory;
    	
        // Job build 및 순서 정의
    	@Bean
    	public Job trendingMovieJob() {
    		
    		Job trendingMovieJob = jobBuilderFactory.get("trendingMovieJob")
    				.start(openApiFristStep())	// 데이터를 한번만 받으면 되기 때문에 단일 스텝으로 구성
    				.build();
    		
    		return trendingMovieJob;
    	}
    	
    	@Bean
        @JobScope	// JobParameter를 보내므로 설정
    	public Step openApiFristStep() {
    		return stepBuilderFactory.get("openApiFristStep")
    				.<Mono<TrendingMovie[]>, TrendingMovie[]>chunk(1) // Input, Output, chunk 사이즈
    				.reader(openApiReader())
    				.processor(dataEditProcessor())
    				.writer(dataInsertWrite())
    				.build();
    	}
    	
        // 데이터를 읽어오는 ItemReader 인터페이스의 커스텀 구현체
        @Bean
        @StepScope
    	public OpenApiReader openApiReader(){
    		return new TrendingMovieItemReader();
    	}
        
        // 읽어온 데이터를 가공 후 반환하는 ItemProcessor 인터페이스의 커스텀 구현체
    	@Bean
        @StepScope
    	public OpenApiProcessor dataEditProcessor() {
    		return new OpenApiProcessor();
    	}
    	
        // 가공 되어진 데이터들(Chunk)를 DB 혹은 특정 파일에 작성하는 ItemWriter 인터페이스의 커스텀 구현체
    	@Bean
        @StepScope
    	public OpenApiWriter dataInsertWrite() {
    		return new OpenApiWriter();
    	}
    }

    上で作成したItemReader、ItemProcessor、およびItemWriterのカスタムインプリメンテーション。


    ItemReader
    public class OpenApiReader implements ItemReader<Mono<TrendingMovie[]>> {
    													
    	@Value("${movie.openApi.trending.uri}")
    	private final String TRENDING_MOVIE_URL;
    	
    	@Value("${movie.openApi.apiKey}")
    	private final String API_KEY;
    	
    	@Autowired
    	private WebClient.Builder wcBuilder;
    	
    	private int cnt = 0;
    	
    	@Override
    	public Mono<TrendingMovie[]> read()
    			throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
    	    cnt++;
    	    return cnt == 1 
    	    ? wcBuilder.build().get()
    	              .uri(TRENDING_MOVIE_URL+"/trending/movie/day?api_key={API_KEY}",API_KEY)
    	              .accept(MediaType.APPLICATION_JSON)
    	              .retrieve()
    	              .bodyToMono(TrendingObject.class)
    	              .map(trendingObject -> trendingObject.getResults())
    	    : null;
    	}
    }
    
    リーダーを介してデータを読み込む場合、NULLが最終的に返されない場合は、無限ループが回転していることを知らないため、非常に困難です.
    ItemProcessor
    // exception이 발생하였을 때 Roll Back
    // 적용된 범위에서는 트랜잭션 기능이 포함된 프록시 객체가 생성되어 자동으로 commit 혹은 rollback을 진행해준다.
    @Transactional(rollbackFor = Exception.class)
    public class OpenApiProcessor implements ItemProcessor<Mono<TrendingMovie[]>, TrendingMovie[]>{
    														
    	@Override
    	public TrendingMovie[] process(Mono<TrendingMovie[]> item) throws Exception {
    		return item.block();
    	}
    }
    
    ItemWriter
    @Slf4j
    public class OpenApiWriter implements ItemWriter<TrendingMovie[]> {
    
    	@Autowired
    	private MongoOperations mongoOperations;
    	
    	@Override
    	public void write(List<? extends TrendingMovie[]> items) throws Exception {
    		// chunk 사이즈가 1이므로 한번만 돌음
    		for(int i = 0; i < items.size(); i++) {
    			TrendingMovie[] movies = items.get(i);
    			for(TrendingMovie movie : movies) {
    				log.info("movie : {}", movie.toString());
    				mongoOperations.save(movie);
    			}
    		}
    	}
    }

    Spring Scheduler


    バッチ処理を支援するSpring Batchについて説明した.では、スケジューラとは何でしょうか.

    Scheduler


    Baschがバッチである場合、スケジューラは、一定の時間間隔または一定の時間間隔で特定の論理を回転させるために使用される.
    Spring SchedulerとSpring QuartzはSpringでこれらのScheduler機能を使用する2つの方法であり、Spring Quartzは独立した依存性を必要とし、より複雑で困難である必要がある.

    Spring Schedulerの使用

    @EnableScheduling
    @EnableBatchProcessing
    @SpringBootApplication
    public class Application() {
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        }
    }
    ブートストラップクラスで@EnableScheduling操作が設定されている場合、準備作業は終了します.

    Spring Schedulerを使用する際の注意点

  • メソッドにはvoidの戻りタイプが必要です.
  • メソッドはパラメータを持つことができません.
  • 実際の使用

    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class JobScheduler {
    
    	private final JobLauncher jobLauncher;
    	private final OpenApiJob openApiJob;
    	
    	@Scheduled(cron = "0 0/3 * * * ?") // 3분에 1번씩 실행
    	public void openApiRequestSchedule() {
    		
    		// 넘기는 파라미터를 매번 다르게 해서 별개의 JobInstance로 인식하게 함
    		Map<String, JobParameter> confMap = new HashMap<>();
            confMap.put("time", new JobParameter(System.currentTimeMillis()));
            JobParameters jobParameters = new JobParameters(confMap);
    		try {
    			JobExecution jobExecution = jobLauncher.run(openApiJob.trendingMovieJob(), jobParameters);
    			
    			log.info("Job Execution: " + jobExecution.getStatus());
    			log.info("Job getJobConfigurationName: " + jobExecution.getJobConfigurationName());
    			log.info("Job getJobId: " + jobExecution.getJobId());
    			log.info("Job getExitStatus: " + jobExecution.getExitStatus());
    			log.info("Job getJobInstance: " + jobExecution.getJobInstance());
    			log.info("Job getStepExecutions: " + jobExecution.getStepExecutions());
    			log.info("Job getLastUpdated: " + jobExecution.getLastUpdated());
    			log.info("Job getFailureExceptions: " + jobExecution.getFailureExceptions());
    			
    		} catch (JobExecutionAlreadyRunningException
    				|JobRestartException
    				|JobInstanceAlreadyCompleteException
    				|JobParametersInvalidException e) {
    			e.getMessage();
    		}
    	}
    }
    

    結果



    画像が小さすぎます
    Spring Batchプロセス画像ソース