# 任务调度
# 前言
herosphp/crontab (opens new window) 类似linux的crontab,不同的是
herosphp/crontab
支持秒级任务调度。
时间说明:
0 1 2 3 4 5
| | | | | |
| | | | | +------ day of week (0 - 6) (Sunday=0)
| | | | +------ month (1 - 12)
| | | +-------- day of month (1 - 31)
| | +---------- hour (0 - 23)
| +------------ min (0 - 59)
+-------------- sec (0-59)[可省略,如果没有0位,则最小时间粒度是分钟]
# 安装
composer install herosphp/crontab
# 使用
# 1. 新建进程文件 process/CrontabWorker.php
<?php
declare(strict_types=1);
namespace process;
use Exception;
use Workerman\Crontab\Crontab;
class CrontabWorker
{
/**
* @param Worker $worker
* @return void
*/
public function onWorkerStart(Worker $worker): void
{
//每秒钟执行一次
new Crontab('* * * * * *', function(){
echo date('Y-m-d H:i:s')."\n";
});
}
}
# 2. 注册调度器
<?php
declare(strict_types=1);
use process\CrontabWorker;
return [
//仅能1个进程
'crontab' => [
'enable' => true,
'handler' => CrontabWorker::class
],
];
# 3. 重新启动
php process.php restart -d
注意:定时任务不会马上执行,所有定时任务进入下一分钟才会开始计时执行。
# 最佳实践
通过异步任务配合Crontab
实现任务调度系统。
# 1. 配置
//仅能1个进程
'crontab' => [
'enable' => true,
'handler' => CrontabWorker::class
],
//大量任务的时候,通过投递到异步任务完成
'async_worker' => [
'enable' => true,
'listen' => 'tcp://127.0.0.1:8182',
'handler' => AsyncTaskWorker::class,
'count' => 1
],
# 2.CrontabWorker和AsyncTaskWorker
CrontabWorker.php
<?php
declare(strict_types=1);
/**
* This file is part of Heros-Worker.
*
* @contact chenzf@pvc123.com
*/
namespace process;
use Exception;
use herosCron\Crontab;
use herosphp\utils\Lock;
use Workerman\Connection\AsyncTcpConnection;
use Workerman\Worker;
/**
* 采用异步通知消息来完成定时任务
* 主要解决是如果定时任务定义间隙过短、任务执行过久,导致部分任务跳过。
* 参考[http://doc.workerman.net/faq/async-task.html].
*/
class CrontabWorker
{
// 定时任务列表 memo:定时任务的备注,该属性为可选属性,没有任何逻辑上的意义,仅供开发人员查阅帮助对该定时任务的理解。
protected static array $cronList = [
[
'rule' => '* * * * * *', //支持秒
'task' => [AsyncTask::class, 'run'], //处理类和执行方法
'memo' => 'say Hello' //备注可选
]
];
/**
* @param Worker $worker
* @return void
*/
public function onWorkerStart(Worker $worker): void
{
foreach (static::$cronList ?? [] as $cron) {
new Crontab($cron['rule'], static function () use ($cron) {
static::delivery($cron['task'][0], $cron['task'][1], $cron['memo']);
});
}
}
/**
* 投递到异步进程. 一个定时任务执行比较久,间隔设置时间比较短,加锁。一个任务一个时刻仅有一个运行.
*
* @throws Exception
*/
private static function delivery(string $clazz, string $method, string $memo): void
{
$lock = Lock::get("{$clazz}{$method}");
if ($lock->tryLock()) {
$taskConnection = new AsyncTcpConnection('tcp://127.0.0.1:8182');
$taskConnection->send(json_encode(['clazz' => $clazz, 'method' => $method]));
$taskConnection->onMessage = function (AsyncTcpConnection $asyncTcpConnection, $taskResult) use ($lock) {
$asyncTcpConnection->close();
$lock->unlock();
};
$taskConnection->connect();
}
}
}
AsyncTaskWorker.php
<?php
declare(strict_types=1);
namespace process;
use Workerman\Connection\TcpConnection;
class AsyncTaskWorker
{
public function onMessage(TcpConnection $connection, string $data): void
{
$class = json_decode($data, true);
if (isset($class['clazz'], $class['method']) && class_exists($class['clazz']) && method_exists($class['clazz'], $class['method'])) {
call_user_func([new $class['clazz'](), $class['method']]);
}
$connection->send('ok');
}
}