Workflow/Pipelineパターン実装ガイド
概要
EC-Spokeシステムにおいて、決済・在庫連携など複数手順を跨ぐ処理を、Laravel PipelineとWorkflowパターンを用いてパイプライン化し、各ステップをAction/ジョブに分離する設計と実装方法を説明します。
関連ドキュメント: 全体アーキテクチャ
1. 基本概念
1.1 Laravel Pipelineパターンとは
Laravel Pipelineは、リクエスト処理の各段階を独立したPipeクラスに分離し、順次実行するパターンです。HTTPミドルウェアの仕組みを一般化したもので、任意の処理フローに適用できます。
特徴: - 各ステップが独立したPipeクラスとして実装される - パスオブジェクト(Context)を通じてデータを伝播 - 各ステップで処理を中断・継続・変更可能 - エラーハンドリングとロールバックが容易
1.2 Workflowパターンとは
Workflowパターンは、複数のステップからなるビジネスプロセスを管理するパターンです。各ステップの状態を追跡し、非同期処理やリトライ、失敗処理をサポートします。
特徴: - 各ステップの状態を永続化して管理 - 非同期処理(ジョブ)との統合が容易 - リトライと失敗処理の仕組みを内包 - 進捗状況の追跡と可視化が可能
1.3 使い分けの基準
| 観点 | Pipelineパターン | Workflowパターン |
|---|---|---|
| 処理方式 | 同期処理 | 同期・非同期両対応 |
| 実行時間 | 短時間(数秒以内) | 長時間(数分〜数時間) |
| 状態管理 | メモリ内のみ | データベースに永続化 |
| エラー処理 | 即座にロールバック | リトライ・手動復旧対応 |
| 進捗追跡 | 不要 | 必要 |
| 適用例 | カート計算、価格計算、バリデーション | 注文確定、決済処理、配送手配、在庫連携 |
選択の指針: - Pipeline: ユーザーリクエスト内で完結する処理、即座に結果が必要な処理 - Workflow: 外部API呼び出しが多い処理、時間がかかる処理、進捗を追跡したい処理
1.4 同期処理 vs 非同期処理(ジョブ)の選択
同期処理が適している場合: - ユーザーが結果を待つ必要がある - 処理時間が短い(数秒以内) - 即座にエラーフィードバックが必要 - トランザクション内で完結させたい
非同期処理(ジョブ)が適している場合: - 処理に時間がかかる(数秒以上) - 外部API呼び出しが多い - ユーザーが結果を待つ必要がない - リトライが必要な処理 - バッチ処理や大量データ処理
ハイブリッドアプローチ: - 最初の数ステップは同期(Pipeline)で実行 - 時間がかかるステップは非同期(ジョブ)に変換 - Workflowパターンで状態を管理
2. Laravel Pipelineパターンの詳細
2.1 基本構造
use Illuminate\Pipeline\Pipeline;
class OrderPlacementPipeline
{
public function __construct(
private Pipeline $pipeline
) {}
public function process(Order $order): OrderPlacementResult
{
$passable = new OrderPlacementContext($order);
return $this->pipeline
->send($passable)
->through([
ReserveInventoryPipe::class,
ProcessPaymentPipe::class,
ArrangeShippingPipe::class,
SendNotificationsPipe::class,
])
->then(function ($passable) {
return $passable->getResult();
});
}
}
2.2 Pipeクラスの実装
各Pipeはhandleメソッドを実装し、パスオブジェクトを受け取り、次のPipeに渡します。
declare(strict_types=1);
namespace App\Workflows\OrderPlacement\Pipes;
use App\Workflows\OrderPlacement\OrderPlacementContext;
use Closure;
class ReserveInventoryPipe
{
public function __construct(
private InventoryService $inventoryService
) {}
public function handle(OrderPlacementContext $context, Closure $next): OrderPlacementContext
{
// 在庫確保のロジック
foreach ($context->order->items as $item) {
if (!$this->inventoryService->reserve($item->product_id, $item->quantity)) {
throw new InsufficientInventoryException("在庫不足: {$item->product_id}");
}
$context->addReservedInventory($item->product_id, $item->quantity);
}
return $next($context);
}
}
Pipeクラスの特徴:
- handleメソッドは必須
- 第1引数はパスオブジェクト(Context)
- 第2引数はClosure $next(次のPipeを呼び出す)
- 次のPipeに進む場合は$next($context)を呼び出す
- 処理を中断する場合は例外をスロー
2.3 パスオブジェクト(Context)の設計
パスオブジェクトは、各Pipe間でデータを伝播するためのオブジェクトです。詳細は後述の「パスオブジェクトの設計」セクションを参照してください。
設計原則: - 不変データと可変データを分離 - 型安全性を確保 - 各ステップの結果を段階的に追加 - エラー情報を管理
2.4 エラーハンドリングとロールバック
各Pipeでエラーが発生した場合、既に実行されたステップのロールバックが必要です。
class ReserveInventoryPipe
{
public function handle(OrderPlacementContext $context, Closure $next): OrderPlacementContext
{
try {
// 在庫確保処理
foreach ($context->order->items as $item) {
$this->inventoryService->reserve($item->product_id, $item->quantity);
$context->addReservedInventory($item->product_id, $item->quantity);
}
return $next($context);
} catch (\Exception $e) {
// 既に確保した在庫を解放
$this->releaseReservedInventory($context);
throw $e;
}
}
private function releaseReservedInventory(OrderPlacementContext $context): void
{
foreach ($context->getReservedInventories() as $reserved) {
$this->inventoryService->release($reserved['product_id'], $reserved['quantity']);
}
}
}
ロールバック戦略: 1. 逆順ロールバック: 実行順の逆順で各ステップのロールバックを実行 2. 補償トランザクション: 各ステップに対応する補償アクションを定義 3. Sagaパターン: 分散トランザクション管理パターン(高度)
2.5 ミドルウェア的な処理の実装
ロギング、認証、認可などの横断的関心事をPipeとして実装できます。
class LoggingPipe
{
public function handle(OrderPlacementContext $context, Closure $next): OrderPlacementContext
{
\Log::info('Order placement started', [
'order_id' => $context->order->id,
]);
$result = $next($context);
\Log::info('Order placement completed', [
'order_id' => $context->order->id,
]);
return $result;
}
}
class AuthorizationPipe
{
public function handle(OrderPlacementContext $context, Closure $next): OrderPlacementContext
{
if (!$context->user->can('place-order')) {
throw new UnauthorizedException('Order placement not allowed');
}
return $next($context);
}
}
3. Workflowパターンの詳細
3.1 ディレクトリ構造
app/Workflows/
└── OrderPlacement/
├── OrderPlacementWorkflow.php # ワークフローオーケストレーター
├── OrderPlacementPipeline.php # パイプラインオーケストレーター(オプション)
├── OrderPlacementContext.php # ワークフロー専用のコンテキスト(Pipeline用)
├── OrderPlacementState.php # ワークフロー専用の状態管理モデル(Workflow用)
├── Actions/ # ビジネスロジック(再利用可能)
│ ├── ReserveInventoryAction.php
│ ├── ProcessPaymentAction.php
│ └── ArrangeShippingAction.php
├── Jobs/ # 非同期処理
│ ├── ReserveInventoryJob.php
│ ├── ProcessPaymentJob.php
│ └── ArrangeShippingJob.php
└── Pipes/ # Pipeline用のPipe(オプション)
├── ReserveInventoryPipe.php
└── ProcessPaymentPipe.php
app/Domains/
└── Order/
└── ValueObjects/ # ドメインの概念を表現する値オブジェクト
├── ReservedInventory.php # ドメイン層に配置
├── PaymentResult.php # ドメイン層に配置
└── ShippingArrangement.php # ドメイン層に配置
配置の原則:
- ワークフロー固有のデータ構造(Context、State等)は app/Workflows/{WorkflowName}/ 配下に配置
- ドメインの概念を表現する値オブジェクト(ReservedInventory、PaymentResult等)は app/Domains/{Domain}/ValueObjects/ に配置
- ワークフローはドメイン層の値オブジェクトを利用する
3.2 Workflowクラスの実装
Workflowクラスは、各ステップの実行順序と状態遷移を管理します。
declare(strict_types=1);
namespace App\Workflows\OrderPlacement;
use App\Models\Order;
use App\Workflows\OrderPlacement\Jobs\ReserveInventoryJob;
class OrderPlacementWorkflow
{
private const STEPS = [
'inventory_reserved',
'payment_processed',
'shipping_arranged',
'notifications_sent',
'completed',
];
public function start(Order $order): OrderPlacementState
{
// ワークフロー状態を記録
$workflowState = OrderPlacementState::create([
'order_id' => $order->id,
'current_step' => 'inventory_reserved',
'status' => 'processing',
'data' => [],
]);
// 最初のステップを実行(ジョブとして投入)
ReserveInventoryJob::dispatch($workflowState);
return $workflowState;
}
public function proceedToNextStep(OrderPlacementState $state, string $currentStep): void
{
$nextStep = $this->getNextStep($currentStep);
if ($nextStep === null) {
$state->update(['status' => 'completed']);
return;
}
$state->update(['current_step' => $nextStep]);
// 次のステップのジョブを投入
match($nextStep) {
'payment_processed' => ProcessPaymentJob::dispatch($state),
'shipping_arranged' => ArrangeShippingJob::dispatch($state),
'notifications_sent' => SendNotificationsJob::dispatch($state),
default => throw new \InvalidArgumentException("Unknown step: {$nextStep}"),
};
}
private function getNextStep(string $currentStep): ?string
{
$index = array_search($currentStep, self::STEPS);
return $index !== false && isset(self::STEPS[$index + 1])
? self::STEPS[$index + 1]
: null;
}
}
3.3 Step/Actionクラスの分離設計
各ステップのビジネスロジックはActionクラスに分離し、ジョブから呼び出します。
declare(strict_types=1);
namespace App\Workflows\OrderPlacement\Actions;
use App\Models\Order;
use App\Domains\Order\ValueObjects\ReservedInventory;
class ReserveInventoryAction
{
public function __construct(
private InventoryService $inventoryService
) {}
public function execute(Order $order): array
{
$reserved = [];
foreach ($order->items as $item) {
if (!$this->inventoryService->reserve($item->product_id, $item->quantity)) {
// 既に確保した在庫を解放
$this->releaseAll($reserved);
throw new InsufficientInventoryException("在庫不足: {$item->product_id}");
}
$reserved[] = new ReservedInventory(
productId: $item->product_id,
quantity: $item->quantity,
reservationId: $this->inventoryService->getLastReservationId(),
reservedAt: now(),
);
}
return $reserved;
}
private function releaseAll(array $reserved): void
{
foreach ($reserved as $item) {
$this->inventoryService->release($item->reservationId);
}
}
}
Actionクラスの特徴: - 単一責任の原則に従う - ビジネスロジックのみを含む(ジョブ関連の処理は含まない) - 再利用可能(他のワークフローからも呼び出せる) - テストが容易
3.4 状態管理と進行状況の追跡
状態はデータベースに永続化し、進捗を追跡します。
// マイグレーション
Schema::create('workflow_states', function (Blueprint $table) {
$table->id();
$table->string('workflow_type'); // 'order_placement'
$table->morphs('subject'); // order_id
$table->string('current_step');
$table->string('status'); // 'processing', 'completed', 'failed'
$table->json('data')->nullable(); // 各ステップの結果
$table->text('error')->nullable();
$table->timestamps();
$table->index(['workflow_type', 'status']);
});
// モデル
class OrderPlacementState extends Model
{
protected $fillable = [
'order_id',
'current_step',
'status',
'data',
'error',
];
protected function casts(): array
{
return [
'data' => 'array',
];
}
public function order(): BelongsTo
{
return $this->belongsTo(Order::class);
}
}
3.5 リトライと失敗処理
ジョブのリトライ機能と失敗処理を実装します。
declare(strict_types=1);
namespace App\Workflows\OrderPlacement\Jobs;
use App\Workflows\OrderPlacement\Actions\ReserveInventoryAction;
use App\Workflows\OrderPlacement\OrderPlacementWorkflow;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ReserveInventoryJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public int $tries = 3;
public int $backoff = 60; // 60秒後にリトライ
public function __construct(
public OrderPlacementState $workflowState
) {}
public function handle(
ReserveInventoryAction $action,
OrderPlacementWorkflow $workflow
): void {
$order = Order::findOrFail($this->workflowState->order_id);
try {
$result = $action->execute($order);
// 結果を保存
$this->workflowState->update([
'data' => array_merge($this->workflowState->data ?? [], [
'inventory_reserved' => array_map(
fn($item) => $item->toArray(),
$result
),
]),
]);
// 次のステップへ
$workflow->proceedToNextStep($this->workflowState, 'inventory_reserved');
} catch (InsufficientInventoryException $e) {
// 失敗処理(リトライしない)
$this->workflowState->update([
'status' => 'failed',
'error' => $e->getMessage(),
]);
// 通知を送信
NotifyOrderFailureJob::dispatch($order, $e);
throw $e; // ジョブを失敗としてマーク
}
}
public function failed(\Throwable $exception): void
{
// ジョブ失敗時の処理(最大リトライ回数に達した場合)
$this->workflowState->update([
'status' => 'failed',
'error' => $exception->getMessage(),
]);
// 管理者に通知
\Log::error('Order placement workflow failed', [
'workflow_state_id' => $this->workflowState->id,
'error' => $exception->getMessage(),
'trace' => $exception->getTraceAsString(),
]);
}
}
4. 決済処理の具体例
4.1 注文確定から決済完了までのフロー
1. 在庫確保(ReserveInventory)
↓ 成功
2. 注文計算(CalculateOrder)
↓ 成功
3. 決済処理(ProcessPayment)
↓ 成功
4. 注文確定(ConfirmOrder)
↓ 成功
5. 配送手配(ArrangeShipping)
↓ 成功
6. 通知送信(SendNotifications)
↓ 完了
4.2 各ステップの分離
各ステップを独立したAction/ジョブとして実装します。
// Action: 決済処理
class ProcessPaymentAction
{
public function execute(Order $order, PaymentMethod $method): PaymentResult
{
$gateway = $this->getGateway($method);
$result = $gateway->charge(
amount: $order->grand_total,
currency: $order->currency,
paymentMethod: $method,
);
return new PaymentResult(
transactionId: $result->transaction_id,
status: $result->success ? 'success' : 'failed',
errorMessage: $result->error_message,
processedAt: now(),
);
}
}
// Job: 決済処理ジョブ
class ProcessPaymentJob implements ShouldQueue
{
public function handle(
ProcessPaymentAction $action,
OrderPlacementWorkflow $workflow
): void {
$order = Order::findOrFail($this->workflowState->order_id);
$paymentMethod = PaymentMethod::find($this->workflowState->data['payment_method_id']);
$result = $action->execute($order, $paymentMethod);
if (!$result->isSuccess()) {
// 決済失敗時の処理
$this->handlePaymentFailure($order, $result);
return;
}
// 結果を保存
$this->workflowState->update([
'data' => array_merge($this->workflowState->data ?? [], [
'payment_result' => $result->toArray(),
]),
]);
// 次のステップへ
$workflow->proceedToNextStep($this->workflowState, 'payment_processed');
}
}
4.3 エラー時のロールバック処理
決済処理で失敗した場合、既に実行されたステップをロールバックします。
private function handlePaymentFailure(Order $order, PaymentResult $result): void
{
// 1. 在庫を解放
$reservedInventories = $this->workflowState->data['inventory_reserved'] ?? [];
foreach ($reservedInventories as $reserved) {
$this->inventoryService->release($reserved['reservation_id']);
}
// 2. ワークフロー状態を更新
$this->workflowState->update([
'status' => 'failed',
'error' => $result->errorMessage,
]);
// 3. ユーザーに通知
NotifyPaymentFailureJob::dispatch($order, $result);
}
4.4 非同期処理への移行ポイント
決済処理は外部API呼び出しが多いため、非同期処理に適しています。
// 同期処理(Pipeline)から非同期処理(Workflow)への移行
class OrderPlacementService
{
public function placeOrder(Order $order): OrderPlacementResult
{
// 最初の数ステップは同期で実行(高速)
$context = app(OrderPlacementPipeline::class)
->process($order);
// 時間がかかる処理は非同期に移行
if ($context->canProceedToPayment()) {
$workflow = app(OrderPlacementWorkflow::class);
$workflowState = $workflow->start($order);
// 既に実行済みの結果を設定
$workflowState->update([
'data' => [
'inventory_reserved' => $context->getReservedInventories(),
'calculation_result' => $context->getCalculationResult()->toArray(),
],
]);
// 決済処理から非同期で実行
ProcessPaymentJob::dispatch($workflowState);
return new OrderPlacementResult(
order: $order,
workflowState: $workflowState,
isAsync: true,
);
}
return $context->getResult();
}
}
5. 在庫連携の具体例
5.1 在庫確保 → 在庫減算 → 外部システム連携のフロー
1. 在庫確保(ReserveInventory)
- 楽観的ロックで在庫を一時確保
↓ 成功
2. 決済完了確認(ConfirmPayment)
↓ 成功
3. 在庫減算(DeductInventory)
- 確保した在庫を確定
↓ 成功
4. 外部システム連携(SyncToExternalSystem)
- ERP/WMSへの在庫情報同期
↓ 成功
5. 完了
5.2 楽観的ロックとペシミスティックロックの使い分け
楽観的ロック: - 在庫確保時は楽観的ロックを使用 - バージョン番号で競合を検出 - 競合時はリトライ
class ReserveInventoryAction
{
public function execute(Order $order): array
{
$reserved = [];
foreach ($order->items as $item) {
$productVar = ProductVar::find($item->product_var_id);
// 楽観的ロックで在庫確保
$success = DB::transaction(function () use ($productVar, $item, &$reserved) {
$productVar->refresh(); // 最新の状態を取得
if ($productVar->stock_quantity < $item->quantity) {
return false;
}
// 在庫を減算(バージョンチェック付き)
$affected = ProductVar::where('id', $productVar->id)
->where('version', $productVar->version) // バージョンチェック
->where('stock_quantity', '>=', $item->quantity)
->update([
'stock_quantity' => DB::raw("stock_quantity - {$item->quantity}"),
'version' => DB::raw('version + 1'),
]);
if ($affected === 0) {
// 競合が発生(他の処理が在庫を変更)
return false;
}
$reserved[] = new ReservedInventory(
productVarId: $productVar->id,
quantity: $item->quantity,
reservationId: Str::uuid()->toString(),
reservedAt: now(),
);
return true;
});
if (!$success) {
$this->releaseAll($reserved);
throw new InsufficientInventoryException("在庫不足: {$productVar->id}");
}
}
return $reserved;
}
}
ペシミスティックロック: - 在庫確定時はペシミスティックロックを使用 - 確実に在庫を確定する必要がある場合
class DeductInventoryAction
{
public function execute(array $reservedInventories): void
{
foreach ($reservedInventories as $reserved) {
DB::transaction(function () use ($reserved) {
// ペシミスティックロックで在庫を確定
$productVar = ProductVar::lockForUpdate()
->find($reserved->productVarId);
// 在庫確定処理
$productVar->update([
'reserved_quantity' => DB::raw("reserved_quantity - {$reserved->quantity}"),
'sold_quantity' => DB::raw("sold_quantity + {$reserved->quantity}"),
]);
});
}
}
}
5.3 在庫不足時の処理フロー
在庫不足が検出された場合の処理フロー:
5.4 バッチ処理との連携
大量の在庫更新はバッチ処理で実行します。
class SyncInventoryToExternalSystemJob implements ShouldQueue
{
public function handle(): void
{
// バッチ処理で在庫情報を外部システムに同期
ProductVar::chunk(100, function ($products) {
$data = $products->map(fn($p) => [
'product_id' => $p->product_id,
'variant_id' => $p->id,
'stock_quantity' => $p->stock_quantity,
'reserved_quantity' => $p->reserved_quantity,
'updated_at' => $p->updated_at->toIso8601String(),
])->toArray();
$this->externalSystem->syncInventory($data);
});
}
}
6. Action/ジョブへの分離
6.1 Actionクラスの設計(単一責任の原則)
各Actionは単一の責任を持ち、再利用可能に設計します。
declare(strict_types=1);
namespace App\Workflows\OrderPlacement\Actions;
/**
* 在庫確保アクション
*
* 責任: 注文に必要な在庫を確保する
*/
class ReserveInventoryAction
{
public function __construct(
private InventoryService $inventoryService
) {}
/**
* 在庫を確保する
*
* @param Order $order 注文
* @return ReservedInventory[] 確保した在庫のリスト
* @throws InsufficientInventoryException 在庫不足の場合
*/
public function execute(Order $order): array
{
// ビジネスロジックのみ
// ジョブ関連の処理は含まない
}
}
Actionクラスの設計原則: - 単一責任: 1つのActionは1つのことだけを行う - 再利用性: 他のワークフローからも呼び出せる - テスト容易性: 依存関係を注入可能にする - 純粋性: 副作用を最小限に(可能な限り)
6.2 ジョブへの変換タイミング
以下の場合にジョブに変換します:
- 処理時間が長い: 数秒以上かかる処理
- 外部API呼び出し: ネットワークI/Oが発生する処理
- リトライが必要: 一時的な失敗が想定される処理
- バッチ処理: 大量データを処理する場合
- ユーザー待機不要: ユーザーが結果を待つ必要がない処理
// Action(同期処理)
class CalculateOrderAction
{
public function execute(array $cartItems): OrderCalculationResult
{
// 高速な計算処理
return $this->calculationService->calculate($cartItems);
}
}
// Job(非同期処理)
class ProcessPaymentJob implements ShouldQueue
{
public function handle(ProcessPaymentAction $action): void
{
// 時間がかかる外部API呼び出し
$result = $action->execute($this->order, $this->paymentMethod);
// ...
}
}
6.3 キュー投入の戦略
即時投入:
遅延投入:
バッチ投入:
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
$batch = Bus::batch([
new ProcessPaymentJob($workflowState1),
new ProcessPaymentJob($workflowState2),
new ProcessPaymentJob($workflowState3),
])->then(function (Batch $batch) {
// すべてのジョブが成功した場合
})->catch(function (Batch $batch, \Throwable $e) {
// ジョブが失敗した場合
})->finally(function (Batch $batch) {
// すべてのジョブが完了した場合(成功・失敗問わず)
})->dispatch();
6.4 ジョブチェーンとジョブバッチの活用
ジョブチェーン: 順次実行が必要な場合に使用します。
use Illuminate\Support\Facades\Bus;
Bus::chain([
new ReserveInventoryJob($workflowState),
new ProcessPaymentJob($workflowState),
new ArrangeShippingJob($workflowState),
])->catch(function (\Throwable $e) {
// チェーンのどこかで失敗した場合
\Log::error('Job chain failed', ['error' => $e->getMessage()]);
})->dispatch();
ジョブバッチ: 並列実行可能な場合に使用します。
Bus::batch([
new SyncInventoryToExternalSystemJob(),
new SendOrderConfirmationEmailJob($order),
new UpdateSearchIndexJob($order),
])->then(function (Batch $batch) {
// すべてのジョブが成功した場合
})->dispatch();
7. ディレクトリ構造と命名規則
7.1 app/Workflows/の構造
app/Workflows/
└── {WorkflowName}/ # ワークフロー名(パスカルケース)
├── {WorkflowName}Workflow.php # ワークフローオーケストレーター
├── {WorkflowName}Pipeline.php # パイプラインオーケストレーター(オプション)
├── {WorkflowName}Context.php # ワークフロー専用のコンテキスト(Pipeline用)
├── {WorkflowName}State.php # ワークフロー専用の状態管理モデル(Workflow用)
├── Actions/ # ビジネスロジック
│ ├── {ActionName}Action.php
│ └── ...
├── Jobs/ # 非同期処理
│ ├── {StepName}Job.php
│ └── ...
└── Pipes/ # Pipeline用のPipe(オプション)
├── {StepName}Pipe.php
└── ...
app/Domains/
└── {Domain}/
└── ValueObjects/ # ドメインの概念を表現する値オブジェクト
├── {ValueObjectName}.php
└── ...
例:
app/Workflows/
└── OrderPlacement/
├── OrderPlacementWorkflow.php
├── OrderPlacementPipeline.php
├── OrderPlacementContext.php # ワークフロー専用のコンテキスト
├── OrderPlacementState.php # ワークフロー専用の状態管理
├── Actions/
│ ├── ReserveInventoryAction.php
│ ├── ProcessPaymentAction.php
│ └── ArrangeShippingAction.php
├── Jobs/
│ ├── ReserveInventoryJob.php
│ ├── ProcessPaymentJob.php
│ └── ArrangeShippingJob.php
└── Pipes/
├── ReserveInventoryPipe.php
└── ProcessPaymentPipe.php
app/Domains/
└── Order/
└── ValueObjects/ # ドメインの概念を表現する値オブジェクト
├── ReservedInventory.php # ドメイン層に配置
├── PaymentResult.php # ドメイン層に配置
└── ShippingArrangement.php # ドメイン層に配置
配置の原則:
- ワークフロー固有のデータ構造(Context、State等)は app/Workflows/{WorkflowName}/ 配下に配置
- ドメインの概念を表現する値オブジェクト(ReservedInventory、PaymentResult等)は app/Domains/{Domain}/ValueObjects/ に配置
- ワークフローはドメイン層の値オブジェクトを利用する
7.2 app/Actions/の構造(グローバル)
ドメイン横断的に使用されるActionはapp/Actions/に配置します。
例:
app/Actions/
├── Order/
│ ├── CalculateOrderAction.php
│ └── ValidateOrderAction.php
└── Payment/
├── ProcessPaymentAction.php
└── RefundPaymentAction.php
7.3 app/Jobs/の構造(グローバル)
ドメイン横断的に使用されるジョブはapp/Jobs/に配置します。
7.4 命名規則
| 種別 | 命名規則 | 例 |
|---|---|---|
| Workflowクラス | {WorkflowName}Workflow |
OrderPlacementWorkflow |
| Pipelineクラス | {WorkflowName}Pipeline |
OrderPlacementPipeline |
| Contextクラス | {WorkflowName}Context |
OrderPlacementContext |
| Stateモデル | {WorkflowName}State |
OrderPlacementState |
| Actionクラス | {ActionName}Action |
ReserveInventoryAction |
| Jobクラス | {StepName}Job |
ReserveInventoryJob |
| Pipeクラス | {StepName}Pipe |
ReserveInventoryPipe |
| ValueObject | {ValueObjectName} |
ReservedInventory |
8. 実装例(コードサンプル)
8.1 注文確定Workflowの完全な実装例
declare(strict_types=1);
namespace App\Workflows\OrderPlacement;
use App\Models\Order;
use App\Workflows\OrderPlacement\Jobs\ReserveInventoryJob;
class OrderPlacementWorkflow
{
private const STEPS = [
'inventory_reserved',
'order_calculated',
'payment_processed',
'order_confirmed',
'shipping_arranged',
'notifications_sent',
'completed',
];
public function __construct(
private OrderPlacementState $stateModel
) {}
public function start(Order $order, array $cartItems): OrderPlacementState
{
$workflowState = $this->stateModel->create([
'order_id' => $order->id,
'current_step' => 'inventory_reserved',
'status' => 'processing',
'data' => [
'cart_items' => $cartItems,
],
]);
ReserveInventoryJob::dispatch($workflowState);
return $workflowState;
}
public function proceedToNextStep(OrderPlacementState $state, string $currentStep): void
{
$nextStep = $this->getNextStep($currentStep);
if ($nextStep === null) {
$state->update(['status' => 'completed']);
event(new OrderPlacementCompleted($state->order));
return;
}
$state->update(['current_step' => $nextStep]);
match($nextStep) {
'order_calculated' => CalculateOrderJob::dispatch($state),
'payment_processed' => ProcessPaymentJob::dispatch($state),
'order_confirmed' => ConfirmOrderJob::dispatch($state),
'shipping_arranged' => ArrangeShippingJob::dispatch($state),
'notifications_sent' => SendNotificationsJob::dispatch($state),
default => throw new \InvalidArgumentException("Unknown step: {$nextStep}"),
};
}
private function getNextStep(string $currentStep): ?string
{
$index = array_search($currentStep, self::STEPS);
return $index !== false && isset(self::STEPS[$index + 1])
? self::STEPS[$index + 1]
: null;
}
}
8.2 Pipelineを使った決済処理の実装例
declare(strict_types=1);
namespace App\Workflows\OrderPlacement;
use Illuminate\Pipeline\Pipeline;
use App\Models\Order;
class OrderPlacementPipeline
{
public function __construct(
private Pipeline $pipeline
) {}
public function process(Order $order, array $cartItems): OrderPlacementResult
{
$context = new OrderPlacementContext($order, $cartItems);
return $this->pipeline
->send($context)
->through([
ValidateOrderPipe::class,
CalculateOrderPipe::class,
ReserveInventoryPipe::class,
])
->then(function ($context) {
return $context->getResult();
});
}
}
8.3 Actionクラスの実装例
declare(strict_types=1);
namespace App\Workflows\OrderPlacement\Actions;
use App\Models\Order;
use App\Domains\Order\ValueObjects\ReservedInventory;
class ReserveInventoryAction
{
public function __construct(
private InventoryService $inventoryService
) {}
/**
* 在庫を確保する
*
* @param Order $order 注文
* @return ReservedInventory[] 確保した在庫のリスト
* @throws InsufficientInventoryException 在庫不足の場合
*/
public function execute(Order $order): array
{
$reserved = [];
try {
foreach ($order->items as $item) {
$reservationId = $this->inventoryService->reserve(
productVarId: $item->product_var_id,
quantity: $item->quantity
);
$reserved[] = new ReservedInventory(
productId: $item->product_id,
productVarId: $item->product_var_id,
quantity: $item->quantity,
reservationId: $reservationId,
reservedAt: now(),
);
}
return $reserved;
} catch (InsufficientInventoryException $e) {
// 既に確保した在庫を解放
$this->releaseAll($reserved);
throw $e;
}
}
private function releaseAll(array $reserved): void
{
foreach ($reserved as $item) {
$this->inventoryService->release($item->reservationId);
}
}
}
8.4 ジョブへの変換例
declare(strict_types=1);
namespace App\Workflows\OrderPlacement\Jobs;
use App\Workflows\OrderPlacement\Actions\ReserveInventoryAction;
use App\Workflows\OrderPlacement\OrderPlacementWorkflow;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ReserveInventoryJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public int $tries = 3;
public int $backoff = 60;
public function __construct(
public OrderPlacementState $workflowState
) {}
public function handle(
ReserveInventoryAction $action,
OrderPlacementWorkflow $workflow
): void {
$order = Order::findOrFail($this->workflowState->order_id);
try {
$result = $action->execute($order);
$this->workflowState->update([
'data' => array_merge($this->workflowState->data ?? [], [
'inventory_reserved' => array_map(
fn($item) => $item->toArray(),
$result
),
]),
]);
$workflow->proceedToNextStep($this->workflowState, 'inventory_reserved');
} catch (InsufficientInventoryException $e) {
$this->workflowState->update([
'status' => 'failed',
'error' => $e->getMessage(),
]);
NotifyOrderFailureJob::dispatch($order, $e);
throw $e;
}
}
public function failed(\Throwable $exception): void
{
$this->workflowState->update([
'status' => 'failed',
'error' => $exception->getMessage(),
]);
\Log::error('ReserveInventoryJob failed', [
'workflow_state_id' => $this->workflowState->id,
'error' => $exception->getMessage(),
]);
}
}
9. テスト戦略
9.1 Workflow/Pipelineのユニットテスト
declare(strict_types=1);
namespace Tests\Unit\Workflows\OrderPlacement;
use App\Models\Order;
use App\Workflows\OrderPlacement\OrderPlacementWorkflow;
use App\Workflows\OrderPlacement\OrderPlacementState;
use Tests\TestCase;
use Illuminate\Foundation\Testing\RefreshDatabase;
class OrderPlacementWorkflowTest extends TestCase
{
use RefreshDatabase;
public function test_workflow_starts_correctly(): void
{
$order = Order::factory()->create();
$workflow = app(OrderPlacementWorkflow::class);
$state = $workflow->start($order, []);
$this->assertInstanceOf(OrderPlacementState::class, $state);
$this->assertEquals('inventory_reserved', $state->current_step);
$this->assertEquals('processing', $state->status);
}
public function test_workflow_proceeds_to_next_step(): void
{
$state = OrderPlacementState::factory()->create([
'current_step' => 'inventory_reserved',
'status' => 'processing',
]);
$workflow = app(OrderPlacementWorkflow::class);
$workflow->proceedToNextStep($state, 'inventory_reserved');
$state->refresh();
$this->assertEquals('order_calculated', $state->current_step);
}
}
9.2 Actionクラスのテスト
declare(strict_types=1);
namespace Tests\Unit\Workflows\OrderPlacement\Actions;
use App\Models\Order;
use App\Workflows\OrderPlacement\Actions\ReserveInventoryAction;
use App\Services\InventoryService;
use Tests\TestCase;
use Mockery;
class ReserveInventoryActionTest extends TestCase
{
public function test_execute_reserves_inventory(): void
{
$order = Order::factory()->create();
$order->items()->create([
'product_id' => 1,
'product_var_id' => 1,
'quantity' => 2,
]);
$inventoryService = Mockery::mock(InventoryService::class);
$inventoryService->shouldReceive('reserve')
->once()
->with(1, 2)
->andReturn('reservation-123');
$action = new ReserveInventoryAction($inventoryService);
$result = $action->execute($order);
$this->assertCount(1, $result);
$this->assertEquals('reservation-123', $result[0]->reservationId);
}
public function test_execute_releases_inventory_on_failure(): void
{
$order = Order::factory()->create();
$order->items()->create([
'product_id' => 1,
'product_var_id' => 1,
'quantity' => 2,
]);
$inventoryService = Mockery::mock(InventoryService::class);
$inventoryService->shouldReceive('reserve')
->once()
->andReturn('reservation-123');
$inventoryService->shouldReceive('reserve')
->once()
->andThrow(new InsufficientInventoryException());
$inventoryService->shouldReceive('release')
->once()
->with('reservation-123');
$action = new ReserveInventoryAction($inventoryService);
$this->expectException(InsufficientInventoryException::class);
$action->execute($order);
}
}
9.3 ジョブのテスト
declare(strict_types=1);
namespace Tests\Unit\Workflows\OrderPlacement\Jobs;
use App\Models\Order;
use App\Workflows\OrderPlacement\Jobs\ReserveInventoryJob;
use App\Workflows\OrderPlacement\OrderPlacementState;
use App\Workflows\OrderPlacement\OrderPlacementWorkflow;
use Tests\TestCase;
use Illuminate\Foundation\Testing\RefreshDatabase;
use Illuminate\Support\Facades\Queue;
class ReserveInventoryJobTest extends TestCase
{
use RefreshDatabase;
public function test_job_processes_successfully(): void
{
Queue::fake();
$order = Order::factory()->create();
$state = OrderPlacementState::factory()->create([
'order_id' => $order->id,
'current_step' => 'inventory_reserved',
]);
$job = new ReserveInventoryJob($state);
$job->handle(
app(ReserveInventoryAction::class),
app(OrderPlacementWorkflow::class)
);
$state->refresh();
$this->assertNotNull($state->data['inventory_reserved']);
}
}
9.4 統合テストの考え方
統合テストでは、実際のデータベースと外部サービスを使用して、ワークフロー全体をテストします。
declare(strict_types=1);
namespace Tests\Integration\Workflows\OrderPlacement;
use App\Models\Order;
use App\Workflows\OrderPlacement\OrderPlacementWorkflow;
use Tests\TestCase;
use Illuminate\Foundation\Testing\RefreshDatabase;
class OrderPlacementWorkflowIntegrationTest extends TestCase
{
use RefreshDatabase;
public function test_complete_order_placement_workflow(): void
{
$order = Order::factory()->create();
$workflow = app(OrderPlacementWorkflow::class);
$state = $workflow->start($order, []);
// 各ステップのジョブを実行
$this->artisan('queue:work', ['--once' => true]);
$state->refresh();
$this->assertEquals('completed', $state->status);
}
}
10. ベストプラクティスと注意点
10.1 パフォーマンス考慮事項
Pipelineパターン: - 各Pipeの処理時間を最小限に - 不要なデータベースクエリを避ける - キャッシュを活用
Workflowパターン: - ジョブの実行時間を監視 - キューワーカーの数を適切に設定 - バッチ処理で大量データを効率的に処理
10.2 デバッグとロギング
各ステップで適切にログを記録します。
class ReserveInventoryPipe
{
public function handle(OrderPlacementContext $context, Closure $next): OrderPlacementContext
{
\Log::info('ReserveInventoryPipe started', [
'order_id' => $context->order->id,
]);
try {
$result = $next($context);
\Log::info('ReserveInventoryPipe completed', [
'order_id' => $context->order->id,
'reserved_count' => count($context->getReservedInventories()),
]);
return $result;
} catch (\Exception $e) {
\Log::error('ReserveInventoryPipe failed', [
'order_id' => $context->order->id,
'error' => $e->getMessage(),
]);
throw $e;
}
}
}
10.3 監視とアラート
ワークフローの状態を監視し、異常時にアラートを送信します。
// 定期的に実行されるコマンド
class MonitorWorkflowsCommand extends Command
{
public function handle(): void
{
// 長時間処理中のワークフローを検出
$stuckWorkflows = OrderPlacementState::where('status', 'processing')
->where('updated_at', '<', now()->subHours(1))
->get();
if ($stuckWorkflows->isNotEmpty()) {
\Log::warning('Stuck workflows detected', [
'count' => $stuckWorkflows->count(),
]);
// アラートを送信
$this->sendAlert($stuckWorkflows);
}
}
}
10.4 将来の拡張性
プラグイン化: - 各ステップをプラグインとして実装 - 設定で有効/無効を切り替え可能に
バージョニング: - ワークフローのバージョンを管理 - 既存のワークフローと互換性を保ちながら拡張
マルチテナント対応: - テナントごとに異なるワークフローを実行可能に - テナント設定でワークフローをカスタマイズ
11. 今後想定されるWorkflow/Pipeline候補
11.1 Workflow候補(非同期・状態管理が必要)
以下のワークフローは、複数のステップを跨ぎ、状態管理と非同期処理が必要なため、Workflowパターンが適しています。
注文確定ワークフロー
- 目的: 在庫仮押さえ → 決済 → 注文確定 → 配送指示 → 通知送信までを一連で管理
- 特徴: 失敗時の補償処理や再試行を制御
- 優先度: 高(Phase 1で実装予定)
外部在庫同期ワークフロー
- 目的: 倉庫・ERPなど外部システムからの在庫イベントを受信し、SKUごとの差分反映 → 整合性検証 → 検索インデックス更新までをバッチ/チェーンで処理
- 特徴: 大量データ処理、リトライが必要
- 優先度: 中(Phase 2以降で実装予定)
配送手配ワークフロー
- 目的: 複数配送業者とのラベル発行 → 追跡番号取得 → ステータス更新 → 顧客通知を段階的に連携
- 特徴: 外部API呼び出しが多い、進捗追跡が必要
- 優先度: 高(Phase 1で実装予定)
サブスクリプション課金ワークフロー
- 目的: 請求生成 → 決済 → 契約更新 → 失敗時のリトライ/ステータス変更 → 顧客通知を周期的に実行
- 特徴: 定期実行、リトライロジックが重要
- 優先度: 中(Phase 2で実装予定)
返品・返金ワークフロー
- 目的: 返品受付 → 在庫検品 → 返金処理 → 帳票更新 → 通知をステップ管理
- 特徴: 部分返金や在庫戻しを安全に処理
- 優先度: 中(Phase 2以降で実装予定)
アカウント審査/承認ワークフロー(B2B想定)
- 目的: 申請受領 → 書類確認 → 承認/差戻し → 権限付与 → 通知をイベント駆動で管理
- 特徴: 承認フロー、複数担当者による処理
- 優先度: 低(Phase 3で実装予定)
11.2 Pipeline候補(同期・即時レスポンスが必要)
以下のパイプラインは、ユーザーリクエスト内で完結し、即座に結果が必要なため、Pipelineパターンが適しています。
カート価格計算パイプライン
- 目的: アイテム正規化 → プロモーション適用 → 送料計算 → 税額按分 → 合計算出を同期で連鎖
- 特徴: ユーザーがカート画面で即座に結果を確認する必要がある
- 優先度: 高(Phase 1で実装予定)
注文バリデーションパイプライン
- 目的: カート状態チェック → 在庫即時照会 → 決済手段制約チェック → 不正検知ヒューリスティクスを順次評価
- 特徴: 注文確定前に即座にエラーを検出する必要がある
- 優先度: 高(Phase 1で実装予定)
商品公開前チェックパイプライン
- 目的: マスタ整合性 → 画像/コンテンツ検証 → SEOメタ情報 → 承認ルール確認をFilament操作内で実行
- 特徴: 管理画面での操作時に即座に検証結果を表示
- 優先度: 中(Phase 2以降で実装予定)
顧客セグメント判定パイプライン
- 目的: 属性収集 → 購買履歴解析 → スコアリング → セグメント割当を同期で行い、マーケティング施策に反映
- 特徴: リアルタイムでのセグメント判定が必要
- 優先度: 低(Phase 3以降で実装予定)
注記: カード決済は事前に選定した単一ゲートウェイを利用する方針のため、「決済ゲートウェイ選定パイプライン」は対象外とします。
まとめ
Workflow/Pipelineパターンにより、複数手順を跨ぐ処理を以下のように管理できます:
- 責務の分離: 各ステップが独立したAction/Pipeとして実装される
- 再利用性: Actionクラスを他のワークフローからも利用可能
- テスト容易性: 各コンポーネントを個別にテスト可能
- 拡張性: ステップの追加・削除が容易
- 可観測性: 各ステップのログ・メトリクスを取得可能
- エラーハンドリング: 各ステップで適切にエラーを処理可能
このパターンを適用することで、EC-Spokeシステムの複雑なビジネスプロセスを、保守性と拡張性の高いコードとして実装できます。
関連ドキュメント: - 全体アーキテクチャ - 注文計算方式 - 価格計算
更新日: 2025-01-XX バージョン: 1.0 作成者: EC-Spoke開発チーム