通过队列批处理退款订单


上篇教程我们看到了将单个耗时队列任务拆分成多个子任务的好处:

但是这也引入了新的问题,如果使用 Redis 作为队列驱动,会存在多次执行 Redis 命令推送队列任务的问题,而每次执行 Redis 命令都要建立网络连接,IO 开销很大。

批处理推送

为此,我们可以通过 Laravel 消息队列提供的批处理功能一次性推送多个任务来提升队列性能。

以退款业务为例,我们可以把之前通过 each 函数遍历所有参会者,然后在闭包中依次推送 RefundAttendee 队列任务操作调整为通过 map 函数将所有有效的 RefundAttendee 队列任务实例组合成一个 jobs 集合,然后通过 Bus::batch($jobs)->dispatch() 批量推送 $jobs 集合中的所有任务到消息队列执行:

为了让 RefundAttendee 任务类支持批处理的所有功能,可以在该类中引入 Batchable Trait,以便后续可以获取当前任务所属的批处理实例,以及对批处理进行取消等操作:

如果使用数据库作为队列驱动,需要引入新的数据表 job_batches

你可以参考 Laravel 队列文档了解队列任务批处理的更多细节。

取消批处理

可以在执行批处理推送后获取对应的批处理实例,然后将其 ID 持久化到 conference 模型实例的 refunds_batch 字段:

如果你想要在中途取消这个批处理操作的话,可以通过上面存储的批处理 ID 获取对应的批处理实例,然后在该实例上执行 cancel 方法即可将其取消:

不过,需要注意的是,这只会将存储在数据表中的批处理记录设置为 canceled,不能撤回已经推送到队列的任务。

要堵住这个漏洞,需要在执行队列任务前判断其所属的批处理是否有效,如果该批处理已经取消,则不再执行该任务:

处理批处理失败

默认配置是批处理中的任意任务执行失败会取消整个批处理,这意味着如果批处理 1000 个退款,但是执行到 300 个的时候出错,那么剩下的就会因为批处理已取消而不会被执行。

要强制批处理即使在某个任务失败也继续执行,可以通过在分发时通过 allowFailures() 方法进行设置:

这样一来,即使某个任务执行失败,也会继续执行批处理,然后将失败的任务 ID 存储到 job_batches 表。

你可以通过如下命令重试单个批处理的失败任务:

也可以一次性重试指定批处理中的所有失败任务:


点赞 取消点赞 收藏 取消收藏

<< 上一篇: 通过幂等设计和原子锁避免重复退款

>> 下一篇: 监控退款任务批处理过程