RustのActixで常駐プログラムを作る


(2020/03/12 ループ周りで気に入らなかった、すぐに反映されない問題を修正しました。)

目的

RustのActix0.9系で常駐プログラムを作ります。
常駐プログラムで必要なものは以下の3つと考えました。

  • Graceful Stop
  • ループ
  • DBアクセス

Graceful Stop

ActixではActorのstopが呼ばれるとstoppingのboolの戻り値で停止の可否を制御できます。
停止を拒否した後は、またstopを呼ばない止まってくれないので、少し使いづらいです。
そこで各Actorには終了のメッセージを送信して、停止できる状態になったら自分でactor.stop()します。
すべてのstopを検知した時にactix::System::current().stop()を呼ぶようにしたいです。

またhangup, interrupt, quit, terminateのシグナルをハンドリングしたいです。

ループ

常駐プログラムなので、停止のシグナルを受け付けるまでは、動き続けてもらいたいです。
ビジーループは良くないので、ループはあるタスクを実行してその戻り値の時間だけスリープする構造にします。
例えばキューにデータがある場合はどんどん処理してほしいのでスリープは0秒、キューが空の場合はスリープ1秒のようにします。

DBアクセス

色々な仕事はあると思いますが、とりあえずDBアクセスはいつでもどこでも必要になります。自分はPostgreSQLとRedisをよく使います。

crates

actix-daemon-utils

Graceful Stopとループをサポートします。自作です。以下のプログラムは指定した時間毎に指定した文字を表示するプログラムです。ctrl+cで止まります。

Cargo.toml
[package]
name = "delayer"
version = "0.1.0"
edition = "2018"

[dependencies]
actix = "^0.9"
actix-daemon-utils = "^0.3"
main.rs
use actix::prelude::*;
use actix_daemon_utils::{
    graceful_stop::{GracefulStop},
    delayer::{Delayer, Task, Timing},
};
use std::time::Duration;

struct MyActor { msg: String, seconds: u64 }

impl Actor for MyActor {
    type Context = Context<Self>;
}

impl Handler<Task> for MyActor {
    type Result = ();

    fn handle(&mut self, task: Task, _ctx: &mut Self::Context) -> Self::Result {
        println!("{}", self.msg);
        task.0.do_send(Timing::Later(Duration::from_secs(self.seconds)));
    }
}

fn main() {
    let sys = actix::System::new("main");
    let graceful_stop = GracefulStop::new();
    let actor1 = MyActor { msg: "x".to_string(), seconds: 1 }.start();
    let actor2 = MyActor { msg: "y".to_string(), seconds: 3 }.start();
    let delayer1 = Delayer::new(actor1.recipient(), graceful_stop.clone_system_terminator(), Duration::from_secs(10)).start();
    let delayer2 = Delayer::new(actor2.recipient(), graceful_stop.clone_system_terminator(), Duration::from_secs(10)).start();
    graceful_stop
        .subscribe(delayer1.recipient())
        .subscribe(delayer2.recipient())
        .start();

    let _ = sys.run();
    println!("main terminated");
}

actix-postgres

PostgreSQLへのDBアクセスをサポートします。自作です。

Cargo.toml
[package]
name = "pg"
version = "0.1.0"
edition = "2018"

[dependencies]
actix = "^0.9"
actix-postgres = "^0.1"
actix-rt = "1.0"
main.rs
use actix::prelude::*;
use actix_postgres::{bb8_postgres::tokio_postgres::tls::NoTls, PostgresActor, PostgresMessage};

#[actix_rt::main]
async fn main() {
    let path = std::env::var("PG_PATH").unwrap();
    let pg_actor = PostgresActor::start(&path, NoTls).unwrap();
    let task = PostgresMessage::new(|pool| {
        Box::pin(async move {
            let connection = pool.get().await?;
            connection
                .query("SELECT NOW()::TEXT as c", &vec![])
                .await
                .map_err(|err| err.into())
        })
    });
    let res = pg_actor.send(task).await.unwrap().unwrap();
    let val: &str = res[0].get(0);
    println!("{}", val);
    System::current().stop();
}

actix-redis

Actix本家が提供しているRedisアクセス。actix-postgresはこれを参考に作りました。

常駐プログラムサンプル

機能

Redisのキューにデータが入ると、そのデータをPostgreSQLに渡して処理をします。キューにデータがある間はループの待ち時間0で処理していきます。キューが無い場合は3秒まってから実行します。

Cargo.toml
[package]
name = "daemon"
version = "0.1.0"
edition = "2018"

[dependencies]
actix = "^0.9"
actix-daemon-utils = "^0.3"
actix-postgres = "^0.1"
actix-redis = "^0.8"
futures = "^0.3"
redis-async = "^0.6"
main.rs
#[macro_use]
extern crate redis_async;

use actix::prelude::*;
use actix_daemon_utils::{
    graceful_stop::{GracefulStop},
    delayer::{Delayer, Task, Timing},
};
use actix_postgres::{
    bb8_postgres::tokio_postgres::tls::NoTls,
    PostgresActor,
    PostgresMessage,
};
use actix_redis::{
    Command,
    RedisActor,
    RespValue,
};
use std::time::Duration;

struct MyActor {
    pg: Addr<PostgresActor<NoTls>>,
    redis: Addr<RedisActor>,
}

impl Actor for MyActor {
    type Context = Context<Self>;
}

impl Handler<Task> for MyActor {
    type Result = ();

    fn handle(&mut self, task: Task, ctx: &mut Self::Context) -> Self::Result {
        let redis = self.redis.clone();
        let pg = self.pg.clone();
        async move {
            let resp = redis.send(Command(resp_array!["LPOP", "queue"])).await.unwrap().unwrap();
            println!("resp={:?}", resp);
            match resp {
                RespValue::Nil => {
                    // キューにデータが無いので、しばらくお休みする
                    task.0.do_send(Timing::Later(Duration::from_secs(3)));
                },
                RespValue::BulkString(val) => {
                    // まだキューにデータがあるかもしれないので、直ぐに次の処理を依頼する
                    task.0.do_send(Timing::Immediately);
                    let task = PostgresMessage::new(|pool| {
                        Box::pin(async move {
                            let val = std::str::from_utf8(&val).unwrap();
                            let connection = pool.get().await?;
                            connection
                                .query("SELECT NOW()::TEXT || $1", &[&val])
                                .await
                                .map_err(|err| err.into())
                        })
                    });
                    let res = pg.send(task).await.unwrap().unwrap();
                    let val: &str = res[0].get(0);
                    println!("string={}", val);
                },
                _ => {
                    task.0.do_send(Timing::Later(Duration::from_secs(3)));
                },
            }
        }.into_actor(self)
        .wait(ctx);
    }
}

fn main() {
    let pg_path = std::env::var("PG_PATH").unwrap();
    let redis_path = std::env::var("REDIS_PATH").unwrap();
    let sys = actix::System::new("main");
    let graceful_stop = GracefulStop::new();
    let pg = PostgresActor::start(&pg_path, NoTls).unwrap();
    let redis = RedisActor::start(&redis_path);
    let actor = MyActor {
        pg: pg,
        redis: redis,
    }.start();
    let delayer = Delayer::new(actor.recipient(), graceful_stop.clone_system_terminator(), Duration::from_secs(1)).start();
    graceful_stop
        .subscribe(delayer.recipient())
        .start();

    let _ = sys.run();
    println!("main terminated");
}

キューにデータを入れるのは以下の通りです。

$ redis-cli -c rpush queue テスト

まとめ

以前はactix_daemon_utils::looper::Looperを使っていて、Taskのハンドラーですぐに待ち時間が設定できなかったのが、不満だったんですが、actix_daemon_utils::delayer::DelayerでTaskのハンドラーの中で次の処理を回すようにすることで解消できました。