Rust:actorモードとActivxライブラリ
8848 ワード
ずっとrustのactor同時モードを理解したいと思っています.Activxライブラリはrustで有名なライブラリです.Activxライブラリの説明を見てactorに入ります.このライブラリの重要ないくつかの概念:1、actorの任意の実装Actor traitのタイプは、actorです.Actorにはライフサイクルがあり、いくつかのステータスがあります.
(1)Started
(2) Running
(3)Stopping
(4)Stopped
Actor trait:start()、start_が入っていますdefault()などパラメータを持たない関数は,ほとんどがAddrを返す.Addrタイプを見ました.すべてのActorsにはメールアドレスAddrがあり、Actors間の通信はmessagesによって実現されるため、Actors間ではmessagesアドレスしか知らず、相手の内部に侵入することはできない.
カスタムクラス、Actorの実装:
2、message 、handler、 address、Recipient
(1)任意の実装Message traitタイプは、messageである.
(2)すべてのActors間の通信はmessagesによって実現される.通信するメッセージは宛先メールボックスに送信され、Actorsはメッセージhandlers(ハンドル)を呼び出し、コンテキスト(context)を実行する.
(3)handlerって何?
実装handler:
(4)クラスをカスタマイズし、Messageを実現する:
(5)メッセージの送信方法まずAddr objectを使います.具体的には、actors間でmessageを送信する方法はいくつかあります.
Addr::do_send(M)-メールボックスがいっぱいになったり、閉じたりする可能性があるため、エラーを返す方法を無視します.Addr::try_send(M)-エラーが発生した場合、SendErrorが返されます.Addr::send(M)-処理プロセスの結果情報を含むfuture objectが返されます.
(6)Recipient-受信者は、1つのActorが別の異なるタイプのActorに送信されたときのアドレスである.例えば、購読者と送信者.
3、context(コンテキスト)、Mailbox
(1)Contextは文字通りコンテキストである.具体的には何がありますか.ソースコードを見てください.
ContextPartsとOptionの2つの部分で構成されています.(2)ContextPartsのソースコードが見つかりました.
(3)Mailboxのデザインを見てみましょう.
4、Arbiter 、SyncArbiter
Actorsには非同期実行のコンテキスト環境が提供され、1つのactorが実行されると、Arbitersはactorが特定の実行状況を含むコンテキスト環境を制御します.Arbitersは、システムスレッドの関数、イベントポーリング、イベントポーリングタスクの非同期配布、非同期タスクのサポートなど、多くの関数を実行する必要があります.actorを起動すると、システムのスレッドで実行され、効率が高くなります.すなわち、1つのスレッドがN個のactorに対して存在する可能性がある.イベントポーリングイベントポーリングでは、対応するArbiterがイベントポーリングイベントプールのスレッドを制御します.Arbiterはタスクキューを並べます.Arbiterを「single-threaded event loop」と見なすことが多いです.
5、futureはFutureライブラリから見ることができる:
(1)Started
(2) Running
(3)Stopping
(4)Stopped
Actor trait:start()、start_が入っていますdefault()などパラメータを持たない関数は,ほとんどがAddr
pub trait Actor: Sized + 'static {
type Context: ActorContext;
fn started(&mut self, ctx: &mut Self::Context) {}
fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
Running::Stop
}
fn start(self) -> Addr
where
Self: Actor>,
{
Context::new().run(self)
}
fn start_default() -> Addr
where
Self: Actor> + Default,
{
Self::default().start()
}
/// Start new actor in arbiter's thread.
fn start_in_arbiter(arb: &Arbiter, f: F) -> Addr
where
Self: Actor>,
F: FnOnce(&mut Context) -> Self + Send + 'static,
{
let (tx, rx) = channel::channel(DEFAULT_CAPACITY);
// create actor
arb.exec_fn(move || {
let mut ctx = Context::with_receiver(rx);
let act = f(&mut ctx);
let fut = ctx.into_future(act);
actix_rt::spawn(fut);
});
Addr::new(tx)
}
fn create(f: F) -> Addr
where
Self: Actor>,
F: FnOnce(&mut Context) -> Self + 'static,
{
let mut ctx = Context::new();
let act = f(&mut ctx);
ctx.run(act)
}
}
カスタムクラス、Actorの実装:
use actix::prelude::*;
struct MyActor {
count: usize,
}
impl Actor for MyActor {
type Context = Context;
// , ,
fn started(&mut self, ctx: &mut Self::Context) {
ctx.set_mailbox_capacity(1);
}
}
let addr = MyActor.start();
2、message 、handler、 address、Recipient
(1)任意の実装Message traitタイプは、messageである.
pub trait Message {
/// The type of value that this message will resolved with if it is
/// successful.
type Result: 'static;
}
(2)すべてのActors間の通信はmessagesによって実現される.通信するメッセージは宛先メールボックスに送信され、Actorsはメッセージhandlers(ハンドル)を呼び出し、コンテキスト(context)を実行する.
(3)handlerって何?
pub trait Handler
where
Self: Actor,
M: Message,
{
/// The type of value that this handler will return.
type Result: MessageResponse;
/// This method is called for every message received by this actor.
fn handle(&mut self, msg: M, ctx: &mut Self::Context) -> Self::Result;
}
実装handler:
struct Ping(usize);
impl Handler for MyActor {
type Result = usize;
fn handle(&mut self, msg: Ping, ctx: &mut Context) -> Self::Result {
self.count += msg.0;
self.count
}
}
(4)クラスをカスタマイズし、Messageを実現する:
use actix::prelude::*;
struct Ping(usize);
impl Message for Ping {
type Result = usize;
}
(5)メッセージの送信方法まずAddr objectを使います.具体的には、actors間でmessageを送信する方法はいくつかあります.
Addr::do_send(M)-メールボックスがいっぱいになったり、閉じたりする可能性があるため、エラーを返す方法を無視します.Addr::try_send(M)-エラーが発生した場合、SendErrorが返されます.Addr::send(M)-処理プロセスの結果情報を含むfuture objectが返されます.
(6)Recipient-受信者は、1つのActorが別の異なるタイプのActorに送信されたときのアドレスである.例えば、購読者と送信者.
3、context(コンテキスト)、Mailbox
(1)Contextは文字通りコンテキストである.具体的には何がありますか.ソースコードを見てください.
pub struct Contextwhere
A: Actor>,
{
parts: ContextParts,
mb: Option>,
}
implContextwhere
A: Actor,
{
#[inline]
pub(crate) fn new() -> Self {
let mb = Mailbox::default();
Self {
parts: ContextParts::new(mb.sender_producer()),
mb: Some(mb),
}
}
#[inline]
pub fn with_receiver(rx: AddressReceiver) -> Self {
let mb = Mailbox::new(rx);
Self {
parts: ContextParts::new(mb.sender_producer()),
mb: Some(mb),
}
}
#[inline]
pub fn run(self, act: A) -> Addr{
let fut = self.into_future(act);
let addr = fut.address();
actix_rt::spawn(fut);
addr
}
pub fn into_future(mut self, act: A) -> ContextFut{
let mb = self.mb.take().unwrap();
ContextFut::new(self, act, mb)
}
pub fn handle(&self) -> SpawnHandle {
self.parts.curr_handle()
}
pub fn set_mailbox_capacity(&mut self, cap: usize) {
self.parts.set_mailbox_capacity(cap)
}
}
implAsyncContextPartsfor Contextwhere
A: Actor,
{
fn parts(&mut self) -> &mut ContextParts{
&mut self.parts
}
}
pub trait ContextFutureSpawnerwhere
A: Actor,
A::Context: AsyncContext,
{
fn spawn(self, ctx: &mut A::Context);
fn wait(self, ctx: &mut A::Context);
}
implContextFutureSpawnerfor T
where
A: Actor,
A::Context: AsyncContext,
T: ActorFuture + 'static,
{
#[inline]
fn spawn(self, ctx: &mut A::Context) {
let _ = ctx.spawn(self);
}
#[inline]
fn wait(self, ctx: &mut A::Context) {
ctx.wait(self);
}
}
ContextPartsとOptionの2つの部分で構成されています.(2)ContextPartsのソースコードが見つかりました.
pub struct ContextPartswhere
A: Actor,
A::Context: AsyncContext,
{
addr: AddressSenderProducer,
flags: ContextFlags,
wait: SmallVec; 2]>,
items: SmallVec; 3]>,
handles: SmallVec,
}
(3)Mailboxのデザインを見てみましょう.
pub struct Mailboxwhere
A: Actor,
A::Context: AsyncContext,
{
msgs: AddressReceiver,
}
implMailboxwhere
A: Actor,
A::Context: AsyncContext,
{
#[inline]
pub fn new(msgs: AddressReceiver) -> Self {
Self { msgs }
}
pub fn capacity(&self) -> usize {
self.msgs.capacity()
}
pub fn set_capacity(&mut self, cap: usize) {
self.msgs.set_capacity(cap);
}
#[inline]
pub fn connected(&self) -> bool {
self.msgs.connected()
}
pub fn address(&self) -> Addr{
Addr::new(self.msgs.sender())
}
pub fn sender_producer(&self) -> AddressSenderProducer{
self.msgs.sender_producer()
}
pub fn poll(&mut self, act: &mut A, ctx: &mut A::Context) {
#[cfg(feature = "mailbox_assert")]
let mut n_polls = 0u16;
loop {
let mut not_ready = true;
// sync messages
loop {
if ctx.waiting() {
return;
}
match self.msgs.poll() {
Ok(Async::Ready(Some(mut msg))) => {
not_ready = false;
msg.handle(act, ctx);
}
Ok(Async::Ready(None)) | Ok(Async::NotReady) | Err(_) => break,
}
#[cfg(feature = "mailbox_assert")]
{
n_polls += 1;
assert!(n_polls < MAX_SYNC_POLLS, "Too many messages are being processed. Use Self::Context::notify() instead of direct use of address");
}
}
if not_ready {
return;
}
}
}
}
4、Arbiter 、SyncArbiter
Actorsには非同期実行のコンテキスト環境が提供され、1つのactorが実行されると、Arbitersはactorが特定の実行状況を含むコンテキスト環境を制御します.Arbitersは、システムスレッドの関数、イベントポーリング、イベントポーリングタスクの非同期配布、非同期タスクのサポートなど、多くの関数を実行する必要があります.actorを起動すると、システムのスレッドで実行され、効率が高くなります.すなわち、1つのスレッドがN個のactorに対して存在する可能性がある.イベントポーリングイベントポーリングでは、対応するArbiterがイベントポーリングイベントプールのスレッドを制御します.Arbiterはタスクキューを並べます.Arbiterを「single-threaded event loop」と見なすことが多いです.
5、futureはFutureライブラリから見ることができる:
pub trait Future {
/// The type of value produced on completion.
#[stable(feature = "futures_api", since = "1.36.0")]
type Output;
#[stable(feature = "futures_api", since = "1.36.0")]
fn poll(self: Pin, cx: &mut Context) -> Poll<:output>;
}