RustのActixで常駐プログラムを作る
(2020/03/12 ループ周りで気に入らなかった、すぐに反映されない問題を修正しました。)
目的
RustのActix0.9系で常駐プログラムを作ります。
常駐プログラムで必要なものは以下の3つと考えました。
- Graceful Stop
- ループ
- DBアクセス
Graceful Stop
ActixではActorのstopが呼ばれるとstoppingのboolの戻り値で停止の可否を制御できます。
停止を拒否した後は、またstopを呼ばない止まってくれないので、少し使いづらいです。
そこで各Actorには終了のメッセージを送信して、停止できる状態になったら自分でactor.stop()します。
すべてのstopを検知した時にactix::System::current().stop()を呼ぶようにしたいです。
またhangup, interrupt, quit, terminateのシグナルをハンドリングしたいです。
ループ
常駐プログラムなので、停止のシグナルを受け付けるまでは、動き続けてもらいたいです。
ビジーループは良くないので、ループはあるタスクを実行してその戻り値の時間だけスリープする構造にします。
例えばキューにデータがある場合はどんどん処理してほしいのでスリープは0秒、キューが空の場合はスリープ1秒のようにします。
DBアクセス
色々な仕事はあると思いますが、とりあえずDBアクセスはいつでもどこでも必要になります。自分はPostgreSQLとRedisをよく使います。
crates
actix-daemon-utils
Graceful Stopとループをサポートします。自作です。以下のプログラムは指定した時間毎に指定した文字を表示するプログラムです。ctrl+cで止まります。
[package]
name = "delayer"
version = "0.1.0"
edition = "2018"
[dependencies]
actix = "^0.9"
actix-daemon-utils = "^0.3"
use actix::prelude::*;
use actix_daemon_utils::{
graceful_stop::{GracefulStop},
delayer::{Delayer, Task, Timing},
};
use std::time::Duration;
struct MyActor { msg: String, seconds: u64 }
impl Actor for MyActor {
type Context = Context<Self>;
}
impl Handler<Task> for MyActor {
type Result = ();
fn handle(&mut self, task: Task, _ctx: &mut Self::Context) -> Self::Result {
println!("{}", self.msg);
task.0.do_send(Timing::Later(Duration::from_secs(self.seconds)));
}
}
fn main() {
let sys = actix::System::new("main");
let graceful_stop = GracefulStop::new();
let actor1 = MyActor { msg: "x".to_string(), seconds: 1 }.start();
let actor2 = MyActor { msg: "y".to_string(), seconds: 3 }.start();
let delayer1 = Delayer::new(actor1.recipient(), graceful_stop.clone_system_terminator(), Duration::from_secs(10)).start();
let delayer2 = Delayer::new(actor2.recipient(), graceful_stop.clone_system_terminator(), Duration::from_secs(10)).start();
graceful_stop
.subscribe(delayer1.recipient())
.subscribe(delayer2.recipient())
.start();
let _ = sys.run();
println!("main terminated");
}
actix-postgres
PostgreSQLへのDBアクセスをサポートします。自作です。
[package]
name = "pg"
version = "0.1.0"
edition = "2018"
[dependencies]
actix = "^0.9"
actix-postgres = "^0.1"
actix-rt = "1.0"
use actix::prelude::*;
use actix_postgres::{bb8_postgres::tokio_postgres::tls::NoTls, PostgresActor, PostgresMessage};
#[actix_rt::main]
async fn main() {
let path = std::env::var("PG_PATH").unwrap();
let pg_actor = PostgresActor::start(&path, NoTls).unwrap();
let task = PostgresMessage::new(|pool| {
Box::pin(async move {
let connection = pool.get().await?;
connection
.query("SELECT NOW()::TEXT as c", &vec![])
.await
.map_err(|err| err.into())
})
});
let res = pg_actor.send(task).await.unwrap().unwrap();
let val: &str = res[0].get(0);
println!("{}", val);
System::current().stop();
}
actix-redis
Actix本家が提供しているRedisアクセス。actix-postgresはこれを参考に作りました。
常駐プログラムサンプル
機能
Redisのキューにデータが入ると、そのデータをPostgreSQLに渡して処理をします。キューにデータがある間はループの待ち時間0で処理していきます。キューが無い場合は3秒まってから実行します。
[package]
name = "daemon"
version = "0.1.0"
edition = "2018"
[dependencies]
actix = "^0.9"
actix-daemon-utils = "^0.3"
actix-postgres = "^0.1"
actix-redis = "^0.8"
futures = "^0.3"
redis-async = "^0.6"
#[macro_use]
extern crate redis_async;
use actix::prelude::*;
use actix_daemon_utils::{
graceful_stop::{GracefulStop},
delayer::{Delayer, Task, Timing},
};
use actix_postgres::{
bb8_postgres::tokio_postgres::tls::NoTls,
PostgresActor,
PostgresMessage,
};
use actix_redis::{
Command,
RedisActor,
RespValue,
};
use std::time::Duration;
struct MyActor {
pg: Addr<PostgresActor<NoTls>>,
redis: Addr<RedisActor>,
}
impl Actor for MyActor {
type Context = Context<Self>;
}
impl Handler<Task> for MyActor {
type Result = ();
fn handle(&mut self, task: Task, ctx: &mut Self::Context) -> Self::Result {
let redis = self.redis.clone();
let pg = self.pg.clone();
async move {
let resp = redis.send(Command(resp_array!["LPOP", "queue"])).await.unwrap().unwrap();
println!("resp={:?}", resp);
match resp {
RespValue::Nil => {
// キューにデータが無いので、しばらくお休みする
task.0.do_send(Timing::Later(Duration::from_secs(3)));
},
RespValue::BulkString(val) => {
// まだキューにデータがあるかもしれないので、直ぐに次の処理を依頼する
task.0.do_send(Timing::Immediately);
let task = PostgresMessage::new(|pool| {
Box::pin(async move {
let val = std::str::from_utf8(&val).unwrap();
let connection = pool.get().await?;
connection
.query("SELECT NOW()::TEXT || $1", &[&val])
.await
.map_err(|err| err.into())
})
});
let res = pg.send(task).await.unwrap().unwrap();
let val: &str = res[0].get(0);
println!("string={}", val);
},
_ => {
task.0.do_send(Timing::Later(Duration::from_secs(3)));
},
}
}.into_actor(self)
.wait(ctx);
}
}
fn main() {
let pg_path = std::env::var("PG_PATH").unwrap();
let redis_path = std::env::var("REDIS_PATH").unwrap();
let sys = actix::System::new("main");
let graceful_stop = GracefulStop::new();
let pg = PostgresActor::start(&pg_path, NoTls).unwrap();
let redis = RedisActor::start(&redis_path);
let actor = MyActor {
pg: pg,
redis: redis,
}.start();
let delayer = Delayer::new(actor.recipient(), graceful_stop.clone_system_terminator(), Duration::from_secs(1)).start();
graceful_stop
.subscribe(delayer.recipient())
.start();
let _ = sys.run();
println!("main terminated");
}
キューにデータを入れるのは以下の通りです。
$ redis-cli -c rpush queue テスト
まとめ
以前はactix_daemon_utils::looper::Looperを使っていて、Taskのハンドラーですぐに待ち時間が設定できなかったのが、不満だったんですが、actix_daemon_utils::delayer::DelayerでTaskのハンドラーの中で次の処理を回すようにすることで解消できました。
Author And Source
この問題について(RustのActixで常駐プログラムを作る), 我々は、より多くの情報をここで見つけました https://qiita.com/aoyagikouhei/items/dadd38734e47babfbe68著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .