SpringBootプロジェクトで大量のredisMessageListenerContailner-X線路を作成することによるメモリオーバーフローの問題の分析と解決策


具体的な問題の説明:
プロジェクトが採用するspring cloudマイクロサービスアーキテクチャは、spring session(redisストレージ方式)を使用して各マイクロサービス間のsession共有、すなわちプロジェクト起動内に追加される
@EnableRedisHttpSession  。

中にはメールを送信する公共サービスmail-serviceがあり、ある業務モジュールは毎日定時タスクを実行してメールサービスを呼び出し、大量のメールを送信し、大量のr e d i s M s s a g e ListenerContailner-X線路を作成し、最終的にメールサービスメモリがオーバーフローする!
原因分析:
Spring Session(redis)のコンフィギュレーションクラスソース(R e d i s H t p SessionConfiguration):
@Autowired(
    required = false    //              ,           SimpleAsyncTaskExecutor   
)
@Qualifier("springSessionRedisTaskExecutor")
public void setRedisTaskExecutor(Executor redisTaskExecutor) {
    this.redisTaskExecutor = redisTaskExecutor;
}

SpringSessionRedisTaskExecutorは必須ではありません.カスタマイズしない場合、springのデフォルトではSimpleAsyncTaskExecutorスレッドプールが使用されます.
余談:
SimpleAsyncTaskExecutor:毎回新しいスレッドが作成されます(「スレッドプール」と言いますが、実際にはプール化されていませんが、最大同時スレッド数を設定できます.)@EnableAsyncは非同期メソッドを開き、背後でデフォルトで使用されているのがこのスレッドプールです.非同期メソッドを使用する場合、ビジネスシーンで頻繁に呼び出される場合(非同期メソッド)、スレッドプールをカスタマイズして、頻繁にスレッドを作成することによるパフォーマンスの消費を防止します.この非同期メソッドがブロックされている場合、呼び出し量が大きい場合は、OOM(スレッドがまだ終了していないし、より多くのスレッドが増加し、メモリオーバーフローを招く可能性がある)に注意してください.@Async注記は、カスタムスレッドプールの使用を選択できます.
SimpleAsyncTaskExecutorを作成しました
RedisHttpSessionConfigurationについてお話しします.
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(this.redisConnectionFactory);
    if (this.redisTaskExecutor != null) {
        container.setTaskExecutor(this.redisTaskExecutor);
    }

    if (this.redisSubscriptionExecutor != null) {
        container.setSubscriptionExecutor(this.redisSubscriptionExecutor);
    }

    container.addMessageListener(this.sessionRepository(), Arrays.asList(new PatternTopic("__keyevent@*:del"), new PatternTopic("__keyevent@*:expired")));
    container.addMessageListener(this.sessionRepository(), Collections.singletonList(new PatternTopic(this.sessionRepository().getSessionCreatedChannelPrefix() + "*")));
    return container;
}

RedisMessageListenerはリスニングを処理するクラスで、RedisMessageListenerは空ではないredisTaskExecutorを設定しています.spring sessionはデフォルトでこのExecutorを構成していないので、RedisMessageListenerはリスニングを処理する際にどのようにスレッドを使用しますか?次に、RedisMessageListenerContainerのソースコードを見てみましょう.
public void afterPropertiesSet() {
    if (this.taskExecutor == null) {
        this.manageExecutor = true;
        this.taskExecutor = this.createDefaultTaskExecutor();
    }

    if (this.subscriptionExecutor == null) {
        this.subscriptionExecutor = this.taskExecutor;
    }

    this.initialized = true;
}

protected TaskExecutor createDefaultTaskExecutor() {
    String threadNamePrefix = this.beanName != null ? this.beanName + "-" : DEFAULT_THREAD_NAME_PREFIX;
    return new SimpleAsyncTaskExecutor(threadNamePrefix);
}

afterPropertiesSet()この方法はよく知られているでしょう.この方法はすべての属性が初期化された後に呼び出されます(InitializingBeanインタフェースの詳細はここでは省略します).したがって、ユーザーがspringSessionRedisTaskExecutorを定義していない場合、Spring sessionはcreateDefaultTaskExecutor()メソッドを呼び出してSimpleAsyncTaskExecutorスレッドプールを作成します.このスレッドプールでは、タスクを処理するたびに新しいスレッドが作成されます.多くのredisMessageListenerContailner-X線路が見つかります
ソリューション:
springセッションにspringSessionRedisTaskExecutorスレッドプールを追加します.
/**
 *   spring session,          
 * @return
 */
@Bean
public ThreadPoolTaskExecutor springSessionRedisTaskExecutor(){
    ThreadPoolTaskExecutor springSessionRedisTaskExecutor = new ThreadPoolTaskExecutor();
    springSessionRedisTaskExecutor.setCorePoolSize(8);
    springSessionRedisTaskExecutor.setMaxPoolSize(16);
    springSessionRedisTaskExecutor.setKeepAliveSeconds(10);
    springSessionRedisTaskExecutor.setQueueCapacity(1000);
    springSessionRedisTaskExecutor.setThreadNamePrefix("Spring session redis executor thread: ");
    return springSessionRedisTaskExecutor;
}