akka-typed(8)-CQRS読み書き分離モード
18435 ワード
イベントソース(EventSource)とクラスタ(cluster)を先に紹介しましたが、CQRSについて議論する時になりました.CQRSすなわち読み書き分離モードは,独立した書き方プログラムと読み方プログラムからなり,具体的な原理は以前のブログで紹介した.akka-typedは自然にCQRSモードをサポートすべきで、少なくとも自身がライターのプログラミングをサポートしていることはEventSourcedBehaviorから知ることができる.akka-typedは新しいEventSourcedBehavior-Actorを提供し、persistentActorの応用開発を極めて便利にしたが、プログラミング者にもいくつかの制限を与えた.手動で状態を変えるのはさらに難しくなり、EventSourcedBehaviorは多層式のpersistをサポートしていません.つまり、persistの特定のeventを通じてevent-handlerプログラムで状態処理を行うことは不可能です.ここに例があります.ショッピングカートのアプリです.支払いが終わったらスナップショット(snapshot)を取る必要があります.次はこのsnapshotのコードです.
イベントタイプを判断するのは問題ありません.現在のイベントだからですが、もう一つの条件はカートが空でなければならないことです.これはちょっと困ります.この状態はこのいくつかのevent演算の結果に依存して確定しなければならないので、つまり次のステップですが、確定結果はカートの内容を計算する必要があります.死の循環のようです.akka-classicではevent演算結果を判断した後,状態を変える必要がある場合に特殊なeventをpersistし,このeventのhandlerで状態処理を行うことができる.仕方ありません.EventSourcedBehaviorは多層persistをサポートしていません.そうするしかありません.
私はまず現在の状態を保存して、結単演算をして、それからショッピングカートを空にして、このようにsnapshotは順調に行うことができます.
さて、akkaの読み方プログラミングはPersistentQueryによって実現されています.readerの役割はeventをデータベースから読み出して具体的なデータフォーマットに復元することです.このアプリケーションのreaderの実装の詳細についてreaderの呼び出しから説明します.
このreaderはクラスタスライス,sharding−entityであることがわかる.購入が完了するたびにentity、このentity、reader機能が完了すると自動的に終了し、すぐに占有されたリソースを解放することを考えています.reader-actorの定義は次のとおりです.
readerは普通のactorです.読者プログラムは膨大で複雑なプログラムであり、複数のモジュールに分割する必要があることに注意してください.そのため、次のモジュールは上のモジュールで発生した結果を継続する必要がある可能性があります.actorではスレッドのブロックを絶対に回避し、すべてのモジュールがFutureに戻り、for-yieldで直列に接続することを覚えておいてください.ctxを使いましたpipeToSelfは、Future演算が完了した後にReader Finishメッセージを自分に送信し、停止を通知します.
この例ではreaderタスクを次のように分けます.
1、データベースからイベントを読み込む
2、イベントの再演による状態データの生成(カート内容)
3、形成されたショッピングカートの内容を取引文書項目としてデータベースに保存する
4、ユーザーに提供されたrestapiに取引データを出力する
event読み取りはcassandra-persistence-pluginによって実現されます.
この部分は比較的簡単である:PersistenceQueryを定義し、それを用いてSourceを生成し、runというSourceはFuture[List[Any]]を取得する.
再演イベントは取引データを生成します.
List[Event]を繰り返し、List[TxnItem]を生成します.
データベースにList[TxnItem]:
注意結果タイプFuture[Seq[TxnItem]]を返します.for-yieldでこれらの動作をつなぎます.
注意結果タイプFuture[Seq[TxnItem]]を返します.for-yieldでこれらの動作をつなぎます.
注意:このforが返すFuture[List[TxnItem]]は、restapi出力機能に提供されます.そこでList[TxnItem]はpostのパケット埋め込みデータとしてjsonに変換される.
すべてのサブタスクの戻り結果タイプがFutureになりました.forで串刺しすることができます
EventSourcedBehaviorといえば、cassandra-pluginを使っていたので、プロファイルの新旧に大きな違いがあることをふと思い出しました.今のアプリケーションはconfはこうです.
akka.persitence.cassandra段落ではkeyspace名を定義できます.これにより、新しいバージョンのアプリケーションはcassandraを共有しながらオンラインにすることができます.
snapshotWhen {
(state,evt,seqNr) => CommandHandler.takeSnapshot(state,evt,seqNr)
}
...
def takeSnapshot(state: Voucher, evt: Events.Action, lstSeqNr: Long)(implicit pid: PID) = {
if (evt.isInstanceOf[Events.PaymentMade]
|| evt.isInstanceOf[Events.VoidVoucher.type]
|| evt.isInstanceOf[Events.SuspVoucher.type])
if (state.items.isEmpty) {
log.step(s"#${state.header.num} taking snapshot at [$lstSeqNr] ...")
true
} else
false
else
false
}
イベントタイプを判断するのは問題ありません.現在のイベントだからですが、もう一つの条件はカートが空でなければならないことです.これはちょっと困ります.この状態はこのいくつかのevent演算の結果に依存して確定しなければならないので、つまり次のステップですが、確定結果はカートの内容を計算する必要があります.死の循環のようです.akka-classicではevent演算結果を判断した後,状態を変える必要がある場合に特殊なeventをpersistし,このeventのhandlerで状態処理を行うことができる.仕方ありません.EventSourcedBehaviorは多層persistをサポートしていません.そうするしかありません.
case PaymentMade(acct, dpt, num, ref,amount) =>
...
writerInternal.lastVoucher = Voucher(vchs, vItems)
endVoucher(Voucher(vchs,vItems),TXNTYPE.sales)
Voucher(vchs.nextVoucher, List())
...
私はまず現在の状態を保存して、結単演算をして、それからショッピングカートを空にして、このようにsnapshotは順調に行うことができます.
さて、akkaの読み方プログラミングはPersistentQueryによって実現されています.readerの役割はeventをデータベースから読み出して具体的なデータフォーマットに復元することです.このアプリケーションのreaderの実装の詳細についてreaderの呼び出しから説明します.
val readerShard = writerInternal.optSharding.get
val readerRef = readerShard.entityRefFor(POSReader.EntityKey, s"$pid.shopId:$pid.posId")
readerRef ! Messages.PerformRead(pid.shopid, pid.posid,writerInternal.lastVoucher.header.num,writerInternal.lastVoucher.header.opr,bseq,eseq,txntype,writerInternal.expurl,writerInternal.expacct,writerInternal.exppass)
このreaderはクラスタスライス,sharding−entityであることがわかる.購入が完了するたびにentity、このentity、reader機能が完了すると自動的に終了し、すぐに占有されたリソースを解放することを考えています.reader-actorの定義は次のとおりです.
object POSReader extends LogSupport {
val EntityKey: EntityTypeKey[Command] = EntityTypeKey[Command]("POSReader")
def apply(nodeAddress: String, trace: Boolean): Behavior[Command] = {
log.stepOn = trace
implicit var pid: PID = PID("","")
Behaviors.supervise(
Behaviors.setup[Command] { ctx =>
Behaviors.withTimers { timer =>
implicit val ec = ctx.executionContext
Behaviors.receiveMessage {
case PerformRead(shopid, posid, vchnum, opr, bseq, eseq, txntype, xurl, xacct, xpass) =>
pid = PID(shopid, posid)
log.step(s"POSReader: PerformRead($shopid,$posid,$vchnum,$opr,$bseq,$eseq,$txntype,$xurl,$xacct,$xpass)")(PID(shopid, posid))
val futReadSaveNExport = for {
txnitems ActionReader.readActions(ctx, vchnum, opr, bseq, eseq, trace, nodeAddress, shopid, posid, txntype)
_ Events.TXNTYPE.suspend,
{ if(txntype == Events.TXNTYPE.voidall)
txnitems.map (_.copy(txntype=Events.TXNTYPE.voidall))
else txnitems },
trace)(ctx.system.toClassic, pid)
} yield ()
ctx.pipeToSelf(futReadSaveNExport) {
case Success(_) => {
timer.startSingleTimer(ReaderFinish(shopid, posid, vchnum), readInterval.seconds)
StopReader
}
case Failure(err) =>
log.error(s"POSReader: Error: ${err.getMessage}")
timer.startSingleTimer(ReaderFinish(shopid, posid, vchnum), readInterval.seconds)
StopReader
}
Behaviors.same
case StopReader =>
Behaviors.same
case ReaderFinish(shopid, posid, vchnum) =>
Behaviors.stopped(
() => log.step(s"POSReader: {$shopid,$posid} finish reading voucher#$vchnum and stopped")(PID(shopid, posid))
)
}
}
}
).onFailure(SupervisorStrategy.restart)
}
readerは普通のactorです.読者プログラムは膨大で複雑なプログラムであり、複数のモジュールに分割する必要があることに注意してください.そのため、次のモジュールは上のモジュールで発生した結果を継続する必要がある可能性があります.actorではスレッドのブロックを絶対に回避し、すべてのモジュールがFutureに戻り、for-yieldで直列に接続することを覚えておいてください.ctxを使いましたpipeToSelfは、Future演算が完了した後にReader Finishメッセージを自分に送信し、停止を通知します.
この例ではreaderタスクを次のように分けます.
1、データベースからイベントを読み込む
2、イベントの再演による状態データの生成(カート内容)
3、形成されたショッピングカートの内容を取引文書項目としてデータベースに保存する
4、ユーザーに提供されたrestapiに取引データを出力する
event読み取りはcassandra-persistence-pluginによって実現されます.
val query =
PersistenceQuery(classicSystem).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
// issue query to journal
val source: Source[EventEnvelope, NotUsed] =
query.currentEventsByPersistenceId(s"${pid.shopid}:${pid.posid}", startSeq, endSeq)
// materialize stream, consuming events
val readActions: Future[List[Any]] = source.runFold(List[Any]()) { (lstAny, evl) => evl.event :: lstAny }
この部分は比較的簡単である:PersistenceQueryを定義し、それを用いてSourceを生成し、runというSourceはFuture[List[Any]]を取得する.
再演イベントは取引データを生成します.
def buildVoucher(actions: List[Any]): List[TxnItem] = {
log.step(s"POSReader: read actions: $actions")
val (voidtxns,onlytxns) = actions.asInstanceOf[Seq[Action]].pickOut(_.isInstanceOf[Voided])
val listOfActions = onlytxns.reverse zip (LazyList from 1) //zipWithIndex
listOfActions.foreach { case (txn,idx) =>
txn.asInstanceOf[Action] match {
case Voided(_) =>
case ti@_ =>
curTxnItem = EventHandlers.buildTxnItem(ti.asInstanceOf[Action],vchState).copy(opr=cshr)
if(voidtxns.exists(a => a.asInstanceOf[Voided].seq == idx)) {
curTxnItem = curTxnItem.copy(txntype = TXNTYPE.voided, opr=cshr)
log.step(s"POSReader: voided txnitem: $curTxnItem")
}
val vch = EventHandlers.updateState(ti.asInstanceOf[Action],vchState,vchItems,curTxnItem,true)
vchState = vch.header
vchItems = vch.txnItems
log.step(s"POSReader: built txnitem: ${vchItems.txnitems.head}")
}
}
log.step(s"POSReader: voucher built with state: $vchState, items: ${vchItems.txnitems}")
vchItems.txnitems
}
List[Event]を繰り返し、List[TxnItem]を生成します.
データベースにList[TxnItem]:
def writeTxnsToDB(vchnum: Int, txntype: Int, bseq: Long, eseq: Long, txns: List[TxnItem])(
implicit system: akka.actor.ActorSystem, session: CassandraSession, pid: PID): Future[Seq[TxnItem]] = ???
注意結果タイプFuture[Seq[TxnItem]]を返します.for-yieldでこれらの動作をつなぎます.
val txnitems: Future[List[Events.TxnItem]] = for {
lst1 //read list from Source
lstTxns if (lst1.length < (endSeq -startSeq)) //if imcomplete list read again
readActions
else FastFuture.successful(lst1)
items FastFuture.successful( buildVoucher(lstTxns) )
_ JournalTxns.writeTxnsToDB(vchnum,txntype,startSeq,endSeq,items)
_ session.close(ec)
} yield items
注意結果タイプFuture[Seq[TxnItem]]を返します.for-yieldでこれらの動作をつなぎます.
val txnitems: Future[List[Events.TxnItem]] = for {
lst1 //read list from Source
lstTxns if (lst1.length < (endSeq -startSeq)) //if imcomplete list read again
readActions
else FastFuture.successful(lst1)
items FastFuture.successful( buildVoucher(lstTxns) )
_ JournalTxns.writeTxnsToDB(vchnum,txntype,startSeq,endSeq,items)
_ session.close(ec)
} yield items
注意:このforが返すFuture[List[TxnItem]]は、restapi出力機能に提供されます.そこでList[TxnItem]はpostのパケット埋め込みデータとしてjsonに変換される.
すべてのサブタスクの戻り結果タイプがFutureになりました.forで串刺しすることができます
val futReadSaveNExport = for {
txnitems ActionReader.readActions(ctx, vchnum, opr, bseq, eseq, trace, nodeAddress, shopid, posid, txntype)
_ Events.TXNTYPE.suspend,
{ if(txntype == Events.TXNTYPE.voidall)
txnitems.map (_.copy(txntype=Events.TXNTYPE.voidall))
else txnitems },
trace)(ctx.system.toClassic, pid)
} yield ()
EventSourcedBehaviorといえば、cassandra-pluginを使っていたので、プロファイルの新旧に大きな違いがあることをふと思い出しました.今のアプリケーションはconfはこうです.
akka {
loglevel = INFO
actor {
provider = cluster
serialization-bindings {
"com.datatech.pos.cloud.CborSerializable" = jackson-cbor
}
}
remote {
artery {
canonical.hostname = "192.168.11.189"
canonical.port = 0
}
}
cluster {
seed-nodes = [
"akka://[email protected]:2551"]
sharding {
passivate-idle-entity-after = 5 m
}
}
# use Cassandra to store both snapshots and the events of the persistent actors
persistence {
journal.plugin = "akka.persistence.cassandra.journal"
snapshot-store.plugin = "akka.persistence.cassandra.snapshot"
}
}
akka.persistence.cassandra {
# don't use autocreate in production
journal.keyspace = "poc2g"
journal.keyspace-autocreate = on
journal.tables-autocreate = on
snapshot.keyspace = "poc2g_snapshot"
snapshot.keyspace-autocreate = on
snapshot.tables-autocreate = on
}
datastax-java-driver {
basic.contact-points = ["192.168.11.189:9042"]
basic.load-balancing-policy.local-datacenter = "datacenter1"
}
akka.persitence.cassandra段落ではkeyspace名を定義できます.これにより、新しいバージョンのアプリケーションはcassandraを共有しながらオンラインにすることができます.