Igniteメッセージとイベント
15089 ワード
1.Topic Based Messaging(トピックベースのメッセージ)
1.1概要
1.2 IgniteMessaging
1.3 Publish Messages(発表メッセージ)
1.3.1 Ordered Messages
1.3.2 Unordered Messages
1.4 Subscribe for Messages
1.4.1 Local Listen
1.4.2 Remote Listen
1.5サンプル
2.Local and Remote Events(ローカルおよびリモートイベント)
2.1概要
2.2 IgniteEvents
2.3 Subscribe for Events
2.3.1 Local Events
2.3.2 Remote Events
2.4 Query for Events
2.4.1 Local Query
2.4.2 Remote Query
2.5 Configuration
3. Automatic Batching
1.1概要
ignite . 。
ignite - , 。 T a , T 。
PS: ( ) 。
1.2 IgniteMessaging
IgniteMessaging 。 IgniteMessaging , :
Ignite ignite = Ignition.ignite();
// Messaging instance over this cluster.
IgniteMessaging msg = ignite.message();
// Messaging instance over given cluster group (in this case, remote nodes).
IgniteMessaging rmtMsg = ignite.message(ignite.cluster().forRemotes());
1.3 Publish Messages(発表メッセージ)
Send / 。 。
1.3.1 Ordered Messages
, sendorder(…) 。 , 。 , 。
1.3.2 Unordered Messages
send(…) 。 , A B , A, B。
1.4 Subscribe for Messages
listen / 。 , ( ) , 。 , , 。
1.4.1 Local Listen
localListen(…) , 。
1.4.2 Remote Listen
remoteListen(…) , 。
1.5サンプル
。
//
Ignite ignite = Ignition.ignite();
IgniteMessaging rmtMsg = ignite.message(ignite.cluster().forRemotes());
// Add listener for unordered messages on all remote nodes.
rmtMsg.remoteListen("MyOrderedTopic", (nodeId, msg) -> {
System.out.println("Received ordered message [msg=" + msg + ", from=" + nodeId + ']');
return true; // Return true to continue listening.
});
// Send ordered messages to remote nodes.
for (int i = 0; i < 10; i++)
rmtMsg.sendOrdered("MyOrderedTopic", Integer.toString(i));
//
Ignite ignite = Ignition.ignite();
IgniteMessaging rmtMsg = ignite.message(ignite.cluster().forRemotes());
// Add listener for unordered messages on all remote nodes.
rmtMsg.remoteListen("MyUnOrderedTopic", (nodeId, msg) -> {
System.out.println("Received unordered message [msg=" + msg + ", from=" + nodeId + ']');
return true; // Return true to continue listening.
});
// Send unordered messages to remote nodes.
for (int i = 0; i < 10; i++)
rmtMsg.send("MyUnOrderedTopic", Integer.toString(i));
//java7
Ignite ignite = Ignition.ignite();
// Get cluster group of remote nodes.
ClusterGroup rmtPrj = ignite.cluster().forRemotes();
// Get messaging instance over remote nodes.
IgniteMessaging msg = ignite.message(rmtPrj);
// Add message listener for specified topic on all remote nodes.
msg.remoteListen("myOrderedTopic", new IgniteBiPredicate() {
@Override public boolean apply(UUID nodeId, String msg) {
System.out.println("Received ordered message [msg=" + msg + ", from=" + nodeId + ']');
return true; // Return true to continue listening.
}
});
// Send ordered messages to all remote nodes.
for (int i = 0; i < 10; i++)
msg.sendOrdered("myOrderedTopic", Integer.toString(i), 0);
2.Local and Remote Events(ローカルおよびリモートイベント)
。
2.1概要
ignite 。 , , , 。
2.2 IgniteEvents
IgniteEvents 。 ignite igniteEvents , :
Ignite ignite = ignition.ignite()
IgniteEvents events = ignite.events();
2.3 Subscribe for Events
。 remotes 。 , 。
2.3.1 Local Events
localListen(...) 。
2.3.2 Remote Events
remoteListen(…) 。 :
//
Ignite ignite = Ignition.ignite();
// Local listener that listenes to local events.
IgnitePredicate locLsnr = evt -> {
System.out.println("Received event [evt=" + evt.name() + ", key=" + evt.key() +
", oldVal=" + evt.oldValue() + ", newVal=" + evt.newValue());
return true; // Continue listening.
};
// Subscribe to specified cache events occuring on local node.
ignite.events().localListen(locLsnr,
EventType.EVT_CACHE_OBJECT_PUT,
EventType.EVT_CACHE_OBJECT_READ,
EventType.EVT_CACHE_OBJECT_REMOVED);
// Get an instance of named cache.
final IgniteCache cache = ignite.cache("cacheName");
// Generate cache events.
for (int i = 0; i < 20; i++)
cache.put(i, Integer.toString(i));
//
Ignite ignite = Ignition.ignite();
// Get an instance of named cache.
final IgniteCache cache = ignite.jcache("cacheName");
// Sample remote filter which only accepts events for keys
// that are greater than or equal to 10.
IgnitePredicate rmtLsnr = evt -> evt.key() >= 10;
// Subscribe to specified cache events on all nodes that have cache running.
ignite.events(ignite.cluster().forCacheNodes("cacheName")).remoteListen(null, rmtLsnr, EventType.EVT_CACHE_OBJECT_PUT,
EventType.EVT_CACHE_OBJECT_READ,
EventType.EVT_CACHE_OBJECT_REMOVED);
// Generate cache events.
for (int i = 0; i < 20; i++)
cache.put(i, Integer.toString(i));
//java7
Ignite ignite = Ignition.ignite();
// Get an instance of named cache.
final IgniteCache cache = ignite.jcache("cacheName");
// Sample remote filter which only accepts events for keys
// that are greater than or equal to 10.
IgnitePredicate rmtLsnr = new IgnitePredicate() {
@Override public boolean apply(CacheEvent evt) {
System.out.println("Cache event: " + evt);
int key = evt.key();
return key >= 10;
}
};
// Subscribe to specified cache events occuring on
// all nodes that have the specified cache running.
ignite.events(ignite.cluster().forCacheNodes("cacheName")).remoteListen(null, rmtLsnr, EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_READ, EVT_CACHE_OBJECT_REMOVED);
// Generate cache events.
for (int i = 0; i < 20; i++)
cache.put(i, Integer.toString(i));
,EVT_CACHE_OBJECT_PUT、EVT_CACHE_OBJECT_READ EVT_CACHE_OBJECT_REMOVED EventType 。
PS:EventType 。 , javadoc。
ps: localListen(…) remoteListen(…) IgniteConfiguration 。 。
2.4 Query for Events
。igniteEvents API
2.4.1 Local Query
localQuery(…) 。 , 。
2.4.2 Remote Query
remoteQuery(…) 。 , , 。 , , 。
2.5 Configuration
, IgniteConfiguration includeEventTypes 。
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
...
<property name="includeEventTypes">
<util:constant static-field="org.apache.ignite.events.EventType.EVTS_CACHE"/>
property>
...
bean>
IgniteConfiguration cfg = new IgniteConfiguration();
// Enable cache events.
cfg.setIncludeEventTypes(EVTS_CACHE);
// Start Ignite node.
Ignition.start(cfg);
, 。
PS: , 。 。 , 。
3. Automatic Batching
, 。
ignite 。
。 , , 。
ignite , , 。 :
Ignite ignite = Ignition.ignite();
// Get an instance of named cache.
final IgniteCache cache = ignite.jcache("cacheName");
// Sample remote filter which only accepts events for keys
// that are greater than or equal to 10.
IgnitePredicate rmtLsnr = new IgnitePredicate() {
@Override public boolean apply(CacheEvent evt) {
System.out.println("Cache event: " + evt);
int key = evt.key();
return key >= 10;
}
};
// Subscribe to cache events occuring on all nodes
// that have the specified cache running.
// Send notifications in batches of 10.
ignite.events(ignite.cluster().forCacheNodes("cacheName")).remoteListen(
10 /*batch size*/, 0 /*time intervals*/, false, null, rmtLsnr, EVTS_CACHE);
// Generate cache events.
for (int i = 0; i < 20; i++)
cache.put(i, Integer.toString(i));