# 消息队列

# 简介

基于Redis的消息队列🎉🎉🎉,支持消息延迟处理。依赖redis扩展。另外基于Redis-Stream实现的方案正在研发中,相信不久之后会跟大家见面。

# 安装

# 1. 安装扩展

composer install herosphp/redis-queue

# 2. 生成配置文件queue.config.php

composer vendor:publish "herosphp/redis-queue"

配置文件:

<?php
return [
    'default' => [
        'host' => 'redis://172.28.1.3:6379',
        'options' => [
            'auth' => null,    // 密码,可选参数
            'db' => 0,      // 数据库
            'max_attempts' => 10, // 消费失败后,重试次数
            'retry_seconds' => 5, // 重试间隔,单位秒
        ],
    ],
];

# 3. 启动消费者进程

config\process.config.php中,增加redis-queue配置,重启进程。

<?php

declare(strict_types=1);

use herosphp\plugin\queue\Consumer;

return [
    'redis-queue' => [
        'enable' => true,
        'handler' => Consumer::class,
        'count' => 4,
        'constructor' => [
            'consumer_dir' => APP_PATH.'queue',
        ],
    ],
];

# 使用

# 投递消息(同步)

   Client::send('demo', [1,2,3]);

注意

延迟队列消费时间可能会出现误差,例如消费速度小于生产速度导致队列积压,进而导致消费延迟,缓解办法是多开一些消费进程。

# 投递消息(异步)

    Client::send('demo', [1,2,3], 5); //// 投递延迟消息,消息会在5秒后处理

Client::send() 没有返回值,它属于异步推送,它不保证消息100%送达redis。

注意

Client::send()原理是在本地内存建立一个内存队列,异步将消息同步到redis(同步速度很快,每秒大概1万笔消息)。如果进程重启,恰好本地内存队列里数据没有同步完毕,会造成消息丢失。Client::send()异步投递适合投递不重要的消息。

# 在其他项目投递消息

如其他FPM类PHP项目

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting';
    $queue_delay = '{redis-queue}-delayed';
    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);

# 消费

消费者目录在app/queue/下。 新建 app/queue/MySmsSend.php (类名任意,符合psr4规范即可)。

<?php

namespace app\queue;

use herosphp\plugin\queue\ConsumerInterface;

class MySmsSend implements ConsumerInterface
{
    // 要消费的队列名
    public $queue = 'send-sms';

    // 连接名,对应 config/queue.php 里的连接`
    public $connection = 'default';

    // 消费
    public function consume($data)
    {
        // 无需反序列化
        var_export($data); // 输出 ['to' => '13888888888', 'content' => 'hello']
    }
}

注意

消费过程中没有抛出异常和Error视为消费成功,否则消费失败,进入重试队列。 redis-queue没有ack机制,你可以把它看作是自动ack(没有产生异常或Error)。如果消费过程中想标记当前消息消费不成功,可以手动抛出异常,让当前消息进入重试队列。这实际上和ack机制没有区别。

提示 虽然消费者是多进程的,但是一个消息只会有一个进程进行消费,不会出现多个进程同时消费一个消息的情况。消费过的消息会自动从队列删除,无需手动删除。

提示 消费进程可以同时消费多种不同的队列,新增队列不需要修改process.php中的配置,新增队列消费者时只需要在app/queue/下新增对应的Consumer类即可,并用类属性$queue指定要消费的队列名

# 消费失败重试

如果消费失败(发生了异常),则消息会放入延迟队列,等待下次重试。重试次数通过参数max_attempts控制,重试间隔由 retry_secondsmax_attempts共同控制。比如max_attempts为5,retry_seconds为10,第1次重试间隔为110秒,第2次重试时间间隔为 210秒,第3次重试时间间隔为3*10秒,以此类推直到重试5次。如果超过了max_attempts设置测重试次数,则消息放入key{redis-queue}-failed的失败队列。

# 为不同的队列设置不同的消费进程

默认情况下,所有的消费者共用相同的消费进程。但有时我们需要将一些队列的消费独立出来(政企、白酒),例如政企的业务放到一组进程中消费,白酒的业务放到另外一组进程消费。为此我们可以将消费者分为两个目录,例如 'app/queue/redis/zq' 和 'app/queue/redis/bj' (注意消费类的命名空间需要做相应的更改),则配置如下:

use herosphp\plugin\queue\Consumer\Consumer;
return [
    ...这里省略了其它配置...

    'redis_consumer_zq'  => [
        'handler'     => Consumer::class,
        'count'       => 4,
        'constructor' => [
            // 消费者类目录
            'consumer_dir' => BASE_PATH . 'app/queue/redis/zq'
        ]
    ],
    'redis_consumer_bj'  => [
        'handler'     => Consumer::class,
        'count'       => 2,
        'constructor' => [
            // 消费者类目录
            'consumer_dir' => BASE_PATH . 'app/queue/redis/bj'
        ]
    ]
];

# 多redis配置

config/quque.php文件

<?php
return [
    'default' => [
        'host' => 'redis://192.168.0.1:6379',
        'options' => [
            'auth' => null,       // 密码,字符串类型,可选参数
            'db' => 0,            // 数据库
            'max_attempts'  => 5, // 消费失败后,重试次数
            'retry_seconds' => 5, // 重试间隔,单位秒
        ]
    ],
    'other' => [
        'host' => 'redis://192.168.0.2:6379',
        'options' => [
            'auth' => null,       // 密码,字符串类型,可选参数
            'db' => 0,             // 数据库
            'max_attempts'  => 5, // 消费失败后,重试次数
            'retry_seconds' => 5, // 重试间隔,单位秒
        ]
    ],
];

注意配置里增加了一个other为key的redis配置

# 多redis投递消息

// 向 `default` 为key的队列投递消息
Client::connection('default')->send($queue, $data);
//  等同于
Client::send($queue, $data);

// 向 `other` 为key的队列投递消息
Client::connection('other')->send($queue, $data);

# 多redis消费

namespace app\queue;

use herosphp\plugin\queue\ConsumerInterface;

class MySmsSend implements ConsumerInterface
{
    // 要消费的队列名
    public $queue = 'send-mail';

    // === 这里设置为other,代表消费配置里other为key的队列 ===
    public $connection = 'other';

    // 消费
    public function consume($data)
    {
        // 无需反序列化
        var_export($data);
    }
}
上次更新: 10/27/2022, 11:18:25 AM