Akka-CQRS(5)-CQRS Writer Actorの導入とテスト
72754 ワード
前編ではWriterActorの例を示したが,主な目的はWriterActorがクラスタスライスとしてpersistentActor特性とevent-sourcingモードでCQRSの書き込み機能を実現する方法を示すことである.クラスタスライスである以上、WriterActorの導入とテストについて説明します.この中には注意すべき点があるからです.次に、WriteActor(cluster-sharding)の配備コードを示します.
注意handOffStopMessageパラメータを持つstart関数はallocationStrategyを同時に提供する必要があります.このパラメータはpassivationメッセージタイプを提供します.
クラスタ全体のスライス配置コードは次のとおりです.
以上、hostおよびportはmainのパラメータから解析されたものであり、例えば192.168.11.162:2551であり、本ノードのhostおよびportを表す.akka.cluster.rolesは、poswriterがその1つであるこのノードでサポートされているロールを表します.一方、クライストチャーディングセツ(system).withRole(「poswriter」)は、このスライスshardがposwriterロールをサポートするノードにしか配備できないことを表します.間違って実行していると、Shardingが起動できないことに気づきます.上記のプログラムは、poswriterロールをサポートするノードを表します.このノード(入力されたIPアドレス)には、poswriterロールを持つ「POSHARD」というcluster-shardingが配備されています.
もし私が複数のマシンでこのコードを実行したら、現在のマシンのIP+PORTを入力すると、このような複数のマシンに「POSHord」スライスが配備されていることを意味します.上のClusterMonitorはクラスタ状態モニタactorです.
クラスタノードの接続状態を監視できます.
では、いくつかのマシンで構成されたクラスタの各ノードに「POSShard」スライスを配備したと仮定すると、この「POSShard」スライスにPOSSMessageを送信するクライアントを設計します.
このクライアントは、同じクラスタ、すなわちクラスタのノードの1つである必要があります.そうしないと、「POSHord」スライスが配備されている他のノードと情報交換できません.しかし、remoteのhostnameとportがすでに占有されているため、「POSHord」が導入されたノードと同じ場所にはありません.そのため、クライアントを「POSHARD」が配備されていないノードに置いて、ClusterSharding(system)を使用するしかない.startProxyは、スライス仲介を開始します.
このproxyのroleはSome(「poswriter」)でなければならないことに注意してください.このようにしてこそ、他のノードの「POSShard」を呼び出すことができます.彼らの役割はすべて「poswriter」です.WriterActorと対話するにはactorが必要です.WriterActorはsender()で結果を返します.このsender()はActorRefです.
WriterActorを指揮するには、次の方法があります.
次に、サービス側のスライス配置ソースコードを示します.
resources/application.conf
POSRouter.scala
次は、このテスト項目のソースコードです.
build.sbt
resources/application.conf
resources/logback.xml
ClientDemo.scala
client/Commands.scala
client/States.scala
client/POSClient.scala
client/Responses.scala
client/DataAccess.scala
logging/Log.scala
logging/LogSupport.scala
logging/ClusterMonitor.scala
ClusterSharding(system).start(
typeName = shardName,
entityProps = writerProps,
settings = cpsSettings,
extractEntityId = getPOSId,
extractShardId = getShopId,
allocationStrategy = ClusterSharding(system).defaultShardAllocationStrategy(cpsSettings),
handOffStopMessage = PassivatePOS
)
注意handOffStopMessageパラメータを持つstart関数はallocationStrategyを同時に提供する必要があります.このパラメータはpassivationメッセージタイプを提供します.
クラスタ全体のスライス配置コードは次のとおりです.
object POSRouter extends LogSupport {
def main(args: Array[String]) {
import WriterActor._
import Commands._
val argsPat = "(.*):(.*)".r
val (host, port) = args(0) match {
case argsPat(h, p) => (h, p)
case _ => ("localhost", "2551")
}
val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=\"" + port + "\"")
.withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.hostname=\"" + host + "\""))
//roles can be deployed on this node
.withFallback(ConfigFactory.parseString("akka.cluster.roles = [poswriter]"))
.withFallback(ConfigFactory.load())
log.info(s"******* hostname = $host, port = $port *******")
val shardName = "POSShard"
case class POSMessage(id: Long, cmd: POSCommand) {
def shopId = id.toString.head.toString
def posId = id.toString
}
val getPOSId: ShardRegion.ExtractEntityId = {
case posCommand: POSMessage => (posCommand.posId, posCommand.cmd)
}
val getShopId: ShardRegion.ExtractShardId = {
case posCommand: POSMessage => posCommand.shopId
}
val system = ActorSystem("cloud-pos-server", config)
val role = "poswriter" //role of this shard
val cpsSettings = ClusterShardingSettings(system).withRole(role) //.withPassivateIdleAfter(10 minutes)
ClusterSharding(system).start(
typeName = shardName,
entityProps = writerProps,
settings = cpsSettings,
extractEntityId = getPOSId,
extractShardId = getShopId,
allocationStrategy = ClusterSharding(system).defaultShardAllocationStrategy(cpsSettings),
handOffStopMessage = PassivatePOS
)
system.actorOf(ClusterMonitor.props, "cps-cluster-monitor")
}
}
以上、hostおよびportはmainのパラメータから解析されたものであり、例えば192.168.11.162:2551であり、本ノードのhostおよびportを表す.akka.cluster.rolesは、poswriterがその1つであるこのノードでサポートされているロールを表します.一方、クライストチャーディングセツ(system).withRole(「poswriter」)は、このスライスshardがposwriterロールをサポートするノードにしか配備できないことを表します.間違って実行していると、Shardingが起動できないことに気づきます.上記のプログラムは、poswriterロールをサポートするノードを表します.このノード(入力されたIPアドレス)には、poswriterロールを持つ「POSHARD」というcluster-shardingが配備されています.
もし私が複数のマシンでこのコードを実行したら、現在のマシンのIP+PORTを入力すると、このような複数のマシンに「POSHord」スライスが配備されていることを意味します.上のClusterMonitorはクラスタ状態モニタactorです.
package sdp.cluster.monitor
import akka.actor._
import akka.cluster.ClusterEvent._
import akka.cluster._
import sdp.logging.LogSupport
object ClusterMonitor {
def props = Props(new ClusterMonitor)
}
class ClusterMonitor extends Actor with LogSupport {
val cluster = Cluster(context.system)
override def preStart(): Unit = {
cluster.subscribe(self,initialStateMode = InitialStateAsEvents
,classOf[MemberEvent],classOf[UnreachableMember]) //
super.preStart()
}
override def postStop(): Unit = {
cluster.unsubscribe(self) //
super.postStop()
}
override def receive: Receive = {
case MemberJoined(member) =>
log.info(s"Member is Joining: {${member.address}}")
case MemberUp(member) =>
log.info(s"Member is Up: {${member.address}}")
case MemberLeft(member) =>
log.info(s"Member is Leaving: {${member.address}}")
case MemberExited(member) =>
log.info(s"Member is Exiting: {${member.address}}")
case MemberRemoved(member, previousStatus) =>
log.info(
s"Member is Removed: {${member.address}} after {${previousStatus}")
case UnreachableMember(member) =>
log.info(s"Member detected as unreachable: {${member.address}}")
cluster.down(member.address) // , auto-down
case _: MemberEvent => // ignore
}
}
クラスタノードの接続状態を監視できます.
では、いくつかのマシンで構成されたクラスタの各ノードに「POSShard」スライスを配備したと仮定すると、この「POSShard」スライスにPOSSMessageを送信するクライアントを設計します.
case class POSMessage(id: Long, cmd: POSCommand) {
def shopId = id.toString.head.toString
def posId = id.toString
}
val getPOSId: ShardRegion.ExtractEntityId = {
case posCommand: POSMessage => (posCommand.posId, posCommand.cmd)
}
val getShopId: ShardRegion.ExtractShardId = {
case posCommand: POSMessage => posCommand.shopId
}
このクライアントは、同じクラスタ、すなわちクラスタのノードの1つである必要があります.そうしないと、「POSHord」スライスが配備されている他のノードと情報交換できません.しかし、remoteのhostnameとportがすでに占有されているため、「POSHord」が導入されたノードと同じ場所にはありません.そのため、クライアントを「POSHARD」が配備されていないノードに置いて、ClusterSharding(system)を使用するしかない.startProxyは、スライス仲介を開始します.
//no shard deployed on this node 2558, use proxy
val posHandler = ClusterSharding(system).startProxy(
typeName = shardName,
role = Some("poswriter"),
extractEntityId = getPOSId,
extractShardId = getShopId
)
//val posHandler = ClusterSharding(system).shardRegion(shardName)
system.actorOf(POSClient.props(posHandler), "pos-client")
このproxyのroleはSome(「poswriter」)でなければならないことに注意してください.このようにしてこそ、他のノードの「POSShard」を呼び出すことができます.彼らの役割はすべて「poswriter」です.WriterActorと対話するにはactorが必要です.WriterActorはsender()で結果を返します.このsender()はActorRefです.
object POSClient {
def props(pos: ActorRef) = Props(new POSClient(pos))
}
class POSClient(posHandler: ActorRef) extends Actor with LogSupport {
override def receive: Receive = {
case msg @ POSMessage(_,_) => posHandler ! msg
case resp: POSResponse =>
log.info(s"response from server: $resp")
}
}
WriterActorを指揮するには、次の方法があります.
val posref = system.actorOf(POSClient.props(posHandler), "pos-client")
posref ! POSMessage(1021, LogSales(SALESTYPE.plu, "", apple.code, 1, 0))
posref ! POSMessage(2021, LogSales(SALESTYPE.plu, "", pineapple.code, 2, 0))
posref ! POSMessage(3021, LogSales(SALESTYPE.plu, "", banana.code, 1, 0))
posref ! POSMessage(4021, LogSales(SALESTYPE.plu, "", grape.code, 3, 0))
posref ! POSMessage(4021,Subtotal)
次に、サービス側のスライス配置ソースコードを示します.
resources/application.conf
akka.actor.warn-about-java-serializer-usage = off
akka.log-dead-letters-during-shutdown = off
akka.log-dead-letters = off
akka {
loglevel = INFO
actor {
provider = "cluster"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
cluster {
seed-nodes = [
"akka.tcp://[email protected]:2551"]
log-info = off
sharding {
role = "poswriter"
passivate-idle-entity-after = 5 m
}
}
persistence {
journal.plugin = "cassandra-journal"
snapshot-store.plugin = "cassandra-snapshot-store"
}
}
cassandra-journal {
contact-points = ["192.168.11.162"]
}
cassandra-snapshot-store {
contact-points = ["192.168.11.162"]
}
POSRouter.scala
package cloud.pos.server
import akka.actor._
import akka.cluster.sharding._
import akka.cluster.sharding.ClusterSharding
import com.typesafe.config.ConfigFactory
import sdp.cluster.monitor._
import sdp.logging._
object POSRouter extends LogSupport {
def main(args: Array[String]) {
import WriterActor._
import Commands._
val argsPat = "(.*):(.*)".r
val (host, port) = args(0) match {
case argsPat(h, p) => (h, p)
case _ => ("localhost", "2551")
}
val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=\"" + port + "\"")
.withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.hostname=\"" + host + "\""))
//roles can be deployed on this node
.withFallback(ConfigFactory.parseString("akka.cluster.roles = [poswriter]"))
.withFallback(ConfigFactory.load())
log.info(s"******* hostname = $host, port = $port *******")
val shardName = "POSShard"
case class POSMessage(id: Long, cmd: POSCommand) {
def shopId = id.toString.head.toString
def posId = id.toString
}
val getPOSId: ShardRegion.ExtractEntityId = {
case posCommand: POSMessage => (posCommand.posId, posCommand.cmd)
}
val getShopId: ShardRegion.ExtractShardId = {
case posCommand: POSMessage => posCommand.shopId
}
val system = ActorSystem("cloud-pos-server", config)
val role = "poswriter" //role of this shard
val cpsSettings = ClusterShardingSettings(system).withRole(role) //.withPassivateIdleAfter(10 minutes)
ClusterSharding(system).start(
typeName = shardName,
entityProps = writerProps,
settings = cpsSettings,
extractEntityId = getPOSId,
extractShardId = getShopId,
allocationStrategy = ClusterSharding(system).defaultShardAllocationStrategy(cpsSettings),
handOffStopMessage = PassivatePOS
)
system.actorOf(ClusterMonitor.props, "cps-cluster-monitor")
}
}
次は、このテスト項目のソースコードです.
build.sbt
name := "cloud-pos-client"
version := "0.1"
scalaVersion := "2.12.8"
libraryDependencies := Seq(
"com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.19",
"com.typesafe.akka" %% "akka-persistence" % "2.5.19",
"com.typesafe.akka" %% "akka-persistence-cassandra" % "0.93",
"com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.93" % Test,
"ch.qos.logback" % "logback-classic" % "1.2.3"
)
resources/application.conf
akka.actor.warn-about-java-serializer-usage = off
akka.log-dead-letters-during-shutdown = off
akka.log-dead-letters = off
akka {
loglevel = INFO
actor {
provider = "cluster"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "192.168.11.162"
port = 2558
}
}
cluster {
seed-nodes = [
"akka.tcp://[email protected]:2551"]
log-info = off
}
}
resources/logback.xml
"1.0" encoding="UTF-8"?>
"STDOUT" class="ch.qos.logback.core.ConsoleAppender">
%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
"debug">
ref ref="STDOUT" />
ClientDemo.scala
package cloud.pos.client
import akka.actor._
import akka.cluster.sharding.ClusterSharding
import sdp.cluster.monitor._
import sdp.logging._
import Commands._
import States._
import Items._
import akka.cluster.sharding._
object POSClientDemo extends LogSupport {
def main(args: Array[String]) {
val system = ActorSystem("cloud-pos-server")
val shardName = "POSShard"
val getPOSId: ShardRegion.ExtractEntityId = {
case posCommand: POSMessage => (posCommand.posId, posCommand.cmd)
}
val getShopId: ShardRegion.ExtractShardId = {
case posCommand: POSMessage => posCommand.shopId
}
//no shard deployed on this node 2558, use proxy
val posHandler = ClusterSharding(system).startProxy(
typeName = shardName,
role = Some("poswriter"),
extractEntityId = getPOSId,
extractShardId = getShopId
)
//val posHandler = ClusterSharding(system).shardRegion(shardName)
system.actorOf(ClusterMonitor.props, "cps-cluster-monitor")
val posref = system.actorOf(POSClient.props(posHandler), "pos-client")
posref ! POSMessage(1021, LogSales(SALESTYPE.plu, "", apple.code, 1, 0))
posref ! POSMessage(2021, LogSales(SALESTYPE.plu, "", pineapple.code, 2, 0))
posref ! POSMessage(3021, LogSales(SALESTYPE.plu, "", banana.code, 1, 0))
posref ! POSMessage(4021, LogSales(SALESTYPE.plu, "", grape.code, 3, 0))
posref ! POSMessage(4021,Subtotal)
scala.io.StdIn.readLine()
system.terminate()
}
}
client/Commands.scala
package cloud.pos.client
object Commands {
sealed trait POSCommand {}
case class LogOn(opr: String, passwd: String) extends POSCommand
case object LogOff extends POSCommand
case class SuperOn(su: String, passwd: String) extends POSCommand
case object SuperOff extends POSCommand
case class MemberOn(cardnum: String, passwd: String) extends POSCommand
case object MemberOff extends POSCommand //remove member status for the voucher
case object RefundOn extends POSCommand
case object RefundOff extends POSCommand
case object VoidOn extends POSCommand
case object VoidOff extends POSCommand
case object VoidAll extends POSCommand
case object Suspend extends POSCommand
case class VoucherNum(vnum: Int) extends POSCommand
case class LogSales(salesType: Int, dpt: String, code: String, qty: Int, price: Int) extends POSCommand
case object Subtotal extends POSCommand
case class Discount(code: String, percent: Int) extends POSCommand
case class OfflinePay(acct: String, num: String, amount: Int) extends POSCommand //settlement
//read only command, no event process
case class VCBalance(acct: String, num: String, passwd: String) extends POSCommand
case class VCPay(acct: String, num: String, passwd: String, amount: Int) extends POSCommand
case class AliPay(acct: String, num: String, amount: Int) extends POSCommand
case class WxPay(acct: String, num: String, amount: Int) extends POSCommand
// read only command, no update event
case class Plu(itemCode: String) extends POSCommand //read only
case class POSMessage(id: Long, cmd: POSCommand) {
def shopId = id.toString.head.toString
def posId = id.toString
}
}
client/States.scala
package cloud.pos.client
object States {
object TXNTYPE {
val sales: Int = 0
val refund: Int = 1
val void: Int = 2
val voided: Int = 3
val voidall: Int = 4
val subtotal: Int = 5
val logon: Int = 6
val supon: Int = 7 // super user on/off
val suspend: Int = 8
}
object SALESTYPE {
val plu: Int = 0
val dpt: Int = 1
val cat: Int = 2
val brd: Int = 3
val ra: Int = 4
val sub: Int = 5
val ttl: Int = 6
val dsc: Int = 7
val crd: Int = 8
}
case class TxnItem(
txndate: String = ""
,txntime: String = ""
,opr: String = ""//
,num: Int = 0 //
,seq: Int = 1 //
,txntype: Int = TXNTYPE.sales//
,salestype: Int = SALESTYPE.plu //
,qty: Int = 1 //
,price: Int = 0 // ( )
,amount: Int = 0 // ( )
,dscamt: Int = 0 // : net = amount + dscamt
,member: String = "" //
,code: String = "" // ( 、 ...)
,desc: String = "" //
,dpt: String = ""
,department: String = ""
,cat: String = ""
,category: String = ""
,brd: String = ""
,brand: String = ""
)
case class VchStatus( //
qty: Int = 1,
refund: Boolean = false,
void: Boolean = false)
case class VchStates(
opr: String = "", //
jseq: BigInt = 0, //begin journal sequence for read-side replay
num: Int = 0, //
seq: Int = 1, //
void: Boolean = false, //
refd: Boolean = false, //
due: Boolean = true, //
su: String = "",
mbr: String = ""
)
}
client/POSClient.scala
package cloud.pos.client
import akka.actor._
import sdp.logging._
import Responses._
import Commands._
object POSClient {
def props(pos: ActorRef) = Props(new POSClient(pos))
}
class POSClient(posHandler: ActorRef) extends Actor with LogSupport {
override def receive: Receive = {
case msg @ POSMessage(_,_) => posHandler ! msg
case resp: POSResponse =>
log.info(s"response from server: $resp")
}
}
client/Responses.scala
package cloud.pos.client
import States._
object Responses {
object STATUS {
val OK: Int = 0
val FAIL: Int = -1
}
case class POSResponse (sts: Int, msg: String, voucher: VchStates, txnItems: List[TxnItem])
}
client/DataAccess.scala
package cloud.pos.client
import java.time.LocalDate
import java.time.format.DateTimeFormatter
case class Item(
brd: String
,dpt: String
,cat: String
,code: String
,name: String
,price: Int
)
object Items {
val apple = Item("01","02","01","001", "green apple", 820)
val grape = Item("01","02","01","002", "red grape", 1050)
val orage = Item("01","02","01","003", "sunkist orage", 350)
val banana = Item("01","02","01","004", "demon banana", 300)
val pineapple = Item("01","02","01","005", "hainan pineapple", 1300)
val peach = Item("01","02","01","006", "xinjiang peach", 2390)
val tblItems = List(apple, grape, orage, banana, pineapple, peach)
sealed trait QueryItemsResult {}
case class QueryItemsOK(items: List[Item]) extends QueryItemsResult
case class QueryItemsFail(msg: String) extends QueryItemsResult
}
object Codes {
case class User(code: String, name: String, passwd: String)
case class Department(code: String, name: String)
case class Category(code: String, name: String)
case class Brand(code: String, name: String)
case class Ra(code: String, name: String)
case class Account(code: String, name: String)
case class Disc(code: String, best: Boolean, aggr: Boolean, group: Boolean)
val ras = List(Ra("01","Delivery"),Ra("02","Cooking"))
val dpts = List(Department("01","Fruit"),Department("02","Grocery"))
val cats = List(Category("0101","Fresh Fruit"),Category("0201","Dry Grocery"))
val brds = List(Brand("01","Sunkist"),Brand("02","Demon"))
val accts = List(Account("001","Cash"),Account("002","Value Card"), Account("003", "Visa")
,Account("004","Alipay"),Account("005","WXPay"))
val users = List(User("1001","Tiger", "123"),User("1002","John", "123"),User("1003","Maria", "123"))
def getDpt(code: String) = dpts.find(d => d.code == code)
def getCat(code: String) = cats.find(d => d.code == code)
def getBrd(code: String) = brds.find(b => b.code == code)
def getAcct(code: String) = accts.find(a => a.code == code)
def getRa(code: String) = ras.find(a => a.code == code)
}
object DAO {
import Items._
import Codes._
def getItem(code: String): QueryItemsResult = {
val optItem = tblItems.find(it => it.code == code)
optItem match {
case Some(item) => QueryItemsOK(List(item))
case None => QueryItemsFail("Invalid item code!")
}
}
def validateDpt(code: String) = dpts.find(d => d.code == code)
def validateCat(code: String) = cats.find(d => d.code == code)
def validateBrd(code: String) = brds.find(b => b.code == code)
def validateRa(code: String) = ras.find(ac => ac.code == code)
def validateAcct(code: String) = accts.find(ac => ac.code == code)
def validateUser(userid: String, passwd: String) = users.find(u => (u.code == userid && u.passwd == passwd))
def lastSecOfDateStr(ldate: LocalDate): String = {
ldate.format(DateTimeFormatter.ofPattern( "yyyy-MM-dd"))+" 23:59:59"
}
}
logging/Log.scala
package sdp.logging
import org.slf4j.Logger
/**
* Logger which just wraps org.slf4j.Logger internally.
*
* @param logger logger
*/
class Log(logger: Logger) {
// use var consciously to enable squeezing later
var isDebugEnabled: Boolean = logger.isDebugEnabled
var isInfoEnabled: Boolean = logger.isInfoEnabled
var isWarnEnabled: Boolean = logger.isWarnEnabled
var isErrorEnabled: Boolean = logger.isErrorEnabled
def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = {
level match {
case 'debug | 'DEBUG => debug(msg)
case 'info | 'INFO => info(msg)
case 'warn | 'WARN => warn(msg)
case 'error | 'ERROR => error(msg)
case _ => // nothing to do
}
}
def debug(msg: => String): Unit = {
if (isDebugEnabled && logger.isDebugEnabled) {
logger.debug(msg)
}
}
def debug(msg: => String, e: Throwable): Unit = {
if (isDebugEnabled && logger.isDebugEnabled) {
logger.debug(msg, e)
}
}
def info(msg: => String): Unit = {
if (isInfoEnabled && logger.isInfoEnabled) {
logger.info(msg)
}
}
def info(msg: => String, e: Throwable): Unit = {
if (isInfoEnabled && logger.isInfoEnabled) {
logger.info(msg, e)
}
}
def warn(msg: => String): Unit = {
if (isWarnEnabled && logger.isWarnEnabled) {
logger.warn(msg)
}
}
def warn(msg: => String, e: Throwable): Unit = {
if (isWarnEnabled && logger.isWarnEnabled) {
logger.warn(msg, e)
}
}
def error(msg: => String): Unit = {
if (isErrorEnabled && logger.isErrorEnabled) {
logger.error(msg)
}
}
def error(msg: => String, e: Throwable): Unit = {
if (isErrorEnabled && logger.isErrorEnabled) {
logger.error(msg, e)
}
}
}
logging/LogSupport.scala
package sdp.logging
import org.slf4j.LoggerFactory
trait LogSupport {
/**
* Logger
*/
protected val log = new Log(LoggerFactory.getLogger(this.getClass))
}
logging/ClusterMonitor.scala
package sdp.cluster.monitor
import akka.actor._
import akka.cluster.ClusterEvent._
import akka.cluster._
import sdp.logging.LogSupport
object ClusterMonitor {
def props = Props(new ClusterMonitor())
}
class ClusterMonitor extends Actor with LogSupport {
val cluster = Cluster(context.system)
override def preStart(): Unit = {
cluster.subscribe(self,initialStateMode = InitialStateAsEvents
,classOf[MemberEvent],classOf[UnreachableMember]) //
super.preStart()
}
override def postStop(): Unit = {
cluster.unsubscribe(self) //
super.postStop()
}
override def receive: Receive = {
case MemberJoined(member) =>
log.info(s"Member is Joining: {${member.address}}")
case MemberUp(member) =>
log.info(s"Member is Up: {${member.address}}")
case MemberLeft(member) =>
log.info(s"Member is Leaving: {${member.address}}")
case MemberExited(member) =>
log.info(s"Member is Exiting: {${member.address}}")
case MemberRemoved(member, previousStatus) =>
log.info(
s"Member is Removed: {${member.address}} after {${previousStatus}")
case UnreachableMember(member) =>
log.info(s"Member detected as unreachable: {${member.address}}")
cluster.down(member.address) // , auto-down
case _: MemberEvent => // ignore
}
}