# 任务调度

# 前言

herosphp/crontab (opens new window) 类似linux的crontab,不同的是herosphp/crontab支持秒级任务调度。

时间说明:

0   1   2   3   4   5
|   |   |   |   |   |
|   |   |   |   |   +------ day of week (0 - 6) (Sunday=0)
|   |   |   |   +------ month (1 - 12)
|   |   |   +-------- day of month (1 - 31)
|   |   +---------- hour (0 - 23)
|   +------------ min (0 - 59)
+-------------- sec (0-59)[可省略,如果没有0位,则最小时间粒度是分钟]

# 安装

composer install herosphp/crontab

# 使用

# 1. 新建进程文件 process/CrontabWorker.php

<?php
declare(strict_types=1);

namespace process;

use Exception;
use Workerman\Crontab\Crontab;

class CrontabWorker
{
    /**
     * @param Worker $worker
     * @return void
     */
    public function onWorkerStart(Worker $worker): void
    {
        //每秒钟执行一次
        new Crontab('* * * * * *', function(){
            echo date('Y-m-d H:i:s')."\n";
        });
    }

}

# 2. 注册调度器

<?php
declare(strict_types=1);
use process\CrontabWorker;

return [
    //仅能1个进程
    'crontab' => [
        'enable' => true,
        'handler' => CrontabWorker::class
    ],
];

# 3. 重新启动

    php process.php restart -d

注意:定时任务不会马上执行,所有定时任务进入下一分钟才会开始计时执行。

# 最佳实践

通过异步任务配合Crontab实现任务调度系统。

# 1. 配置

   //仅能1个进程
    'crontab' => [
        'enable' => true,
        'handler' => CrontabWorker::class
    ],

    //大量任务的时候,通过投递到异步任务完成
    'async_worker' => [
        'enable' => true,
        'listen' => 'tcp://127.0.0.1:8182',
        'handler' => AsyncTaskWorker::class,
        'count' => 1
    ],

# 2.CrontabWorker和AsyncTaskWorker

CrontabWorker.php

<?php

declare(strict_types=1);
/**
 * This file is part of Heros-Worker.
 *
 * @contact  chenzf@pvc123.com
 */

namespace process;

use Exception;
use herosCron\Crontab;
use herosphp\utils\Lock;
use Workerman\Connection\AsyncTcpConnection;
use Workerman\Worker;

/**
 * 采用异步通知消息来完成定时任务
 * 主要解决是如果定时任务定义间隙过短、任务执行过久,导致部分任务跳过。
 * 参考[http://doc.workerman.net/faq/async-task.html].
 */
class CrontabWorker
{
    // 定时任务列表 memo:定时任务的备注,该属性为可选属性,没有任何逻辑上的意义,仅供开发人员查阅帮助对该定时任务的理解。
    protected static array $cronList = [
       [
           'rule' => '* * * * * *',  //支持秒
           'task' => [AsyncTask::class, 'run'],  //处理类和执行方法
           'memo' => 'say Hello'  //备注可选
       ]
    ];

    /**
     * @param Worker $worker
     * @return void
     */
    public function onWorkerStart(Worker $worker): void
    {
        foreach (static::$cronList ?? [] as $cron) {
            new Crontab($cron['rule'], static function () use ($cron) {
                static::delivery($cron['task'][0], $cron['task'][1], $cron['memo']);
            });
        }
    }

    /**
     * 投递到异步进程. 一个定时任务执行比较久,间隔设置时间比较短,加锁。一个任务一个时刻仅有一个运行.
     *
     * @throws Exception
     */
    private static function delivery(string $clazz, string $method, string $memo): void
    {
        $lock = Lock::get("{$clazz}{$method}");
        if ($lock->tryLock()) {
            $taskConnection = new AsyncTcpConnection('tcp://127.0.0.1:8182');
            $taskConnection->send(json_encode(['clazz' => $clazz, 'method' => $method]));
            $taskConnection->onMessage = function (AsyncTcpConnection $asyncTcpConnection, $taskResult) use ($lock) {
                $asyncTcpConnection->close();
                $lock->unlock();
            };
            $taskConnection->connect();
        }
    }
}

AsyncTaskWorker.php

<?php
declare(strict_types=1);
namespace process;

use Workerman\Connection\TcpConnection;

class AsyncTaskWorker
{
    public function onMessage(TcpConnection $connection, string $data): void
    {
        $class = json_decode($data, true);
        if (isset($class['clazz'], $class['method']) && class_exists($class['clazz']) && method_exists($class['clazz'], $class['method'])) {
            call_user_func([new $class['clazz'](), $class['method']]);
        }
        $connection->send('ok');
    }
}

上次更新: 10/27/2022, 11:18:25 AM