Igniteメッセージとイベント

15089 ワード

1.Topic Based Messaging(トピックベースのメッセージ)
                

1.1概要
ignite                          .                               。
ignite         -     ,                     。          T    a ,        T      。
PS:                        (    )        。

1.2 IgniteMessaging
  IgniteMessaging              。       IgniteMessaging   ,    :
    Ignite ignite = Ignition.ignite();
    // Messaging instance over this cluster.
    IgniteMessaging msg = ignite.message();
    // Messaging instance over given cluster group (in this case, remote nodes).
    IgniteMessaging rmtMsg = ignite.message(ignite.cluster().forRemotes());

1.3 Publish Messages(発表メッセージ)
Send              /         。               。

1.3.1 Ordered Messages
                ,     sendorder(…)  。                        ,             。      ,                      。

1.3.2 Unordered Messages
send(…)          。    ,        A   B ,              A,   B。

1.4 Subscribe for Messages
listen      /    。        ,               (   )     ,      。      ,            ,                 。

1.4.1 Local Listen
localListen(…)                         ,                 。

1.4.2 Remote Listen
remoteListen(…)                            ,                 。

1.5サンプル
//    
Ignite ignite = Ignition.ignite();

IgniteMessaging rmtMsg = ignite.message(ignite.cluster().forRemotes());

// Add listener for unordered messages on all remote nodes.
rmtMsg.remoteListen("MyOrderedTopic", (nodeId, msg) -> {
    System.out.println("Received ordered message [msg=" + msg + ", from=" + nodeId + ']');

    return true; // Return true to continue listening.
});

// Send ordered messages to remote nodes.
for (int i = 0; i < 10; i++)
    rmtMsg.sendOrdered("MyOrderedTopic", Integer.toString(i));
//    
 Ignite ignite = Ignition.ignite();

IgniteMessaging rmtMsg = ignite.message(ignite.cluster().forRemotes());

// Add listener for unordered messages on all remote nodes.
rmtMsg.remoteListen("MyUnOrderedTopic", (nodeId, msg) -> {
    System.out.println("Received unordered message [msg=" + msg + ", from=" + nodeId + ']');

    return true; // Return true to continue listening.
});

// Send unordered messages to remote nodes.
for (int i = 0; i < 10; i++)
    rmtMsg.send("MyUnOrderedTopic", Integer.toString(i));
//java7
Ignite ignite = Ignition.ignite();

// Get cluster group of remote nodes.
ClusterGroup rmtPrj = ignite.cluster().forRemotes();

// Get messaging instance over remote nodes.
IgniteMessaging msg = ignite.message(rmtPrj);

// Add message listener for specified topic on all remote nodes.
msg.remoteListen("myOrderedTopic", new IgniteBiPredicate() {
    @Override public boolean apply(UUID nodeId, String msg) {
        System.out.println("Received ordered message [msg=" + msg + ", from=" + nodeId + ']');

        return true; // Return true to continue listening.
    }
});

// Send ordered messages to all remote nodes.
for (int i = 0; i < 10; i++)
    msg.sendOrdered("myOrderedTopic", Integer.toString(i), 0);

2.Local and Remote Events(ローカルおよびリモートイベント)

2.1概要
ignite                                 。                             , , ,         。

2.2 IgniteEvents
         IgniteEvents    。    ignite     igniteEvents   ,      :
Ignite ignite = ignition.ignite()
IgniteEvents events = ignite.events();

2.3 Subscribe for Events
                       。        remotes                。           ,        。

2.3.1 Local Events
localListen(...)                       。

2.3.2 Remote Events
remoteListen(…)                      。            :
//    
Ignite ignite = Ignition.ignite();

// Local listener that listenes to local events.
IgnitePredicate locLsnr = evt -> {
  System.out.println("Received event [evt=" + evt.name() + ", key=" + evt.key() + 
    ", oldVal=" + evt.oldValue() + ", newVal=" + evt.newValue());

  return true; // Continue listening.
};

// Subscribe to specified cache events occuring on local node.
ignite.events().localListen(locLsnr,
  EventType.EVT_CACHE_OBJECT_PUT,
  EventType.EVT_CACHE_OBJECT_READ,
  EventType.EVT_CACHE_OBJECT_REMOVED);

// Get an instance of named cache.
final IgniteCache cache = ignite.cache("cacheName");

// Generate cache events.
for (int i = 0; i < 20; i++)
  cache.put(i, Integer.toString(i));
//    
Ignite ignite = Ignition.ignite();

// Get an instance of named cache.
final IgniteCache cache = ignite.jcache("cacheName");

// Sample remote filter which only accepts events for keys
// that are greater than or equal to 10.
IgnitePredicate rmtLsnr = evt -> evt.key() >= 10;

// Subscribe to specified cache events on all nodes that have cache running.
ignite.events(ignite.cluster().forCacheNodes("cacheName")).remoteListen(null, rmtLsnr,                                                                 EventType.EVT_CACHE_OBJECT_PUT,
  EventType.EVT_CACHE_OBJECT_READ,
  EventType.EVT_CACHE_OBJECT_REMOVED);

// Generate cache events.
for (int i = 0; i < 20; i++)
  cache.put(i, Integer.toString(i));
//java7
Ignite ignite = Ignition.ignite();

// Get an instance of named cache.
final IgniteCache cache = ignite.jcache("cacheName");

// Sample remote filter which only accepts events for keys
// that are greater than or equal to 10.
IgnitePredicate rmtLsnr = new IgnitePredicate() {
    @Override public boolean apply(CacheEvent evt) {
        System.out.println("Cache event: " + evt);

        int key = evt.key();

        return key >= 10;
    }
};

// Subscribe to specified cache events occuring on 
// all nodes that have the specified cache running.
ignite.events(ignite.cluster().forCacheNodes("cacheName")).remoteListen(null, rmtLsnr,                                                                 EVT_CACHE_OBJECT_PUT,                                                           EVT_CACHE_OBJECT_READ,                                                     EVT_CACHE_OBJECT_REMOVED);

// Generate cache events.
for (int i = 0; i < 20; i++)
    cache.put(i, Integer.toString(i));
       ,EVT_CACHE_OBJECT_PUT、EVT_CACHE_OBJECT_READ EVT_CACHE_OBJECT_REMOVED   EventType               。
PS:EventType                      。             ,   javadoc。
ps:      localListen(…) remoteListen(…)           IgniteConfiguration   。          。

2.4 Query for Events
                   。igniteEvents API            

2.4.1 Local Query
localQuery(…)                      。         ,                。

2.4.2 Remote Query
remoteQuery(…)                          。         ,            ,             。  ,        ,       。

2.5 Configuration
                       ,    IgniteConfiguration  includeEventTypes   。
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
        ... 
    
    <property name="includeEventTypes">
        <util:constant static-field="org.apache.ignite.events.EventType.EVTS_CACHE"/>
    property>
    ...
bean>
IgniteConfiguration cfg = new IgniteConfiguration();

// Enable cache events.
cfg.setIncludeEventTypes(EVTS_CACHE);

// Start Ignite node.
Ignition.start(cfg);
     ,            。
PS:            ,               。           。  ,                    。

3. Automatic Batching
    ,               。
ignite                               。
                        。          ,                ,                 。
 ignite ,        ,        。                :
Ignite ignite = Ignition.ignite();

// Get an instance of named cache.
final IgniteCache cache = ignite.jcache("cacheName");

// Sample remote filter which only accepts events for keys
// that are greater than or equal to 10.
IgnitePredicate rmtLsnr = new IgnitePredicate() {
    @Override public boolean apply(CacheEvent evt) {
        System.out.println("Cache event: " + evt);

        int key = evt.key();

        return key >= 10;
    }
};

// Subscribe to cache events occuring on all nodes 
// that have the specified cache running. 
// Send notifications in batches of 10.
ignite.events(ignite.cluster().forCacheNodes("cacheName")).remoteListen(
        10 /*batch size*/, 0 /*time intervals*/, false, null, rmtLsnr, EVTS_CACHE);

// Generate cache events.
for (int i = 0; i < 20; i++)
    cache.put(i, Integer.toString(i));