CamelにおけるSplitterとAgregatorの使用


最近会社はcamelで文字列を切り分けて集まって処理が完了したかどうかを判断します。容器はServicemixを使用します。
blue print.xml
<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" 
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
           xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd" xmlns:ns2="null" xmlns:ns3="null" xmlns:ns4="null" xmlns:ns5="null" xmlns:ns6="null" xmlns:ns7="null">
    
    <bean id="Aggregated" class="com.bosame.TicketCenter.Application.OrderProcedure.Aggregated">
    </bean>

    <bean id="aggregatorStrategy" class="com.bosame.TicketCenter.Application.OrderProcedure.StringAggregationStrategy"/>

    <bean id="OrderSplitBean" class="com.bosame.TicketCenter.Application.OrderProcedure.OrderSplit"></bean>

    <camelContext id="OrderCamel"   xmlns="http://camel.apache.org/schema/blueprint" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                    xsi:schemaLocation="http://camel.apache.org/schema/blueprint http://camel.apache.org/schema/blueprint/camel-blueprint.xsd">
        <route>
            <from uri="vm:ReceiveBatchOrder"/>
            <setHeader headerName="gid">
                <simple>${date:now:HH_mm_ss}</simple>
            </setHeader>
            <split>
                <method bean="OrderSplitBean" method="splitOrders"/>
                <setHeader headerName="gid">
                    <simple>${in.header.gid}</simple>
                </setHeader>
                <to uri="vm:ReceiveOrder"/>
            </split>
        </route>
        <route>
            <from uri="vm:ReceiveOrder"/>
            <to uri="bean:OrderVerifierBean" />
            <aggregate strategyRef="aggregatorStrategy" eagerCheckCompletion="true" completionTimeout="3000">
                <correlationExpression>
                    <header>gid</header>
                </correlationExpression>
                <completionSize>
                    <header>CamelSplitSize</header>
                </completionSize>
                <to uri="bean:Aggregated"/>
            </aggregate>
        </route>
    </camelContext>
</blueprint>

 
ここで分割する場合は、beanオブジェクトを使って切り分けします。正規表現や文字列を使ってもいいです。
これを「<tokenize token=」@’に変えて、切り分ける時は「@」で切るということです。
加入$        この行のコードは、集合の際に使いやすいようにしています。camelが集まるときは、頭と同じように集まるためです。
<corelationExpression>                   
gid
これは集約条件を表しています。頭と同じもので重合します。
<copletionSize>                   
CamelSplitySize
これは集約完了条件です。切り分けの大きさで完成条件です。Camel SplitySizeというパラメータはcamelの2.9以降にcamelのヘッドheaderに追加されました。 中にあります。詳細はリンクhttp://camel.apache.org/splitter.htmlを参照してください。
compleetion Timeout=「3000」というパラメータは、タイムアウト時間が過ぎても、3秒が終わっていない時にタイムアウトします。strate gyRef=「aggregegatostrategy」これは重合の策略です。自分でAgregationStrategyを継承するクラスを作成します。
中にはどのように集約するかと書いてありますが、ここでは簡単に2つの値を加算します。
<to uri=「bean:Agregated」/>これは最終的に重合が完了しました。ジャンプするbeanです。自分で勝手に書いてもいいです。
Order Split.java
package com.TicketCenter.Application.OrderProcedure;

import java.util.List;

/**
 *     
 * @author lym
 */
public class OrderSplit {
    public List<String> splitOrders(List<String> orderStrList) {
        return orderStrList;
    }
}
StringAgregationStrategy.java
package com.TicketCenter.Application.OrderProcedure;

import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;

/**
 *
 * @author Administrator
 */
public class StringAggregationStrategy implements AggregationStrategy {

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            return newExchange;
        }
        oldExchange.getIn().setBody(newExchange.getIn().getBody(Order.class));
        return oldExchange;
    }
}