队列
简介
在构建网页应用程序时,您可能会遇到一些任务,例如解析和存储上传的 CSV 文件,这些任务在传统的网络请求中执行需要太长时间。幸运的是,Laravel 允许您轻松创建排队作业,这些作业可以在后台处理。通过将耗时的任务移至队列中,您的应用程序可以以快速响应网页请求,并为用户提供更好的体验。
Laravel 队列提供了一个统一的队列 API,可以连接不同的队列后端,例如 Amazon SQS,Redis,甚至是关系型数据库。
Laravel 的队列配置选项存储在应用程序的 config/queue.php
配置文件中。在这个文件中,您可以找到每个队列驱动程序的连接配置,这些驱动程序包括与框架一起提供的数据库,Amazon SQS,Redis 和 Beanstalkd 驱动程序,以及一个立即执行作业的同步驱动程序(用于本地开发)。还包括一个空队列驱动程序,它可以丢弃排队的作业。
Laravel 现在提供了一个名为 Horizon 的漂亮的仪表板和配置系统,用于您 Redis 驱动的队列。详细的 Horizon 文档可在此处查看。
连接与队列
在开始使用 Laravel 队列之前,重要的是要了解“连接”和“队列”之间的区别。在 config/queue.php
配置文件中,有一个 connections
配置数组。该选项定义了与后端队列服务(如 Amazon SQS,Beanstalk 或 Redis)的连接。但是,任何给定的队列连接可能具有多个“队列”,可以将其视为不同的堆栈或排队作业的堆叠。
请注意,在队列配置文件中的每个连接配置示例中都包含一个 queue
属性。这是作业默认要派发到的队列。换句话说,如果您派发一个没有明确指定要派发到哪个队列的作业,该作业将被放置在队列配置的队列属性中定义的队列上:
use App\Jobs\ProcessPodcast;
// 这个作业将发送到默认连接的默认队列...
ProcessPodcast::dispatch();
// 这个作业将发送到默认连接的"emails"队列...
ProcessPodcast::dispatch()->onQueue('emails');
某些应用程序可能不需要将作业推送到多个队列,而是更喜欢只使用一个简单的队列。但是,将作业推送到多个队列对于希望按优先级或段分割处理作业的应用程序特别有用,因为 Laravel 队列工作进程允许您指定应按优先级处理的队列。例如,如果您将作业推送到高优先级队列,可以运行一个赋予它们更高处理优先级的工作进程:
php artisan queue:work --queue=high,default
驱动程序与先决条件
数据库
为了使用数据库队列驱动程序,您将需要一个数据库表来保存作业。要生成创建此表的迁移,请运行queue:table
Artisan命令。创建完迁移后,您可以使用 migrate
命令迁移数据库:
php artisan queue:table
php artisan migrate
最后,请不要忘记通过更新应用程序的 .env
文件中的 QUEUE_CONNECTION
变量来指示应用程序使用数据库驱动程序:
QUEUE_CONNECTION=database
Redis
为了使用 Redis 队列驱动程序,您应该在 config/database.php
配置文件中配置一个 Redis 数据库连接。
Redis Cluster
如果您的 Redis 队列连接使用 Redis Cluster,则您的队列名称必须包含键哈希标记。这是为了确保给定队列的所有 Redis 键都放置在同一个哈希槽中:
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => '{default}',
'retry_after' => 90,
],
阻塞
在使用 Redis 队列时,您可以使用 block_for
配置选项来指定驱动程序在迭代工作进程循环并重新轮询 Redis 数据库之前应等待作业变得可用的时间。
根据队列负载调整此值可能比持续轮询 Redis 数据库获取新作业更高效。例如,您可以将该值设置为 5
,表示驱动程序在等待作业变得可用时应阻塞五秒钟:
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => 'default',
'retry_after' => 90,
'block_for' => 5,
],
将
block_for
设置为0
会导致队列工作进程无限期地阻塞,直到作业可用。这也将阻止处理下一个作业之前处理诸如SIGTERM
之类的信号。
其他驱动程序先决条件
所列队驱动程序所需的以下依赖项可以通过 Composer 软件包管理器安装:
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~4.0
- Redis:
predis/predis ~1.0
或phpredis
PHP 扩展
创建作业
生成作业类
默认情况下,应用程序的所有可排队作业都存储在 app/Jobs
目录中。如果 app/Jobs
目录不存在,则在运行make:job
Artisan命令时将创建该目录:
php artisan make:job ProcessPodcast
生成的类将实现 Illuminate\Contracts\Queue\ShouldQueue
接口,指示 Laravel 作业应异步推入队列中运行。
作业样本可以使用桩发布进行自定义。
类结构
作业类非常简单,通常只包含一个在作业由队列处理时调用的 handle
方法。下面是一个示例作业类。在这个示例中,我们假设我们管理一个播客发布服务,并且需要在发布之前处理上传的播客文件:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* Create a new job instance.
*/
public function __construct(
public Podcast $podcast,
) {}
/**
* Execute the job.
*/
public function handle(AudioProcessor $processor): void
{
// Process uploaded podcast...
}
}
请注意,在这个示例中,我们可以直接将一个 Eloquent 模型传递给排队的作业的构造函数。由于作业使用了SerializesModels
特性,当作业正在处理时,Eloquent 模型及其加载的关系将被优雅地序列化和反序列化。
如果您的排队作业在构造函数中接受一个Eloquent模型,那么只有该模型的标识符将被序列化到队列上。当实际处理作业时,队列系统将自动从数据库中重新检索完整的模型实例及其加载的关系。此方法允许将较小的作业负载发送到队列驱动程序。
处理方法的依赖项注入
当作业由队列处理时,将调用处理方法。请注意,我们可以在处理方法中使用类型提示来注入处理方法所需的依赖项。Laravel 服务容器会自动注入这些依赖项。
如果您想完全控制容器如何将依赖项注入处理方法,可以使用容器的 bindMethod
方法。bindMethod
方法接受一个回调函数,该回调函数接收作业和容器。在回调函数中,您可以以任何您希望的方式调用处理方法。通常,应该在 App\Providers\AppServiceProvider
服务提供者的 boot
方法中调用此方法:
use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;
$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
return $job->handle($app->make(AudioProcessor::class));
});
在传递给队列的作业之前,将包含二进制数据(如原始图像内容)传递给
base64_encode
函数。否则,作业在被放置在队列上时可能无法正确序列化为 JSON。
排队的关系
由于在作业排队时加载的所有 Eloquent 模型关系也会被序列化,所以序列化的作业字符串有时会变得非常大。此外,当反序列化作业并重新从数据库中检索模型关系时,它们将完全被检索。在作业排队过程中序列化之前应用的任何先前关系约束将不会在作业反序列化时应用。因此,如果要使用给定关系的子集,应在排队作业中重新约束该关系。
或者,为了防止关系被序列化,您可以在模型设置属性值时调用该模型的 withoutRelations
方法。此方法将返回一个没有加载关系的模型实例:
/**
* Create a new job instance.
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast->withoutRelations();
}
如果您使用了 PHP 构造函数属性提升,且希望指示某个 Eloquent 模型不应序列化其关系,则可以使用WithoutRelations
属性:
use Illuminate\Queue\Attributes\WithoutRelations;
/**
* Create a new job instance.
*/
public function __construct(
#[WithoutRelations]
public Podcast $podcast
) {
}
如果作业接收的是 Eloquent 模型的集合或数组,而不是单个模型,则在作业反序列化和执行时不会恢复该集合中的模型关系。这是为了防止处理大量模型的作业使用过多资源。
唯一作业
唯一作业需要支持锁的缓存驱动程序。目前,
memcached
,redis
,dynamodb
,database
,file
和array
缓存驱动程序都支持原子锁。此外,唯一作业约束不适用于批处理中的作业。
有时,您可能只希望确保特定作业的一个实例在任何时间点都存在于队列中。您可以通过在作业类上实现ShouldBeUnique
接口来实现此目的。这个接口不需要您在类上定义任何额外的方法:
<?php
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
...
}
在上面的示例中,UpdateSearchIndex
作业是唯一的。因此,如果作业的另一个实例已经在队列中并且尚未完成处理,作业将不会被派发。
在某些情况下,您可能想要定义一个使作业唯一的特定“键”,或者可能想要指定超过该时间的超时时间。为此,可以在作业类上定义 uniqueId
和 uniqueFor
属性或方法:
<?php
use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
/**
* The product instance.
*
* @var \App\Product
*/
public $product;
/**
* The number of seconds after which the job's unique lock will be released.
*
* @var int
*/
public $uniqueFor = 3600;
/**
* Get the unique ID for the job.
*/
public function uniqueId(): string
{
return $this->product->id;
}
}
在上面的示例中,UpdateSearchIndex
作业是根据产品 ID 唯一的。因此,如果以相同的产品 ID 重新派发作业,直到现有作业处理完成之前,新派发的作业将被忽略。此外,如果现有的作业在一小时内没有处理完成,唯一锁将被释放,可以将具有相同唯一键的另一个作业派发到队列中。
如果应用程序从多个 Web 服务器或容器派发作业,您应确保所有服务器都与同一个中央缓存服务器通信,以便Laravel 可以准确判断作业是否唯一。
保持作业唯一,直到处理开始
默认情况下,作业在处理完成或失败所有重试尝试之前会“解锁”。但是,可能存在作业解锁后立即开始处理的情况。为了实现这一点,作业类应实现 ShouldBeUniqueUntilProcessing
合同,而不是 ShouldBeUnique
合同:
<?php
use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
// ...
}
唯一作业锁
在幕后,当派发 ShouldBeUnique
作业时,Laravel 尝试使用 uniqueId
键获得锁。如果锁没有被获取,作业将不会派发。此锁在作业完成处理或失败所有重试尝试后被释放。默认情况下,Laravel 将使用默认缓存驱动程序来获取此锁。但是,如果您希望使用其他驱动程序来获取锁,可以调用 uniqueVia
方法,并返回应使用的缓存驱动程序:
use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
...
/**
* Get the cache driver for the unique job lock.
*/
public function uniqueVia(): Repository
{
return Cache::driver('redis');
}
}
如果您只需要限制作业的并发处理,请使用
WithoutOverlapping
作业中间件。
加密作业
Laravel 允许您通过加密来确保作业数据的隐私和完整性。要开始使用,请将 ShouldBeEncrypted
接口添加到作业类。一旦将此接口添加到类中,Laravel 将自动对作业进行加密,然后将其推送到队列中:
<?php
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;
class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
{
// ...
}
作业中间件
作业中间件允许您在队列作业的执行过程中包装自定义逻辑,减少作业本身的样板代码。例如,考虑下面的处理方法,它利用 Laravel 的 Redis 速率限制功能,每五秒钟只允许处理一个作业:
use Illuminate\Support\Facades\Redis;
/**
* Execute the job.
*/
public function handle(): void
{
Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
info('Lock obtained...');
// 处理任务...
}, function () {
// 无法获得锁...
return $this->release(5);
});
}
虽然这段代码是有效的,但处理方法的实现变得很嘈杂,因为它杂乱地包含了 Redis 速率限制逻辑。此外,这种速率限制逻辑必须在任何希望限制速率的其他作业中进行复制。
与其在处理方法中进行速率限制,不如定义一个作业中间件来处理速率限制。Laravel 没有为作业中间件定义默认位置,因此您可以将作业中间件放置在应用程序中的任何位置。在这个示例中,我们将作业中间件放置在app/Jobs/Middleware
目录中:
<?php
namespace App\Jobs\Middleware;
use Closure;
use Illuminate\Support\Facades\Redis;
class RateLimited
{
/**
* Process the queued job.
*
* @param \Closure(object): void $next
*/
public function handle(object $job, Closure $next): void
{
Redis::throttle('key')
->block(0)->allow(1)->every(5)
->then(function () use ($job, $next) {
// Lock obtained...
$next($job);
}, function () use ($job) {
// Could not obtain lock...
$job->release(5);
});
}
}
如您所见,与路由中间件一样,作业中间件在作业正在处理时接收作业和应该调用以继续处理作业的回调。
创建作业中间件后,可以通过从作业的 middleware
方法返回它们将它们附加到作业中。默认情况下,此方法在使用 make:job
Artisan命令生成的作业中不存在,因此您需要手动将其添加到作业类中:
use App\Jobs\Middleware\RateLimited;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited];
}
作业中间件也可以分配给可排队的事件侦听器、可发送的邮件和通知。
限速
虽然我们刚刚演示了如何编写自己的限速作业中间件,但 Laravel 实际上包含一个可用于限制速率的中间件,您可以使用它来限制作业的速率。与路由限流器一样,作业限流器也是使用 RateLimiter
门面的 for
方法来定义的。
例如,您可能希望允许用户每小时备份一次数据,而对高级客户则不进行此限制。为了实现这一点,您可以在 AppServiceProvider
的 boot
方法中定义一个 RateLimiter
:
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;
/**
* 启动任何应用程序服务。
*/
public function boot(): void
{
RateLimiter::for('backups', function (object $job) {
return $job->user->vipCustomer()
? Limit::none()
: Limit::perHour(1)->by($job->user->id);
});
}
在上面的示例中,我们定义了一个每小时限速;但是,您可以使用 perMinute
方法轻松定义基于分钟的限速。此外,您可以将任何值传递给 rate limit 的 by
方法,但是该值最常用于按客户分段限速:
return Limit::perMinute(50)->by($job->user->id);
完成限速定义后,您可以使用 Illuminate\Queue\Middleware\RateLimited
中间件将限速器附加到作业上。每次作业超过限速时,此中间件都会根据限速持续时间在适当的延迟后将作业释放回队列。
use Illuminate\Queue\Middleware\RateLimited;
/**
* 获取作业应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited('backups')];
}
将限速作业释放回队列将仍会增加作业的总尝试次数。您可能希望相应地调整作业类的 tries
和 maxExceptions
属性。或者,您可以使用 retryUntil
方法定义作业不再尝试的时间。
如果您不希望在速率限制时重新尝试作业,可以使用 dontRelease
方法:
/**
* 获取作业应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new RateLimited('backups'))->dontRelease()];
}
如果您正在使用 Redis,则可以使用
Illuminate\Queue\Middleware\RateLimitedWithRedis
中间件,该中间件针对 Redis 进行了优化,并且比基本的限速中间件效率更高。
防止作业重叠
Laravel 包含 Illuminate\Queue\Middleware\WithoutOverlapping
中间件,该中间件使您可以基于任意键来防止作业重叠。当队列作业修改只允许一次修改的资源时,这很有用。
例如,假设您有一个队列作业,用于更新用户的信用评分,您希望防止相同用户 ID 的信用评分更新作业重叠。为了实现此目的,您可以在作业的中间件方法中返回 WithoutOverlapping
中间件:
use Illuminate\Queue\Middleware\WithoutOverlapping;
/**
* 获取作业应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new WithoutOverlapping($this->user->id)];
}
相同类型的任何重叠作业都将被释放回队列。您还可以指定在尝试再次执行释放的作业之前必须经过的秒数:
/**
* 获取作业应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}
如果希望立即删除重叠的作业,以便它们不会重新尝试,可以使用 dontRelease
方法:
/**
* 获取作业应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}
WithoutOverlapping
中间件采用 Laravel 的原子锁功能。有时,作业可能会意外失败或超时,以至于锁定未被释放。因此,您可以使用 expireAfter
方法明确定义锁定的到期时间。例如,下面的示例将指示 Laravel 在作业开始处理后的三分钟内释放 WithoutOverlapping
锁定:
/**
* 获取作业应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}
WithoutOverlapping
中间件需要支持锁的缓存驱动程序。目前,memcached
、redis
、dynamodb
、database
、file
和array
缓存驱动程序支持原子锁。
在作业类之间共享锁键
WithoutOverlapping
中间件默认只能防止相同类别的重叠作业。因此,尽管两个不同的作业类可能使用相同的锁键,但它们不会避免重叠。但是,您可以通过使用 shared
方法指示 Laravel 在作业类之间应用相同的键来更改此行为:
use Illuminate\Queue\Middleware\WithoutOverlapping;
class ProviderIsDown
{
// ...
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}
class ProviderIsUp
{
// ...
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}
限制异常
Laravel 包含 Illuminate\Queue\Middleware\ThrottlesExceptions
中间件,使您可以对异常进行节流。一旦作业引发了给定数量的异常,后续执行作业的尝试将被推迟,直到经过指定的时间间隔。这种中间件对于与不稳定的第三方服务交互的作业特别有用。
例如,假设有一个与第三方 API 交互的队列作业开始引发异常。为了进行异常节流,您可以在作业的中间件方法中返回 ThrottlesExceptions
中间件。通常,此中间件应与实现基于时间的尝试的作业配对使用:
use DateTime;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* 获取作业应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new ThrottlesExceptions(10, 5)];
}
/**
* 确定作业应超时的时间。
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(5);
}
中间件的第一个构造函数参数是作业在被限制之前可以引发的异常次数,而第二个构造函数参数是作业在节流后经过的分钟数。在上面的代码示例中,如果作业在 5 分钟内引发 10 次异常,我们将在 5 分钟后再次尝试作业。
当作业引发异常但尚未达到异常阈值时,作业通常会立即重试。但是,当附加中间件到作业时,您可以通过调用 backoff
方法来指定等待的分钟数:
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* 获取作业应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 5))->backoff(5)];
}
在内部,此中间件使用 Laravel 的缓存系统实现速率限制,作业的类名被用作缓存键。通过在附加到作业的中间件中调用 by
方法,您可以覆盖此键,如果您的多个作业与同一第三方服务进行交互,并且您希望它们共享一个常见的限制桶,那么这可能很有用:
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* 获取作业应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10))->by('key')];
}
如果您正在使用 Redis,则可以使用
Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis
中间件,该中间件针对 Redis 进行了优化,并且比基本的异常节流中间件效率更高。
调度作业
编写完作业类后,您可以使用作业本身的 dispatch
方法将其调度。传递给 dispatch
方法的参数将传递给作业的构造函数:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储新的播客。
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatch($podcast);
return redirect('/podcasts');
}
}
如果要有条件地调度作业,可以使用 dispatchIf
和 dispatchUnless
方法:
ProcessPodcast::dispatchIf($accountActive, $podcast);
ProcessPodcast::dispatchUnless($accountSuspended, $podcast);
在新的 Laravel 应用程序中,默认的队列驱动程序是 sync
驱动程序。该驱动程序在当前请求的前台同步执行作业,这在本地开发期间非常方便。如果要开始对作业进行后台处理并真正开始排队作业,可以在应用程序的 config/queue.php
配置文件中指定不同的队列驱动程序。
延迟调度
如果您希望指定作业不会立即可供队列工作进程处理,可以在调度作业时使用 delay
方法。例如,让我们指定作业在派发后的 10 分钟后才可供处理:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储新的播客。
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(10));
return redirect('/podcasts');
}
}
队列服务 Amazon SQS 的最大延迟时间为 15 分钟。
在响应发送到浏览器后调度
或者,使用 dispatchAfterResponse
方法可以将作业的调度延迟到 HTTP 响应发送给用户的后面,如果您的 Web 服务器正在使用 FastCGI。这将允许用户在排队的作业仍在执行时开始使用应用程序。这通常只用于作业,例如发送电子邮件,这些作业大约需要一秒钟的时间。由于它们在当前 HTTP 请求中处理,以这种方式调度的作业不需要运行队列工作进程来进行处理:
use App\Jobs\SendNotification;
SendNotification::dispatchAfterResponse();
您还可以调度闭包,并将 afterResponse
方法链接到 dispatch
助手上,在将 HTTP 响应发送到浏览器后执行闭包:
use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;
dispatch(function () {
Mail::to('taylor@example.com')->send(new WelcomeMessage);
})->afterResponse();
同步调度
如果您希望立即(同步)调度作业,可以使用 dispatchSync
方法。使用此方法时,作业不会排队,而是立即在当前进程中执行:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储新的播客。
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatchSync($podcast);
return redirect('/podcasts');
}
}
作业和数据库事务
在事务内部调度作业是可以的,但是您应该格外小心,以确保作业实际上可以成功执行。当在事务内部调度作业时,作业可能会在父事务提交之前被工作进程处理。当发生这种情况时,在数据库事务(或事务)期间对模型或数据库记录的任何更新可能尚未在数据库中反映出来。此外,事务(或事务)中创建的任何模型或数据库记录可能不存在于数据库中。
幸运的是,Laravel 提供了几种解决此问题的方法。首先,您可以在队列连接的配置数组中设置 after_commit
连接选项:
'redis' => [
'driver' => 'redis',
// ...
'after_commit' => true,
],
当 after_commit
选项为 true
时,您可以在数据库事务内部调度作业,但是 Laravel 将在实际调度作业之前等待打开的父数据库事务提交。当然,如果当前没有打开的数据库事务,则会立即调度作业。
如果事务由于在事务内部发生的异常而回滚,则在事务中调度的作业将被丢弃。
将
after_commit
配置选项设置为true
还会导致在所有打开的数据库事务提交之后,所有排队的事件监听器、可发送邮件、通知和广播事件都将被分发。
指明内联提交调度行为
如果你没有将 after_commit
队列连接配置选项设置为 true
,你也可以指示一个具体的 job 应在所有开放的数据库事务提交后被调度。为了实现这一点,你可以将 afterCommit
方法链接到你的调度操作中:
use App\Jobs\ProcessPodcast;
ProcessPodcast::dispatch($podcast)->afterCommit();
同样,如果 after_commit
配置选项设置为 true
,你也可以指示一个特定的作业应立即调度,而无需等待任何未提交的数据库事务:
ProcessPodcast::dispatch($podcast)->beforeCommit();
作业链
作业链允许你指定在主作业成功执行后应顺序运行的队列作业列表。如果序列中的一个作业失败,序列中的其余作业将不会运行。要执行一个队列作业链,你可以使用 Bus
门面提供的 chain
方法。Laravel 的命令总线是队列作业调度建立在其上的较低级别的组件:
use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->dispatch();
除了链接作业类实例,你还可以链接闭包:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
function () {
Podcast::update(/* ... */);
},
])->dispatch();
在作业中使用
$this->delete()
方法删除作业不会阻止链接的作业被处理。只有当链中的一项作业失败时,链才会停止执行。
链连接和队列
如果你想指定链接的作业应使用的连接和队列,你可以使用 onConnection
和 onQueue
方法。这些方法指定应使用的队列连接和队列名称,除非队列作业被明确定义了不同的连接/队列:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();
链故障
当链接作业时,你可以使用 catch
方法指定如果链中的作业失败应调用的闭包。给定的回调将接收到导致作业失败的 Throwable
实例:
use Illuminate\Support\Facades\Bus;
use Throwable;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->catch(function (Throwable $e) {
// The chain of work has failed...
})->dispatch();
由于链回调被序列化并在稍后的时间由 Laravel 队列执行,所以你不应在链回调中使用
$this
变量。
定制队列和连接
向特定队列调度
通过将作业推入不同的队列,你可以“分类”你的队列作业,并甚至优先指定你为各种队列分配的作业数量。请注意,这并不将作业推送到你的队列配置文件定义的不同的队列“连接”,而只是向单个连接中的特定队列推送。要指定队列,使用 onQueue
方法调度作业:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Save the new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// create a podcast...
ProcessPodcast::dispatch($podcast)->onQueue('processing');
return redirect('/podcasts');
}
}
或者,你可以在作业的构造函数中通过调用 onQueue
方法来指定作业的队列:
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* Create a new job instance.
*/
public function __construct()
{
$this->onQueue('processing');
}
}
向特定连接调度
如果你的应用程序与多个队列连接进行交互,你可以使用 onConnection
方法指定将作业推送到哪个连接:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Save the new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// create a podcast...
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
return redirect('/podcasts');
}
}
你可以将 onConnection
和 onQueue
方法链接在一起,以指定作业的连接和队列:
ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');
或者,你可以在作业的构造函数中通过调用 onConnection
方法来指定作业的连接:
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* Create a new job instance.
*/
public function __construct()
{
$this->onConnection('sqs');
}
}
指定最大尝试/超时值
最大尝试次数
如果你的一个队列作业遇到错误,你可能不希望它无限制地继续重试。因此,Laravel 提供了多种方式来指定一个作业可能试图执行的次数或时间。
一种指定一个作业可能尝试的最大次数的方法是使用 Artisan 命令行上的 --tries
开关。这将适用于由 worker 处理的所有作业,除非被处理的作业指定了它可能尝试执行的次数:
php artisan queue:work --tries=3
如果一个作业超过了其最大尝试次数,那么它将被视为一个“失败的”作业。有关处理失败的作业的更多信息,可以查看处理失败的作业文档。如果向 queue:work
命令提供了 --tries=0
,则作业将无限制地被重试。
你可以采取更细致的方法,通过在作业类本身上定义一个作业可能尝试的最大次数。如果在作业上指定了最大尝试次数,那么它将优先于命令行提供的 --tries
的值:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 5;
}
基于时间的尝试
作为定义一个作业在失败之前可能尝试多少次的另一种方式,你可以定义一个作业不应再试图执行的时间。这允许一个作业在给定的时间段内尝试任何次数。要定义作业不应再试图执行的时间,在你的作业类中加入 retryUntil
方法。这个方法应返回一个 DateTime
实例:
use DateTime;
/**
* Determine the time at which the job should timeout.
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(10);
}
你也可以在你的队列事件监听器上定义
tries
属性或retryUntil
方法。
最大异常
有时你可能希望指定一个作业可以尝试很多次,但是如果重试被给定数量的未处理的异常触发,则应该失败(而不是直接通过 release
方法发布)。要实现这一点,你可以在你的作业类上定义一个 maxExceptions
属性:
<?php
namespace App\Jobs;
use Illuminate\Support\Facades\Redis;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 25;
/**
* The maximum number of unhandled exceptions to allow before failing.
*
* @var int
*/
public $maxExceptions = 3;
/**
* Execute the job.
*/
public function handle(): void
{
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// Lock obtained, process the podcast...
}, function () {
// Unable to obtain lock...
return $this->release(10);
});
}
}
在这个例子中,如果应用程序无法获取 Redis 锁,那么作业将被放开十秒,然后将继续被尝试到 25 次。然而,如果在作业中抛出了三个未处理的异常,那么作业将失败。
超时
要指定作业超时,必须安装
pcntl
PHP 扩展。
通常,你知道你的队列作业大概需要多长时间。因此,Laravel 允许你指定一个“超时”值。默认情况下,超时值为 60 秒。如果作业的处理时间长于超时值所指定的秒数,正在处理作业的 worker 将以错误退出。通常,worker 将由你的服务器上配置的进程管理器自动重启。
可以使用 Artisan 命令行上的 --timeout
开关指定作业可以运行的最大秒数:
php artisan queue:work --timeout=30
如果作业由于持续超时而超过其最大尝试次数,它将被标记为失败。
你也可以在作业类本身定义作业应该被允许运行的最大秒数。如果在作业上指定了超时,那么它将优先于任何在命令行上指定的超时:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of seconds the job can run before timing out.
*
* @var int
*/
public $timeout = 120;
}
有时候,IO 阻塞过程,如套接字或外出的 HTTP 连接,可能不会尊重你指定的超时时间。因此,在使用这些功能时,你应该总是尝试使用他们的 API 指定一个超时值。例如,当使用 Guzzle 时,你应该总是指定一个连接和请求超时值。
超时失败
如果你想指明一个作业应在超时时标记为失败,你可以在工作类上定义 $failOnTimeout
属性:
/**
* Indicate if the job should be marked as failed if it times out.
*
* @var bool
*/
public $failOnTimeout = true;
错误处理
如果在处理作业时遇到异常,作业将自动被重新放回队列,以便再次尝试。作业将继续被释放,直到它已经尝试了你的应用程序允许的最大次数。可以使用 queue:work
Artisan 命令的 --tries
开关定义最大尝试次数。或者,最大尝试次数也可以在作业类本身定义。可以在下面找到更多关于运行队列 worker 的信息。
在调度操作时手动释放作业
有时,您可能希望手动将作业释放回队列,以便它可以在以后的某个时间再次尝试。通过调用 `release 方法,您可以实现这一点:
/**
* 执行作业。
*/
public function handle(): void
{
// ...
$this->release();
}
默认情况下,release
方法会将作业立即释放回队列以进行处理。但是,您可以通过向 release
方法传递整数或日期实例来指示队列在给定的秒数之后再次将作业加入处理队列:
$this->release(10);
$this->release(now()->addSeconds(10));
手动将作业标记为失败
有时您可能需要手动将作业标记为“失败”。您可以调用 fail
方法来实现这一点:
/**
* 执行作业
*/
public function handle(): void
{
// ...
$this->fail();
}
如果要将作业标记为失败,是因为您捕获到了异常,可以将异常传递给 fail
方法。或者,为方便起见,您可以传递一个字符串错误消息,该消息将被转换为异常:
$this->fail($exception);
$this->fail('Something went wrong.');
有关处理失败作业的更多信息,请查阅有关处理作业失败的文档。
作业批处理
Laravel 的作业批处理功能允许您轻松执行一批作业,然后在批处理作业完成后执行某个操作。在开始之前,您应该创建一个数据库迁移,用于构建一个包含有关作业批次的元信息的表,例如它们的完成百分比。可以使用 queue:batches-table
Artisan 命令生成此迁移:
php artisan queue:batches-table
php artisan migrate
定义可批处理的作业
要定义一个可批处理的作业,可以像正常情况下创建一个可队列化的作业一样;但是,您应该将 Illuminate\Bus\Batchable
trait 添加到作业类。这个 trait 提供了访问当前作业所在的批次的 batch
方法:
<?php
namespace App\Jobs;
use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ImportCsv implements ShouldQueue
{
use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* 执行作业。
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
// 确定批次是否被取消...
return;
}
// 导入 CSV 文件的一部分...
}
}
调度批处理
要调度一批作业,您需要使用 Bus
门面的 batch
方法。当然,批处理主要是与完成回调结合使用时才有用。因此,您可以使用 then
、catch
和 finally
方法为批处理定义完成回调。这些回调在调用时将接收一个 Illuminate\Bus\Batch
实例。在此示例中,我们设想我们正在排队一批作业,每个作业都从一个 CSV 文件中处理给定数量的行:
use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;
$batch = Bus::batch([
new ImportCsv(1, 100),
new ImportCsv(101, 200),
new ImportCsv(201, 300),
new ImportCsv(301, 400),
new ImportCsv(401, 500),
])->then(function (Batch $batch) {
// 所有作业都成功完成...
})->catch(function (Batch $batch, Throwable $e) {
// 检测到第一个批处理作业失败...
})->finally(function (Batch $batch) {
// 批处理已完成执行...
})->dispatch();
return $batch->id;
通过 $batch->id
属性访问的批次 ID 可以在调度批次后使用 Laravel 命令总线查询有关批次的信息。
由于批处理回调会在稍后由 Laravel 队列执行进行序列化和执行,因此您不应在批处理回调中使用
$this
变量。
命名批处理
如果将批处理命名,某些工具(如 Laravel Horizon 和 Laravel Telescope)可以为批处理提供更用户友好的调试信息。为了为批处理分配一个任意名称,可以在定义批处理时调用名称方法:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// 所有作业都成功完成...
})->name('Import CSV')->dispatch();
批处理连接和队列
如果要指定用于批处理作业的连接和队列,请使用 onConnection
和 onQueue
方法。批处理作业必须在相同的连接和队列内执行:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// 所有作业都成功完成...
})->onConnection('redis')->onQueue('podcasts')->dispatch();
在批处理中使用链式作业
可以通过将链式作业放在一个数组中来在批处理中定义一组链式作业。例如,我们可以并行执行两个作业链,并在两个作业链都完成处理时执行回调:
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
Bus::batch([
[
new ReleasePodcast(1),
new SendPodcastReleaseNotification(1),
],
[
new ReleasePodcast(2),
new SendPodcastReleaseNotification(2),
],
])->then(function (Batch $batch) {
// ...
})->dispatch();
将作业添加到批处理
有时从批处理作业内部添加其他作业可能会很有用。当您需要批量处理数量较多的作业时,这种模式非常有用,因为在 Web 请求期间将这些作业连接起来可能需要太长时间。因此,您可能希望调度一批初始的“加载器”作业,以便更多地使用这批“加载器”作业填充批处理:
$batch = Bus::batch([
new LoadImportBatch,
new LoadImportBatch,
new LoadImportBatch,
])->then(function (Batch $batch) {
// 所有作业都成功完成...
})->name('Import Contacts')->dispatch();
在此示例中,我们将使用 LoadImportBatch
作业将批处理中添加更多作业。要实现这一点,可以使用可以通过作业的 batch
方法访问的批处理实例上的 add
方法:
use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;
/**
* Execute the job.
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
return;
}
$this->batch()->add(Collection::times(1000, function () {
return new ImportContacts;
}));
}
只能从属于同一个批次的作业内部向批次添加作业。
检查批处理
Illuminate\Bus\Batch
实例提供了多种属性和方法来帮助您与某个批处理作业进行交互和检查:
// 批处理的 UUID...
$batch->id;
// 批处理的名称(如果适用)...
$batch->name;
// 分配给批处理的作业数量...
$batch->totalJobs;
// 尚未由队列处理的作业数量...
$batch->pendingJobs;
// 失败的作业数量...
$batch->failedJobs;
// 到目前为止已处理的作业数...
$batch->processedJobs();
// 批次的完成百分比(0-100)...
$batch->progress();
// 指示批处理是否已完成执行...
$batch->finished();
// 取消批处理的执行...
$batch->cancel();
// 指示批处理是否已取消...
$batch->cancelled();
从路由返回批处理
所有的 Illuminate\Bus\Batch
实例都可以 JSON 序列化,这意味着您可以直接从应用程序的某个路由返回它们,以检索包含有关批处理的信息的 JSON 负载。这使得在应用程序的 UI 中轻松显示有关批处理完成进度的信息变得很方便。
要通过批次 ID 检索批次,可以使用 Bus
门面的 findBatch
方法:
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;
Route::get('/batch/{batchId}', function (string $batchId) {
return Bus::findBatch($batchId);
});
取消批处理
有时您可能需要取消给定批次的执行。这可以通过在 Illuminate\Bus\Batch
实例上调用 cancel
方法来实现:
/**
* Execute the job.
*/
public function handle(): void
{
if ($this->user->exceedsImportLimit()) {
return $this->batch()->cancel();
}
if ($this->batch()->cancelled()) {
return;
}
}
正如您可能已经注意到的那样,在上面的示例中,批处理作业应通常在继续执行之前确定其对应的批处理是否已被取消。但是,为了方便起见,您可以将 SkipIfBatchCancelled
中间件分配给作业。如其名称所示,此中间件将指示 Laravel,如果所对应的批次已被取消,则不处理作业:
use Illuminate\Queue\Middleware\SkipIfBatchCancelled;
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [new SkipIfBatchCancelled];
}
批处理失败
当批处理作业失败时,将调用 catch
回调(如果已分配)。仅对批处理中的第一个失败作业调用此回调。
允许失败
当批处理的作业失败时,Laravel 将自动将批处理标记为“已取消”。如果希望,您可以禁用此行为,使作业失败不会自动标记批处理为取消。可以通过在调度批处理时调用 allowFailures
方法来实现此目的:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// 所有作业都成功完成...
})->allowFailures()->dispatch();
重试失败的批处理作业
为了方便起见,Laravel 提供了一个 queue:retry-batch
Artisan 命令,允许您轻松地重试给定批次的所有失败作业。queue:retry-batch
命令接受所需重试其失败作业的批次的 UUID 参数:
php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5
修剪批次
如果不进行修剪,则批次表可以非常快地累积记录。为了缓解这个问题,您应该将 queue:prune-batches
Artisan 命令调度为每天运行:
$schedule->command('queue:prune-batches')->daily();
默认情况下,将修剪超过 24 小时的已完成批次。在调用命令时,可以使用 hours
选项来确定保留批次数据的时间长度。例如,以下命令将删除超过 48 小时完成的所有批次:
$schedule->command('queue:prune-batches --hours=48')->daily();
有时,您的 jobs_batches
表可能会累积尚未成功完成的批次记录,例如,如果作业失败且该作业尚未成功重试,则可能积累这些未完成的批次记录。您可以使用 queue:prune-batches
命令通过使用 unfinished
选项来清理这些未完成的批次记录:
$schedule->command('queue:prune-batches --hours=48 --unfinished=72')->daily();
同样,jobs_batches
表也可能会累积已取消的批次记录。您可以使用 queue:prune-batches
命令通过使用 cancelled
选项来清理这些取消的批次记录:
$schedule->command('queue:prune-batches --hours=48 --cancelled=72')->daily();
队列闭包
可以将闭包调度到队列中,而不是调度作业类。这对于需要在当前请求周期之外执行的快速、简单的任务非常有用。当将闭包调度到队列时,闭包的代码内容将进行加密签名,以防止在传输过程中进行修改:
$podcast = App\Podcast::find(1);
dispatch(function () use ($podcast) {
$podcast->publish();
});
通过使用 catch
方法,您可以提供一个在排除了队列的任何配置的重试尝试后失败的排队闭包中执行的闭包:
use Throwable;
dispatch(function () use ($podcast) {
$podcast->publish();
})->catch(function (Throwable $e) {
// 此作业失败了...
});
由于
catch
回调会在稍后由 Laravel 队列执行进行序列化和执行,因此在catch
回调中不应使用$this
变量。
运行队列工作进程
队列 :work
命令
Laravel 包含一个 Artisan 命令,它将启动一个队列工作进程,并在将新作业推送到队列时处理它们。您可以使用 queue:work
Artisan命令来运行工作进程。注意,一旦 queue:work
命令启动,它将持续运行,直到您手动停止它或关闭终端:
php artisan queue:work
为了使
queue:work
进程持续在后台运行,您应该使用像 Supervisor 之类的进程监视器,以确保队列工作进程不会停止运行。
在调用 queue:work
命令时,如果希望在命令输出中包括处理的作业 ID,可以使用 -v
标志:
php artisan queue:work -v
请记住,队列工作进程是长时间运行的进程,并将启动的应用程序状态存储在内存中。因此,一旦启动后,它们将不会注意到代码库中的变动。因此,在部署过程中,请确保重新启动队列工作进程。此外,请记住,应用程序创建或修改的任何静态状态都不会在作业之间自动重置。
或者,您可以运行 queue:listen
命令。使用 queue:listen
命令时,当您想要重新加载更新的代码或重置应用程序状态时,无需手动重新启动工作进程。但是,请注意,这个命令比 queue:work
命令要低效得多:
php artisan queue:listen
运行多个队列工作进程
要将多个工作进程分配给队列并同时处理作业,您只需启动多个 queue:work
进程即可。可以通过在终端中使用多个选项卡或在生产环境中使用进程管理器的配置设置来完成。在使用 Supervisor 时,您可以使用 numprocs
配置值。
指定连接和队列
您还可以指定工作进程应该使用的队列连接。传递给 work
命令的连接名称应与 config/queue.php
配置文件中定义的连接之一相对应:
php artisan queue:work redis
默认情况下,queue:work
命令仅处理给定连接上默认队列的作业。但是,您可以通过仅处理给定连接上的特定队列来进一步自定义队列工作进程。例如,如果您所有的电子邮件都在 redis
队列连接的电子邮件队列中处理,请发出以下命令来启动仅处理该队列的工作进程:
php artisan queue:work redis --queue=emails
处理指定数量的作业
凭借 --once
选项,可以指示工作仅处理队列中的一个作业:
php artisan queue:work --once
--max-jobs
选项可用于指示工作进程处理给定数量的作业,然后退出。当与 Supervisor 结合使用时,这个选项可以确保在处理给定数量的作业后自动重新启动工作进程,释放可能积累的任何内存:
php artisan queue:work --max-jobs=1000
处理所有排队的作业后退出
--stop-when-empty
选项可用于指示工作进程处理所有作业,然后优雅地退出。如果希望在 Docker 容器中处理 Laravel 队列后关闭容器,这个选项非常有用:
php artisan queue:work --stop-when-empty
指定处理作业的秒数
--max-time
选项可用于指示工作进程处理给定秒数的作业,然后退出。当与 Supervisor 结合使用时,这个选项可以确保在处理给定时间量的作业后自动重新启动工作进程,释放可能积累的任何内存:
# 处理作业一个小时,然后退出...
php artisan queue:work --max-time=3600
工作进程休眠时间
当队列中有作业可用时,工作进程将不断处理各个作业,作业之间没有延迟。但是,sleep
选项决定了工作进程在没有作业可用时的“休眠”秒数。当工作进程休眠时,将不会处理任何新的作业:
php artisan queue:work --sleep=3
资源考虑
守护程序队列工作进程在处理每个作业之前不会“重启”框架。因此,在每个作业完成处理后,您应该释放任何使用的重量级资源,如使用 GD 库进行的图像处理,应该使用 imagedestroy
释放内存。
队列优先级
有时,您可能希望为队列的处理设置优先级。例如,在 config/queue.php
配置文件中,您可以将默认队列设置为 redis
连接的 low
队列。但是,偶尔,您可能希望将作业推送到高优先级队列,如下所示:
dispatch((new Job)->onQueue('high'));
为了启动一个工作进程,在继续处理低优先级队列之前,可以将队列名称列表以逗号分隔的形式传递给 work
命令:
php artisan queue:work --queue=high,low
队列工作进程与部署
由于队列工作进程是长时间运行的进程,它们不会在不重新启动的情况下注意到代码的更改。因此,使用队列工作进程部署应用程序的最简单方法是在部署过程中重新启动工作进程。您可以通过发出 queue:restart
命令优雅地重新启动所有工作进程:
php artisan queue:restart
这个命令将指示所有队列工作进程在完成当前作业处理后优雅地退出,以确保不会丢失任何已存在的作业。由于队列工作进程将在执行 queue:restart
命令时退出,您应该运行类似 Supervisor 的进程管理器,以自动重新启动队列工作进程。
队列使用缓存来存储重启信号,在使用此功能之前,请确保为应用程序正确配置了缓存驱动程序。
作业到期和超时
作业到期
在 config/queue.php
配置文件中,每个队列连接都定义了一个 retry_after
选项。此选项指定队列连接在重新尝试正被处理的作业之前应等待的秒数。例如,如果 retry_after
的值设置为 90
,如果作业在处理了 90 秒而没有被释放或删除,它将被重新放入队列。通常情况下,应将 retry_after
的值设置为您的作业合理完成处理所需的最长时间。
唯一不包含
retry_after
值的队列连接是 Amazon SQS。SQS 将根据在 AWS 控制台中管理的默认可见性超时来重试作业。
进程超时
queue:work
Artisan命令公开了一个 --timeout
选项。默认情况下,--timeout
的值为 60 秒。如果作业处理的时间超过超时值指定的秒数,处理该作业的进程将退出并报错。通常情况下,进程将由服务器上配置的进程管理器自动重新启动:
php artisan queue:work --timeout=60
retry_after
配置选项和 --timeout
命令行选项是不同的,但它们一起工作,以确保作业不会丢失,并且作业仅在成功处理一次时才能处理。
--timeout
值应始终至少比retry_after
配置值短几秒。这将确保在工作进程处理冻结作业之前,工作进程总是被终止。如果--timeout
选项的值超过retry_after
配置值,您的作业可能会被处理两次。
Supervisor 配置
在生产环境中,您需要一种方法来保持您的 queue:work
进程运行。queue:work
进程可能因为多种原因而停止运行,比如超过工作进程超时或执行 queue:restart
命令。
因此,您需要配置一个进程监视器,可以检测到 queue:work
进程何时退出,并自动重新启动它们。此外,进程监视器还可以让您指定您想要同时运行多少个 queue:work
进程。Supervisor 是 Linux 环境常用的一个进程监视器,下面的文档中我们将讨论如何配置它。
安装 Supervisor
Supervisor 是 Linux 操作系统的进程监视器,如果它们失败,它将自动重新启动您的 queue:work
进程。要在Ubuntu 上安装 Supervisor,可以使用以下命令:
sudo apt-get install supervisor
如果你觉得配置和管理 Supervisor 自己很困难,可以考虑使用 Laravel Forge,它会自动为您的生产Laravel 项目安装和配置 Supervisor。
配置 Supervisor
Supervisor 配置文件通常存储在 /etc/supervisor/conf.d
目录中。在这个目录下,您可以创建任意数量的配置文件,指示 supervisor 如何监视您的进程。例如,我们可以创建一个 laravel-worker.conf
文件,用于启动和监视 queue:work
进程:
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600
在此示例中,numprocs
指令将指示 Supervisor 运行八个 queue:work
进程并监视它们全部,如果它们失败,将自动重新启动它们。您应该根据您所需的队列连接和工作进程选项更改配置文件的 command
指令。
您应确保
stopwaitsecs
的值大于最长运行时间所消耗的秒数。否则,Supervisor 可能会在作业完成处理之前终止作业。
启动 Supervisor
配置文件创建后,您可以使用以下命令更新 Supervisor 配置并启动进程:
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start laravel-worker:*
有关 Supervisor 的更多信息,请参考 Supervisor 文档。
处理失败的作业
有时候,您的排队作业会失败。别担心,事情并不总是按计划进行!Laravel 提供了一种方便的方法来指定作业的最大尝试次数。异步作业超过此次数后,会被插入到 failed_jobs
数据库表中。同步调度的作业如果失败,则不会存储在这个表中,并且其异常将被应用程序立即处理。
通常情况下,在新的 Laravel 应用程序中,创建一个用于 failed_jobs
表的迁移已经存在。但是,如果您的应用程序没有为此表创建迁移,您可以使用 queue:failed-table
命令创建迁移:
php artisan queue:failed-table
php artisan migrate
在运行队列工作进程时,您可以使用 queue:work
命令的 --tries
开关指定作业应尝试的最大次数。如果未为 --tries
选项指定值,则作业将只尝试一次或尝试次数由作业类的 $tries
属性指定:
php artisan queue:work redis --tries=3
使用 --backoff
选项,可以指定 Laravel 等待多少秒后重试遇到异常的作业。默认情况下,作业会立即重新放入队列,以便可以再次尝试:
php artisan queue:work redis --tries=3 --backoff=3
如果要根据每个作业动态配置 Laravel 在遇到异常后等待多少秒后重试作业,可以在作业类上定义一个 backoff
属性:
/**
* The number of seconds to wait before retrying the job.
*
* @var int
*/
public $backoff = 3;
如果需要更复杂的逻辑来确定作业的退避时间,可以在作业类上定义一个 backoff
方法:
/**
* Calculate the number of seconds to wait before retrying the job.
*/
public function backoff(): int
{
return 3;
}
您可以通过从 backoff
方法返回退避值的数组来轻松配置“指数”退避。在下面的示例中,第一次重试的延迟将为1秒,第二次重试的延迟将为5秒,第三次重试的延迟将为10秒:
/**
* Calculate the number of seconds to wait before retrying the job.
*
* @return array<int, int>
*/
public function backoff(): array
{
return [1, 5, 10];
}
清理失败的作业
当某个作业失败时,您可能希望向用户发送警报或撤销作业部分完成的任何操作。为了实现这一点,您可以在作业类上定义一个 failed
方法。导致作业失败的 Throwable
实例将传递给 failed
方法:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Throwable;
class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
/**
* Create a new job instance.
*/
public function __construct(
public Podcast $podcast,
) {}
/**
* Execute the job.
*/
public function handle(AudioProcessor $processor): void
{
// Process uploaded podcast...
}
/**
* Handle a job failure.
*/
public function failed(Throwable $exception): void
{
// Send user notification of failure, etc...
}
}
在调用
failed
方法之前,会实例化作业的一个新实例。因此,在handle
方法中发生的任何类属性修改将会丢失。
重试失败的作业
要查看插入到 failed_jobs
数据库表中的所有失败作业,可以使用 queue:failed
Artisan 命令:
php artisan queue:failed
queue:failed
命令将列出作业 ID、连接、队列、失败时间和关于作业的其他信息。作业 ID 可以用于重试失败的作业。例如,要重试一个ID为 ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
的失败作业,请发出以下命令:
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
如果需要,可以将多个 ID 传递给命令:
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d
您还可以重试特定队列中的所有失败作业:
php artisan queue:retry --queue=name
要重试所有失败的作业,请执行 queue:retry
命令并将 all
作为 ID 传递:
php artisan queue:retry all
如果要删除失败的作业,可以使用 queue:forget
命令:
php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
当使用 Horizon 时,应使用
horizon:forget
命令删除失败的作业,而不是queue:forget
命令。
要从 failed_jobs
表中删除所有失败的作业,可以使用 queue:flush
命令:
php artisan queue:flush
忽略丢失的模型
当将一个 Eloquent 模型注入到作业中时,模型将在放入队列之前自动序列化,并在作业处理时重新从数据库检索。但是,如果模型在作业等待工作进程处理时被删除,您的作业可能会因为找不到模型而失败并抛出ModelNotFoundException
。
为方便起见,您可以通过将作业的 deleteWhenMissingModels
属性设置为 true
来自动删除丢失模型的作业。当此属性设置为 true
时,Laravel 将悄悄地丢弃作业而不会引发异常:
/**
* Delete the job if its models no longer exist.
*
* @var bool
*/
public $deleteWhenMissingModels = true;
修剪失败的作业
您可以通过调用 queue:prune-failed
Artisan 命令来修剪应用程序的 failed_jobs
表中的记录:
php artisan queue:prune-failed
默认情况下,将修剪所有超过 24 小时的失败作业记录。如果为该命令提供了 --hours
选项,将只保留最后 N 小时内插入的失败作业记录。例如,以下命令将删除所有插入时间超过 48 小时的失败作业记录:
php artisan queue:prune-failed --hours=48
在 DynamoDB 中存储失败的作业
Laravel 还支持在 DynamoDB 中存储失败的作业记录,而不是使用关系型数据库表。但是,您必须创建一个DynamoDB 表来存储所有失败的作业记录。通常,此表应命名为 failed_jobs
,但您应根据应用程序队列配置文件中的 queue.failed.table
配置值命名表。
failed_jobs
表应具有名为 application
的字符串主分区键和名为 uuid
的字符串主排序键。键的应用程序部分将包含您的应用程序名称,如应用程序的 name
配置值所定义的那样。由于应用程序名称是 DynamoDB 表键的一部分,您可以使用相同的表存储多个 Laravel 应用程序的失败作业。
此外,请确保安装了 AWS SDK,以便您的 Laravel 应用程序可以与 Amazon DynamoDB 进行通信:
composer require aws/aws-sdk-php
接下来,将 queue.failed.driver
配置选项的值设置为 dynamodb
。此外,您应该在失败的 job
配置数组中定义 key
,secret
和 region
配置选项。这些选项将用于与 AWS 进行身份验证。当使用 dynamodb
驱动程序时,queue.failed.database
配置选项是不必要的:
'failed' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'failed_jobs',
]
禁用失败作业存储
您可以通过将 queue.failed.driver
配置选项的值设置为 null
来指示 Laravel 在不存储它们的情况下丢弃失败的作业。通常,可以使用 QUEUE_FAILED_DRIVER
环境变量来实现此目的:
QUEUE_FAILED_DRIVER=null
失败作业事件
您可以注册一个事件监听器,当作业失败时将调用该事件监听器,您可以使用 Queue
门面的 failing
方法。例如,我们可以在 Laravel 中包含的 AppServiceProvider
的 boot
方法中注册一个在事件发生时附加一个闭包到这个事件上:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;
class AppServiceProvider extends ServiceProvider
{
/**
* Register any application services.
*/
public function register(): void
{
// ...
}
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
}
从队列中清除作业
当使用 Horizon 时,应使用
horizon:clear
命令清除队列中的作业,而不是queue:clear
命令。
如果要删除默认连接的默认队列中的所有作业,可以使用 queue:clear
Artisan命令:
php artisan queue:clear
您还可以通过提供 connection
参数和 queue
选项来删除特定连接和队列中的作业:
php artisan queue:clear redis --queue=emails
只有 SQS、Redis 和数据库队列驱动程序支持清除队列中的作业。此外,SQS 消息删除过程需要最多60秒,因此在清空队列后的60秒内发送到 SQS 队列的作业也可能会被删除。
监控您的队列
如果队列突然接收到大量作业,可能会导致其不堪重负,导致作业完成的等待时间过长。如果需要,Laravel 可以在作业数量超过指定阈值时向您发出警报。
要开始,您应该将 queue:monitor
命令安排在每分钟运行一次。该命令接受您希望监视的队列的名称以及您期望的作业计数阈值:
php artisan queue:monitor redis:default,redis:deployments --max=100
仅仅安排这个命令还不足以触发警报来提醒您队列的负荷过重。当命令遇到作业计数超过阈值的队列时,将触发一个 Illuminate\Queue\Events\QueueBusy
事件。您可以在应用程序的 EventServiceProvider
中监听此事件,以向您或开发团队发送通知:
use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;
/**
* Register any other events for your application.
*/
public function boot(): void
{
Event::listen(function (QueueBusy $event) {
Notification::route('mail', 'dev@example.com')
->notify(new QueueHasLongWaitTime(
$event->connection,
$event->queue,
$event->size
));
});
}
测试
在测试分发作业的代码时,您可能希望指示 Laravel 不实际执行作业本身,因为作业的代码可以单独和分离于分发它的代码进行测试。当然,要测试作业本身,您可以在测试中实例化一个作业实例,并在测试中直接调用handle
方法。
您可以使用 Queue
门面的 fake
方法来阻止将排队的作业实际推送到队列中。在调用 Queue
门面的 fake
方法之后,您可以断言应用程序试图将作业推送到队列:
<?php
namespace Tests\Feature;
use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;
class ExampleTest extends TestCase
{
public function test_orders_can_be_shipped(): void
{
Queue::fake();
// Perform order shipping...
// Assert that no jobs were pushed...
Queue::assertNothingPushed();
// Assert a job was pushed to a given queue...
Queue::assertPushedOn('queue-name', ShipOrder::class);
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
// Assert a job was not pushed...
Queue::assertNotPushed(AnotherJob::class);
// Assert that a Closure was pushed to the queue...
Queue::assertClosurePushed();
}
}
您可以在 assertPushed或assertNotPushed
方法中传递一个闭包,以便断言有一个通过给定“真实测试”作业而推送的作业。如果至少有一个作业通过给定的真实测试而被推送,那么断言将成功:
Queue::assertPushed(function (ShipOrder $job) use ($order) {
return $job->order->id === $order->id;
});
伪造一组作业
如果只需要伪造特定的作业,同时允许其他作业正常执行,您可以将应该被伪造的作业的类名传递给 fake
方法:
public function test_orders_can_be_shipped(): void
{
Queue::fake([
ShipOrder::class,
]);
// Perform order shipping...
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
}
您可以使用 except
方法伪造除了一组指定的作业之外的所有作业:
Queue::fake()->except([
ShipOrder::class,
]);
测试作业链
要测试作业链,您将需要利用 Bus
门面的伪造能力。Bus
门面的 assertChained
方法可用于断言已调度作业的一个链:
use App\Jobs\RecordShipment;
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertChained([
ShipOrder::class,
RecordShipment::class,
UpdateInventory::class
]);
如上面的示例所示,链作业的数组可以是作业的类名数组。但是,您也可以使用实际作业实例的数组。在这样做时,Laravel 将确保作业实例是相同类且具有与应用程序调度的链作业相同的属性值:
Bus::assertChained([
new ShipOrder,
new RecordShipment,
new UpdateInventory,
]);
您可以使用 assertDispatchedWithoutChain
方法,断言未将作业推送到链中:
Bus::assertDispatchedWithoutChain(ShipOrder::class);
测试作业批处理
Bus
门面的 assertBatched
方法可用于断言一批作业已调度。传递给 assertBatched
方法的闭包会接收一个Illuminate\Bus\PendingBatch
实例,该实例可用于检查批处理中的作业:
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertBatched(function (PendingBatch $batch) {
return $batch->name == 'import-csv' &&
$batch->jobs->count() === 10;
});
测试作业/批处理交互
另外,您可能需要偶尔测试单个作业与其底层批处理的交互。例如,您可能需要测试作业是否取消了其批处理的进一步处理。为了实现这一点,您需要通过 withFakeBatch
方法将一个伪造的批处理分配给作业。withFakeBatch
方法返回一个包含作业实例和伪造批处理的元组:
[$job, $batch] = (new ShipOrder)->withFakeBatch();
$job->handle();
$this->assertTrue($batch->cancelled());
$this->assertEmpty($batch->added);
作业事件
使用 Queue
门面的 before
和 after
方法,您可以指定在处理排队作业之前或之后执行的回调。这些回调是执行其他日志记录或增加仪表盘统计数据的绝佳机会。通常情况下,您应该在服务提供者的 boot
方法中调用这些方法。例如,我们可以使用 Laravel 中包含的 AppServiceProvider
:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* Register any application services.
*/
public function register(): void
{
// ...
}
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
}
使用 Queue
门面的 looping
方法,您可以指定在工作进程尝试从队列中获取作业之前执行的回调。例如,您可以注册一个闭包来回滚之前失败的作业留下未关闭的任何事务:
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});
No Comments