CEP(siddhi)複雑なイベントフローエンジン


詳細
Siddhiは軽量級で、簡単なオープンソースの複雑なイベントフローエンジンです.クラスSQLの言語でイベントフロータスクを記述し、拡張可能で構成可能なフロータスク実行エンジンの開発をサポートします.従来の設計では、異なるアラート・ルール・タイプをサポートするために、異なるビジネス・ロジック・コードを記述する必要がありますが、Siddhiを使用すると、異なるストリーム・タスクSiddhiqlを構成するだけで、異なるアラート・ビジネスをサポートできます.
 
Why use Siddhi:
It is fast. UBER uses it to process 20 Billion events per day (300,000 events per second). 
It is lightweight (<2MB), and embeddable in Android and RaspberryPi.
It has over 40 Siddhi Extensions
It is used by over 60 companies including many Fortune 500 companies in production. Following are some examples:
WSO2 uses Siddhi for the following purposes:
To provide stream processing capabilities in their products such as WSO2 Stream Processor.
As the edge analytics library of WSO2 IoT Server.
As the core of WSO2 API Manager's throttling. 
As the core of WSO2 products' analytics.

UBER uses Siddhi for fraud analytics.
Apache Eagle uses Siddhi as a policy engine.

Solutions based on Siddhi have been finalists at ACM DEBS Grand Challenge Stream Processing competitions in 2014, 2015, 2016, 2017.
Siddhi has been the basis of many academic research projects and has over 60 citations.
Overview
 
 1.maven依存

    org.wso2.siddhi
    siddhi-core
    4.2.0



    org.wso2.siddhi
    siddhi-query-api
    4.2.0



    org.wso2.siddhi
    siddhi-query-compiler
    4.2.0

 2、siddhiコード実行例
// Creating Siddhi Manager
SiddhiManager siddhiManager = new SiddhiManager();

String siddhiApp = "define stream cseEventStream (symbol string, price float, volume long); " +
        "" +
        "@info(name = 'query1') " +
        "from cseEventStream#window.length(0) " +
        "select symbol, price, avg(price) as ap, sum(price) as sp, count(price) as cp " +
        "group by symbol " +
        "output first every 4000 milliseconds "+
        "insert into outputStream;";

// Generating runtime
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
// Starting event processing
siddhiAppRuntime.start();
// Adding callback to retrieve output events from query
siddhiAppRuntime.addCallback("query1", new QueryCallback()
{
    @Override
    public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents)
    {
        System.out.println("============query callback============");
        EventPrinter.print(timeStamp, inEvents, removeEvents);
        /*System.out.println(timeStamp);
        System.out.println(inEvents);*/
    }
});
siddhiAppRuntime.addCallback("cseEventStream", new StreamCallback() {
    @Override
    public void receive(Event[] events) {
        System.out.println("============input stream callback============");
        EventPrinter.print(events);
    }
});


// Retrieving InputHandler to push events into Siddhi
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream");


int i = 1;
while (i <= 10) {
    float p = i*10;
    inputHandler.send(new Object[]{"WSO2", p, 100});
    System.out.println("\"WSO2\", " + p);
    inputHandler.send(new Object[] {"IBM", p, 100});
    System.out.println("\"IBM\", " + p);
    Thread.sleep(1000);
    i++;
}

// Shutting down the runtime
siddhiAppRuntime.shutdown();

// Shutting down Siddhi
siddhiManager.shutdown();