Akka-CQRS(5)-CQRS Writer Actorの導入とテスト

72754 ワード

前編ではWriterActorの例を示したが,主な目的はWriterActorがクラスタスライスとしてpersistentActor特性とevent-sourcingモードでCQRSの書き込み機能を実現する方法を示すことである.クラスタスライスである以上、WriterActorの導入とテストについて説明します.この中には注意すべき点があるからです.次に、WriteActor(cluster-sharding)の配備コードを示します.
    ClusterSharding(system).start(
      typeName = shardName,
      entityProps = writerProps,
      settings = cpsSettings,
      extractEntityId = getPOSId,
      extractShardId = getShopId,
      allocationStrategy = ClusterSharding(system).defaultShardAllocationStrategy(cpsSettings),
      handOffStopMessage = PassivatePOS
    )

注意handOffStopMessageパラメータを持つstart関数はallocationStrategyを同時に提供する必要があります.このパラメータはpassivationメッセージタイプを提供します.
クラスタ全体のスライス配置コードは次のとおりです.
object POSRouter extends LogSupport {
  def main(args: Array[String]) {
    import WriterActor._
    import Commands._

    val argsPat = "(.*):(.*)".r
    val (host, port) = args(0) match {
      case argsPat(h, p) => (h, p)
      case _ => ("localhost", "2551")
    }

    val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=\"" + port + "\"")
      .withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.hostname=\"" + host + "\""))
      //roles can be deployed on this node
      .withFallback(ConfigFactory.parseString("akka.cluster.roles = [poswriter]"))
      .withFallback(ConfigFactory.load())

    log.info(s"******* hostname = $host,  port = $port *******")

    val shardName = "POSShard"

    case class POSMessage(id: Long, cmd: POSCommand) {
      def shopId = id.toString.head.toString

      def posId = id.toString
    }

    val getPOSId: ShardRegion.ExtractEntityId = {
      case posCommand: POSMessage => (posCommand.posId, posCommand.cmd)
    }
    val getShopId: ShardRegion.ExtractShardId = {
      case posCommand: POSMessage => posCommand.shopId
    }


    val system = ActorSystem("cloud-pos-server", config)
    val role = "poswriter"    //role of this shard
    val cpsSettings = ClusterShardingSettings(system).withRole(role) //.withPassivateIdleAfter(10 minutes)

    ClusterSharding(system).start(
      typeName = shardName,
      entityProps = writerProps,
      settings = cpsSettings,
      extractEntityId = getPOSId,
      extractShardId = getShopId,
      allocationStrategy = ClusterSharding(system).defaultShardAllocationStrategy(cpsSettings),
      handOffStopMessage = PassivatePOS
    )

    system.actorOf(ClusterMonitor.props, "cps-cluster-monitor")

  }
}

以上、hostおよびportはmainのパラメータから解析されたものであり、例えば192.168.11.162:2551であり、本ノードのhostおよびportを表す.akka.cluster.rolesは、poswriterがその1つであるこのノードでサポートされているロールを表します.一方、クライストチャーディングセツ(system).withRole(「poswriter」)は、このスライスshardがposwriterロールをサポートするノードにしか配備できないことを表します.間違って実行していると、Shardingが起動できないことに気づきます.上記のプログラムは、poswriterロールをサポートするノードを表します.このノード(入力されたIPアドレス)には、poswriterロールを持つ「POSHARD」というcluster-shardingが配備されています.
もし私が複数のマシンでこのコードを実行したら、現在のマシンのIP+PORTを入力すると、このような複数のマシンに「POSHord」スライスが配備されていることを意味します.上のClusterMonitorはクラスタ状態モニタactorです.
package sdp.cluster.monitor

import akka.actor._
import akka.cluster.ClusterEvent._
import akka.cluster._
import sdp.logging.LogSupport

object ClusterMonitor {
  def props = Props(new ClusterMonitor)
}

class ClusterMonitor extends Actor with LogSupport {
  val cluster = Cluster(context.system)
  override def preStart(): Unit = {
    cluster.subscribe(self,initialStateMode = InitialStateAsEvents
      ,classOf[MemberEvent],classOf[UnreachableMember])  // 
    super.preStart()
  }

  override def postStop(): Unit = {
    cluster.unsubscribe(self)    // 
    super.postStop()
  }

  override def receive: Receive = {
    case MemberJoined(member) =>
      log.info(s"Member is Joining: {${member.address}}")
    case MemberUp(member) =>
      log.info(s"Member is Up: {${member.address}}")
    case MemberLeft(member) =>
      log.info(s"Member is Leaving: {${member.address}}")
    case MemberExited(member) =>
      log.info(s"Member is Exiting: {${member.address}}")
    case MemberRemoved(member, previousStatus) =>
      log.info(
        s"Member is Removed: {${member.address}} after {${previousStatus}")
    case UnreachableMember(member) =>
      log.info(s"Member detected as unreachable: {${member.address}}")
      cluster.down(member.address)      // , auto-down
    case _: MemberEvent => // ignore
  }

}

クラスタノードの接続状態を監視できます.
では、いくつかのマシンで構成されたクラスタの各ノードに「POSShard」スライスを配備したと仮定すると、この「POSShard」スライスにPOSSMessageを送信するクライアントを設計します.
    case class POSMessage(id: Long, cmd: POSCommand) {
      def shopId = id.toString.head.toString

      def posId = id.toString
    }

    val getPOSId: ShardRegion.ExtractEntityId = {
      case posCommand: POSMessage => (posCommand.posId, posCommand.cmd)
    }
    val getShopId: ShardRegion.ExtractShardId = {
      case posCommand: POSMessage => posCommand.shopId
    }

このクライアントは、同じクラスタ、すなわちクラスタのノードの1つである必要があります.そうしないと、「POSHord」スライスが配備されている他のノードと情報交換できません.しかし、remoteのhostnameとportがすでに占有されているため、「POSHord」が導入されたノードと同じ場所にはありません.そのため、クライアントを「POSHARD」が配備されていないノードに置いて、ClusterSharding(system)を使用するしかない.startProxyは、スライス仲介を開始します.
   //no shard deployed on this node  2558, use proxy
    val posHandler = ClusterSharding(system).startProxy(
      typeName = shardName,
      role = Some("poswriter"),
      extractEntityId = getPOSId,
      extractShardId = getShopId
    )

    //val posHandler = ClusterSharding(system).shardRegion(shardName)

    system.actorOf(POSClient.props(posHandler), "pos-client")

このproxyのroleはSome(「poswriter」)でなければならないことに注意してください.このようにしてこそ、他のノードの「POSShard」を呼び出すことができます.彼らの役割はすべて「poswriter」です.WriterActorと対話するにはactorが必要です.WriterActorはsender()で結果を返します.このsender()はActorRefです.
object POSClient {
  def props(pos: ActorRef) = Props(new POSClient(pos))
}
class POSClient(posHandler: ActorRef)  extends Actor with LogSupport {

  override def receive: Receive = {
    case msg @ POSMessage(_,_) => posHandler ! msg
    case resp: POSResponse  =>
      log.info(s"response from server: $resp")
  }
}

WriterActorを指揮するには、次の方法があります.
    val posref = system.actorOf(POSClient.props(posHandler), "pos-client")
    
    posref ! POSMessage(1021, LogSales(SALESTYPE.plu, "", apple.code, 1, 0))
    posref ! POSMessage(2021, LogSales(SALESTYPE.plu, "", pineapple.code, 2, 0))
    posref ! POSMessage(3021, LogSales(SALESTYPE.plu, "", banana.code, 1, 0))
    posref ! POSMessage(4021, LogSales(SALESTYPE.plu, "", grape.code, 3, 0))
    posref ! POSMessage(4021,Subtotal)

 
次に、サービス側のスライス配置ソースコードを示します.
resources/application.conf
akka.actor.warn-about-java-serializer-usage = off
akka.log-dead-letters-during-shutdown = off
akka.log-dead-letters = off

akka {
  loglevel = INFO
  actor {
    provider = "cluster"
  }

  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://[email protected]:2551"]
    log-info = off
    sharding {
      role = "poswriter"
      passivate-idle-entity-after = 5 m
    }
  }

  persistence {
    journal.plugin = "cassandra-journal"
    snapshot-store.plugin = "cassandra-snapshot-store"
  }

}

cassandra-journal {
  contact-points = ["192.168.11.162"]
}

cassandra-snapshot-store {
  contact-points = ["192.168.11.162"]
}

POSRouter.scala
package cloud.pos.server

import akka.actor._
import akka.cluster.sharding._
import akka.cluster.sharding.ClusterSharding
import com.typesafe.config.ConfigFactory
import sdp.cluster.monitor._
import sdp.logging._

object POSRouter extends LogSupport {
  def main(args: Array[String]) {
    import WriterActor._
    import Commands._

    val argsPat = "(.*):(.*)".r
    val (host, port) = args(0) match {
      case argsPat(h, p) => (h, p)
      case _ => ("localhost", "2551")
    }


    val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=\"" + port + "\"")
      .withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.hostname=\"" + host + "\""))
      //roles can be deployed on this node
      .withFallback(ConfigFactory.parseString("akka.cluster.roles = [poswriter]"))
      .withFallback(ConfigFactory.load())


    log.info(s"******* hostname = $host,  port = $port *******")

    val shardName = "POSShard"

    case class POSMessage(id: Long, cmd: POSCommand) {
      def shopId = id.toString.head.toString

      def posId = id.toString
    }

    val getPOSId: ShardRegion.ExtractEntityId = {
      case posCommand: POSMessage => (posCommand.posId, posCommand.cmd)
    }
    val getShopId: ShardRegion.ExtractShardId = {
      case posCommand: POSMessage => posCommand.shopId
    }


    val system = ActorSystem("cloud-pos-server", config)
    val role = "poswriter"    //role of this shard
    val cpsSettings = ClusterShardingSettings(system).withRole(role) //.withPassivateIdleAfter(10 minutes)

    ClusterSharding(system).start(
      typeName = shardName,
      entityProps = writerProps,
      settings = cpsSettings,
      extractEntityId = getPOSId,
      extractShardId = getShopId,
      allocationStrategy = ClusterSharding(system).defaultShardAllocationStrategy(cpsSettings),
      handOffStopMessage = PassivatePOS
    )

    system.actorOf(ClusterMonitor.props, "cps-cluster-monitor")

  }
}

次は、このテスト項目のソースコードです.
build.sbt
name := "cloud-pos-client"

version := "0.1"

scalaVersion := "2.12.8"

libraryDependencies := Seq(
  "com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.19",
  "com.typesafe.akka" %% "akka-persistence" % "2.5.19",
  "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.93",
  "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.93" % Test,
  "ch.qos.logback"  %  "logback-classic"   % "1.2.3"
)

resources/application.conf
akka.actor.warn-about-java-serializer-usage = off
akka.log-dead-letters-during-shutdown = off
akka.log-dead-letters = off

akka {
  loglevel = INFO
  actor {
    provider = "cluster"
  }

  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "192.168.11.162"
      port = 2558
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://[email protected]:2551"]
    log-info = off
  }

}

resources/logback.xml
"1.0" encoding="UTF-8"?>

    "STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        
            
                %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
            
        
    

    "debug">
        ref ref="STDOUT" />
    

ClientDemo.scala
package cloud.pos.client
import akka.actor._
import akka.cluster.sharding.ClusterSharding
import sdp.cluster.monitor._
import sdp.logging._
import Commands._
import States._
import Items._
import akka.cluster.sharding._


object POSClientDemo extends LogSupport {
  def main(args: Array[String]) {

    val system = ActorSystem("cloud-pos-server")

    val shardName = "POSShard"

    val getPOSId: ShardRegion.ExtractEntityId = {
      case posCommand: POSMessage => (posCommand.posId, posCommand.cmd)
    }
    val getShopId: ShardRegion.ExtractShardId = {
      case posCommand: POSMessage => posCommand.shopId
    }

    //no shard deployed on this node  2558, use proxy
    val posHandler = ClusterSharding(system).startProxy(
      typeName = shardName,
      role = Some("poswriter"),
      extractEntityId = getPOSId,
      extractShardId = getShopId
    )

    //val posHandler = ClusterSharding(system).shardRegion(shardName)

    system.actorOf(ClusterMonitor.props, "cps-cluster-monitor")

    val posref = system.actorOf(POSClient.props(posHandler), "pos-client")

    posref ! POSMessage(1021, LogSales(SALESTYPE.plu, "", apple.code, 1, 0))
    posref ! POSMessage(2021, LogSales(SALESTYPE.plu, "", pineapple.code, 2, 0))
    posref ! POSMessage(3021, LogSales(SALESTYPE.plu, "", banana.code, 1, 0))
    posref ! POSMessage(4021, LogSales(SALESTYPE.plu, "", grape.code, 3, 0))
    posref ! POSMessage(4021,Subtotal)



    scala.io.StdIn.readLine()

    system.terminate()

  }
}

client/Commands.scala
package cloud.pos.client

object Commands {

  sealed trait POSCommand {}

  case class LogOn(opr: String, passwd: String) extends POSCommand
  case object LogOff extends POSCommand
  case class SuperOn(su: String, passwd: String) extends POSCommand
  case object SuperOff extends POSCommand
  case class MemberOn(cardnum: String, passwd: String) extends POSCommand
  case object MemberOff extends POSCommand   //remove member status for the voucher
  case object RefundOn extends POSCommand
  case object RefundOff extends POSCommand
  case object VoidOn extends POSCommand
  case object VoidOff extends POSCommand
  case object VoidAll extends POSCommand
  case object Suspend extends POSCommand

  case class VoucherNum(vnum: Int) extends POSCommand


  case class LogSales(salesType: Int, dpt: String, code: String, qty: Int, price: Int) extends POSCommand
  case object Subtotal extends POSCommand
  case class Discount(code: String, percent: Int) extends POSCommand

  case class OfflinePay(acct: String, num: String, amount: Int) extends POSCommand          //settlement    
  //read only command, no event process
  case class VCBalance(acct: String, num: String, passwd: String) extends POSCommand
  case class VCPay(acct: String, num: String, passwd: String, amount: Int) extends POSCommand
  case class AliPay(acct: String, num: String, amount: Int) extends POSCommand
  case class WxPay(acct: String, num: String, amount: Int) extends POSCommand


  // read only command, no update event
  case class Plu(itemCode: String) extends POSCommand  //read only

  case class POSMessage(id: Long, cmd: POSCommand) {
    def shopId = id.toString.head.toString
    def posId = id.toString
  }


}

client/States.scala
package cloud.pos.client

object States {

  object TXNTYPE {
    val sales: Int = 0
    val refund: Int = 1
    val void: Int = 2
    val voided: Int = 3
    val voidall: Int = 4
    val subtotal: Int = 5
    val logon: Int = 6
    val supon: Int = 7       // super user on/off
    val suspend: Int = 8

  }

  object SALESTYPE {
    val plu: Int = 0
    val dpt: Int = 1
    val cat: Int = 2
    val brd: Int = 3
    val ra:  Int = 4
    val sub: Int = 5
    val ttl: Int = 6
    val dsc: Int = 7
    val crd: Int = 8
  }


  case class TxnItem(
                      txndate: String = ""
                      ,txntime: String = ""
                      ,opr: String = ""// 
                      ,num: Int = 0 // 
                      ,seq: Int = 1 // 
                      ,txntype: Int = TXNTYPE.sales// 
                      ,salestype: Int = SALESTYPE.plu // 
                      ,qty: Int =  1 // 
                      ,price: Int = 0 // ( )
                      ,amount: Int = 0 // ( )
                      ,dscamt: Int = 0 // :   net  = amount + dscamt
                      ,member: String = "" // 
                      ,code: String = "" // ( 、 ...)
                      ,desc: String = "" // 
                      ,dpt: String = ""
                      ,department: String = ""
                      ,cat: String = ""
                      ,category: String = ""
                      ,brd: String = ""
                      ,brand: String = ""
                    )


  case class VchStatus( // 
                        qty: Int = 1,
                        refund: Boolean = false,
                        void: Boolean = false)

  case class VchStates(
                        opr: String = "",      // 
                        jseq: BigInt = 0,      //begin journal sequence for read-side replay
                        num: Int = 0,          // 
                        seq: Int = 1,          // 
                        void: Boolean = false, // 
                        refd: Boolean = false, // 
                        due: Boolean = true,   // 
                        su: String = "",
                        mbr: String = ""
                      )


}

client/POSClient.scala
package cloud.pos.client

import akka.actor._
import sdp.logging._
import Responses._
import Commands._

object POSClient {
  def props(pos: ActorRef) = Props(new POSClient(pos))
}
class POSClient(posHandler: ActorRef)  extends Actor with LogSupport {

  override def receive: Receive = {
    case msg @ POSMessage(_,_) => posHandler ! msg
    case resp: POSResponse  =>
      log.info(s"response from server: $resp")
  }
}

client/Responses.scala
package cloud.pos.client

import States._
object Responses {

  object STATUS {
    val OK: Int = 0
    val FAIL: Int = -1
  }

  case class POSResponse (sts: Int, msg: String, voucher: VchStates, txnItems: List[TxnItem])
}

client/DataAccess.scala
package cloud.pos.client

import java.time.LocalDate
import java.time.format.DateTimeFormatter


case class Item(
                 brd: String
                 ,dpt: String
                 ,cat: String
                 ,code: String
                 ,name: String
                 ,price: Int

               )
object Items {
  val apple = Item("01","02","01","001", "green apple", 820)
  val grape = Item("01","02","01","002", "red grape", 1050)
  val orage = Item("01","02","01","003", "sunkist orage", 350)
  val banana = Item("01","02","01","004", "demon banana", 300)
  val pineapple = Item("01","02","01","005", "hainan pineapple", 1300)
  val peach = Item("01","02","01","006", "xinjiang peach", 2390)

  val tblItems = List(apple, grape, orage, banana, pineapple, peach)

  sealed trait QueryItemsResult {}

  case class QueryItemsOK(items: List[Item]) extends QueryItemsResult

  case class QueryItemsFail(msg: String) extends QueryItemsResult

}


object Codes {
  case class User(code: String, name: String, passwd: String)
  case class Department(code: String, name: String)
  case class Category(code: String, name: String)
  case class Brand(code: String, name: String)
  case class Ra(code: String, name: String)
  case class Account(code: String, name: String)
  case class Disc(code: String, best: Boolean, aggr: Boolean, group: Boolean)

  val ras = List(Ra("01","Delivery"),Ra("02","Cooking"))
  val dpts = List(Department("01","Fruit"),Department("02","Grocery"))
  val cats = List(Category("0101","Fresh Fruit"),Category("0201","Dry Grocery"))
  val brds = List(Brand("01","Sunkist"),Brand("02","Demon"))
  val accts = List(Account("001","Cash"),Account("002","Value Card"), Account("003", "Visa")
    ,Account("004","Alipay"),Account("005","WXPay"))

  val users = List(User("1001","Tiger", "123"),User("1002","John", "123"),User("1003","Maria", "123"))

  def getDpt(code: String) = dpts.find(d => d.code == code)
  def getCat(code: String) = cats.find(d => d.code == code)
  def getBrd(code: String) = brds.find(b => b.code == code)
  def getAcct(code: String) = accts.find(a => a.code == code)
  def getRa(code: String) = ras.find(a => a.code == code)
}

object DAO {
  import Items._
  import Codes._

  def getItem(code: String): QueryItemsResult = {
    val optItem = tblItems.find(it => it.code == code)
    optItem match {
      case Some(item) => QueryItemsOK(List(item))
      case None => QueryItemsFail("Invalid item code!")
    }
  }

  def validateDpt(code: String) = dpts.find(d => d.code == code)
  def validateCat(code: String) = cats.find(d => d.code == code)
  def validateBrd(code: String) = brds.find(b => b.code == code)
  def validateRa(code: String) = ras.find(ac => ac.code == code)
  def validateAcct(code: String) = accts.find(ac => ac.code == code)

  def validateUser(userid: String, passwd: String) = users.find(u => (u.code == userid && u.passwd == passwd))

  def lastSecOfDateStr(ldate: LocalDate): String = {
    ldate.format(DateTimeFormatter.ofPattern( "yyyy-MM-dd"))+" 23:59:59"
  }


}

logging/Log.scala
package sdp.logging

import org.slf4j.Logger

/**
  * Logger which just wraps org.slf4j.Logger internally.
  *
  * @param logger logger
  */
class Log(logger: Logger) {

  // use var consciously to enable squeezing later
  var isDebugEnabled: Boolean = logger.isDebugEnabled
  var isInfoEnabled: Boolean = logger.isInfoEnabled
  var isWarnEnabled: Boolean = logger.isWarnEnabled
  var isErrorEnabled: Boolean = logger.isErrorEnabled

  def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = {
    level match {
      case 'debug | 'DEBUG => debug(msg)
      case 'info | 'INFO => info(msg)
      case 'warn | 'WARN => warn(msg)
      case 'error | 'ERROR => error(msg)
      case _ => // nothing to do
    }
  }

  def debug(msg: => String): Unit = {
    if (isDebugEnabled && logger.isDebugEnabled) {
      logger.debug(msg)
    }
  }

  def debug(msg: => String, e: Throwable): Unit = {
    if (isDebugEnabled && logger.isDebugEnabled) {
      logger.debug(msg, e)
    }
  }

  def info(msg: => String): Unit = {
    if (isInfoEnabled && logger.isInfoEnabled) {
      logger.info(msg)
    }
  }

  def info(msg: => String, e: Throwable): Unit = {
    if (isInfoEnabled && logger.isInfoEnabled) {
      logger.info(msg, e)
    }
  }

  def warn(msg: => String): Unit = {
    if (isWarnEnabled && logger.isWarnEnabled) {
      logger.warn(msg)
    }
  }

  def warn(msg: => String, e: Throwable): Unit = {
    if (isWarnEnabled && logger.isWarnEnabled) {
      logger.warn(msg, e)
    }
  }

  def error(msg: => String): Unit = {
    if (isErrorEnabled && logger.isErrorEnabled) {
      logger.error(msg)
    }
  }

  def error(msg: => String, e: Throwable): Unit = {
    if (isErrorEnabled && logger.isErrorEnabled) {
      logger.error(msg, e)
    }
  }

}

logging/LogSupport.scala
package sdp.logging

import org.slf4j.LoggerFactory

trait LogSupport {

  /**
    * Logger
    */
  protected val log = new Log(LoggerFactory.getLogger(this.getClass))

}

logging/ClusterMonitor.scala
package sdp.cluster.monitor

import akka.actor._
import akka.cluster.ClusterEvent._
import akka.cluster._
import sdp.logging.LogSupport

object ClusterMonitor {
  def props = Props(new ClusterMonitor())
}

class ClusterMonitor extends Actor with LogSupport {
  val cluster = Cluster(context.system)
  override def preStart(): Unit = {
    cluster.subscribe(self,initialStateMode = InitialStateAsEvents
      ,classOf[MemberEvent],classOf[UnreachableMember])  // 
    super.preStart()
  }

  override def postStop(): Unit = {
    cluster.unsubscribe(self)    // 
    super.postStop()
  }

  override def receive: Receive = {
    case MemberJoined(member) =>
      log.info(s"Member is Joining: {${member.address}}")
    case MemberUp(member) =>
      log.info(s"Member is Up: {${member.address}}")
    case MemberLeft(member) =>
      log.info(s"Member is Leaving: {${member.address}}")
    case MemberExited(member) =>
      log.info(s"Member is Exiting: {${member.address}}")
    case MemberRemoved(member, previousStatus) =>
      log.info(
        s"Member is Removed: {${member.address}} after {${previousStatus}")
    case UnreachableMember(member) =>
      log.info(s"Member detected as unreachable: {${member.address}}")
      cluster.down(member.address)      // , auto-down
    case _: MemberEvent => // ignore
  }

}