yii 2フレームワーク非同期daemonの追加

44557 ワード

PHPのyii 2フレームワークでは、redis listベースの非同期キュープロセスを作成するために、いくつかのファイルを自分で追加する方法を主に書く非同期キューの機能が必要になる場合があります.yii 2のフレームワークでphp-cliプロセスを使用するには、コントローラをcommandsフォルダに入れ、php ./yii ...で呼び出す必要があります.
まず、非同期キューを開始するためのControllerを作成します.コードは次のとおりです.

namespace app\commands\daemon;

use yii\console\Controller;
use Yii;

class DaemonController extends Controller
{
     
    private $running_children_pid = [];
    
    private $running = true;
    private static $_master_pid;
    public static $pid_file;
    private $alarm_times = 0;
    private $pid = 0;

    public function __construct($id, $module, $config = array())
    {
     
        $this->checkPcntl();
        static::$pid_file = __DIR__ . "/../../runtime/dasheng_yii_daemon.pid";
        parent::__construct($id, $module, $config);
    }
    
    //        pcntl  
    private function checkPcntl()
    {
     
        // Make sure PHP has support for pcntl
        if (!function_exists('pcntl_signal')) {
     
            $message = 'PHP does not appear to be compiled with the PCNTL extension.  This is neccesary for daemonization';
            echo $message;
            throw new \Exception($message);
        }
        
    }
    
    private function killallChildren()
    {
     
        foreach ($this->running_children_pid as $pname => $pids) {
     
            foreach ($pids as $pid) {
     
                posix_kill($pid, SIGQUIT);
            }
        }
    }

    private function sendWarningMail($msg)
    {
     
        try {
     
            Yii::$app->mailer->compose()
                ->setFrom('***')
                ->setTo('***')
                ->setSubject('daemon     ')
                ->setHtmlBody("$msg")
                ->send();
        } catch (\Exception $e) {
     
            \Yii::warning("【daemon     】        >>>" . $e->getMessage());
        }
    }
    
    //      
    public function signalHandler($signo)
    {
     
        EchoFormat::out("" . $this->pid . " signalHandler($signo)");
        switch ($signo) {
     
            //       ,           ,      ,  while   wait
            case SIGCHLD:
                $stopped_pid = pcntl_wait($status, WNOHANG);
                EchoFormat::out($this->pid." wait pid=".$stopped_pid);
                $this->removePid($stopped_pid);

                //    
                if($this->running)
                {
     
                    EchoFormat::out("running");
                    ++$this->alarm_times;
                    switch ($this->alarm_times)
                    {
     
                        case 1:
                        case 10:
                        case 100:
                            $ip_info = shell_exec('ifconfig');
                            $this->sendWarningMail("
$ip_info
");
break;
default:
break;
}
}
break;
//プロセス
case SIGTERM:
case SIGHUP:
case SIGQUIT:
$this->running = false;
EchoFormat::out("master stopping");
$this->killallChildren();
break;
default:
break;
}
EchoFormat::out($this->pid."signalHandler over");
}
//pidの
private function removePid($stopped_pid)
{
foreach ($this->running_children_pid as $pname => $pids) {
$index = array_search($stopped_pid, $pids);
if ($index !== false && $index >= 0) {
EchoFormat::out("unset running_children_pid[$pname][$index]");
unset($this->running_children_pid[$pname][$index]);
EchoFormat::out("running_children_pid=". print_r($this->running_children_pid, true));
if (count($this->running_children_pid[$pname]) == 0) {
unset($this->running_children_pid[$pname]);
}
}
}
}
private function onStart()
{
while ($this->running) {
//echo "mgr on start";
$daemon_config = Yii::$app->params["daemon_config"];
$daemons = $daemon_config["daemons"];
$process_name = $daemon_config["process_name"];
foreach ($daemons as $pname => $pparams) {
$controller = $pparams["controller"];
$init_params = $pparams["init_params"];
$count = intval($pparams["count"]);
$key = $pparams["key"];
$action = $controller . "/run";
$running_count = 0;
$pids = array();
if (isset($this->running_children_pid[$pname])) {
//processes are running
$pids = $this->running_children_pid[$pname];
$running_count = count($pids);
}
if ($running_count >= $count) {
//reduce process, send signal
for ($i = 0; $i < $running_count - $count;++$i) {
$stopping_pid = array_shift($pids);
posix_kill($stopping_pid, SIGQUIT);
EchoFormat::out("reduce daemon=". $pname. ", pid=".$stopping_pid);
}
$this->running_children_pid[$pname] = $pids;
}
$params[0] = $init_params;
$params[1] = $key;
for ($i = 0; $i < $count - $running_count;++$i) {
$child_pid = pcntl_fork();
if ($child_pid < 0) {
EchoFormat::out("fork error");
continue;
} else if (0 == $child_pid) {
//worker
$this->pid = getmypid();
EchoFormat::out("fork=". $pname . ", pid=".$this->pid);
cli_set_process_title($process_name.": worker process: ". $pname);
Yii::$app->runAction($action, $params);
exit(0);
}
//mgr
$this->running_children_pid[$pname][] = $child_pid;
}
}
//EchoFormat::out("".$this->pid."pcntl_signal");
//
pcntl_signal(SIGTERM, array($this, "signalHandler"), false);
pcntl_signal(SIGINT, array($this, "signalHandler"), false);
pcntl_signal(SIGQUIT, array($this, "signalHandler"), false);
pcntl_signal(SIGCHLD, array($this, "signalHandler"), false);
self::saveMasterPid();
//EchoFormat::out("".$this->pid."pcntl_signal_dispatch begin");
pcntl_signal_dispatch();
//EchoFormat::out("".$this->pid."pcntl_signal_dispatch over");
sleep(1);
}
while (count($this->running_children_pid)) {
pcntl_signal_dispatch();
$stopped_pid = pcntl_wait($status);
echo "loop wait pid=".$stopped_pid,"";
$this->removePid($stopped_pid);
//sleep(1);
usleep(100);
}
}
public function actionStart()
{
if (php_sapi_name() != "cli") {
die("only run in command line mode");
}
$daemon_config = Yii::$app->params["daemon_config"];
$process_name = $daemon_config["process_name"];
$master_pid = @file_get_contents(self::$pid_file);
$master_is_alive = $master_pid && @posix_kill($master_pid, SIG_DFL);
if ($master_is_alive) {
echo $process_name.": already running";
exit;
}
set_time_limit(0);
umask(0);//ファイルをマスクする
$master_pid = pcntl_fork();
if ($master_pid < 0) {
echo "fork master failed!";
} else if ($master_pid) {
exit(0);
}
posix_setsid();// しいセッションリーダーを し、 から する
$this->pid = getmypid();
cli_set_process_title($process_name.": master process");
self::saveMasterPid();
fclose(stdOUT);
fclose(stdERR);
global $stdOUT, $stdERR;
$log_dir = __DIR__ . "/../../runtime/logs/console/";
if(is_dir($log_dir) == false)
{
mkdir($log_dir, 0777, true);
}
chmod($log_dir, 0777);
$stdOUT = fopen($log_dir."console_echo-". date("Y-m-d") . ".log", "ab");
$stdERR = fopen($log_dir."console_err-". date("Y-m-d") . ".log", "ab");
$this->onStart();
EchoFormat::out("master stopped");
}
public function actionStop()
{
$daemon_config = Yii::$app->params["daemon_config"];
$process_name = $daemon_config["process_name"];
EchoFormat::out("daemon stop..");
//system("kill $(ps -ef|grep\"".$process_name.": master\"|awk '$0 !~/grep/{print $2}' |tr -s '' ' ')");
$master_pid = @file_get_contents(self::$pid_file);
posix_kill($master_pid, SIGTERM);
}
protected static function saveMasterPid()
{
self::$_master_pid = posix_getpid();
if (false === @file_put_contents(self::$pid_file, self::$_master_pid)) {
throw new\Exception('can not save pid to ' . self::$pid_file);
}
}
public function actionRestart()
{
self::actionStop();
sleep(1);
$master_pid = @file_get_contents(self::$pid_file);
while($master_is_alive = $master_pid && @posix_kill($master_pid, SIG_DFL))
{
sleep(1);
}
self::actionStart();
}
}
onStart ではループが され、サブプロセスのステータスをチェックし けます. していない は、サブプロセスがforkされ、 のように します.
Yii::$app->runAction($action, $params);

なdaemonプロセスのcontrollerに ると、このcontrollerの を する クラスを します.


namespace app\commands\daemon;

use app\components\DbMethods;
use app\components\RedisLink;
use yii\console\Controller;

abstract class DaemonBaseController extends Controller
{
     
    private $running = true;

    //      
    public function signalHandler($signo)
    {
     
        echo "" . getmypid() . " signalHandler($signo)
"
; switch ($signo) { // case SIGTERM: case SIGHUP: case SIGQUIT: $this->running = false; break; default: break; } } public function actionRun($init_params = [], $key = "") { pcntl_signal(SIGTERM, array($this, "signalHandler"), false); pcntl_signal(SIGINT, array($this, "signalHandler"), false); pcntl_signal(SIGQUIT, array($this, "signalHandler"), false); $redis_pipe = RedisLink::getRedis("pipe"); $has_data = false; while ($this->running) { while ($str_pop_data = $redis_pipe->rpop($key)) { //$STDOUT = fopen(__DIR__."/../../runtime/logs/console/console_echo.log.".date("Y-m-d", time()), "ab"); //$STDERR = fopen(__DIR__."/../../runtime/logs/console/console_err.log.".date("Y-m-d", time()), "ab"); $pop_data = json_decode($str_pop_data, true); @$id = intval($pop_data["id"]); @$data = $pop_data["data"]; EchoFormat::out("class=" . get_class($this) . " perform id=".$id." data=".print_r($data, true)); $this->perform($id, $data, $init_params); $has_data = true; } if($has_data) { DbMethods::closeAll(); } $has_data = false; usleep(100000); pcntl_signal_dispatch(); } } abstract protected function perform($id, $data, $init_params); }

サブプロセスはactionRun に り、$redis_pipe->rpop($key)を してredisのキューからデータを し、 クラスが する があるabstract protected function perform($id, $data, $init_params);にデータを してredisのキューにデータのコードを れ、componentsフォルダに いて のビジネスを することができます.


namespace app\components;

class AsynQueue {
     

    /**
     * @param string $key  redis  
     * @param int $id
     * @param array $data
     */
    public static function pushData($key, $id, $data)
    {
     
        $redis_pipe = RedisLink::getRedis("pipe");
        $push_data = array();
        $push_data["id"] = $id;
        $push_data["data"] = $data;
        $redis_pipe->lpush($key, json_encode($push_data));
    }
}

configにdaemonを1つ くconfig.phpのプロファイル:


namespace app\config;

return [
    "process_name" => "test_daemon",
    "daemons" => [
        "sms"  => [
            "controller" => "daemon/sms-daemon",
            "init_params" => [],
            "count" => 1,
            "key" => "sms_pipe",
        ],
    ]
];

daemon controllerの をもう つ きます.


namespace app\commands\daemon;
use Yii;

class UserDaemonController extends DaemonBaseController
{
     
    protected function perform($id, $data, $init_params)
    {
     
        switch($id)
        {
     
            case 1:
                //      
                break;
            case 2:
            {
     
               //      
            }
                break;
            case 3:
            {
     
                //      
            }
                break;
            default:
                break;
        }
    }
}