需要对N多个队列进行数据处理,每个队列一次处理一个数据耗时较长。要尽可能短的时间里处理一个队列数据,需要开M个进程消耗一个队列数据。N个队列需要对应 N * M 个进程处理。
但是还需要有一个限流的功能,要求单个队列每分钟最多处理100数据。
使用 Swoole 的多进程 + 进程池的方案进行进程调度处理。
队列1 <----> 进程A(建立进程池)
|-- 子进程A1
|-- 子进程A2
|-- 子进程 ...
队列2 <----> 进程B(建立进程池)
|-- 子进程B1
|-- 子进程B2
涉及到的知识点:
限流,使用模拟令牌桶算法
利用 Swoole 进程池自动管理工作进程,柔性重启
利用 Redis 的 bRPop 阻塞队列
监听系统信号,实现进程的优雅重启、退出
使用 Redis + 定时任务模拟实现一个令牌桶用于每分钟限流:
<?php
// [queueName => workConfig] 可作为整体配置,统一引入
$redisQueue = [
'queue1' => ['workerNum' => 2, 'rateMin' => 100,],
'queue2' => ['workerNum' => 3, 'rateMin' => 100,],
'retry_queue1' => ['workerNum' => 1, 'rateMin' => 1,],
];
$curMin = date('YmdHi');
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 模拟令牌桶
foreach ($redisQueue as $k => $v) {
$_key = 'rateMin_' . $k . '_' . $curMin;
$fillList = range(1, $v['rateMin']);
$redis->lpush($_key, ...$fillList);
$redis->expire($_key, 60);
}
利用Swoole进程池自动管理工作进程
创建运行主文件,如:multiSwoolePool.php
<?php
// [queueName => workConfig] 可作为整体配置,统一引入
// retry 前缀为重试队列
// workNum 每个队列对应的工作进程数量
// rateMin 每个队列的消耗速率 单位 分
$config = [
'queue1' => ['workerNum' => 2, 'rateMin' => 100,],
'retry_queue1' => ['workerNum' => 1, 'rateMin' => 1,],
];
$queueLen = count($config);
if (empty($config)) exit('empty queue');
use Swoole\Process;
// 用于保存子进程pid,方便后续进程重启和停止
$processPidFile = 'process_pid.data';
file_put_contents($processPidFile, '');
foreach ($config as $configQueueName => $configParams) {
// 启动多个子进程
$process = new Process(function () use ($configQueueName, $configParams, $processPidFile) {
echo date('Y-m-d H:i:s') . ' Child #' . getmypid() . " start {$configQueueName}" . PHP_EOL;
file_put_contents($processPidFile, getmypid() . PHP_EOL, FILE_APPEND);
// 子进程启动进程池
$workerNum = $configParams['workerNum'];
$pool = new Swoole\Process\Pool($workerNum);
// $pool->set(['enable_coroutine' => true]); // 是否开启协程
$pool->on("WorkerStart", function ($pool, $workerId) use ($configQueueName, $configParams) {
echo date('Y-m-d H:i:s') . " Worker #{$workerId}:{$configQueueName} is started\n";
try {
// 使用 include 便于业务重载
include "your_logic.php";
} catch (\Exception $e) {
// 记录错误日志
echo $e->getMessage() . PHP_EOL;
exit; // 让出错进程主动退出,进程池会自动拉起新进程(由业务侧保证数据)
}
});
$pool->on("WorkerStop", function ($pool, $workerId) {
echo date('Y-m-d H:i:s') . " Worker #{$workerId} is stopped\n";
});
$pool->start();
echo date('Y-m-d H:i:s') . ' Child #' . getmypid() . ' exit' . PHP_EOL;
});
$process->start();
}
// 监听多进程
for ($i = 0; $i < $queueLen; $i++) {
$status = Process::wait(true);
echo date('Y-m-d H:i:s') . " Recycled #{$status['pid']}, code={$status['code']}, signal={$status['signal']}" . PHP_EOL;
}
echo date('Y-m-d H:i:s') . ' Parent #' . getmypid() . ' exit' . PHP_EOL;
// Swoole\Process::daemon();
进程的优雅重启、退出,通过监听系统信号,文件 your_logic.php
<?php
/**
* 进程内传入参数
* $pool, $workerId
* $configQueueName, $configParams
*
*/
$redis = new Redis();
$redis->pconnect('127.0.0.1', 6379);
$logic = new MyBusiness();
// 安装退出进程信号,实现优雅退出
$running = true;
pcntl_signal(SIGTERM, function () use (&$running, $pool, $workerId, $redis) {
$running = false;
echo date('Y-m-d H:i:s') . " Worker #{$pool->getProcess($workerId)->pid}-{$workerId} TERM TERM TERM !!!\n";
// 关闭 redis、mysql 连接等
$redis->close();
exit;
});
while ($running) {
// 响应退出信号防止逻辑丢失
pcntl_signal_dispatch();
$result = $redis->bRPop($configQueueName, 10); // 10s 超时
if ($result == null) continue;
[$_queueName, $_params] = $result;
// step 0 队列调度,防止业务逻辑耗时进程异常导致数据丢失
if (!preg_match('/^retry/', $configQueueName)) $redis->lPush($configQueueName, $_params);
// step 1 检查限流/分钟(模拟令牌桶算法)
$rateMinKey = 'rateMin_' . $configQueueName . '_' . date('YmdHi');
$rateValue = $redis->rPop($rateMinKey);
if (empty($rateValue)) {
// 限流时等待下一次任务
$redis->rPush($configQueueName, $_params);
$redis->lPop($configQueueName);
// 限流时: redis 操作结束也可响应退出信号
pcntl_signal_dispatch();
$rand = mt_rand(2, 10);
echo date('Y-m-d H:i:s') . " Worker #{$workerId}:{$configQueueName}:RateLimit will sleep {$rand}s\n";
sleep($rand);
continue;
}
// step 2 处理业务
echo date('Y-m-d H:i:s') . " Worker #{$workerId}:{$configQueueName} Start deal logic, current TokenBucketValue:{$rateValue}\n";
try {
$logic->manage($configQueueName, $_params);
} catch (\Exception $e) {
// 业务异常时的处理
echo $e->getMessage() . PHP_EOL;
}
// step 3 清理变量
}
// 业务处理
class MyBusiness
{
public function manage($queueName, $queueParams)
{
$params = json_encode($queueParams);
// 模拟业务处理耗时
echo date('Y-m-d H:i:s') . " Deal Logic Start:{$queueName}-{$params}==========>>\n";
sleep(2);
// 模拟异常
// if ($params%9==0) throw new Exception("Exception #queueName:{$queueName}, Value is {$params}");
// 业务处理失败,丢弃或推送到重试队列
$usedMemory = memory_get_usage();
$usedMemory = round($usedMemory / 1024 / 1024, 3);
echo date('Y-m-d H:i:s') . " Deal Logic Success #queueName:{$queueName}-params:{$params}-usedMemory:{$usedMemory}Mb==========>>\n";
}
}
进程启动时记录下自己的pid,使用系统命令发送信号
# reloadMultiSwoolePool.sh
# -10 SIGUSR1 柔性重启
cat process_pid.data| xargs kill -SIGUSR1
# stopMultiSwoolePool.sh
# -15 SIGTERM 柔性停止
cat process_pid.data| xargs kill -SIGTERM