メインコンテンツまでスキップ

キュー

はじめに

ウェブアプリケーションを構築する際、通常のウェブリクエスト中に実行するのに時間がかかりすぎるような、アップロードされたCSVファイルの解析や保存などのタスクがあるかもしれません。幸いにも、Laravelでは、バックグラウンドで処理されるキューイングジョブを簡単に作成することができます。時間のかかるタスクをキューに移動することで、アプリケーションは高速にウェブリクエストに応答し、顧客により良いユーザーエクスペリエンスを提供することができます。

Laravelのキューは、Amazon SQSRedis、またはリレーショナルデータベースなど、さまざまなキューバックエンドにわたる統一されたキューイングAPIを提供します。

Laravelのキューの設定オプションは、アプリケーションの config/queue.php 構成ファイルに保存されています。このファイルには、フレームワークに含まれる各キュードライバーの接続構成が含まれており、データベース、Amazon SQSRedisBeanstalkd ドライバー、および即座にジョブを実行する同期ドライバーが含まれています(ローカル開発時に使用)。また、ジョブを破棄する null キュードライバーも含まれています。

注記

Laravelには、Redisを使用したキューのための美しいダッシュボードと構成システムであるHorizonが提供されています。詳細については、Horizonのドキュメントをご覧ください。

接続とキュー

Laravelキューを始める前に、「接続」と「キュー」の違いを理解することが重要です。config/queue.php 構成ファイルには、connections 構成配列があります。このオプションは、Amazon SQS、Beanstalk、またはRedisなどのバックエンドキューサービスへの接続を定義します。ただし、特定のキュー接続には複数の「キュー」が存在する可能性があり、これは異なるスタックや積まれたジョブの山と考えることができます。

queue 構成ファイル内の各接続構成例には、queue 属性が含まれています。これは、ジョブが特定の接続に送信されたときにディスパッチされるデフォルトのキューを定義します。言い換えると、ジョブを明示的にどのキューにディスパッチするかを定義しないでジョブをディスパッチした場合、そのジョブは接続構成の queue 属性で定義されたキューに配置されます。

    use App\Jobs\ProcessPodcast;

// This job is sent to the default connection's default queue...
ProcessPodcast::dispatch();

// This job is sent to the default connection's "emails" queue...
ProcessPodcast::dispatch()->onQueue('emails');

一部のアプリケーションでは、複数のキューにジョブを常にプッシュする必要がない場合があり、代わりに単純なキューを持つことを好むかもしれません。ただし、ジョブを複数のキューにプッシュすることは、ジョブの処理方法を優先順位付けしたりセグメント化したいアプリケーションに特に役立ちます。なぜなら、Laravelのキューワーカーは、処理するキューを優先順位で指定できるからです。たとえば、high キューにジョブをプッシュすると、それらをより高い処理優先度で処理するワーカーを実行できます:

php artisan queue:work --queue=high,default

ドライバーの注意事項と前提条件

データベース

database キュードライバーを使用するには、ジョブを保持するためのデータベーステーブルが必要です。通常、これはLaravelのデフォルトの 0001_01_01_000002_create_jobs_table.php データベースマイグレーション に含まれていますが、アプリケーションにこのマイグレーションが含まれていない場合は、make:queue-table Artisanコマンドを使用して作成できます:

php artisan make:queue-table

php artisan migrate

Redis

redis キュードライバーを使用するには、config/database.php 構成ファイルで Redis データベース接続を構成する必要があります。

警告

serializer および compression Redis オプションは redis キュードライバーではサポートされていません。

Redis クラスター

Redis キュー接続が Redis クラスターを使用している場合、キュー名に キーハッシュタグ を含める必要があります。これは、特定のキューのすべての Redis キーが同じハッシュスロットに配置されるようにするために必要です:

    'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', '{default}'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => null,
'after_commit' => false,
],

ブロッキング

Redis キューを使用する場合、block_for 構成オプションを使用して、ジョブが利用可能になるまでドライバーが待機する時間を指定できます。これにより、ワーカーループを繰り返し、Redis データベースを再ポーリングするよりも、キューロードに基づいてこの値を調整することが効率的です。たとえば、ジョブが利用可能になるまでドライバーが 5 秒間ブロックするように値を設定することができます:

    'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => 5,
'after_commit' => false,
],

警告

block_for0 に設定すると、ジョブが利用可能になるまでキューのワーカーが無期限にブロックされます。これにより、SIGTERM などのシグナルが次のジョブが処理されるまで処理されなくなります。

その他のドライバーの前提条件

以下の依存関係が、リストされたキュードライバーに必要です。これらの依存関係は、Composer パッケージマネージャーを介してインストールできます:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~5.0
  • Redis: predis/predis ~2.0 または phpredis PHP 拡張機能

ジョブの作成

ジョブクラスの生成

デフォルトでは、アプリケーションのすべてのキュー可能なジョブは app/Jobs ディレクトリに保存されます。app/Jobs ディレクトリが存在しない場合は、make:job Artisan コマンドを実行すると作成されます:

php artisan make:job ProcessPodcast

生成されたクラスは Illuminate\Contracts\Queue\ShouldQueue インターフェースを実装し、ジョブが非同期で実行されるべきであることを Laravel に示します。

注記

ジョブスタブは スタブの公開 を使用してカスタマイズできます。

クラス構造

ジョブクラスは非常にシンプルで、通常は、ジョブがキューによって処理されるときに呼び出される handle メソッドのみを含みます。始めるために、例としてジョブクラスを見てみましょう。この例では、ポッドキャスト配信サービスを管理し、公開前にアップロードされたポッドキャストファイルを処理する必要があるとします:

    <?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

/**
* Create a new job instance.
*/
public function __construct(
public Podcast $podcast,
) {}

/**
* Execute the job.
*/
public function handle(AudioProcessor $processor): void
{
// Process uploaded podcast...
}
}

この例では、キューにジョブを直接渡すことができることに注意してください。ジョブが使用している SerializesModels トレイトのおかげで、Eloquent モデルとその読み込まれたリレーションシップは、ジョブが処理されるときに優雅にシリアライズおよび非シリアライズされます。

キューにジョブが Eloquent モデルをコンストラクタに受け入れる場合、モデルの識別子のみがキューにシリアル化されます。実際にジョブが処理されるとき、キューシステムは自動的にデータベースから完全なモデルインスタンスとその読み込まれたリレーションシップを再取得します。このモデルのシリアル化方法により、より小さなジョブペイロードをキュードライバーに送信できます。


#### `handle`メソッドの依存関係の注入

`handle`メソッドは、ジョブがキューで処理されるときに呼び出されます。`handle`メソッドで依存関係を型ヒントすることができることに注意してください。Laravelの[サービスコンテナ](/docs/container)は、これらの依存関係を自動的に注入します。

コンテナが`handle`メソッドに依存関係を注入する方法を完全に制御したい場合は、コンテナの`bindMethod`メソッドを使用できます。`bindMethod`メソッドは、ジョブとコンテナを受け取るコールバックを受け入れます。コールバック内では、`handle`メソッドを自由に呼び出すことができます。通常、このメソッドを`App\Providers\AppServiceProvider`の`boot`メソッドから呼び出すべきです[サービスプロバイダ](/docs/providers):

```php
use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;

$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
return $job->handle($app->make(AudioProcessor::class));
});

警告

生の画像コンテンツなどのバイナリデータは、キューに配置される際にJSONに適切にシリアル化されない可能性があるため、キューに渡す前にbase64_encode関数を通す必要があります。

キュー関係

ジョブがキューに配置されると、読み込まれたEloquentモデルの関係もシリアル化されるため、シリアル化されたジョブ文字列はしばしば非常に大きくなります。さらに、ジョブがデシリアライズされ、モデルの関係が再度データベースから再取得されると、それらは完全に取得されます。モデルがジョブキューイングプロセス中にシリアル化される前に適用されていた以前の関係制約は、ジョブがデシリアライズされるときに適用されません。したがって、特定の関係のサブセットで作業する場合は、キューに入れたジョブ内でその関係を再制約する必要があります。

また、関係がシリアル化されないようにするために、プロパティ値を設定するときにモデルでwithoutRelationsメソッドを呼び出すことができます。このメソッドは、読み込まれた関係を持たないモデルのインスタンスを返します:

    /**
* Create a new job instance.
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast->withoutRelations();
}

PHPのコンストラクタプロパティプロモーションを使用しており、Eloquentモデルに関係がシリアル化されないようにする場合は、WithoutRelations属性を使用できます。

    use Illuminate\Queue\Attributes\WithoutRelations;

/**
* Create a new job instance.
*/
public function __construct(
#[WithoutRelations]
public Podcast $podcast
) {
}

ジョブが1つのモデルではなく、Eloquentモデルのコレクションや配列を受け取った場合、そのコレクション内のモデルは、ジョブが逆シリアル化されて実行されるときに関連付けが復元されません。これは、大量のモデルを扱うジョブで過剰なリソース使用を防ぐためです。

ユニークなジョブ

警告

ユニークなジョブには、ロックをサポートするキャッシュドライバが必要です。現在、memcachedredisdynamodbdatabasefilearray キャッシュドライバがアトミックロックをサポートしています。また、ユニークなジョブの制約はバッチ内のジョブには適用されません。

時々、特定のジョブのインスタンスが常にキューに1つだけ存在することを確認したい場合があります。そのような場合は、ジョブクラスでShouldBeUniqueインターフェースを実装することで実現できます。このインターフェースでは、クラスに追加のメソッドを定義する必要はありません。

    <?php

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
...
}

上記の例では、UpdateSearchIndexジョブはユニークです。そのため、既にキューに同じジョブのインスタンスが存在し、処理が完了していない場合は、ジョブはディスパッチされません。

特定の場合には、ジョブを一意にする特定の「キー」を定義したり、ジョブが一意でなくなるタイムアウトを指定したりしたい場合があります。これを実現するには、ジョブクラスでuniqueIdおよびuniqueForプロパティまたはメソッドを定義することができます。

    <?php

use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
/**
* The product instance.
*
* @var \App\Product
*/
public $product;

/**
* The number of seconds after which the job's unique lock will be released.
*
* @var int
*/
public $uniqueFor = 3600;

/**
* Get the unique ID for the job.
*/
public function uniqueId(): string
{
return $this->product->id;
}
}

上記の例では、UpdateSearchIndexジョブは製品IDによってユニークです。そのため、同じ製品IDでジョブを新たにディスパッチしても、既存のジョブが処理を完了するまで無視されます。さらに、既存のジョブが1時間以内に処理されない場合、ユニークロックが解放され、同じユニークキーを持つ別のジョブがキューにディスパッチされる可能性があります。

警告

アプリケーションが複数のWebサーバーやコンテナからジョブをディスパッチする場合は、すべてのサーバーが同じ中央キャッシュサーバーと通信していることを確認してください。これにより、Laravelがジョブが一意かどうかを正確に判断できます。

処理が開始するまでジョブを一意に保つ

デフォルトでは、一意のジョブは処理が完了するか、リトライ試行がすべて失敗した後に「ロック解除」されます。ただし、ジョブが処理される直前にジョブをすぐにロック解除したい場合があります。これを実現するには、ジョブは ShouldBeUnique コントラクトの代わりに ShouldBeUniqueUntilProcessing コントラクトを実装する必要があります:

    <?php

use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
// ...
}

一意のジョブロック

裏側では、ShouldBeUnique ジョブがディスパッチされると、Laravel は uniqueId キーで ロック を取得しようとします。ロックが取得できない場合、ジョブはディスパッチされません。このロックは、ジョブが処理を完了するか、リトライ試行がすべて失敗したときに解放されます。デフォルトでは、Laravel はこのロックを取得するためにデフォルトのキャッシュドライバを使用します。ただし、ロックを取得するために別のドライバを使用したい場合は、使用するキャッシュドライバを返す uniqueVia メソッドを定義することができます:

    use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
...

/**
* Get the cache driver for the unique job lock.
*/
public function uniqueVia(): Repository
{
return Cache::driver('redis');
}
}

注記

ジョブの同時処理を制限するだけでよい場合は、WithoutOverlapping ジョブミドルウェアを使用してください。

暗号化されたジョブ

Laravel では、暗号化 を介してジョブのデータのプライバシーと整合性を確保することができます。開始するには、ジョブクラスに ShouldBeEncrypted インターフェースを追加するだけです。このインターフェースがクラスに追加されると、Laravel はジョブをキューにプッシュする前に自動的にジョブを暗号化します:

    <?php

use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;

class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
{
// ...
}

ジョブミドルウェア

ジョブミドルウェアを使用すると、キューに入れられたジョブの実行を囲むカスタムロジックをラップすることができ、ジョブ自体の冗長性を減らすことができます。たとえば、次の handle メソッドを考えてみてください。このメソッドは、Laravel の Redis レート制限機能を活用して、5秒ごとに1つのジョブのみが処理されるようにします:

    use Illuminate\Support\Facades\Redis;

/**
* Execute the job.
*/
public function handle(): void
{
Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
info('Lock obtained...');

// Handle job...
}, function () {
// Could not obtain lock...

return $this->release(5);
});
}

このコードは有効ですが、handle メソッドの実装が Redis レート制限ロジックで混雑しているため、ノイズが多くなります。さらに、このレート制限ロジックは、他のレート制限を行いたいジョブにも複製する必要があります。

代わりに、handle メソッドでレート制限を行う代わりに、レート制限を処理するジョブミドルウェアを定義することができます。Laravel にはジョブミドルウェアのデフォルトの場所がないため、アプリケーションのどこにでもジョブミドルウェアを配置することができます。この例では、ミドルウェアを app/Jobs/Middleware ディレクトリに配置します:

    <?php

namespace App\Jobs\Middleware;

use Closure;
use Illuminate\Support\Facades\Redis;

class RateLimited
{
/**
* Process the queued job.
*
* @param \Closure(object): void $next
*/
public function handle(object $job, Closure $next): void
{
Redis::throttle('key')
->block(0)->allow(1)->every(5)
->then(function () use ($job, $next) {
// Lock obtained...

$next($job);
}, function () use ($job) {
// Could not obtain lock...

$job->release(5);
});
}
}

ルートミドルウェアのように、ジョブミドルウェアも処理されるジョブと、ジョブの処理を継続するために呼び出すべきコールバックを受け取ります。

ジョブミドルウェアを作成した後は、ジョブの middleware メソッドからそれらを返すことで、ジョブにアタッチすることができます。このメソッドは make:job Artisan コマンドで生成されたジョブには存在しないため、ジョブクラスに手動で追加する必要があります:

    use App\Jobs\Middleware\RateLimited;

/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited];
}

注記

ジョブミドルウェアは、キュー可能なイベントリスナーやメーラブル、通知にも割り当てることができます。

レート制限

自作のレート制限ジョブミドルウェアを書く方法を示しましたが、実際には Laravel にはジョブのレート制限を行うために利用できるレート制限ミドルウェアが含まれています。ルートのレート制限と同様に、ジョブのレート制限は RateLimiter ファサードの for メソッドを使用して定義されます。

たとえば、ユーザーにデータのバックアップを1時間に1回だけ許可し、プレミアム顧客にはそのような制限を課さないようにしたい場合、AppServiceProviderboot メソッドで RateLimiter を定義することができます:

    use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;

/**
* Bootstrap any application services.
*/
public function boot(): void
{
RateLimiter::for('backups', function (object $job) {
return $job->user->vipCustomer()
? Limit::none()
: Limit::perHour(1)->by($job->user->id);
});
}

上記の例では、1時間ごとのレート制限を定義しましたが、perMinute メソッドを使用して分単位でレート制限を簡単に定義することもできます。さらに、レート制限を by メソッドに渡すことができますが、この値は通常、顧客ごとにレート制限を区別するために使用されます:

    return Limit::perMinute(50)->by($job->user->id);

レート制限を定義したら、Illuminate\Queue\Middleware\RateLimited ミドルウェアを使用して、ジョブにレート制限をアタッチすることができます。ジョブがレート制限を超過するたびに、このミドルウェアは適切な遅延を持つジョブをキューに戻します。

リミットが設定されたジョブをキューに戻すと、ジョブの attempts 総数が増加します。ジョブクラスの triesmaxExceptions プロパティを調整したり、ジョブが試行されるまでの時間を定義するために retryUntil メソッド を使用することができます。

ジョブがレート制限されたときにジョブを再試行したくない場合は、dontRelease メソッドを使用できます:

注記

Redis を使用している場合は、Redis に最適化された Illuminate\Queue\Middleware\RateLimitedWithRedis ミドルウェアを使用することができます。これは、基本的なレート制限ミドルウェアよりも効率的です。

ジョブの重複を防ぐ

Laravel には、任意のキーに基づいてジョブの重複を防止する Illuminate\Queue\Middleware\WithoutOverlapping ミドルウェアが含まれています。これは、キューに入れられたジョブが一度に 1 つだけ変更されるべきリソースを変更するときに役立ちます。

たとえば、ユーザーの信用スコアを更新するキューに入れられたジョブがあり、同じユーザー ID の信用スコア更新ジョブの重複を防止したいとします。これを実現するために、ジョブの middleware メソッドから WithoutOverlapping ミドルウェアを返すことができます:

    use Illuminate\Queue\Middleware\WithoutOverlapping;

/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new WithoutOverlapping($this->user->id)];
}

同じタイプの重複するジョブはキューに戻されます。また、リリースされたジョブが再試行されるまでに経過する必要がある秒数を指定することもできます:

    /**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}

重複するジョブをすぐに削除して再試行されないようにしたい場合は、dontRelease メソッドを使用できます:

    /**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}

WithoutOverlapping ミドルウェアは Laravel のアトミックロック機能によって動作します。時々、ジョブが予期せず失敗したりタイムアウトしたりしてロックが解放されない場合があります。そのため、expireAfter メソッドを使用して明示的にロックの有効期限を定義することができます。たとえば、以下の例では、ジョブの処理が開始されてから 3 分後に WithoutOverlapping ロックを解放するように Laravel に指示します:

    /**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}

警告

WithoutOverlappingミドルウェアは、locksをサポートするキャッシュドライバが必要です。現在、memcachedredisdynamodbdatabasefile、およびarrayキャッシュドライバがアトミックロックをサポートしています。

ジョブクラス間でのロックキーの共有

デフォルトでは、WithoutOverlappingミドルウェアは、同じクラスのジョブの重複を防ぎます。したがって、異なる2つのジョブクラスが同じロックキーを使用していても、重複を防ぐことはありません。ただし、sharedメソッドを使用して、Laravelにジョブクラス間でキーを適用するように指示することができます:

use Illuminate\Queue\Middleware\WithoutOverlapping;

class ProviderIsDown
{
// ...


public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}

class ProviderIsUp
{
// ...


public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}

例外のスロットリング

Laravelには、例外をスロットリングするIlluminate\Queue\Middleware\ThrottlesExceptionsミドルウェアが含まれており、一定数の例外をスローした後、ジョブを実行しようとするすべての試行が指定された時間間隔が経過するまで遅延されます。このミドルウェアは、不安定なサードパーティサービスとやり取りするジョブに特に有用です。

例えば、サードパーティAPIとやり取りするキューに入れられたジョブが例外をスローし始めたとします。例外をスロットリングするには、ジョブのmiddlewareメソッドからThrottlesExceptionsミドルウェアを返すことができます。通常、このミドルウェアは、time based attemptsを実装したジョブとペアにする必要があります:

    use DateTime;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new ThrottlesExceptions(10, 5)];
}

/**
* Determine the time at which the job should timeout.
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(5);
}

ミドルウェアが受け入れる最初のコンストラクタ引数は、ジョブがスローできる例外の数であり、2番目のコンストラクタ引数は、ジョブがスロットリングされた後に再試行されるまで経過するべき分数です。上記のコード例では、ジョブが5分以内に10回の例外をスローした場合、ジョブを再試行するまで5分待機します。

ジョブが例外をスローしたが、例外の閾値に達していない場合、通常はジョブがすぐに再試行されます。ただし、ミドルウェアをジョブにアタッチする際にbackoffメソッドを呼び出すことで、そのようなジョブを遅延させる分数を指定することができます。

    use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 5))->backoff(5)];
}

このミドルウェアは、内部的にはLaravelのキャッシュシステムを使用してレート制限を実装し、ジョブのクラス名がキャッシュの「キー」として使用されます。ジョブにミドルウェアをアタッチする際にbyメソッドを呼び出すことで、このキーをオーバーライドすることができます。これは、同じサードパーティーサービスとやり取りする複数のジョブがあり、それらが共通のスロットリング「バケツ」を共有したい場合に便利です。

    use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10))->by('key')];
}

デフォルトでは、このミドルウェアはすべての例外に対してスロットリングを行います。whenメソッドを呼び出すことで、ジョブにミドルウェアをアタッチする際にこの動作を変更することができます。whenメソッドに提供されたクロージャがtrueを返す場合にのみ、例外がスロットリングされます。

    use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10))->when(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}

スロットリングされた例外をアプリケーションの例外ハンドラに報告したい場合は、ジョブにミドルウェアをアタッチする際にreportメソッドを呼び出すことができます。オプションで、reportメソッドにクロージャを提供し、指定されたクロージャがtrueを返す場合にのみ例外が報告されます。

    use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10))->report(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}

注記

Redisを使用している場合は、Redis向けに調整されたIlluminate\Queue\Middleware\ThrottlesExceptionsWithRedisミドルウェアを使用することができます。これは、基本的な例外スロットリングミドルウェアよりもRedisに適しており、効率的です。

ジョブのディスパッチ

ジョブクラスを作成したら、そのジョブ自体でdispatchメソッドを使用してディスパッチできます。dispatchメソッドに渡される引数は、ジョブのコンストラクタに渡されます。

    <?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);

// ...

ProcessPodcast::dispatch($podcast);

return redirect('/podcasts');
}
}

条件付きでジョブをディスパッチしたい場合は、dispatchIfメソッドとdispatchUnlessメソッドを使用できます。

    ProcessPodcast::dispatchIf($accountActive, $podcast);

ProcessPodcast::dispatchUnless($accountSuspended, $podcast);

新しいLaravelアプリケーションでは、syncドライバがデフォルトのキュードライバです。このドライバは、ジョブを現在のリクエストの前景で同期的に実行します。これは、ローカル開発中に便利です。バックグラウンド処理のためにジョブを実際にキューに入れたい場合は、アプリケーションのconfig/queue.php構成ファイルで異なるキュードライバを指定することができます。

遅延ディスパッチ

ジョブがキューのワーカーによってすぐに処理されないように指定したい場合は、ジョブをディスパッチする際に delay メソッドを使用できます。たとえば、ジョブがディスパッチされてから10分後まで処理できないように指定したい場合は、次のようにします:

    <?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);

// ...

ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(10));

return redirect('/podcasts');
}
}

警告

Amazon SQS キューサービスの最大遅延時間は15分です。

レスポンスがブラウザに送信された後のディスパッチ

代わりに、dispatchAfterResponse メソッドは、FastCGI を使用している場合に、HTTP レスポンスがユーザーのブラウザに送信された後にジョブをディスパッチすることを遅延させます。これにより、ユーザーはアプリケーションの使用を開始できるようになりますが、キューに入れられたジョブはまだ実行中です。これは通常、1秒程度かかるようなジョブ(たとえば、メールの送信)にのみ使用するべきです。この方法でディスパッチされたジョブは、現在の HTTP リクエスト内で処理されるため、それらを処理するためにはキューのワーカーが実行中である必要はありません:

    use App\Jobs\SendNotification;

SendNotification::dispatchAfterResponse();

また、クロージャを dispatch し、dispatch ヘルパーに afterResponse メソッドをチェーンして、HTTP レスポンスがブラウザに送信された後にクロージャを実行することもできます:

    use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;

dispatch(function () {
Mail::to('taylor@example.com')->send(new WelcomeMessage);
})->afterResponse();

同期ディスパッチ

ジョブを即座に(同期的に)ディスパッチしたい場合は、dispatchSync メソッドを使用できます。このメソッドを使用すると、ジョブはキューに入れられず、現在のプロセス内で直ちに実行されます:

    <?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);

// Create podcast...

ProcessPodcast::dispatchSync($podcast);

return redirect('/podcasts');
}
}

ジョブとデータベーストランザクション

データベーストランザクション内でジョブをディスパッチすることは問題ありませんが、ジョブが実際に正常に実行されることを確認するために特別な注意を払う必要があります。トランザクション内でジョブをディスパッチすると、親トランザクションがコミットされる前にジョブがワーカーによって処理される可能性があります。これが起こると、データベーストランザクション中にモデルやデータベースレコードに対して行った更新がまだデータベースに反映されていない可能性があります。さらに、トランザクション内で作成されたモデルやデータベースレコードはデータベースに存在しないかもしれません。

幸いにも、Laravelにはこの問題を解決するためのいくつかの方法が用意されています。まず、キュー接続の構成配列でafter_commit接続オプションを設定することができます:

    'redis' => [
'driver' => 'redis',
// ...
'after_commit' => true,
],

after_commitオプションがtrueの場合、データベーストランザクション内でジョブをディスパッチすることができます。ただし、Laravelは、実際にジョブをディスパッチする前に、開いている親データベーストランザクションがコミットされるまで待機します。もちろん、現在開いているデータベーストランザクションがない場合は、ジョブはすぐにディスパッチされます。

トランザクションが例外によってロールバックされると、そのトランザクション中にディスパッチされたジョブは破棄されます。

注記

after_commit構成オプションをtrueに設定すると、オープンしているすべてのデータベーストランザクションがコミットされた後に、キューに入れられたイベントリスナーやメーラブル、通知、ブロードキャストイベントがディスパッチされるようになります。

コミットディスパッチ動作のインライン指定

after_commitキュー接続構成オプションをtrueに設定しない場合でも、特定のジョブがすべてのオープンなデータベーストランザクションがコミットされた後にディスパッチされるように指定することができます。これを実現するために、ディスパッチ操作にafterCommitメソッドをチェーンすることができます:

    use App\Jobs\ProcessPodcast;

ProcessPodcast::dispatch($podcast)->afterCommit();

同様に、after_commit構成オプションがtrueに設定されている場合、特定のジョブがオープンなデータベーストランザクションがコミットされるのを待たずにすぐにディスパッチされるように指定することができます:

    ProcessPodcast::dispatch($podcast)->beforeCommit();

ジョブチェーン

ジョブチェーンを使用すると、主要なジョブが正常に実行された後に、シーケンスで実行する必要があるキューに入れられたジョブのリストを指定できます。シーケンス内の1つのジョブが失敗すると、残りのジョブは実行されません。キューに入れられたジョブチェーンを実行するには、Busファサードが提供するchainメソッドを使用できます。Laravelのコマンドバスは、キューに入れられたジョブのディスパッチングが構築されている上位コンポーネントです。

    use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;

Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->dispatch();

ジョブクラスのインスタンスを連鎖させるだけでなく、クロージャも連鎖させることができます:

    Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
function () {
Podcast::update(/* ... */);
},
])->dispatch();

警告

ジョブ内で$this->delete()メソッドを使用しても、連鎖されたジョブが処理されるのを防ぐことはできません。連鎖は、連鎖内のジョブが失敗した場合にのみ実行が停止します。

連鎖接続とキュー

連鎖されたジョブで使用する接続とキューを指定したい場合は、onConnectionおよびonQueueメソッドを使用できます。これらのメソッドは、指定された接続とキュー名を指定します。別の接続/キューが明示的に割り当てられていない限り、使用されるキュー接続とキュー名を指定します:

    Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();

連鎖にジョブを追加

時折、連鎖内の別のジョブから既存のジョブ連鎖にジョブを追加したり、追加したりする必要がある場合があります。これは、prependToChainおよびappendToChainメソッドを使用して行うことができます:

/**
* Execute the job.
*/
public function handle(): void
{
// ...

// Prepend to the current chain, run job immediately after current job...
$this->prependToChain(new TranscribePodcast);

// Append to the current chain, run job at end of chain...
$this->appendToChain(new TranscribePodcast);
}

連鎖の失敗

ジョブを連鎖させる際に、catchメソッドを使用して、連鎖内のジョブが失敗した場合に呼び出すべきクロージャを指定できます。指定されたコールバックは、ジョブの失敗の原因となったThrowableインスタンスを受け取ります:

    use Illuminate\Support\Facades\Bus;
use Throwable;

Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->catch(function (Throwable $e) {
// A job within the chain has failed...
})->dispatch();

警告

連鎖コールバックはシリアル化され、後でLaravelキューによって実行されるため、連鎖コールバック内で$this変数を使用しないでください。

キューと接続のカスタマイズ

特定のキューにディスパッチ

異なるキューにジョブをプッシュすることで、キューにジョブを「カテゴリー分け」し、さまざまなキューに割り当てるワーカーの数を優先順位付けすることができます。ただし、これはキュー構成ファイルで定義された異なるキュー「接続」にジョブをプッシュするのではなく、単一の接続内の特定のキューにのみジョブをプッシュします。キューを指定するには、ジョブをディスパッチする際にonQueueメソッドを使用します:

    <?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);

// Create podcast...

ProcessPodcast::dispatch($podcast)->onQueue('processing');

return redirect('/podcasts');
}
}

または、ジョブのキューを指定する場合は、ジョブのコンストラクタ内で onQueue メソッドを呼び出すことができます:

    <?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

/**
* Create a new job instance.
*/
public function __construct()
{
$this->onQueue('processing');
}
}

特定の接続先へのディスパッチ

アプリケーションが複数のキュー接続とやり取りする場合、onConnection メソッドを使用してジョブをプッシュする接続を指定することができます:

    <?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);

// Create podcast...

ProcessPodcast::dispatch($podcast)->onConnection('sqs');

return redirect('/podcasts');
}
}

onConnection メソッドと onQueue メソッドを連結して、ジョブの接続とキューを指定することができます:

    ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');

または、ジョブの接続を指定する場合は、ジョブのコンストラクタ内で onConnection メソッドを呼び出すことができます:

    <?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

/**
* Create a new job instance.
*/
public function __construct()
{
$this->onConnection('sqs');
}
}

最大ジョブ試行回数 / タイムアウト値の指定

最大試行回数

キューに入れられたジョブの1つがエラーに遭遇した場合、それを無期限にリトライし続けることは望ましくないでしょう。そのため、Laravel では、ジョブが何回試行されるか、またはどれくらいの期間試行されるかを指定する方法がいくつか提供されています。

ジョブが試行される最大回数を指定するアプローチの1つは、Artisan コマンドラインの --tries スイッチを使用することです。これは、処理されるすべてのジョブに適用されますが、処理されるジョブが試行される回数を指定していない場合は、この値が適用されます:

php artisan queue:work --tries=3

ジョブが最大試行回数を超えると、「失敗した」ジョブと見なされます。失敗したジョブの処理についての詳細は、失敗したジョブのドキュメントを参照してください。queue:work コマンドに --tries=0 が指定された場合、ジョブは無期限にリトライされます。

ジョブクラス自体でジョブが試行される最大回数を定義することで、より詳細なアプローチを取ることができます。ジョブで最大試行回数が指定されている場合、コマンドラインで提供された --tries の値よりも優先されます:

    <?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 5;
}

特定のジョブの最大試行回数を動的に制御する必要がある場合は、ジョブに tries メソッドを定義することができます:

    /**
* Determine number of times the job may be attempted.
*/
public function tries(): int
{
return 5;
}

タイムベースの試行

ジョブが失敗する前に何回試行できるかを定義する代わりに、ジョブが試行されるべき時刻を定義することもできます。これにより、指定された時間枠内で何回でもジョブを試行できます。ジョブが試行されるべき時刻を定義するには、ジョブクラスに retryUntil メソッドを追加します。このメソッドは DateTime インスタンスを返す必要があります:

    use DateTime;

/**
* Determine the time at which the job should timeout.
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(10);
}

注記

キューに入れられたイベントリスナーtries プロパティまたは retryUntil メソッドを定義することもできます。

最大例外数

時々、ジョブが多くの回数試行されることを指定したいが、再試行が特定の未処理例外の数によってトリガーされた場合には失敗するようにしたいことがあります(release メソッドによってリリースされるのではなく)。これを実現するために、ジョブクラスに maxExceptions プロパティを定義することができます:

    <?php

namespace App\Jobs;

use Illuminate\Support\Facades\Redis;

class ProcessPodcast implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 25;

/**
* The maximum number of unhandled exceptions to allow before failing.
*
* @var int
*/
public $maxExceptions = 3;

/**
* Execute the job.
*/
public function handle(): void
{
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// Lock obtained, process the podcast...
}, function () {
// Unable to obtain lock...
return $this->release(10);
});
}
}

この例では、アプリケーションがRedisロックを取得できない場合、ジョブは10秒間リリースされ、最大25回まで再試行されます。ただし、ジョブが3つの未処理例外をスローした場合にはジョブが失敗します。

タイムアウト

通常、キューに入れられたジョブがどのくらいの時間を要するかはおおよそわかっています。そのため、Laravelでは「タイムアウト」値を指定できます。デフォルトでは、タイムアウト値は60秒です。ジョブがタイムアウト値で指定された秒数よりも長い時間処理されている場合、そのジョブを処理しているワーカーはエラーで終了します。通常、ワーカーは自動的に再起動されます。サーバーに構成されたプロセスマネージャによって。

ジョブが実行できる最大秒数は、Artisanコマンドラインで --timeout スイッチを使用して指定できます:

php artisan queue:work --timeout=30

ジョブが継続的にタイムアウトして最大試行回数を超える場合、そのジョブは失敗としてマークされます。

ジョブクラス自体で、ジョブが実行されることが許可される最大秒数を定義することもできます。ジョブにタイムアウトが指定されている場合、コマンドラインで指定されたタイムアウトよりも優先されます:

    <?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
/**
* The number of seconds the job can run before timing out.
*
* @var int
*/
public $timeout = 120;
}

時には、ソケットや送信中のHTTP接続などのIOブロッキングプロセスは、指定されたタイムアウトを尊重しないことがあります。したがって、これらの機能を使用する際には、常にそれらのAPIを使用してタイムアウトを指定することを試みるべきです。たとえば、Guzzleを使用する場合は、常に接続タイムアウト値とリクエストタイムアウト値を指定する必要があります。

警告

ジョブのタイムアウトを指定するには、pcntl PHP拡張機能をインストールする必要があります。さらに、ジョブの「タイムアウト」値は常にその"有効期限後"の値よりも小さくする必要があります。そうしないと、ジョブは実際に実行が完了する前に再試行される可能性があります。

タイムアウト時の失敗

ジョブがタイムアウトした場合にジョブを失敗としてマークする必要がある場合は、ジョブクラスで$failOnTimeoutプロパティを定義することができます:

/**
* Indicate if the job should be marked as failed on timeout.
*
* @var bool
*/
public $failOnTimeout = true;

エラーハンドリング

ジョブの処理中に例外がスローされた場合、ジョブは自動的にキューに戻され、再試行されることになります。ジョブは、アプリケーションで許可される最大試行回数まで繰り返しリリースされ続けます。試行回数の最大値は、queue:work Artisanコマンドで使用される--triesスイッチで定義されます。また、試行回数の最大値はジョブクラス自体で定義することもできます。キューワーカーの実行に関する詳細は、以下で見つけることができます

手動でジョブをリリースする

時には、ジョブを手動でキューに戻して、後で再試行できるようにしたい場合があります。これは、releaseメソッドを呼び出すことで行うことができます:

    /**
* Execute the job.
*/
public function handle(): void
{
// ...

$this->release();
}

デフォルトでは、releaseメソッドはジョブを直ちに処理するためにキューに戻します。ただし、releaseメソッドに整数または日付インスタンスを渡すことで、指定した秒数が経過するまでジョブを処理可能にしないように指示することもできます。

    $this->release(10);

$this->release(now()->addSeconds(10));

ジョブの手動失敗

時折、ジョブを手動で「失敗」とマークする必要があるかもしれません。その場合は、fail メソッドを呼び出すことができます:

    /**
* Execute the job.
*/
public function handle(): void
{
// ...

$this->fail();
}

例外をキャッチしたためにジョブを失敗させたい場合は、例外を fail メソッドに渡すことができます。または、便宜上、例外に変換される文字列エラーメッセージを渡すこともできます:

    $this->fail($exception);

$this->fail('Something went wrong.');

注記

ジョブの失敗に関する詳細情報については、ジョブの失敗に対処するドキュメントを参照してください。

ジョブのバッチ処理

Laravel のジョブのバッチ処理機能を使用すると、一括のジョブを簡単に実行し、その一括のジョブの実行が完了したときにアクションを実行できます。始める前に、ジョブのバッチに関するメタ情報(完了率など)を含むテーブルを作成するためのデータベースマイグレーションを作成する必要があります。このマイグレーションは、make:queue-batches-table Artisan コマンドを使用して生成できます:

php artisan make:queue-batches-table

php artisan migrate

バッチ処理可能なジョブの定義

バッチ処理可能なジョブを定義するには、通常通りに キュー可能なジョブを作成 する必要がありますが、ジョブクラスに Illuminate\Bus\Batchable トレイトを追加する必要があります。このトレイトは、ジョブが実行されている現在のバッチを取得するために使用できる batch メソッドへのアクセスを提供します:

    <?php

namespace App\Jobs;

use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ImportCsv implements ShouldQueue
{
use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

/**
* Execute the job.
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
// Determine if the batch has been cancelled...

return;
}

// Import a portion of the CSV file...
}
}

バッチのディスパッチ

ジョブのバッチをディスパッチするには、Bus ファサードの batch メソッドを使用する必要があります。もちろん、バッチ処理は主に完了コールバックと組み合わせて使用すると便利です。そのため、バッチの完了コールバックを定義するために thencatchfinally メソッドを使用できます。これらのコールバックは、呼び出されるときに Illuminate\Bus\Batch インスタンスを受け取ります。この例では、CSV ファイルから一定数の行を処理する一括のジョブをキューイングしていると想定します。

    use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;

$batch = Bus::batch([
new ImportCsv(1, 100),
new ImportCsv(101, 200),
new ImportCsv(201, 300),
new ImportCsv(301, 400),
new ImportCsv(401, 500),
])->before(function (Batch $batch) {
// The batch has been created but no jobs have been added...
})->progress(function (Batch $batch) {
// A single job has completed successfully...
})->then(function (Batch $batch) {
// All jobs completed successfully...
})->catch(function (Batch $batch, Throwable $e) {
// First batch job failure detected...
})->finally(function (Batch $batch) {
// The batch has finished executing...
})->dispatch();

return $batch->id;

バッチのIDは、$batch->id プロパティを介してアクセスでき、バッチがディスパッチされた後にバッチに関する情報をクエリするために使用できます。

警告

バッチコールバックはシリアライズされ、後でLaravelキューによって実行されるため、コールバック内で$this変数を使用しないでください。さらに、バッチジョブはデータベーストランザクション内にラップされているため、暗黙のコミットをトリガーするデータベースステートメントはジョブ内で実行してはいけません。

バッチの名前付け

Laravel HorizonやLaravel Telescopeなどのツールは、バッチが名前付けされている場合にバッチに関するより使いやすいデバッグ情報を提供する場合があります。バッチに任意の名前を割り当てるには、バッチを定義する際にnameメソッドを呼び出すことができます:

    $batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->name('Import CSV')->dispatch();

バッチの接続とキュー

バッチジョブで使用する接続とキューを指定したい場合は、onConnectionメソッドとonQueueメソッドを使用できます。すべてのバッチジョブは同じ接続とキュー内で実行する必要があります:

    $batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->onConnection('redis')->onQueue('imports')->dispatch();

チェーンとバッチ

バッチ内でチェーンされたジョブのセットを定義するには、チェーンされたジョブを配列内に配置します。たとえば、2つのジョブチェーンを並行して実行し、両方のジョブチェーンが処理を完了したときにコールバックを実行することができます:

    use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;

Bus::batch([
[
new ReleasePodcast(1),
new SendPodcastReleaseNotification(1),
],
[
new ReleasePodcast(2),
new SendPodcastReleaseNotification(2),
],
])->then(function (Batch $batch) {
// ...
})->dispatch();

逆に、チェーン内でジョブのバッチを実行するには、チェーン内でバッチを定義します。たとえば、最初に複数のポッドキャストをリリースするためのジョブバッチを実行し、次にリリース通知を送信するためのジョブバッチを実行できます:

    use App\Jobs\FlushPodcastCache;
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Support\Facades\Bus;

Bus::chain([
new FlushPodcastCache,
Bus::batch([
new ReleasePodcast(1),
new ReleasePodcast(2),
]),
Bus::batch([
new SendPodcastReleaseNotification(1),
new SendPodcastReleaseNotification(2),
]),
])->dispatch();

バッチにジョブを追加する

時には、バッチジョブ内から追加のジョブをバッチに追加することが役立つことがあります。このパターンは、ウェブリクエスト中にディスパッチするのに時間がかかりすぎるかもしれない数千のジョブをバッチ化する必要がある場合に便利です。その代わりに、さらに多くのジョブでバッチを補充する初期バッチの「ローダー」ジョブをディスパッチしたい場合があります:

    $batch = Bus::batch([
new LoadImportBatch,
new LoadImportBatch,
new LoadImportBatch,
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->name('Import Contacts')->dispatch();

この例では、LoadImportBatch ジョブを使用してバッチに追加のジョブを組み込みます。これを達成するために、ジョブの batch メソッドを介してアクセスできるバッチインスタンス上の add メソッドを使用することができます:

    use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;

/**
* Execute the job.
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
return;
}

$this->batch()->add(Collection::times(1000, function () {
return new ImportContacts;
}));
}

警告

同じバッチに属するジョブからのみ、バッチにジョブを追加できます。

バッチの検査

バッチ完了コールバックに提供される Illuminate\Bus\Batch インスタンスには、特定のジョブバッチを操作および検査するのに役立つさまざまなプロパティとメソッドがあります:

    // The UUID of the batch...
$batch->id;

// The name of the batch (if applicable)...
$batch->name;

// The number of jobs assigned to the batch...
$batch->totalJobs;

// The number of jobs that have not been processed by the queue...
$batch->pendingJobs;

// The number of jobs that have failed...
$batch->failedJobs;

// The number of jobs that have been processed thus far...
$batch->processedJobs();

// The completion percentage of the batch (0-100)...
$batch->progress();

// Indicates if the batch has finished executing...
$batch->finished();

// Cancel the execution of the batch...
$batch->cancel();

// Indicates if the batch has been cancelled...
$batch->cancelled();

ルートからバッチを返す

すべての Illuminate\Bus\Batch インスタンスは JSON シリアライズ可能であり、アプリケーションのルートの1つから直接返すことで、バッチに関する情報を含む JSON ペイロードを取得できます。これにより、アプリケーションの UI にバッチの完了進捗状況に関する情報を表示するのが便利になります。

ID によってバッチを取得するには、Bus ファサードの findBatch メソッドを使用できます:

    use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;

Route::get('/batch/{batchId}', function (string $batchId) {
return Bus::findBatch($batchId);
});

バッチのキャンセル

時々、特定のバッチの実行をキャンセルする必要があるかもしれません。これは、Illuminate\Bus\Batch インスタンス上の cancel メソッドを呼び出すことで達成できます:

    /**
* Execute the job.
*/
public function handle(): void
{
if ($this->user->exceedsImportLimit()) {
return $this->batch()->cancel();
}

if ($this->batch()->cancelled()) {
return;
}
}

前の例で気づいたように、バッチ処理されたジョブは通常、実行を続行する前に対応するバッチがキャンセルされたかどうかを判断する必要があります。ただし、便宜上、ジョブに SkipIfBatchCancelled ミドルウェア を割り当てることもできます。その名前が示すように、このミドルウェアは、対応するバッチがキャンセルされている場合にジョブを処理しないように Laravel に指示します:

    use Illuminate\Queue\Middleware\SkipIfBatchCancelled;

/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [new SkipIfBatchCancelled];
}

バッチの失敗

バッチ処理されたジョブが失敗すると、最初に失敗したジョブに割り当てられた catch コールバックが呼び出されます。

失敗を許可する

バッチ内のジョブが失敗した場合、Laravel は自動的にバッチを "キャンセル" とマークします。希望する場合は、ジョブの失敗が自動的にバッチをキャンセルしたりしないようにこの動作を無効にすることができます。これは、バッチをディスパッチする際に allowFailures メソッドを呼び出すことで達成できます:

    $batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->allowFailures()->dispatch();

失敗したバッチジョブの再試行

便宜上、Laravel は queue:retry-batch Artisan コマンドを提供しており、指定されたバッチのすべての失敗したジョブを簡単に再試行できます。queue:retry-batch コマンドは、失敗したジョブを再試行する必要があるバッチの UUID を受け入れます:

php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5

バッチの整理

整理を行わないと、job_batches テーブルは非常に速くレコードを蓄積します。これを緩和するために、queue:prune-batches Artisan コマンドを毎日実行するように スケジュール する必要があります:

    use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches')->daily();

デフォルトでは、24 時間以上経過したすべての完了したバッチが整理されます。バッチデータを保持する期間を決定するためにコマンドを呼び出す際に hours オプションを使用できます。たとえば、次のコマンドは、48 時間以上前に終了したすべてのバッチを削除します:

    use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48')->daily();

時々、jobs_batches テーブルには、成功裏に完了しなかったバッチのレコードが蓄積される場合があります。たとえば、ジョブが失敗し、そのジョブが再試行されずに成功しなかったバッチの場合などです。queue:prune-batches コマンドに unfinished オプションを使用して、これらの未完了のバッチレコードを整理するように指示できます:

    use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();

同様に、jobs_batches テーブルには、キャンセルされたバッチのレコードも蓄積される場合があります。queue:prune-batches コマンドに cancelled オプションを使用して、これらのキャンセルされたバッチレコードを整理するように指示できます:

    use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();

DynamoDB にバッチを保存する

Laravel は、リレーショナルデータベースの代わりに DynamoDB にバッチのメタ情報を保存するサポートも提供しています。ただし、すべてのバッチレコードを保存するための DynamoDB テーブルを手動で作成する必要があります。

通常、このテーブルは job_batches という名前にする必要がありますが、アプリケーションの queue 構成ファイル内の queue.batching.table 構成値に基づいてテーブルの名前を付ける必要があります。

DynamoDB バッチテーブルの構成

job_batches テーブルには、application という文字列のプライマリパーティションキーと id という文字列のプライマリソートキーが必要です。キーの application 部分には、アプリケーションの app 構成ファイル内の name 構成値で定義されたアプリケーション名が含まれます。アプリケーション名がDynamoDBテーブルのキーの一部であるため、同じテーブルを使用して複数のLaravelアプリケーションのジョブバッチを保存できます。

さらに、DynamoDB でバッチの剪定 を活用したい場合は、テーブルに ttl 属性を定義することができます。

DynamoDB の構成

次に、AWS SDK をインストールして、Laravel アプリケーションが Amazon DynamoDB と通信できるようにします:

composer require aws/aws-sdk-php

次に、queue.batching.driver 構成オプションの値を dynamodb に設定します。さらに、batching 構成配列内で keysecretregion 構成オプションを定義する必要があります。これらのオプションは AWS との認証に使用されます。dynamodb ドライバを使用する場合、queue.batching.database 構成オプションは不要です:

'batching' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
],

DynamoDB でのバッチの剪定

ジョブバッチ情報を保存するために DynamoDB を利用する場合、リレーショナルデータベースに保存されているバッチを剪定するために通常使用される剪定コマンドは機能しません。代わりに、古いバッチのレコードを自動的に削除するために DynamoDB のネイティブ TTL 機能 を利用できます。

DynamoDB テーブルに ttl 属性を定義した場合、Laravel にバッチレコードを剪定する方法を指示する構成パラメータを定義できます。queue.batching.ttl_attribute 構成値は TTL を保持する属性の名前を定義し、queue.batching.ttl 構成値は、レコードが最後に更新された時点から DynamoDB テーブルからバッチレコードを削除できる秒数を定義します:

'batching' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
'ttl_attribute' => 'ttl',
'ttl' => 60 * 60 * 24 * 7, // 7 days...
],

クロージャのキューイング

ジョブクラスをキューにディスパッチする代わりに、クロージャをディスパッチすることもできます。これは、現在のリクエストサイクルの外で実行する必要がある素早く簡単なタスクに適しています。クロージャをキューにディスパッチする際、クロージャのコード内容は暗号化され、途中で変更されないようになっています:

    $podcast = App\Podcast::find(1);

dispatch(function () use ($podcast) {
$podcast->publish();
});

catch メソッドを使用すると、キューに入れられたクロージャがすべてのキューの設定されたリトライ試行を使い果たしても正常に完了しなかった場合に実行されるクロージャを指定できます:

    use Throwable;

dispatch(function () use ($podcast) {
$podcast->publish();
})->catch(function (Throwable $e) {
// This job has failed...
});

警告

catch コールバックはシリアライズされ、後でLaravelキューによって実行されるため、catch コールバック内で$this変数を使用しないでください。

キューワーカーの実行

queue:work コマンド

Laravelには、新しいジョブがキューにプッシュされるたびにキューワーカーを起動し、ジョブを処理するArtisanコマンドが含まれています。queue:work Artisanコマンドを使用してワーカーを実行できます。queue:workコマンドを開始すると、コマンドが手動で停止されるか、ターミナルを閉じるまで実行が続行されます:

php artisan queue:work
注記

queue:workプロセスをバックグラウンドで永続的に実行するためには、Supervisorなどのプロセスモニターを使用して、キューワーカーが停止しないようにする必要があります。

queue:workコマンドを呼び出す際に、処理されたジョブIDをコマンドの出力に含めたい場合は、-vフラグを含めることができます:

php artisan queue:work -v

キューワーカーは長寿命のプロセスであり、起動されたアプリケーションの状態をメモリに保存します。そのため、起動後にコードベースに変更があっても気づかないでしょう。したがって、デプロイプロセス中には、キューワーカーを再起動することを忘れないでください。また、アプリケーションによって作成または変更された静的状態は、ジョブ間で自動的にリセットされないことを覚えておいてください。

または、queue:listen コマンドを実行することもできます。queue:listen コマンドを使用すると、更新されたコードを再読み込みしたりアプリケーションの状態をリセットしたりする際に、ワーカーを手動で再起動する必要はありません。ただし、このコマンドは queue:work コマンドよりも効率がかなり低いです:

php artisan queue:listen

複数のキューワーカーを実行する

複数のワーカーをキューに割り当ててジョブを並行して処理するには、単純に複数の queue:work プロセスを開始すればよいです。これは、ターミナルの複数のタブを使用してローカルで行うか、プロセスマネージャの構成設定を使用して本番環境で行うことができます。Supervisor を使用する場合numprocs 構成値を使用できます。

接続とキューの指定

ワーカーが利用するキュー接続を指定することもできます。work コマンドに渡す接続名は、config/queue.php 構成ファイルで定義されている接続のいずれかに対応している必要があります:

php artisan queue:work redis

デフォルトでは、queue:work コマンドは指定された接続のデフォルトキューのジョブのみを処理します。ただし、特定のキューのみを処理するようにキューワーカーをさらにカスタマイズすることもできます。たとえば、すべてのメールが redis キュー接続の emails キューで処理される場合、次のコマンドを使用してそのキューのみを処理するワーカーを開始できます:

php artisan queue:work redis --queue=emails

指定された数のジョブを処理する

--once オプションを使用して、ワーカーに対してキューから単一のジョブのみを処理するように指示できます:

php artisan queue:work --once

--max-jobs オプションを使用して、指定された数のジョブを処理してから終了するようにワーカーに指示できます。このオプションは、Supervisorと組み合わせて使用すると便利であり、一定数のジョブを処理した後にワーカーが自動的に再起動され、蓄積されたメモリが解放されるようになります。

php artisan queue:work --max-jobs=1000

すべてのキューに入れられたジョブを処理してから終了する

--stop-when-empty オプションを使用して、ワーカーにすべてのジョブを処理してから gracefully 終了するように指示することができます。このオプションは、Laravel キューを Docker コンテナ内で処理し、キューが空になった後にコンテナをシャットダウンしたい場合に便利です:

php artisan queue:work --stop-when-empty

指定された秒数の間ジョブを処理する

--max-time オプションを使用して、ワーカーに指定された秒数の間ジョブを処理してから終了するように指示することができます。このオプションは、Supervisor と組み合わせて使用すると便利で、指定された時間の間ジョブを処理した後にワーカーが自動的に再起動され、蓄積されたメモリが解放されます:

# Process jobs for one hour and then exit...
php artisan queue:work --max-time=3600

ワーカーのスリープ期間

キューにジョブがある場合、ワーカーはジョブの間に遅延なしでジョブを処理し続けます。ただし、sleep オプションは、ジョブが利用できない場合にワーカーが何秒間 "スリープ" するかを決定します。もちろん、スリープ中は新しいジョブは処理されません:

php artisan queue:work --sleep=3

メンテナンスモードとキュー

アプリケーションが メンテナンスモード にある間は、キューに入れられたジョブは処理されません。アプリケーションがメンテナンスモードから出ると、ジョブは通常通り処理されます。

メンテナンスモードが有効になっている場合でも、キューのワーカーにジョブを処理させるには --force オプションを使用することができます:

php artisan queue:work --force

リソースに関する考慮事項

デーモンキューワーカーは各ジョブを処理する前にフレームワークを "再起動" しません。したがって、各ジョブが完了した後に重いリソースを解放する必要があります。たとえば、GD ライブラリを使用して画像処理を行っている場合は、画像処理が完了したら imagedestroy でメモリを解放する必要があります。

キューの優先順位

時には、キューの処理方法を優先順位付けしたい場合があります。たとえば、config/queue.php 構成ファイルで、redis 接続のデフォルトの queuelow に設定することができます。ただし、時折、ジョブを high 優先度のキューにプッシュしたい場合があります。以下のように行います:

    dispatch((new Job)->onQueue('high'));

work コマンドにキュー名のコンマ区切りリストを渡すことで、low キューのジョブに進む前にすべての high キューのジョブが処理されるようにするワーカーを起動します:

php artisan queue:work --queue=high,low

キューワーカーとデプロイメント

キューワーカーは長寿命プロセスであるため、コードの変更を検出するには再起動する必要があります。そのため、キューワーカーを使用するアプリケーションをデプロイする最も簡単な方法は、デプロイプロセス中にワーカーを再起動することです。queue:restart コマンドを実行して、すべてのワーカーを優雅に再起動できます:

php artisan queue:restart

このコマンドは、現在のジョブの処理が完了した後に、すべてのキューワーカーに優雅に終了するように指示します。queue:restart コマンドが実行されると、キューワーカーは終了するため、Supervisor のようなプロセスマネージャを実行して、キューワーカーを自動的に再起動する必要があります。

注記

キューは再起信号を保存するために cache を使用するため、この機能を使用する前にアプリケーションに適切に構成されたキャッシュドライバがあることを確認する必要があります。

ジョブの有効期限とタイムアウト

ジョブの有効期限

config/queue.php 構成ファイルでは、各キュー接続が retry_after オプションを定義します。このオプションは、処理中のジョブを再試行するまでの秒数を指定します。たとえば、retry_after の値が 90 に設定されている場合、ジョブは処理中に90秒経過してもリリースされず削除されない場合、キューに再リリースされます。通常、retry_after の値は、ジョブが合理的に完了するのにかかる最大秒数に設定する必要があります。

警告

retry_after 値を含まない唯一のキュー接続は Amazon SQS です。SQS は、デフォルトの可視性タイムアウト に基づいてジョブを再試行します。この値は AWS コンソール内で管理されます。

ワーカータイムアウト

queue:work Artisan コマンドには --timeout オプションが公開されています。デフォルトでは、--timeout の値は 60 秒です。ジョブがタイムアウト値で指定された秒数よりも長く処理されている場合、ジョブを処理しているワーカーはエラーで終了します。通常、サーバーに構成されたプロセスマネージャによってワーカーは自動的に再起動されます。

php artisan queue:work --timeout=60

retry_after 構成オプションと --timeout CLI オプションは異なりますが、ジョブが失われず、ジョブが正常に一度だけ処理されるように協力しています。

警告

--timeout の値は常に retry_after 構成値よりも数秒短くする必要があります。これにより、フリーズしたジョブを処理しているワーカーが常にジョブが再試行される前に終了されることが保証されます。--timeout オプションが retry_after 構成値よりも長い場合、ジョブが二重に処理される可能性があります。

Supervisor の構成

本番環境では、queue:work プロセスを実行し続ける方法が必要です。queue:work プロセスは、ワーカータイムアウトが超過した場合や queue:restart コマンドの実行など、さまざまな理由で停止する可能性があります。

そのため、queue:work プロセスが終了したときを検出し、自動的に再起動するプロセスモニターを構成する必要があります。さらに、プロセスモニターは、同時に実行する queue:work プロセスの数を指定できるようにすることができます。Supervisor は Linux 環境で一般的に使用されるプロセスモニターであり、以下のドキュメントでその構成方法について説明します。

Supervisor のインストール

SupervisorはLinuxオペレーティングシステムのプロセス監視ツールであり、queue:workプロセスが失敗した場合に自動的に再起動します。UbuntuにSupervisorをインストールするには、次のコマンドを使用できます:

sudo apt-get install supervisor
注記

Supervisorの設定と管理が複雑に感じられる場合は、Laravel Forgeを使用することを検討してください。これにより、プロダクション環境のLaravelプロジェクトに自動的にSupervisorがインストールおよび構成されます。

Supervisorの設定

Supervisorの設定ファイルは通常、/etc/supervisor/conf.dディレクトリに保存されます。このディレクトリ内で、プロセスの監視方法をSupervisorに指示する様々な設定ファイルを作成できます。例えば、laravel-worker.confファイルを作成して、queue:workプロセスを開始および監視するように指示することができます:

[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600

この例では、numprocsディレクティブはSupervisorに8つのqueue:workプロセスを実行し、すべてを監視し、失敗した場合に自動的に再起動するように指示します。設定のcommandディレクティブを、希望するキュー接続とワーカーオプションに合わせて変更する必要があります。

警告

stopwaitsecsの値が、最も長い実行時間を要するジョブの秒数よりも大きいことを確認する必要があります。そうでない場合、Supervisorはジョブを処理が完了する前に終了させる可能性があります。

Supervisorの起動

設定ファイルが作成されたら、次のコマンドを使用してSupervisorの設定を更新し、プロセスを起動できます:

sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start "laravel-worker:*"

Supervisorに関する詳細情報については、Supervisorのドキュメントを参照してください。

失敗したジョブの処理

時々、キューに入れたジョブが失敗することがあります。心配しないでください、すべてが計画通りにはいかないこともあります! Laravelには、ジョブが試行される最大回数を指定する便利な方法が含まれています。非同期ジョブがこの試行回数を超えると、failed_jobsデータベーステーブルに挿入されます。同期的にディスパッチされたジョブが失敗した場合は、このテーブルに保存されず、その例外はアプリケーションによって直ちに処理されます。

新しいLaravelアプリケーションには通常、failed_jobsテーブルを作成するためのマイグレーションがすでに存在しています。ただし、このテーブルのためのマイグレーションがアプリケーションに含まれていない場合は、make:queue-failed-tableコマンドを使用してマイグレーションを作成できます:

php artisan make:queue-failed-table

php artisan migrate

キューワーカーを実行する際に、queue:workコマンドの--triesスイッチを使用して、ジョブが試行される最大回数を指定できます。--triesオプションに値を指定しない場合、ジョブはジョブクラスの$triesプロパティで指定された回数だけ試行されます:

php artisan queue:work redis --tries=3

--backoffオプションを使用すると、例外が発生したジョブを再試行する前にLaravelが待機する秒数を指定できます。デフォルトでは、ジョブはすぐに再度キューに戻されて再試行されます:

php artisan queue:work redis --tries=3 --backoff=3

例外が発生したジョブを再試行する前にLaravelが待機する秒数をジョブごとに設定したい場合は、ジョブクラスでbackoffプロパティを定義することができます:

    /**
* The number of seconds to wait before retrying the job.
*
* @var int
*/
public $backoff = 3;

ジョブのバックオフ時間を決定するためのより複雑なロジックが必要な場合は、ジョブクラスでbackoffメソッドを定義することができます:

    /**
* Calculate the number of seconds to wait before retrying the job.
*/
public function backoff(): int
{
return 3;
}

backoffメソッドからバックオフ値の配列を返すことで、「指数的」なバックオフを簡単に設定できます。この例では、最初の再試行の遅延は1秒、2回目の再試行は5秒、3回目の再試行は10秒であり、それ以降の再試行は残りの試行がある場合はすべて10秒になります:

    /**
* Calculate the number of seconds to wait before retrying the job.
*
* @return array<int, int>
*/
public function backoff(): array
{
return [1, 5, 10];
}

失敗したジョブの後始末

特定のジョブが失敗した場合、ユーザーにアラートを送信したり、ジョブによって部分的に完了したアクションを元に戻したりしたい場合があります。これを実現するために、ジョブクラスでfailedメソッドを定義することができます。ジョブの失敗原因となったThrowableインスタンスがfailedメソッドに渡されます:

    <?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Throwable;

class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;

/**
* Create a new job instance.
*/
public function __construct(
public Podcast $podcast,
) {}

/**
* Execute the job.
*/
public function handle(AudioProcessor $processor): void
{
// Process uploaded podcast...
}

/**
* Handle a job failure.
*/
public function failed(?Throwable $exception): void
{
// Send user notification of failure, etc...
}
}

警告

ジョブの failed メソッドを呼び出す前に、handle メソッド内で発生したクラスプロパティの変更は失われる可能性があるため、ジョブの新しいインスタンスが生成されます。

失敗したジョブの再試行

failed_jobs データベーステーブルに挿入されたすべての失敗したジョブを表示するには、queue:failed Artisan コマンドを使用できます:

php artisan queue:failed

queue:failed コマンドは、ジョブID、接続、キュー、失敗時刻などのジョブに関する情報をリストします。ジョブIDは、失敗したジョブを再試行するために使用できます。たとえば、ID が ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece の失敗したジョブを再試行するには、次のコマンドを発行します:

php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece

必要に応じて、複数のIDをコマンドに渡すこともできます:

php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d

特定のキューのすべての失敗したジョブを再試行することもできます:

php artisan queue:retry --queue=name

すべての失敗したジョブを再試行するには、queue:retry コマンドを実行し、ID として all を渡します:

php artisan queue:retry all

失敗したジョブを削除する場合は、queue:forget コマンドを使用できます:

php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
注記

Horizon を使用する場合は、queue:forget コマンドの代わりに horizon:forget コマンドを使用して、失敗したジョブを削除する必要があります。

failed_jobs テーブルからすべての失敗したジョブを削除するには、queue:flush コマンドを使用できます:

php artisan queue:flush

存在しないモデルを無視する

Eloquent モデルをジョブに注入すると、モデルは自動的にシリアル化され、キューに配置される前にデータベースから再取得されます。ただし、ジョブがワーカーによって処理される待機中にモデルが削除された場合、ジョブは ModelNotFoundException と失敗する可能性があります。

便宜上、ジョブの deleteWhenMissingModels プロパティを true に設定することで、存在しないモデルを持つジョブを自動的に削除するように選択できます。このプロパティが true に設定されている場合、Laravel は例外を発生させずにジョブを静かに破棄します:

    /**
* Delete the job if its models no longer exist.
*
* @var bool
*/
public $deleteWhenMissingModels = true;

フェイルしたジョブの剪定

アプリケーションの failed_jobs テーブルのレコードを剪定するには、queue:prune-failed Artisan コマンドを実行します:

php artisan queue:prune-failed

デフォルトでは、24時間以上前に挿入されたすべてのフェイルしたジョブレコードが剪定されます。コマンドに --hours オプションを指定すると、最後の N 時間以内に挿入されたフェイルしたジョブレコードのみが保持されます。たとえば、次のコマンドは、48時間以上前に挿入されたすべてのフェイルしたジョブレコードを削除します:

php artisan queue:prune-failed --hours=48

DynamoDB にフェイルしたジョブを保存

Laravel は、リレーショナルデータベーステーブルの代わりに DynamoDB にフェイルしたジョブレコードを保存するサポートも提供しています。ただし、すべてのフェイルしたジョブレコードを保存するために DynamoDB テーブルを手動で作成する必要があります。通常、このテーブルは failed_jobs という名前にする必要がありますが、アプリケーションの queue 構成ファイル内の queue.failed.table 構成値に基づいてテーブルの名前を付ける必要があります。

failed_jobs テーブルには、application という名前の文字列プライマリパーティションキーと uuid という名前の文字列プライマリソートキーが必要です。キーの application 部分には、アプリケーションの名前が含まれます。これは、アプリケーションの app 構成ファイル内の name 構成値で定義されます。アプリケーション名が DynamoDB テーブルのキーの一部であるため、同じテーブルを複数の Laravel アプリケーションのフェイルしたジョブを保存するために使用できます。

さらに、AWS SDK をインストールして、Laravel アプリケーションが Amazon DynamoDB と通信できるようにしてください:

composer require aws/aws-sdk-php

次に、queue.failed.driver 構成オプションの値を dynamodb に設定します。さらに、フェイルしたジョブ構成配列内で keysecretregion 構成オプションを定義する必要があります。これらのオプションは AWS と認証するために使用されます。dynamodb ドライバを使用する場合、queue.failed.database 構成オプションは不要です。```

'failed' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'failed_jobs',
],

失敗したジョブの保存を無効にする

queue.failed.driver 構成オプションの値を null に設定することで、Laravel に失敗したジョブを保存せずに破棄するよう指示することができます。通常、これは QUEUE_FAILED_DRIVER 環境変数を介して行われます:

QUEUE_FAILED_DRIVER=null

失敗したジョブのイベント

ジョブが失敗したときに呼び出されるイベントリスナーを登録したい場合は、Queue ファサードの failing メソッドを使用することができます。たとえば、Laravel に含まれる AppServiceProviderboot メソッドからこのイベントにクロージャをアタッチすることができます:

    <?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;

class AppServiceProvider extends ServiceProvider
{
/**
* Register any application services.
*/
public function register(): void
{
// ...
}

/**
* Bootstrap any application services.
*/
public function boot(): void
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
}

キューからジョブをクリアする

注記

Horizon を使用する場合は、queue:clear コマンドの代わりに horizon:clear コマンドを使用してキューからジョブをクリアする必要があります。

デフォルトの接続のデフォルトキューからすべてのジョブを削除したい場合は、queue:clear Artisan コマンドを使用することができます:

php artisan queue:clear

特定の接続とキューからジョブを削除するには、connection 引数と queue オプションを指定することもできます:

php artisan queue:clear redis --queue=emails
警告

キューからジョブをクリアする機能は、SQS、Redis、およびデータベースのキュードライバーでのみ利用可能です。さらに、SQS メッセージの削除プロセスには最大 60 秒かかるため、キューをクリアした後に最大 60 秒以内に SQS キューに送信されたジョブも削除される可能性があります。

キューのモニタリング

キューが急激に多くのジョブを受信すると、ジョブの完了までの待ち時間が長くなり、オーバーロード状態になる可能性があります。必要に応じて、Laravel はキューのジョブ数が指定された閾値を超えたときにアラートを送信することができます。

開始するには、queue:monitor コマンドを 1 分ごとに実行 するようスケジュールする必要があります。このコマンドは、モニタリングしたいキューの名前と、希望するジョブ数の閾値を受け入れます:

php artisan queue:monitor redis:default,redis:deployments --max=100

このコマンドをスケジュールするだけでは、キューが過負荷状態であることを通知するアラートをトリガーするには十分ではありません。コマンドが閾値を超えるジョブ数を持つキューに遭遇すると、Illuminate\Queue\Events\QueueBusy イベントがディスパッチされます。このイベントをアプリケーションの AppServiceProvider 内でリッスンして、通知を送信するために使用することができます。

use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;

/**
* Bootstrap any application services.
*/
public function boot(): void
{
Event::listen(function (QueueBusy $event) {
Notification::route('mail', 'dev@example.com')
->notify(new QueueHasLongWaitTime(
$event->connection,
$event->queue,
$event->size
));
});
}

テスト

ジョブをディスパッチするコードをテストする際には、ジョブ自体を実際に実行しないように Laravel に指示することができます。ジョブのコードは、それをディスパッチするコードとは別に直接テストすることができます。もちろん、ジョブ自体をテストするには、テスト内でジョブインスタンスをインスタンス化し、handle メソッドを直接呼び出すことができます。

Queue ファサードの fake メソッドを使用して、キューにジョブが実際にプッシュされないようにすることができます。Queue ファサードの fake メソッドを呼び出した後、アプリケーションがキューにジョブをプッシュしようとしたことをアサートすることができます。

<?php

use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;

test('orders can be shipped', function () {
Queue::fake();

// Perform order shipping...

// Assert that no jobs were pushed...
Queue::assertNothingPushed();

// Assert a job was pushed to a given queue...
Queue::assertPushedOn('queue-name', ShipOrder::class);

// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);

// Assert a job was not pushed...
Queue::assertNotPushed(AnotherJob::class);

// Assert that a Closure was pushed to the queue...
Queue::assertClosurePushed();

// Assert the total number of jobs that were pushed...
Queue::assertCount(3);
});
<?php

namespace Tests\Feature;

use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;

class ExampleTest extends TestCase
{
public function test_orders_can_be_shipped(): void
{
Queue::fake();

// Perform order shipping...

// Assert that no jobs were pushed...
Queue::assertNothingPushed();

// Assert a job was pushed to a given queue...
Queue::assertPushedOn('queue-name', ShipOrder::class);

// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);

// Assert a job was not pushed...
Queue::assertNotPushed(AnotherJob::class);

// Assert that a Closure was pushed to the queue...
Queue::assertClosurePushed();

// Assert the total number of jobs that were pushed...
Queue::assertCount(3);
}
}

assertPushed または assertNotPushed メソッドにクロージャを渡すことで、指定された "真理テスト" をパスするジョブがプッシュされたことをアサートすることができます。少なくとも指定された真理テストをパスするジョブがプッシュされた場合、アサーションは成功します。

    Queue::assertPushed(function (ShipOrder $job) use ($order) {
return $job->order->id === $order->id;
});

ジョブの一部を偽装する

他のジョブを通常通り実行させながら特定のジョブのみを偽装したい場合は、fake メソッドに偽装する必要のあるジョブのクラス名を渡すことができます。

test('orders can be shipped', function () {
Queue::fake([
ShipOrder::class,
]);

// Perform order shipping...

// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
});
public function test_orders_can_be_shipped(): void
{
Queue::fake([
ShipOrder::class,
]);

// Perform order shipping...

// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
}

except メソッドを使用して、指定されたジョブのセットを除くすべてのジョブを偽装することができます。

    Queue::fake()->except([
ShipOrder::class,
]);

ジョブチェーンのテスト

ジョブチェーンをテストするには、Bus ファサードの偽装機能を利用する必要があります。Bus ファサードの assertChained メソッドを使用して、ジョブチェーン がディスパッチされたことをアサートすることができます。assertChained メソッドは、最初の引数としてチェーンされたジョブの配列を受け入れます。

    use App\Jobs\RecordShipment;
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Support\Facades\Bus;

Bus::fake();

// ...

Bus::assertChained([
ShipOrder::class,
RecordShipment::class,
UpdateInventory::class
]);

上記の例のように、チェーンされたジョブの配列はジョブのクラス名の配列である場合があります。ただし、実際のジョブインスタンスの配列を提供することもできます。その場合、Laravelはアプリケーションによってディスパッチされたチェーンされたジョブのクラスが同じであり、同じプロパティ値を持っていることを保証します:

    Bus::assertChained([
new ShipOrder,
new RecordShipment,
new UpdateInventory,
]);

ジョブがジョブのチェーンなしでプッシュされたことをアサートするには、assertDispatchedWithoutChain メソッドを使用できます:

    Bus::assertDispatchedWithoutChain(ShipOrder::class);

チェーンの変更のテスト

チェーンされたジョブが既存のチェーンにジョブを追加または追加する場合、ジョブの assertHasChain メソッドを使用して、ジョブが残りのジョブの予想されるチェーンを持っていることをアサートできます:

$job = new ProcessPodcast;

$job->handle();

$job->assertHasChain([
new TranscribePodcast,
new OptimizePodcast,
new ReleasePodcast,
]);

assertDoesntHaveChain メソッドを使用して、ジョブの残りのチェーンが空であることをアサートできます:

$job->assertDoesntHaveChain();

チェーンされたバッチのテスト

ジョブチェーンにジョブのバッチが含まれている場合、Bus::chainedBatch 定義をチェーンアサーション内に挿入することで、チェーンされたバッチが期待通りであることをアサートできます:

    use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;

Bus::assertChained([
new ShipOrder,
Bus::chainedBatch(function (PendingBatch $batch) {
return $batch->jobs->count() === 3;
}),
new UpdateInventory,
]);

ジョブバッチのテスト

Bus ファサードの assertBatched メソッドを使用して、ジョブのバッチがディスパッチされたことをアサートできます。assertBatched メソッドに与えられたクロージャは、Illuminate\Bus\PendingBatch のインスタンスを受け取り、バッチ内のジョブを検査するために使用できます:

    use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;

Bus::fake();

// ...

Bus::assertBatched(function (PendingBatch $batch) {
return $batch->name == 'import-csv' &&
$batch->jobs->count() === 10;
});

assertBatchCount メソッドを使用して、指定されたバッチ数がディスパッチされたことをアサートできます:

    Bus::assertBatchCount(3);

assertNothingBatched を使用して、バッチがディスパッチされなかったことをアサートできます:

    Bus::assertNothingBatched();

ジョブ / バッチの相互作用のテスト

さらに、個々のジョブがその基礎となるバッチとの相互作用をテストする必要がある場合があります。たとえば、ジョブがそのバッチのさらなる処理をキャンセルするかどうかをテストする必要があるかもしれません。これを達成するには、withFakeBatch メソッドを使用してジョブに偽のバッチを割り当てる必要があります。withFakeBatch メソッドは、ジョブインスタンスと偽のバッチを含むタプルを返します:

    [$job, $batch] = (new ShipOrder)->withFakeBatch();

$job->handle();

$this->assertTrue($batch->cancelled());
$this->assertEmpty($batch->added);

ジョブ / キューの相互作用のテスト

時々、キューに入れられたジョブが自分自身をキューに戻すことをテストする必要があるかもしれません。また、ジョブが自身を削除することをテストする必要があるかもしれません。これらのキューの相互作用をテストするには、ジョブをインスタンス化し、withFakeQueueInteractions メソッドを呼び出すことができます。

ジョブのキューの相互作用が偽装された後は、ジョブのhandle メソッドを呼び出すことができます。ジョブを呼び出した後、assetReleasedassertDeletedassertFailed メソッドを使用して、ジョブのキューの相互作用に対するアサーションを行うことができます:

use App\Jobs\ProcessPodcast;

$job = (new ProcessPodcast)->withFakeQueueInteractions();

$job->handle();

$job->assertReleased(delay: 30);
$job->assertDeleted();
$job->assertFailed();

ジョブイベント

Queue ファサードbefore メソッドと after メソッドを使用すると、キューに入れられたジョブが処理される前または後に実行されるコールバックを指定できます。これらのコールバックは、追加のログ記録やダッシュボードの統計の増分を実行する絶好の機会です。通常、これらのメソッドは、サービスプロバイダboot メソッドから呼び出すべきです。たとえば、Laravel に含まれる AppServiceProvider を使用することができます:

    <?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class AppServiceProvider extends ServiceProvider
{
/**
* Register any application services.
*/
public function register(): void
{
// ...
}

/**
* Bootstrap any application services.
*/
public function boot(): void
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});

Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
}

Queue ファサードlooping メソッドを使用すると、ワーカーがキューからジョブを取得しようとする前に実行されるコールバックを指定できます。たとえば、以前に失敗したジョブによって開かれたトランザクションをロールバックするためのクロージャを登録することができます:

    use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;

Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});