• 定义job interface,代码参考
<?php
/**
 * JobInterface.php class file.
 * @author karnc
 * @date 2020/12/31
 */

namespace common\drivers\interfaces;

interface JobInterface
{

    /**
     * 任务处理方法
     * @return mixed
     */
    public function handle();
}
  • 定义redis queue 驱动,代码参考
<?php

namespace common\drivers\queue;
/**
 * RedisQueue.php class file.
 * @author karnc
 */

use common\drivers\interfaces\JobInterface;
use Yii;

class RedisQueue
{
    /**
     * redis服务
     * @var [type]
     */
    protected $redis;

    /**
     * 队列名称
     * @var [type]
     */
    protected $queue_name;


    /**
     * 初始化
     * RedisQueue constructor.
     * @param string $queue_name
     */
    function __construct($queue_name = '')
    {
        $this->redis = Yii::$app->redis;
        $this->queue_name = $queue_name;
    }


    /**
     * 入队
     * @param JobInterface $job
     * @return mixed
     */
    public function push(JobInterface $job)
    {
        return $this->redis->lpush($this->getRedisKey(), serialize($job));
    }

    /**
     * 出队列
     * @return
     */
    public function pull()
    {
        $job_seria = $this->redis->rpop($this->getRedisKey());
        $job       = unserialize($job_seria);

        if (false === $job ||
            ! $job instanceof JobInterface
        ) {
            return false;
        }

        return $job;
    }

    /**
     * 获取队列任务总数
     * @return [type] [description]
     */
    public function count()
    {
        return $this->redis->llen($this->getRedisKey());
    }

    /**
     * 返回redis存储的key名
     * @return [type] [description]
     */
    protected function getRedisKey()
    {
        $name = ! empty($this->queue_name) ? $this->queue_name : $this->getRedisDefaultKeyName();

        return 'queues:' . $name;
    }

    /**
     * 获取默认的key名
     * @return [type] [description]
     */
    protected function getRedisDefaultKeyName()
    {
        return 'default';
    }
}
  • 定义redis驱动实例化管理
<?php
/**
 * Manager.php class file.
 * @author karnc
 */

namespace common\drivers\queue;

use Yii;

class Manager
{
    /**
     * 单例模式
     * @var null
     */
    protected static $_static = null;

    /**
     * 工厂模式
     * @param string $queue_name 队列名称
     * @return RedisQueue
     */
    public static function factory($queue_name = null)
    {

        $queue = new RedisQueue($queue_name);
        return clone $queue;
    }

    /**
     * 单例模式
     * @param  [type] $redis      [description]
     * @param  [type] $queue_name [description]
     * @return [type]             [description]
     */
    public static function instance($queue_name = null)
    {
        if (self::$_static == null) {
            self::$_static = self::factory($queue_name);
        }

        return self::$_static;
    }
}
  • 一个简单的job参考
<?php
/**
 * SimpleJob class file.
 * @author karnc
 */

namespace common\jobs;

class SimpleJob implements \common\drivers\interfaces\JobInterface
{

    public $args = [];

    public function __construct(array $args = [])
    {
        $this->args = $args;
    }

    /**
     * 处理队列
     * @return mixed|void
     */
    public function handle(): void
    {
        print_r($this->args);
        echo "Hello World\n";
    }
}
  • job助手类封装参考
<?php
/**
 * JobHelper.php class file.
 * @author karnc
 */

use common\drivers\queue\Manager;
use common\jobs\SimpleJob;
use Yii;

class JobHelper
{
    
    /**
     * 投递到队列
     * @param array $args
     */
    public static function toSimple(array $args)
    {
        $spider = new SimpleJob($args);
        Manager::factory(Yii::$app->params['queue']['simple'])->push($spider);
    }
    
}
  • 队列消费与守护进行相关代码

Daemon.php

<?php
/**
 * Daemon.php class file.
 * @author karnc
 */
namespace console\components\daemons\simple;

use Yii;

class Daemon
{
    /**
     * 启动守护进程
     * @param bool $daemon
     * @return bool
     */
    public static function start(bool $daemon = false): bool
    {
        $processIdPath = Yii::getAlias(Yii::$app->params['daemon']['simple']['masterPidFilePath']);

        if (is_file($processIdPath))
        {
            /**
             * 进程id存在,则先杀尝试杀掉log中记录的进程id
             */
            $processArr = file($processIdPath);
            foreach($processArr as $processId)
            {
                $processId = trim($processId);
                ($processId > 0) && @posix_kill($processId, SIGTERM);
            }
        }
        $masterProcessor = new Master($daemon);
        $masterProcessor->run();
        return TRUE;
    }

    /**
     * 关闭守护进程
     */
    public static function stop(): void
    {
        $processIdPath = Yii::getAlias(Yii::$app->params['daemon']['simple']['masterPidFilePath']);

        if (is_file($processIdPath))
        {
            $processArr = file($processIdPath);

            foreach($processArr as $processId)
            {
                $processId = trim($processId);
                ($processId > 0) && @posix_kill($processId, SIGTERM);
            }

            @unlink($processIdPath);
        }
    }
}

Master.php

<?php
/**
 * Master.php class file.
 * @author karnc
 */

namespace console\components\daemons\simple;
use Yii;
use swoole\Process;

class Master
{
    /**
     * 子进程列表
     * @var array
     */
    protected $processList = [];

    /**
     * 主循环定时器
     * @var integer
     */
    protected $timerId;

    protected $daemon = false;


    protected $processIdPath;

    /**
     * 构造方法
     * @param bool $daemon
     */
    public function __construct(bool $daemon =false)
    {

        $this->daemon = $daemon;
        $this->processIdPath = Yii::getAlias(Yii::$app->params['daemon']['simple']['masterPidFilePath']);
        if($this->daemon == true) Process::daemon();
        swoole_set_process_name(Yii::$app->params['daemon']['simple']['masterName']);
        file_put_contents($this->processIdPath, posix_getpid().PHP_EOL);
    }

    /**
     * 执行
     */
    public function run(): void
    {

        $this->createWorker(Yii::$app->params['daemon']['simple']['workerNumber']);

//        Process::signal(SIGCHLD, [$this, 'handleForSIGCHLD']);
//        Process::signal(SIGTERM, [$this, 'handleForSIGTERM']);

        $this->timerId = swoole_timer_tick(6000, function($timerId, $params = NULL) {

        });

    }

    /**
     * 生成WORKER进程
     * @param int $number
     * @param bool $daemon
     */
    protected function createWorker(int $number): void
    {
        while ($number > 0)
        {
            $number--;

            $process = new Process(
                function(Process $process) use ($number) {
                    $worker = new Worker();
                    $worker->run();
                },
                false,
                0 //子进程内没有进程间通信需求
            );
            if($this->daemon == true) Process::daemon();

            $processId = $process->start();
            file_put_contents($this->processIdPath, $processId.PHP_EOL,FILE_APPEND);
            $this->processList[$processId] = $processId;
        }
    }

    /**
     * 子进程退出时信号响应方法
     * @param int $singalNumber
     */
    public function handleForSIGCHLD(int $singalNumber): void
    {
        while ($result = Process::wait(FALSE))
        {
            unset($this->processList[$result['pid']]);

            $this->createWorker(1);
        }
    }

    /**
     * 主进程退出时信号响应方法
     * @param int $singalNumber
     * @throws \yii\base\ExitException
     */
    public function handleForSIGTERM(int $singalNumber): void
    {
        $this->killChildren();

        swoole_timer_clear($this->timerId);

        while ($result = Process::wait(TRUE))
        {
            // do nothing
        }

        Yii::$app->end();
    }

    /**
     * 杀死子进程
     */
    protected function killChildren(): void
    {
        foreach ($this->processList as $processId)
        {
            Process::kill($processId, SIGTERM);
        }
    }
}
Worker.php
<?php
/**
 * Worker.php class file.
 * @author karnc
 */

namespace console\components\daemons\simple;

use common\drivers\queue\Manager;
use yii\helpers\FileHelper;
use Yii;

pcntl_async_signals(TRUE);

class Worker
{

    /**
     * 构造方法
     */
    public function __construct()
    {
        /**
         * 设置work进程名称
         */
        swoole_set_process_name(Yii::$app->params['daemon']['simple']['workerName']);
    }


    /**
     * work进程执行
     * @throws \yii\base\ExitException
     * @throws \yii\base\InvalidConfigException
     */
    public function run(): void
    {
        while (true)
        {
            $job = $this->process();
            //如没有任务,则休息2秒
            if(!$job)
            {
                sleep(2);
            }
        }
    }


    /**
     * 处理
     * @param array $args
     * @return bool|mixed
     */
    private function process(array $args=[])
    {
        /**
         * 设置阻塞信号
         */
//        pcntl_sigprocmask(SIG_BLOCK, [SIGINT, SIGQUIT, SIGTERM]);

        /**
         * 从redis队列中取任务
         */
        Yii::$app->redis->setex('super_simple_work_time', 86400, time());
        $this->writeSpiderLog('进程成功得到执行');
        $job = Manager::instance(Yii::$app->params['queue']['simple'])->pull();
        $this->writeSpiderLog('成功消费到redis数据');
        if($job) {
            try {
                call_user_func_array([$job, 'handle'], []);
            } catch (\Exception $e) {
                echo $e->getMessage();
            }
        }

        /**
         * 多个进程同时执行消费队列
         */
//        pcntl_sigprocmask(SIG_UNBLOCK, [SIGINT, SIGQUIT, SIGTERM]);

        return $job;
    }

    /**
     * 文件日志记录
     * @param $msg
     * @throws ServicesException
     */
    public function writeSpiderLog($msg)
    {

        $path = Yii::getAlias('@runtime/api/simple/');

        //目录如果不存就先创建目录
        if(!is_dir($path)) {
            FileHelper::createDirectory($path);
        }

        $cp = fopen($path.'log-'.date('Ymd H:i').'.log', "a+");
        $txt = "【".date('Y-m-d H:i:s',time())."】---".$msg."\r\n";
        fwrite($cp, $txt);
        fclose($cp);

    }
}
  • Yii2控制台消息队列消费相关代码 CloudController.php
<?php
/**
 * CloudController.php class file.
 * @author karnc
 */

namespace console\controllers;

use console\components\daemons\simple\Daemon;
use console\components\daemons\simple\Worker;
use yii\console\Controller;
use yii\console\ExitCode;

/**
 * 消费队列
 * Class CloudController
 * @package console\controllers
 *
 */
class CloudController  extends Controller
{
    /**
     * 是否为守护进程模式运行
     * @var bool
     */
    public $daemon = false;

    /**
     * 启动服务
     * @return int
     */
    public function actionStart()
    {
        $worker = (new Worker())->run();
        return ExitCode::UNSPECIFIED_ERROR;
    }


    public function options($actionID)
    {
        return array_merge(parent::options($actionID), ['daemon']);
    }


    /**
     * 关闭服务
     * @return int
     */
    public function actionStop()
    {
        Daemon::stop();
        return ExitCode::OK;
    }
}

公共配置参数参考

<?php
return [
    /**
     * 守护进程
     */
    'daemon' => [
        //simple进程配置
        'simple' => [
            'masterPidFilePath' => '@runtime/pid/simple.process.pid',
            'masterName' => 'simple', //进程名称
            'workerNumber' => 10, //进程数量
            'workerName' => 'simple_work' //work进程名称
        ],
    ],
    'queue' => [
        'simple' => 'jobs:simple', //队列名称
    ],
];

以上就实现了在Yii框架中实现消息队列的投递与消费相关功能。

最后修改:2021 年 07 月 09 日 02 : 16 PM
如果觉得我的文章对你有用,请随意赞赏