ActiveMQキューメッセージの蓄積問題の調査


サマリ
会社の運営・維持の同僚は、ActiveMQに対して2つの問題を提起した.そのうちの1つは、「キューが長時間監視されていない場合、自動的に削除される」ということだ.調査研究は3つの案を提出した.ここでは関連記録と説明です.
に質問
運行メンテナンスの同僚は生産環境で使用するActiveMQに対して関連監視を行った.このモニタリングは、あるキューにメッセージが溜まっている場合(実際のルールはより複雑で調整中)にメッセージアラームを送信します.運行維持はメールを受け取った後、開発責任者に通知します.開発責任者は、システムが正常に関連キューを傍受しているかどうかを確認します.しかし、これまでの経験から見ると、メッセージが溜まっているのは業務システムの故障によるものです.その他の状況(具体的なデータは集計されておらず、約5、6回)いずれも業務システムがこのキューを傍受しなくなったことによるものである.これにより、私たちの運営・開発の同僚は夜中に問題をチェックした結果、そのキューを削除するだけでよいことが分かった.特に、オンラインActiveMQがメッセージの持続化を構成しているため、このようなメッセージの蓄積は実際にはそうではないActiveMQにどれだけ影響を与えるかは、翌日の出勤後に処理することができます.皆さんの睡眠の質と夫婦感情を考慮して、JIRAでは3つの案を調査、検討しました.
シナリオ1:ActiveMQ自己構成
ActiveMQが公式に提供している機能のリストには、Delete Inactive Destinationという機能があります.「未処理メッセージがなく、消費者がいないDestination」を削除できます.
構成例
この構成は比較的簡単で、ActiveMQのプロファイルactivemq.xmlでは、以下のように変更すればよい.ここではqueueの構成を例に挙げます.topic構成は似ています.


        
            
                
                
            
        
    

上記の構成例は、このBrokerが「gcInactiveDestinations="true"と表記されたQueue(ここではqueue=">"を構成するため、実際にはすべてのQueueをスキャンする)を10000 ms毎にスキャンし、「メッセージが処理されず、消費者がなく、この状態が30000 msを超えた」ことを意味する.(inactiveTimoutBeforeGC構成で指定されています)」のキューが削除されます.少しめまいがします.各構成項目の具体的な説明は以下の通りです.
構成アイテムの説明
次の3つの構成項目のうち、s c h e d u l e P e r iodForDestinationPurgeとgcInactiveDestinationsは必須構成です.inactiveTimoutBeforeGCはオプション構成です.
schedulePeriodForDestinationPurge
これはBrokerの構成で、ミリ秒単位で「アイドルキューをスキャンするサイクル」を宣言します.既定値は0で、「スキャンしない」を意味します.なお、ここではスキャンタスクの起動サイクルのみを構成でき、起動遅延を構成できません.すなわち、構成が完了すると、ActiveMQサービスが起動するとすぐにスキャンされる.次に、指定した時間周期でスキャンします.
gcInactiveDestinations
これはDestinationの構成です.Brokerがアイドルキューをスキャンしたときに、このDestinationをスキャンするかどうかを宣言します(queue=「xxxx」で指定します).デフォルトはfalseです.
inactiveTimoutBeforeGC
これもDestinationの構成であり、このDestinationがどのくらいアイドルになってから削除できるかを宣言するために使用されます.単位ミリ秒、デフォルト時間60 s.この構成はgcInactiveDestinationsがtrueに設定されている場合に有効になる必要があります.
シナリオ分析
これだけ紹介されていますが、実際には、最初の言葉からこの案は私たちの問題を解決できないことがわかります.我々の問題は「メッセージが蓄積されているが、消費者がいないDestination」を処理することであり、この案は「メッセージが処理されていない、消費者がいないDestination」を削除するしかないからだ.それ以外に、これは最も簡単で信頼できる案だと思います.実際、多くのオリジナルQueueにとって、ビジネスシステムは生産者と消費者を同時にラインオフします.この案はこのような状況によく対応できる.
シナリオ2:ActiveMQプラグインのカスタマイズ
ActiveMQプラグイン(plugin)は、ドキュメントにインターセプタ(Interceptor)と呼ばれるものもあります.両者は相補的です.構成にはプラグインが必要です.実行にはブロックが必要です.ActiveMQ公式にはいくつかのプラグインが提供されています.(ログ、統計、タイムスタンプなど)は、公式の説明と開発ドキュメントを参照できます.公式の例を参照してプラグインを定義できます.
コンフィギュレーション
ActiveMQはactivemqを解析する.xmlの構成で、プラグインをロードします.そのため、構成から着手し、プラグインとブロッキングがどのように動作しているかを徐々に明らかにします.activemq.xmlの構成は簡単です.以下に示します.

    
    
        
        
    
    

上記の構成は、プラグインクラス名がnetであるプラグインを宣言する.loyintean.blog.jms.manage.PlugIn,idはlinjunPluginである.このクラスはActiveMQのclasspathパスの下に含める必要があります.私たちは自分でjarパッケージを打って、jarパッケージをActiveMQのlibパスの下に置くことができます.関連クラスパスを変更することもできます.つまりActiveMQがこのクラスにロードされることを保証する(およびその依存クラス).上記の構成では、プラグインにidを設定する必要はありません.ただし、プラグイン宣言にはidを使用する必要がある方法もあります.ここでは、開発ドキュメントを参照することは多くありません.構成中の注釈で述べたように、プラグインを宣言するために使用されるラベルや構文はspringから来ています.つまりspringの他のラベルも、ここではspringのサポートされています.ただし、@Autowiredなどの注釈のサポート方法はまだ見つかっていません.
プラグイン
spring bootを使用しているので、spring-boot-starter-activemqを1つ加えるだけで必要な依存jarパッケージを導入できます.spring bootを使用しない場合はactivemq-broker-x.y.z.jarを導入する必要があります.ActiveMQ仕様に従って、プラグインはBrokerPluginインタフェースを実装する必要があります.このインタフェースには、Broker installPlugin throws Exceptionという方法しかありません.サービスがプラグインを起動、ロードしたときに、現在起動しているborkerインスタンスを取得し、Brokerインスタンスを返します.たとえば、前述のlinjunPluginコードは次のようになります.
public class PlugIn implements BrokerPlugin {
    /**
    * @author linjun
    * @since 2017 10 30 
    * @param b
    * @return
    * @see org.apache.activemq.broker.BrokerPlugin#installPlugin(org.apache.activemq.broker.Broker)
    */
    @Override
    public Broker installPlugin(Broker b) {
        return new RemoveDestination(b);
    }
}

何かわけがわからないようで、しかし、「装飾者」の観点から理解すると楽になります.入参brokerはオリジナルのインスタンス(もちろん他のプラグインで「装飾」されたものもあります).出参は私たち自身のプラグインで「装飾」された、強化版のインスタンスです.一般的には、起動プロセスはあまり処理されません.処理ロジックは私たちの「装飾者」にあります.で、上のコードのRemoveDestinationのように.
ブロッキング
上記のように、「装飾」されたBrokerを提供する必要があります.しかしBrokerはインタフェースで50以上の方法がありますサービスの開始、リンクの作成、メッセージング、トランザクションのコミット、ロールバックなど、Brokerのサービス中のさまざまなイベントを処理します.インタフェースを直接実装するのはあまりにも醜いです.ActiveMQもこの点を考慮して、Broker Filterというアダプタを提供しています.コードは次のとおりです.
public class BrokerFilter implements Broker {
    //  “  ”     
    protected final Broker next;
    public BrokerFilter(Broker next) {
        this.next = next;
    }
    //         ,        next  。
}

このアダプタを使用すると、注目しているイベントの処理に専念できます.RemoveDestinationの場合、サービスの開始時にタイマーを登録し、必要に応じて無人の傍受キューを削除するだけです.コードは次のとおりです.
public class RemoveDestination extends BrokerFilter {
    private Timer timer;
    /**
     * @param next
     */
    public RemoveDestination(Broker next) {
        super(next);
        //        ,       activeMQ   
        this.timer = new Timer(true);
    }
    @Override
    public void start() throws Exception {
        super.start();
        // DONE linjun 2017-11-01       
        this.timer.schedule(new TimerTask() {
            @Override
            public void run() {
                RemoveDestination.this.remove();
            }
        }, 3000, 3000);
    }
    private void remove() {
        Map destinationMap = this
            .getDestinationMap();
        ConnectionContext context = BrokerSupport.getConnectionContext(this);
        destinationMap.entrySet().forEach(entry -> {
            Destination destination = entry.getValue();
            //      
            // DONE linjun 2017-11-01    queue,   topic
            if (destination.getDestinationStatistics().getConsumers()
                .getCount() == 0) {
                ActiveMQDestination activeMQDestination = entry.getKey();
                if (activeMQDestination.isQueue()) {
                    try {
                        this.removeDestination(context,
                            activeMQDestination, 1);
                    } catch (Exception e) {
                        //     ,          
                        e.printStackTrace();
                    }
                }
            }
        });
    }
}

Broker FilterというBrokerイベントに対してブロックしたり、装飾したりするクラスのほか、Destination向けのDestinationFilterもありますが、言うまでもありません.
特別説明
BrokerFilterでもDestinationFilterでも、親クラスのメソッドを書き換えるときは、superの対応メソッドを呼び出すことに注意してください.RemoveDestinationクラスがstart()メソッドを上書きするときにsuperを呼び出す.start()メソッド.この2つのクラスの各メソッドは、BrokerまたはDestinationのイベントの「処理スタック」に対応します.親メソッドを呼び出さないと、ベースのコードやキーのコードが実行されず、例外が発生する可能性があります.したがって、「ここまで実行するときは現在のイベントを中断する必要があります」が非常に確定していない場合は、superの対応するメソッドを呼び出す必要があります.
シナリオ分析
上記のコードは例用であり,さらに改善することができる.しかし、この案は需要を満たすことができる.しかし、このスキームには、Destinationを削除すると、これらのメッセージが永続化されていても、消費されていないすべてのメッセージが削除されるリスクがあります.あるビジネスシステムで長時間障害が発生し、ActiveMQに接続できない場合、ActiveMQがその間に傍受していたDestinationおよびそのメッセージを削除した場合......このリスク確率は小さいが、影響は大きすぎる.念のため、案2を放棄する.
シナリオ3:アラームスクリプトの調整
シナリオ3は運維の範疇に属する.JIRAで議論されているように、この問題の本当の「痛い点」は、列を廃棄するのではなく、緊急ではないのに夜中に警察に通報することです.そのため、運維の同僚がスクリプトを修正し、「消費者がいない」という問題の監視警報時間を調整すればいい.
小結
最後に選んだのは案3です.方案1は需要を満たすことができない;シナリオ2のリスクは大きい.案三直撃痛点、さっぱり.このことは、仕事をする前に目標をよく考え、計画してから行動することを示唆しています.