- 定义job interface,代码参考
<?php
/**
* JobInterface.php class file.
* @author karnc
* @date 2020/12/31
*/
namespace common\drivers\interfaces;
interface JobInterface
{
/**
* 任务处理方法
* @return mixed
*/
public function handle();
}
- 定义redis queue 驱动,代码参考
<?php
namespace common\drivers\queue;
/**
* RedisQueue.php class file.
* @author karnc
*/
use common\drivers\interfaces\JobInterface;
use Yii;
class RedisQueue
{
/**
* redis服务
* @var [type]
*/
protected $redis;
/**
* 队列名称
* @var [type]
*/
protected $queue_name;
/**
* 初始化
* RedisQueue constructor.
* @param string $queue_name
*/
function __construct($queue_name = '')
{
$this->redis = Yii::$app->redis;
$this->queue_name = $queue_name;
}
/**
* 入队
* @param JobInterface $job
* @return mixed
*/
public function push(JobInterface $job)
{
return $this->redis->lpush($this->getRedisKey(), serialize($job));
}
/**
* 出队列
* @return
*/
public function pull()
{
$job_seria = $this->redis->rpop($this->getRedisKey());
$job = unserialize($job_seria);
if (false === $job ||
! $job instanceof JobInterface
) {
return false;
}
return $job;
}
/**
* 获取队列任务总数
* @return [type] [description]
*/
public function count()
{
return $this->redis->llen($this->getRedisKey());
}
/**
* 返回redis存储的key名
* @return [type] [description]
*/
protected function getRedisKey()
{
$name = ! empty($this->queue_name) ? $this->queue_name : $this->getRedisDefaultKeyName();
return 'queues:' . $name;
}
/**
* 获取默认的key名
* @return [type] [description]
*/
protected function getRedisDefaultKeyName()
{
return 'default';
}
}
- 定义redis驱动实例化管理
<?php
/**
* Manager.php class file.
* @author karnc
*/
namespace common\drivers\queue;
use Yii;
class Manager
{
/**
* 单例模式
* @var null
*/
protected static $_static = null;
/**
* 工厂模式
* @param string $queue_name 队列名称
* @return RedisQueue
*/
public static function factory($queue_name = null)
{
$queue = new RedisQueue($queue_name);
return clone $queue;
}
/**
* 单例模式
* @param [type] $redis [description]
* @param [type] $queue_name [description]
* @return [type] [description]
*/
public static function instance($queue_name = null)
{
if (self::$_static == null) {
self::$_static = self::factory($queue_name);
}
return self::$_static;
}
}
- 一个简单的job参考
<?php
/**
* SimpleJob class file.
* @author karnc
*/
namespace common\jobs;
class SimpleJob implements \common\drivers\interfaces\JobInterface
{
public $args = [];
public function __construct(array $args = [])
{
$this->args = $args;
}
/**
* 处理队列
* @return mixed|void
*/
public function handle(): void
{
print_r($this->args);
echo "Hello World\n";
}
}
- job助手类封装参考
<?php
/**
* JobHelper.php class file.
* @author karnc
*/
use common\drivers\queue\Manager;
use common\jobs\SimpleJob;
use Yii;
class JobHelper
{
/**
* 投递到队列
* @param array $args
*/
public static function toSimple(array $args)
{
$spider = new SimpleJob($args);
Manager::factory(Yii::$app->params['queue']['simple'])->push($spider);
}
}
- 队列消费与守护进行相关代码
Daemon.php
<?php
/**
* Daemon.php class file.
* @author karnc
*/
namespace console\components\daemons\simple;
use Yii;
class Daemon
{
/**
* 启动守护进程
* @param bool $daemon
* @return bool
*/
public static function start(bool $daemon = false): bool
{
$processIdPath = Yii::getAlias(Yii::$app->params['daemon']['simple']['masterPidFilePath']);
if (is_file($processIdPath))
{
/**
* 进程id存在,则先杀尝试杀掉log中记录的进程id
*/
$processArr = file($processIdPath);
foreach($processArr as $processId)
{
$processId = trim($processId);
($processId > 0) && @posix_kill($processId, SIGTERM);
}
}
$masterProcessor = new Master($daemon);
$masterProcessor->run();
return TRUE;
}
/**
* 关闭守护进程
*/
public static function stop(): void
{
$processIdPath = Yii::getAlias(Yii::$app->params['daemon']['simple']['masterPidFilePath']);
if (is_file($processIdPath))
{
$processArr = file($processIdPath);
foreach($processArr as $processId)
{
$processId = trim($processId);
($processId > 0) && @posix_kill($processId, SIGTERM);
}
@unlink($processIdPath);
}
}
}
Master.php
<?php
/**
* Master.php class file.
* @author karnc
*/
namespace console\components\daemons\simple;
use Yii;
use swoole\Process;
class Master
{
/**
* 子进程列表
* @var array
*/
protected $processList = [];
/**
* 主循环定时器
* @var integer
*/
protected $timerId;
protected $daemon = false;
protected $processIdPath;
/**
* 构造方法
* @param bool $daemon
*/
public function __construct(bool $daemon =false)
{
$this->daemon = $daemon;
$this->processIdPath = Yii::getAlias(Yii::$app->params['daemon']['simple']['masterPidFilePath']);
if($this->daemon == true) Process::daemon();
swoole_set_process_name(Yii::$app->params['daemon']['simple']['masterName']);
file_put_contents($this->processIdPath, posix_getpid().PHP_EOL);
}
/**
* 执行
*/
public function run(): void
{
$this->createWorker(Yii::$app->params['daemon']['simple']['workerNumber']);
// Process::signal(SIGCHLD, [$this, 'handleForSIGCHLD']);
// Process::signal(SIGTERM, [$this, 'handleForSIGTERM']);
$this->timerId = swoole_timer_tick(6000, function($timerId, $params = NULL) {
});
}
/**
* 生成WORKER进程
* @param int $number
* @param bool $daemon
*/
protected function createWorker(int $number): void
{
while ($number > 0)
{
$number--;
$process = new Process(
function(Process $process) use ($number) {
$worker = new Worker();
$worker->run();
},
false,
0 //子进程内没有进程间通信需求
);
if($this->daemon == true) Process::daemon();
$processId = $process->start();
file_put_contents($this->processIdPath, $processId.PHP_EOL,FILE_APPEND);
$this->processList[$processId] = $processId;
}
}
/**
* 子进程退出时信号响应方法
* @param int $singalNumber
*/
public function handleForSIGCHLD(int $singalNumber): void
{
while ($result = Process::wait(FALSE))
{
unset($this->processList[$result['pid']]);
$this->createWorker(1);
}
}
/**
* 主进程退出时信号响应方法
* @param int $singalNumber
* @throws \yii\base\ExitException
*/
public function handleForSIGTERM(int $singalNumber): void
{
$this->killChildren();
swoole_timer_clear($this->timerId);
while ($result = Process::wait(TRUE))
{
// do nothing
}
Yii::$app->end();
}
/**
* 杀死子进程
*/
protected function killChildren(): void
{
foreach ($this->processList as $processId)
{
Process::kill($processId, SIGTERM);
}
}
}
Worker.php
<?php
/**
* Worker.php class file.
* @author karnc
*/
namespace console\components\daemons\simple;
use common\drivers\queue\Manager;
use yii\helpers\FileHelper;
use Yii;
pcntl_async_signals(TRUE);
class Worker
{
/**
* 构造方法
*/
public function __construct()
{
/**
* 设置work进程名称
*/
swoole_set_process_name(Yii::$app->params['daemon']['simple']['workerName']);
}
/**
* work进程执行
* @throws \yii\base\ExitException
* @throws \yii\base\InvalidConfigException
*/
public function run(): void
{
while (true)
{
$job = $this->process();
//如没有任务,则休息2秒
if(!$job)
{
sleep(2);
}
}
}
/**
* 处理
* @param array $args
* @return bool|mixed
*/
private function process(array $args=[])
{
/**
* 设置阻塞信号
*/
// pcntl_sigprocmask(SIG_BLOCK, [SIGINT, SIGQUIT, SIGTERM]);
/**
* 从redis队列中取任务
*/
Yii::$app->redis->setex('super_simple_work_time', 86400, time());
$this->writeSpiderLog('进程成功得到执行');
$job = Manager::instance(Yii::$app->params['queue']['simple'])->pull();
$this->writeSpiderLog('成功消费到redis数据');
if($job) {
try {
call_user_func_array([$job, 'handle'], []);
} catch (\Exception $e) {
echo $e->getMessage();
}
}
/**
* 多个进程同时执行消费队列
*/
// pcntl_sigprocmask(SIG_UNBLOCK, [SIGINT, SIGQUIT, SIGTERM]);
return $job;
}
/**
* 文件日志记录
* @param $msg
* @throws ServicesException
*/
public function writeSpiderLog($msg)
{
$path = Yii::getAlias('@runtime/api/simple/');
//目录如果不存就先创建目录
if(!is_dir($path)) {
FileHelper::createDirectory($path);
}
$cp = fopen($path.'log-'.date('Ymd H:i').'.log', "a+");
$txt = "【".date('Y-m-d H:i:s',time())."】---".$msg."\r\n";
fwrite($cp, $txt);
fclose($cp);
}
}
- Yii2控制台消息队列消费相关代码 CloudController.php
<?php
/**
* CloudController.php class file.
* @author karnc
*/
namespace console\controllers;
use console\components\daemons\simple\Daemon;
use console\components\daemons\simple\Worker;
use yii\console\Controller;
use yii\console\ExitCode;
/**
* 消费队列
* Class CloudController
* @package console\controllers
*
*/
class CloudController extends Controller
{
/**
* 是否为守护进程模式运行
* @var bool
*/
public $daemon = false;
/**
* 启动服务
* @return int
*/
public function actionStart()
{
$worker = (new Worker())->run();
return ExitCode::UNSPECIFIED_ERROR;
}
public function options($actionID)
{
return array_merge(parent::options($actionID), ['daemon']);
}
/**
* 关闭服务
* @return int
*/
public function actionStop()
{
Daemon::stop();
return ExitCode::OK;
}
}
公共配置参数参考
<?php
return [
/**
* 守护进程
*/
'daemon' => [
//simple进程配置
'simple' => [
'masterPidFilePath' => '@runtime/pid/simple.process.pid',
'masterName' => 'simple', //进程名称
'workerNumber' => 10, //进程数量
'workerName' => 'simple_work' //work进程名称
],
],
'queue' => [
'simple' => 'jobs:simple', //队列名称
],
];
以上就实现了在Yii框架中实现消息队列的投递与消费相关功能。