コンテンツにスキップ

Workflow/Pipelineパターン実装ガイド

概要

EC-Spokeシステムにおいて、決済・在庫連携など複数手順を跨ぐ処理を、Laravel PipelineとWorkflowパターンを用いてパイプライン化し、各ステップをAction/ジョブに分離する設計と実装方法を説明します。

関連ドキュメント: 全体アーキテクチャ


1. 基本概念

1.1 Laravel Pipelineパターンとは

Laravel Pipelineは、リクエスト処理の各段階を独立したPipeクラスに分離し、順次実行するパターンです。HTTPミドルウェアの仕組みを一般化したもので、任意の処理フローに適用できます。

特徴: - 各ステップが独立したPipeクラスとして実装される - パスオブジェクト(Context)を通じてデータを伝播 - 各ステップで処理を中断・継続・変更可能 - エラーハンドリングとロールバックが容易

1.2 Workflowパターンとは

Workflowパターンは、複数のステップからなるビジネスプロセスを管理するパターンです。各ステップの状態を追跡し、非同期処理やリトライ、失敗処理をサポートします。

特徴: - 各ステップの状態を永続化して管理 - 非同期処理(ジョブ)との統合が容易 - リトライと失敗処理の仕組みを内包 - 進捗状況の追跡と可視化が可能

1.3 使い分けの基準

観点 Pipelineパターン Workflowパターン
処理方式 同期処理 同期・非同期両対応
実行時間 短時間(数秒以内) 長時間(数分〜数時間)
状態管理 メモリ内のみ データベースに永続化
エラー処理 即座にロールバック リトライ・手動復旧対応
進捗追跡 不要 必要
適用例 カート計算、価格計算、バリデーション 注文確定、決済処理、配送手配、在庫連携

選択の指針: - Pipeline: ユーザーリクエスト内で完結する処理、即座に結果が必要な処理 - Workflow: 外部API呼び出しが多い処理、時間がかかる処理、進捗を追跡したい処理

1.4 同期処理 vs 非同期処理(ジョブ)の選択

同期処理が適している場合: - ユーザーが結果を待つ必要がある - 処理時間が短い(数秒以内) - 即座にエラーフィードバックが必要 - トランザクション内で完結させたい

非同期処理(ジョブ)が適している場合: - 処理に時間がかかる(数秒以上) - 外部API呼び出しが多い - ユーザーが結果を待つ必要がない - リトライが必要な処理 - バッチ処理や大量データ処理

ハイブリッドアプローチ: - 最初の数ステップは同期(Pipeline)で実行 - 時間がかかるステップは非同期(ジョブ)に変換 - Workflowパターンで状態を管理


2. Laravel Pipelineパターンの詳細

2.1 基本構造

use Illuminate\Pipeline\Pipeline;

class OrderPlacementPipeline
{
    public function __construct(
        private Pipeline $pipeline
    ) {}

    public function process(Order $order): OrderPlacementResult
    {
        $passable = new OrderPlacementContext($order);

        return $this->pipeline
            ->send($passable)
            ->through([
                ReserveInventoryPipe::class,
                ProcessPaymentPipe::class,
                ArrangeShippingPipe::class,
                SendNotificationsPipe::class,
            ])
            ->then(function ($passable) {
                return $passable->getResult();
            });
    }
}

2.2 Pipeクラスの実装

各Pipeはhandleメソッドを実装し、パスオブジェクトを受け取り、次のPipeに渡します。

declare(strict_types=1);

namespace App\Workflows\OrderPlacement\Pipes;

use App\Workflows\OrderPlacement\OrderPlacementContext;
use Closure;

class ReserveInventoryPipe
{
    public function __construct(
        private InventoryService $inventoryService
    ) {}

    public function handle(OrderPlacementContext $context, Closure $next): OrderPlacementContext
    {
        // 在庫確保のロジック
        foreach ($context->order->items as $item) {
            if (!$this->inventoryService->reserve($item->product_id, $item->quantity)) {
                throw new InsufficientInventoryException("在庫不足: {$item->product_id}");
            }
            $context->addReservedInventory($item->product_id, $item->quantity);
        }

        return $next($context);
    }
}

Pipeクラスの特徴: - handleメソッドは必須 - 第1引数はパスオブジェクト(Context) - 第2引数はClosure $next(次のPipeを呼び出す) - 次のPipeに進む場合は$next($context)を呼び出す - 処理を中断する場合は例外をスロー

2.3 パスオブジェクト(Context)の設計

パスオブジェクトは、各Pipe間でデータを伝播するためのオブジェクトです。詳細は後述の「パスオブジェクトの設計」セクションを参照してください。

設計原則: - 不変データと可変データを分離 - 型安全性を確保 - 各ステップの結果を段階的に追加 - エラー情報を管理

2.4 エラーハンドリングとロールバック

各Pipeでエラーが発生した場合、既に実行されたステップのロールバックが必要です。

class ReserveInventoryPipe
{
    public function handle(OrderPlacementContext $context, Closure $next): OrderPlacementContext
    {
        try {
            // 在庫確保処理
            foreach ($context->order->items as $item) {
                $this->inventoryService->reserve($item->product_id, $item->quantity);
                $context->addReservedInventory($item->product_id, $item->quantity);
            }

            return $next($context);
        } catch (\Exception $e) {
            // 既に確保した在庫を解放
            $this->releaseReservedInventory($context);
            throw $e;
        }
    }

    private function releaseReservedInventory(OrderPlacementContext $context): void
    {
        foreach ($context->getReservedInventories() as $reserved) {
            $this->inventoryService->release($reserved['product_id'], $reserved['quantity']);
        }
    }
}

ロールバック戦略: 1. 逆順ロールバック: 実行順の逆順で各ステップのロールバックを実行 2. 補償トランザクション: 各ステップに対応する補償アクションを定義 3. Sagaパターン: 分散トランザクション管理パターン(高度)

2.5 ミドルウェア的な処理の実装

ロギング、認証、認可などの横断的関心事をPipeとして実装できます。

class LoggingPipe
{
    public function handle(OrderPlacementContext $context, Closure $next): OrderPlacementContext
    {
        \Log::info('Order placement started', [
            'order_id' => $context->order->id,
        ]);

        $result = $next($context);

        \Log::info('Order placement completed', [
            'order_id' => $context->order->id,
        ]);

        return $result;
    }
}

class AuthorizationPipe
{
    public function handle(OrderPlacementContext $context, Closure $next): OrderPlacementContext
    {
        if (!$context->user->can('place-order')) {
            throw new UnauthorizedException('Order placement not allowed');
        }

        return $next($context);
    }
}

3. Workflowパターンの詳細

3.1 ディレクトリ構造

app/Workflows/
└── OrderPlacement/
    ├── OrderPlacementWorkflow.php      # ワークフローオーケストレーター
    ├── OrderPlacementPipeline.php      # パイプラインオーケストレーター(オプション)
    ├── OrderPlacementContext.php       # ワークフロー専用のコンテキスト(Pipeline用)
    ├── OrderPlacementState.php         # ワークフロー専用の状態管理モデル(Workflow用)
    ├── Actions/                        # ビジネスロジック(再利用可能)
    │   ├── ReserveInventoryAction.php
    │   ├── ProcessPaymentAction.php
    │   └── ArrangeShippingAction.php
    ├── Jobs/                          # 非同期処理
    │   ├── ReserveInventoryJob.php
    │   ├── ProcessPaymentJob.php
    │   └── ArrangeShippingJob.php
    └── Pipes/                         # Pipeline用のPipe(オプション)
        ├── ReserveInventoryPipe.php
        └── ProcessPaymentPipe.php

app/Domains/
└── Order/
    └── ValueObjects/                  # ドメインの概念を表現する値オブジェクト
        ├── ReservedInventory.php      # ドメイン層に配置
        ├── PaymentResult.php          # ドメイン層に配置
        └── ShippingArrangement.php    # ドメイン層に配置

配置の原則: - ワークフロー固有のデータ構造(Context、State等)は app/Workflows/{WorkflowName}/ 配下に配置 - ドメインの概念を表現する値オブジェクト(ReservedInventory、PaymentResult等)は app/Domains/{Domain}/ValueObjects/ に配置 - ワークフローはドメイン層の値オブジェクトを利用する

3.2 Workflowクラスの実装

Workflowクラスは、各ステップの実行順序と状態遷移を管理します。

declare(strict_types=1);

namespace App\Workflows\OrderPlacement;

use App\Models\Order;
use App\Workflows\OrderPlacement\Jobs\ReserveInventoryJob;

class OrderPlacementWorkflow
{
    private const STEPS = [
        'inventory_reserved',
        'payment_processed',
        'shipping_arranged',
        'notifications_sent',
        'completed',
    ];

    public function start(Order $order): OrderPlacementState
    {
        // ワークフロー状態を記録
        $workflowState = OrderPlacementState::create([
            'order_id' => $order->id,
            'current_step' => 'inventory_reserved',
            'status' => 'processing',
            'data' => [],
        ]);

        // 最初のステップを実行(ジョブとして投入)
        ReserveInventoryJob::dispatch($workflowState);

        return $workflowState;
    }

    public function proceedToNextStep(OrderPlacementState $state, string $currentStep): void
    {
        $nextStep = $this->getNextStep($currentStep);

        if ($nextStep === null) {
            $state->update(['status' => 'completed']);
            return;
        }

        $state->update(['current_step' => $nextStep]);

        // 次のステップのジョブを投入
        match($nextStep) {
            'payment_processed' => ProcessPaymentJob::dispatch($state),
            'shipping_arranged' => ArrangeShippingJob::dispatch($state),
            'notifications_sent' => SendNotificationsJob::dispatch($state),
            default => throw new \InvalidArgumentException("Unknown step: {$nextStep}"),
        };
    }

    private function getNextStep(string $currentStep): ?string
    {
        $index = array_search($currentStep, self::STEPS);
        return $index !== false && isset(self::STEPS[$index + 1])
            ? self::STEPS[$index + 1]
            : null;
    }
}

3.3 Step/Actionクラスの分離設計

各ステップのビジネスロジックはActionクラスに分離し、ジョブから呼び出します。

declare(strict_types=1);

namespace App\Workflows\OrderPlacement\Actions;

use App\Models\Order;
use App\Domains\Order\ValueObjects\ReservedInventory;

class ReserveInventoryAction
{
    public function __construct(
        private InventoryService $inventoryService
    ) {}

    public function execute(Order $order): array
    {
        $reserved = [];

        foreach ($order->items as $item) {
            if (!$this->inventoryService->reserve($item->product_id, $item->quantity)) {
                // 既に確保した在庫を解放
                $this->releaseAll($reserved);
                throw new InsufficientInventoryException("在庫不足: {$item->product_id}");
            }

            $reserved[] = new ReservedInventory(
                productId: $item->product_id,
                quantity: $item->quantity,
                reservationId: $this->inventoryService->getLastReservationId(),
                reservedAt: now(),
            );
        }

        return $reserved;
    }

    private function releaseAll(array $reserved): void
    {
        foreach ($reserved as $item) {
            $this->inventoryService->release($item->reservationId);
        }
    }
}

Actionクラスの特徴: - 単一責任の原則に従う - ビジネスロジックのみを含む(ジョブ関連の処理は含まない) - 再利用可能(他のワークフローからも呼び出せる) - テストが容易

3.4 状態管理と進行状況の追跡

状態はデータベースに永続化し、進捗を追跡します。

// マイグレーション
Schema::create('workflow_states', function (Blueprint $table) {
    $table->id();
    $table->string('workflow_type'); // 'order_placement'
    $table->morphs('subject'); // order_id
    $table->string('current_step');
    $table->string('status'); // 'processing', 'completed', 'failed'
    $table->json('data')->nullable(); // 各ステップの結果
    $table->text('error')->nullable();
    $table->timestamps();

    $table->index(['workflow_type', 'status']);
});

// モデル
class OrderPlacementState extends Model
{
    protected $fillable = [
        'order_id',
        'current_step',
        'status',
        'data',
        'error',
    ];

    protected function casts(): array
    {
        return [
            'data' => 'array',
        ];
    }

    public function order(): BelongsTo
    {
        return $this->belongsTo(Order::class);
    }
}

3.5 リトライと失敗処理

ジョブのリトライ機能と失敗処理を実装します。

declare(strict_types=1);

namespace App\Workflows\OrderPlacement\Jobs;

use App\Workflows\OrderPlacement\Actions\ReserveInventoryAction;
use App\Workflows\OrderPlacement\OrderPlacementWorkflow;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ReserveInventoryJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public int $tries = 3;
    public int $backoff = 60; // 60秒後にリトライ

    public function __construct(
        public OrderPlacementState $workflowState
    ) {}

    public function handle(
        ReserveInventoryAction $action,
        OrderPlacementWorkflow $workflow
    ): void {
        $order = Order::findOrFail($this->workflowState->order_id);

        try {
            $result = $action->execute($order);

            // 結果を保存
            $this->workflowState->update([
                'data' => array_merge($this->workflowState->data ?? [], [
                    'inventory_reserved' => array_map(
                        fn($item) => $item->toArray(),
                        $result
                    ),
                ]),
            ]);

            // 次のステップへ
            $workflow->proceedToNextStep($this->workflowState, 'inventory_reserved');

        } catch (InsufficientInventoryException $e) {
            // 失敗処理(リトライしない)
            $this->workflowState->update([
                'status' => 'failed',
                'error' => $e->getMessage(),
            ]);

            // 通知を送信
            NotifyOrderFailureJob::dispatch($order, $e);

            throw $e; // ジョブを失敗としてマーク
        }
    }

    public function failed(\Throwable $exception): void
    {
        // ジョブ失敗時の処理(最大リトライ回数に達した場合)
        $this->workflowState->update([
            'status' => 'failed',
            'error' => $exception->getMessage(),
        ]);

        // 管理者に通知
        \Log::error('Order placement workflow failed', [
            'workflow_state_id' => $this->workflowState->id,
            'error' => $exception->getMessage(),
            'trace' => $exception->getTraceAsString(),
        ]);
    }
}

4. 決済処理の具体例

4.1 注文確定から決済完了までのフロー

1. 在庫確保(ReserveInventory)
   ↓ 成功
2. 注文計算(CalculateOrder)
   ↓ 成功
3. 決済処理(ProcessPayment)
   ↓ 成功
4. 注文確定(ConfirmOrder)
   ↓ 成功
5. 配送手配(ArrangeShipping)
   ↓ 成功
6. 通知送信(SendNotifications)
   ↓ 完了

4.2 各ステップの分離

各ステップを独立したAction/ジョブとして実装します。

// Action: 決済処理
class ProcessPaymentAction
{
    public function execute(Order $order, PaymentMethod $method): PaymentResult
    {
        $gateway = $this->getGateway($method);

        $result = $gateway->charge(
            amount: $order->grand_total,
            currency: $order->currency,
            paymentMethod: $method,
        );

        return new PaymentResult(
            transactionId: $result->transaction_id,
            status: $result->success ? 'success' : 'failed',
            errorMessage: $result->error_message,
            processedAt: now(),
        );
    }
}

// Job: 決済処理ジョブ
class ProcessPaymentJob implements ShouldQueue
{
    public function handle(
        ProcessPaymentAction $action,
        OrderPlacementWorkflow $workflow
    ): void {
        $order = Order::findOrFail($this->workflowState->order_id);
        $paymentMethod = PaymentMethod::find($this->workflowState->data['payment_method_id']);

        $result = $action->execute($order, $paymentMethod);

        if (!$result->isSuccess()) {
            // 決済失敗時の処理
            $this->handlePaymentFailure($order, $result);
            return;
        }

        // 結果を保存
        $this->workflowState->update([
            'data' => array_merge($this->workflowState->data ?? [], [
                'payment_result' => $result->toArray(),
            ]),
        ]);

        // 次のステップへ
        $workflow->proceedToNextStep($this->workflowState, 'payment_processed');
    }
}

4.3 エラー時のロールバック処理

決済処理で失敗した場合、既に実行されたステップをロールバックします。

private function handlePaymentFailure(Order $order, PaymentResult $result): void
{
    // 1. 在庫を解放
    $reservedInventories = $this->workflowState->data['inventory_reserved'] ?? [];
    foreach ($reservedInventories as $reserved) {
        $this->inventoryService->release($reserved['reservation_id']);
    }

    // 2. ワークフロー状態を更新
    $this->workflowState->update([
        'status' => 'failed',
        'error' => $result->errorMessage,
    ]);

    // 3. ユーザーに通知
    NotifyPaymentFailureJob::dispatch($order, $result);
}

4.4 非同期処理への移行ポイント

決済処理は外部API呼び出しが多いため、非同期処理に適しています。

// 同期処理(Pipeline)から非同期処理(Workflow)への移行
class OrderPlacementService
{
    public function placeOrder(Order $order): OrderPlacementResult
    {
        // 最初の数ステップは同期で実行(高速)
        $context = app(OrderPlacementPipeline::class)
            ->process($order);

        // 時間がかかる処理は非同期に移行
        if ($context->canProceedToPayment()) {
            $workflow = app(OrderPlacementWorkflow::class);
            $workflowState = $workflow->start($order);

            // 既に実行済みの結果を設定
            $workflowState->update([
                'data' => [
                    'inventory_reserved' => $context->getReservedInventories(),
                    'calculation_result' => $context->getCalculationResult()->toArray(),
                ],
            ]);

            // 決済処理から非同期で実行
            ProcessPaymentJob::dispatch($workflowState);

            return new OrderPlacementResult(
                order: $order,
                workflowState: $workflowState,
                isAsync: true,
            );
        }

        return $context->getResult();
    }
}

5. 在庫連携の具体例

5.1 在庫確保 → 在庫減算 → 外部システム連携のフロー

1. 在庫確保(ReserveInventory)
   - 楽観的ロックで在庫を一時確保
   ↓ 成功
2. 決済完了確認(ConfirmPayment)
   ↓ 成功
3. 在庫減算(DeductInventory)
   - 確保した在庫を確定
   ↓ 成功
4. 外部システム連携(SyncToExternalSystem)
   - ERP/WMSへの在庫情報同期
   ↓ 成功
5. 完了

5.2 楽観的ロックとペシミスティックロックの使い分け

楽観的ロック: - 在庫確保時は楽観的ロックを使用 - バージョン番号で競合を検出 - 競合時はリトライ

class ReserveInventoryAction
{
    public function execute(Order $order): array
    {
        $reserved = [];

        foreach ($order->items as $item) {
            $productVar = ProductVar::find($item->product_var_id);

            // 楽観的ロックで在庫確保
            $success = DB::transaction(function () use ($productVar, $item, &$reserved) {
                $productVar->refresh(); // 最新の状態を取得

                if ($productVar->stock_quantity < $item->quantity) {
                    return false;
                }

                // 在庫を減算(バージョンチェック付き)
                $affected = ProductVar::where('id', $productVar->id)
                    ->where('version', $productVar->version) // バージョンチェック
                    ->where('stock_quantity', '>=', $item->quantity)
                    ->update([
                        'stock_quantity' => DB::raw("stock_quantity - {$item->quantity}"),
                        'version' => DB::raw('version + 1'),
                    ]);

                if ($affected === 0) {
                    // 競合が発生(他の処理が在庫を変更)
                    return false;
                }

                $reserved[] = new ReservedInventory(
                    productVarId: $productVar->id,
                    quantity: $item->quantity,
                    reservationId: Str::uuid()->toString(),
                    reservedAt: now(),
                );

                return true;
            });

            if (!$success) {
                $this->releaseAll($reserved);
                throw new InsufficientInventoryException("在庫不足: {$productVar->id}");
            }
        }

        return $reserved;
    }
}

ペシミスティックロック: - 在庫確定時はペシミスティックロックを使用 - 確実に在庫を確定する必要がある場合

class DeductInventoryAction
{
    public function execute(array $reservedInventories): void
    {
        foreach ($reservedInventories as $reserved) {
            DB::transaction(function () use ($reserved) {
                // ペシミスティックロックで在庫を確定
                $productVar = ProductVar::lockForUpdate()
                    ->find($reserved->productVarId);

                // 在庫確定処理
                $productVar->update([
                    'reserved_quantity' => DB::raw("reserved_quantity - {$reserved->quantity}"),
                    'sold_quantity' => DB::raw("sold_quantity + {$reserved->quantity}"),
                ]);
            });
        }
    }
}

5.3 在庫不足時の処理フロー

在庫不足が検出された場合の処理フロー:

1. 在庫確保試行
   ↓ 在庫不足
2. エラーを記録
3. 既に確保した在庫を解放
4. ユーザーに通知
5. ワークフローを失敗状態に

5.4 バッチ処理との連携

大量の在庫更新はバッチ処理で実行します。

class SyncInventoryToExternalSystemJob implements ShouldQueue
{
    public function handle(): void
    {
        // バッチ処理で在庫情報を外部システムに同期
        ProductVar::chunk(100, function ($products) {
            $data = $products->map(fn($p) => [
                'product_id' => $p->product_id,
                'variant_id' => $p->id,
                'stock_quantity' => $p->stock_quantity,
                'reserved_quantity' => $p->reserved_quantity,
                'updated_at' => $p->updated_at->toIso8601String(),
            ])->toArray();

            $this->externalSystem->syncInventory($data);
        });
    }
}

6. Action/ジョブへの分離

6.1 Actionクラスの設計(単一責任の原則)

各Actionは単一の責任を持ち、再利用可能に設計します。

declare(strict_types=1);

namespace App\Workflows\OrderPlacement\Actions;

/**
 * 在庫確保アクション
 *
 * 責任: 注文に必要な在庫を確保する
 */
class ReserveInventoryAction
{
    public function __construct(
        private InventoryService $inventoryService
    ) {}

    /**
     * 在庫を確保する
     *
     * @param Order $order 注文
     * @return ReservedInventory[] 確保した在庫のリスト
     * @throws InsufficientInventoryException 在庫不足の場合
     */
    public function execute(Order $order): array
    {
        // ビジネスロジックのみ
        // ジョブ関連の処理は含まない
    }
}

Actionクラスの設計原則: - 単一責任: 1つのActionは1つのことだけを行う - 再利用性: 他のワークフローからも呼び出せる - テスト容易性: 依存関係を注入可能にする - 純粋性: 副作用を最小限に(可能な限り)

6.2 ジョブへの変換タイミング

以下の場合にジョブに変換します:

  1. 処理時間が長い: 数秒以上かかる処理
  2. 外部API呼び出し: ネットワークI/Oが発生する処理
  3. リトライが必要: 一時的な失敗が想定される処理
  4. バッチ処理: 大量データを処理する場合
  5. ユーザー待機不要: ユーザーが結果を待つ必要がない処理
// Action(同期処理)
class CalculateOrderAction
{
    public function execute(array $cartItems): OrderCalculationResult
    {
        // 高速な計算処理
        return $this->calculationService->calculate($cartItems);
    }
}

// Job(非同期処理)
class ProcessPaymentJob implements ShouldQueue
{
    public function handle(ProcessPaymentAction $action): void
    {
        // 時間がかかる外部API呼び出し
        $result = $action->execute($this->order, $this->paymentMethod);
        // ...
    }
}

6.3 キュー投入の戦略

即時投入:

ProcessPaymentJob::dispatch($workflowState);

遅延投入:

// 5分後に実行
ProcessPaymentJob::dispatch($workflowState)
    ->delay(now()->addMinutes(5));

バッチ投入:

use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;

$batch = Bus::batch([
    new ProcessPaymentJob($workflowState1),
    new ProcessPaymentJob($workflowState2),
    new ProcessPaymentJob($workflowState3),
])->then(function (Batch $batch) {
    // すべてのジョブが成功した場合
})->catch(function (Batch $batch, \Throwable $e) {
    // ジョブが失敗した場合
})->finally(function (Batch $batch) {
    // すべてのジョブが完了した場合(成功・失敗問わず)
})->dispatch();

6.4 ジョブチェーンとジョブバッチの活用

ジョブチェーン: 順次実行が必要な場合に使用します。

use Illuminate\Support\Facades\Bus;

Bus::chain([
    new ReserveInventoryJob($workflowState),
    new ProcessPaymentJob($workflowState),
    new ArrangeShippingJob($workflowState),
])->catch(function (\Throwable $e) {
    // チェーンのどこかで失敗した場合
    \Log::error('Job chain failed', ['error' => $e->getMessage()]);
})->dispatch();

ジョブバッチ: 並列実行可能な場合に使用します。

Bus::batch([
    new SyncInventoryToExternalSystemJob(),
    new SendOrderConfirmationEmailJob($order),
    new UpdateSearchIndexJob($order),
])->then(function (Batch $batch) {
    // すべてのジョブが成功した場合
})->dispatch();

7. ディレクトリ構造と命名規則

7.1 app/Workflows/の構造

app/Workflows/
└── {WorkflowName}/                    # ワークフロー名(パスカルケース)
    ├── {WorkflowName}Workflow.php     # ワークフローオーケストレーター
    ├── {WorkflowName}Pipeline.php     # パイプラインオーケストレーター(オプション)
    ├── {WorkflowName}Context.php      # ワークフロー専用のコンテキスト(Pipeline用)
    ├── {WorkflowName}State.php        # ワークフロー専用の状態管理モデル(Workflow用)
    ├── Actions/                        # ビジネスロジック
    │   ├── {ActionName}Action.php
    │   └── ...
    ├── Jobs/                          # 非同期処理
    │   ├── {StepName}Job.php
    │   └── ...
    └── Pipes/                         # Pipeline用のPipe(オプション)
        ├── {StepName}Pipe.php
        └── ...

app/Domains/
└── {Domain}/
    └── ValueObjects/                  # ドメインの概念を表現する値オブジェクト
        ├── {ValueObjectName}.php
        └── ...

:

app/Workflows/
└── OrderPlacement/
    ├── OrderPlacementWorkflow.php
    ├── OrderPlacementPipeline.php
    ├── OrderPlacementContext.php      # ワークフロー専用のコンテキスト
    ├── OrderPlacementState.php        # ワークフロー専用の状態管理
    ├── Actions/
    │   ├── ReserveInventoryAction.php
    │   ├── ProcessPaymentAction.php
    │   └── ArrangeShippingAction.php
    ├── Jobs/
    │   ├── ReserveInventoryJob.php
    │   ├── ProcessPaymentJob.php
    │   └── ArrangeShippingJob.php
    └── Pipes/
        ├── ReserveInventoryPipe.php
        └── ProcessPaymentPipe.php

app/Domains/
└── Order/
    └── ValueObjects/                  # ドメインの概念を表現する値オブジェクト
        ├── ReservedInventory.php      # ドメイン層に配置
        ├── PaymentResult.php          # ドメイン層に配置
        └── ShippingArrangement.php    # ドメイン層に配置

配置の原則: - ワークフロー固有のデータ構造(Context、State等)は app/Workflows/{WorkflowName}/ 配下に配置 - ドメインの概念を表現する値オブジェクト(ReservedInventory、PaymentResult等)は app/Domains/{Domain}/ValueObjects/ に配置 - ワークフローはドメイン層の値オブジェクトを利用する

7.2 app/Actions/の構造(グローバル)

ドメイン横断的に使用されるActionはapp/Actions/に配置します。

app/Actions/
└── {Domain}/                          # ドメイン名(オプション)
    ├── {ActionName}Action.php
    └── ...

:

app/Actions/
├── Order/
│   ├── CalculateOrderAction.php
│   └── ValidateOrderAction.php
└── Payment/
    ├── ProcessPaymentAction.php
    └── RefundPaymentAction.php

7.3 app/Jobs/の構造(グローバル)

ドメイン横断的に使用されるジョブはapp/Jobs/に配置します。

app/Jobs/
└── {Domain}/                          # ドメイン名(オプション)
    ├── {JobName}Job.php
    └── ...

7.4 命名規則

種別 命名規則
Workflowクラス {WorkflowName}Workflow OrderPlacementWorkflow
Pipelineクラス {WorkflowName}Pipeline OrderPlacementPipeline
Contextクラス {WorkflowName}Context OrderPlacementContext
Stateモデル {WorkflowName}State OrderPlacementState
Actionクラス {ActionName}Action ReserveInventoryAction
Jobクラス {StepName}Job ReserveInventoryJob
Pipeクラス {StepName}Pipe ReserveInventoryPipe
ValueObject {ValueObjectName} ReservedInventory

8. 実装例(コードサンプル)

8.1 注文確定Workflowの完全な実装例

declare(strict_types=1);

namespace App\Workflows\OrderPlacement;

use App\Models\Order;
use App\Workflows\OrderPlacement\Jobs\ReserveInventoryJob;

class OrderPlacementWorkflow
{
    private const STEPS = [
        'inventory_reserved',
        'order_calculated',
        'payment_processed',
        'order_confirmed',
        'shipping_arranged',
        'notifications_sent',
        'completed',
    ];

    public function __construct(
        private OrderPlacementState $stateModel
    ) {}

    public function start(Order $order, array $cartItems): OrderPlacementState
    {
        $workflowState = $this->stateModel->create([
            'order_id' => $order->id,
            'current_step' => 'inventory_reserved',
            'status' => 'processing',
            'data' => [
                'cart_items' => $cartItems,
            ],
        ]);

        ReserveInventoryJob::dispatch($workflowState);

        return $workflowState;
    }

    public function proceedToNextStep(OrderPlacementState $state, string $currentStep): void
    {
        $nextStep = $this->getNextStep($currentStep);

        if ($nextStep === null) {
            $state->update(['status' => 'completed']);
            event(new OrderPlacementCompleted($state->order));
            return;
        }

        $state->update(['current_step' => $nextStep]);

        match($nextStep) {
            'order_calculated' => CalculateOrderJob::dispatch($state),
            'payment_processed' => ProcessPaymentJob::dispatch($state),
            'order_confirmed' => ConfirmOrderJob::dispatch($state),
            'shipping_arranged' => ArrangeShippingJob::dispatch($state),
            'notifications_sent' => SendNotificationsJob::dispatch($state),
            default => throw new \InvalidArgumentException("Unknown step: {$nextStep}"),
        };
    }

    private function getNextStep(string $currentStep): ?string
    {
        $index = array_search($currentStep, self::STEPS);
        return $index !== false && isset(self::STEPS[$index + 1])
            ? self::STEPS[$index + 1]
            : null;
    }
}

8.2 Pipelineを使った決済処理の実装例

declare(strict_types=1);

namespace App\Workflows\OrderPlacement;

use Illuminate\Pipeline\Pipeline;
use App\Models\Order;

class OrderPlacementPipeline
{
    public function __construct(
        private Pipeline $pipeline
    ) {}

    public function process(Order $order, array $cartItems): OrderPlacementResult
    {
        $context = new OrderPlacementContext($order, $cartItems);

        return $this->pipeline
            ->send($context)
            ->through([
                ValidateOrderPipe::class,
                CalculateOrderPipe::class,
                ReserveInventoryPipe::class,
            ])
            ->then(function ($context) {
                return $context->getResult();
            });
    }
}

8.3 Actionクラスの実装例

declare(strict_types=1);

namespace App\Workflows\OrderPlacement\Actions;

use App\Models\Order;
use App\Domains\Order\ValueObjects\ReservedInventory;

class ReserveInventoryAction
{
    public function __construct(
        private InventoryService $inventoryService
    ) {}

    /**
     * 在庫を確保する
     *
     * @param Order $order 注文
     * @return ReservedInventory[] 確保した在庫のリスト
     * @throws InsufficientInventoryException 在庫不足の場合
     */
    public function execute(Order $order): array
    {
        $reserved = [];

        try {
            foreach ($order->items as $item) {
                $reservationId = $this->inventoryService->reserve(
                    productVarId: $item->product_var_id,
                    quantity: $item->quantity
                );

                $reserved[] = new ReservedInventory(
                    productId: $item->product_id,
                    productVarId: $item->product_var_id,
                    quantity: $item->quantity,
                    reservationId: $reservationId,
                    reservedAt: now(),
                );
            }

            return $reserved;
        } catch (InsufficientInventoryException $e) {
            // 既に確保した在庫を解放
            $this->releaseAll($reserved);
            throw $e;
        }
    }

    private function releaseAll(array $reserved): void
    {
        foreach ($reserved as $item) {
            $this->inventoryService->release($item->reservationId);
        }
    }
}

8.4 ジョブへの変換例

declare(strict_types=1);

namespace App\Workflows\OrderPlacement\Jobs;

use App\Workflows\OrderPlacement\Actions\ReserveInventoryAction;
use App\Workflows\OrderPlacement\OrderPlacementWorkflow;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ReserveInventoryJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public int $tries = 3;
    public int $backoff = 60;

    public function __construct(
        public OrderPlacementState $workflowState
    ) {}

    public function handle(
        ReserveInventoryAction $action,
        OrderPlacementWorkflow $workflow
    ): void {
        $order = Order::findOrFail($this->workflowState->order_id);

        try {
            $result = $action->execute($order);

            $this->workflowState->update([
                'data' => array_merge($this->workflowState->data ?? [], [
                    'inventory_reserved' => array_map(
                        fn($item) => $item->toArray(),
                        $result
                    ),
                ]),
            ]);

            $workflow->proceedToNextStep($this->workflowState, 'inventory_reserved');

        } catch (InsufficientInventoryException $e) {
            $this->workflowState->update([
                'status' => 'failed',
                'error' => $e->getMessage(),
            ]);

            NotifyOrderFailureJob::dispatch($order, $e);
            throw $e;
        }
    }

    public function failed(\Throwable $exception): void
    {
        $this->workflowState->update([
            'status' => 'failed',
            'error' => $exception->getMessage(),
        ]);

        \Log::error('ReserveInventoryJob failed', [
            'workflow_state_id' => $this->workflowState->id,
            'error' => $exception->getMessage(),
        ]);
    }
}

9. テスト戦略

9.1 Workflow/Pipelineのユニットテスト

declare(strict_types=1);

namespace Tests\Unit\Workflows\OrderPlacement;

use App\Models\Order;
use App\Workflows\OrderPlacement\OrderPlacementWorkflow;
use App\Workflows\OrderPlacement\OrderPlacementState;
use Tests\TestCase;
use Illuminate\Foundation\Testing\RefreshDatabase;

class OrderPlacementWorkflowTest extends TestCase
{
    use RefreshDatabase;

    public function test_workflow_starts_correctly(): void
    {
        $order = Order::factory()->create();
        $workflow = app(OrderPlacementWorkflow::class);

        $state = $workflow->start($order, []);

        $this->assertInstanceOf(OrderPlacementState::class, $state);
        $this->assertEquals('inventory_reserved', $state->current_step);
        $this->assertEquals('processing', $state->status);
    }

    public function test_workflow_proceeds_to_next_step(): void
    {
        $state = OrderPlacementState::factory()->create([
            'current_step' => 'inventory_reserved',
            'status' => 'processing',
        ]);

        $workflow = app(OrderPlacementWorkflow::class);
        $workflow->proceedToNextStep($state, 'inventory_reserved');

        $state->refresh();
        $this->assertEquals('order_calculated', $state->current_step);
    }
}

9.2 Actionクラスのテスト

declare(strict_types=1);

namespace Tests\Unit\Workflows\OrderPlacement\Actions;

use App\Models\Order;
use App\Workflows\OrderPlacement\Actions\ReserveInventoryAction;
use App\Services\InventoryService;
use Tests\TestCase;
use Mockery;

class ReserveInventoryActionTest extends TestCase
{
    public function test_execute_reserves_inventory(): void
    {
        $order = Order::factory()->create();
        $order->items()->create([
            'product_id' => 1,
            'product_var_id' => 1,
            'quantity' => 2,
        ]);

        $inventoryService = Mockery::mock(InventoryService::class);
        $inventoryService->shouldReceive('reserve')
            ->once()
            ->with(1, 2)
            ->andReturn('reservation-123');

        $action = new ReserveInventoryAction($inventoryService);
        $result = $action->execute($order);

        $this->assertCount(1, $result);
        $this->assertEquals('reservation-123', $result[0]->reservationId);
    }

    public function test_execute_releases_inventory_on_failure(): void
    {
        $order = Order::factory()->create();
        $order->items()->create([
            'product_id' => 1,
            'product_var_id' => 1,
            'quantity' => 2,
        ]);

        $inventoryService = Mockery::mock(InventoryService::class);
        $inventoryService->shouldReceive('reserve')
            ->once()
            ->andReturn('reservation-123');
        $inventoryService->shouldReceive('reserve')
            ->once()
            ->andThrow(new InsufficientInventoryException());
        $inventoryService->shouldReceive('release')
            ->once()
            ->with('reservation-123');

        $action = new ReserveInventoryAction($inventoryService);

        $this->expectException(InsufficientInventoryException::class);
        $action->execute($order);
    }
}

9.3 ジョブのテスト

declare(strict_types=1);

namespace Tests\Unit\Workflows\OrderPlacement\Jobs;

use App\Models\Order;
use App\Workflows\OrderPlacement\Jobs\ReserveInventoryJob;
use App\Workflows\OrderPlacement\OrderPlacementState;
use App\Workflows\OrderPlacement\OrderPlacementWorkflow;
use Tests\TestCase;
use Illuminate\Foundation\Testing\RefreshDatabase;
use Illuminate\Support\Facades\Queue;

class ReserveInventoryJobTest extends TestCase
{
    use RefreshDatabase;

    public function test_job_processes_successfully(): void
    {
        Queue::fake();

        $order = Order::factory()->create();
        $state = OrderPlacementState::factory()->create([
            'order_id' => $order->id,
            'current_step' => 'inventory_reserved',
        ]);

        $job = new ReserveInventoryJob($state);
        $job->handle(
            app(ReserveInventoryAction::class),
            app(OrderPlacementWorkflow::class)
        );

        $state->refresh();
        $this->assertNotNull($state->data['inventory_reserved']);
    }
}

9.4 統合テストの考え方

統合テストでは、実際のデータベースと外部サービスを使用して、ワークフロー全体をテストします。

declare(strict_types=1);

namespace Tests\Integration\Workflows\OrderPlacement;

use App\Models\Order;
use App\Workflows\OrderPlacement\OrderPlacementWorkflow;
use Tests\TestCase;
use Illuminate\Foundation\Testing\RefreshDatabase;

class OrderPlacementWorkflowIntegrationTest extends TestCase
{
    use RefreshDatabase;

    public function test_complete_order_placement_workflow(): void
    {
        $order = Order::factory()->create();
        $workflow = app(OrderPlacementWorkflow::class);

        $state = $workflow->start($order, []);

        // 各ステップのジョブを実行
        $this->artisan('queue:work', ['--once' => true]);

        $state->refresh();
        $this->assertEquals('completed', $state->status);
    }
}

10. ベストプラクティスと注意点

10.1 パフォーマンス考慮事項

Pipelineパターン: - 各Pipeの処理時間を最小限に - 不要なデータベースクエリを避ける - キャッシュを活用

Workflowパターン: - ジョブの実行時間を監視 - キューワーカーの数を適切に設定 - バッチ処理で大量データを効率的に処理

10.2 デバッグとロギング

各ステップで適切にログを記録します。

class ReserveInventoryPipe
{
    public function handle(OrderPlacementContext $context, Closure $next): OrderPlacementContext
    {
        \Log::info('ReserveInventoryPipe started', [
            'order_id' => $context->order->id,
        ]);

        try {
            $result = $next($context);

            \Log::info('ReserveInventoryPipe completed', [
                'order_id' => $context->order->id,
                'reserved_count' => count($context->getReservedInventories()),
            ]);

            return $result;
        } catch (\Exception $e) {
            \Log::error('ReserveInventoryPipe failed', [
                'order_id' => $context->order->id,
                'error' => $e->getMessage(),
            ]);
            throw $e;
        }
    }
}

10.3 監視とアラート

ワークフローの状態を監視し、異常時にアラートを送信します。

// 定期的に実行されるコマンド
class MonitorWorkflowsCommand extends Command
{
    public function handle(): void
    {
        // 長時間処理中のワークフローを検出
        $stuckWorkflows = OrderPlacementState::where('status', 'processing')
            ->where('updated_at', '<', now()->subHours(1))
            ->get();

        if ($stuckWorkflows->isNotEmpty()) {
            \Log::warning('Stuck workflows detected', [
                'count' => $stuckWorkflows->count(),
            ]);

            // アラートを送信
            $this->sendAlert($stuckWorkflows);
        }
    }
}

10.4 将来の拡張性

プラグイン化: - 各ステップをプラグインとして実装 - 設定で有効/無効を切り替え可能に

バージョニング: - ワークフローのバージョンを管理 - 既存のワークフローと互換性を保ちながら拡張

マルチテナント対応: - テナントごとに異なるワークフローを実行可能に - テナント設定でワークフローをカスタマイズ


11. 今後想定されるWorkflow/Pipeline候補

11.1 Workflow候補(非同期・状態管理が必要)

以下のワークフローは、複数のステップを跨ぎ、状態管理と非同期処理が必要なため、Workflowパターンが適しています。

注文確定ワークフロー

  • 目的: 在庫仮押さえ → 決済 → 注文確定 → 配送指示 → 通知送信までを一連で管理
  • 特徴: 失敗時の補償処理や再試行を制御
  • 優先度: 高(Phase 1で実装予定)

外部在庫同期ワークフロー

  • 目的: 倉庫・ERPなど外部システムからの在庫イベントを受信し、SKUごとの差分反映 → 整合性検証 → 検索インデックス更新までをバッチ/チェーンで処理
  • 特徴: 大量データ処理、リトライが必要
  • 優先度: 中(Phase 2以降で実装予定)

配送手配ワークフロー

  • 目的: 複数配送業者とのラベル発行 → 追跡番号取得 → ステータス更新 → 顧客通知を段階的に連携
  • 特徴: 外部API呼び出しが多い、進捗追跡が必要
  • 優先度: 高(Phase 1で実装予定)

サブスクリプション課金ワークフロー

  • 目的: 請求生成 → 決済 → 契約更新 → 失敗時のリトライ/ステータス変更 → 顧客通知を周期的に実行
  • 特徴: 定期実行、リトライロジックが重要
  • 優先度: 中(Phase 2で実装予定)

返品・返金ワークフロー

  • 目的: 返品受付 → 在庫検品 → 返金処理 → 帳票更新 → 通知をステップ管理
  • 特徴: 部分返金や在庫戻しを安全に処理
  • 優先度: 中(Phase 2以降で実装予定)

アカウント審査/承認ワークフロー(B2B想定)

  • 目的: 申請受領 → 書類確認 → 承認/差戻し → 権限付与 → 通知をイベント駆動で管理
  • 特徴: 承認フロー、複数担当者による処理
  • 優先度: 低(Phase 3で実装予定)

11.2 Pipeline候補(同期・即時レスポンスが必要)

以下のパイプラインは、ユーザーリクエスト内で完結し、即座に結果が必要なため、Pipelineパターンが適しています。

カート価格計算パイプライン

  • 目的: アイテム正規化 → プロモーション適用 → 送料計算 → 税額按分 → 合計算出を同期で連鎖
  • 特徴: ユーザーがカート画面で即座に結果を確認する必要がある
  • 優先度: 高(Phase 1で実装予定)

注文バリデーションパイプライン

  • 目的: カート状態チェック → 在庫即時照会 → 決済手段制約チェック → 不正検知ヒューリスティクスを順次評価
  • 特徴: 注文確定前に即座にエラーを検出する必要がある
  • 優先度: 高(Phase 1で実装予定)

商品公開前チェックパイプライン

  • 目的: マスタ整合性 → 画像/コンテンツ検証 → SEOメタ情報 → 承認ルール確認をFilament操作内で実行
  • 特徴: 管理画面での操作時に即座に検証結果を表示
  • 優先度: 中(Phase 2以降で実装予定)

顧客セグメント判定パイプライン

  • 目的: 属性収集 → 購買履歴解析 → スコアリング → セグメント割当を同期で行い、マーケティング施策に反映
  • 特徴: リアルタイムでのセグメント判定が必要
  • 優先度: 低(Phase 3以降で実装予定)

注記: カード決済は事前に選定した単一ゲートウェイを利用する方針のため、「決済ゲートウェイ選定パイプライン」は対象外とします。


まとめ

Workflow/Pipelineパターンにより、複数手順を跨ぐ処理を以下のように管理できます:

  1. 責務の分離: 各ステップが独立したAction/Pipeとして実装される
  2. 再利用性: Actionクラスを他のワークフローからも利用可能
  3. テスト容易性: 各コンポーネントを個別にテスト可能
  4. 拡張性: ステップの追加・削除が容易
  5. 可観測性: 各ステップのログ・メトリクスを取得可能
  6. エラーハンドリング: 各ステップで適切にエラーを処理可能

このパターンを適用することで、EC-Spokeシステムの複雑なビジネスプロセスを、保守性と拡張性の高いコードとして実装できます。


関連ドキュメント: - 全体アーキテクチャ - 注文計算方式 - 価格計算

更新日: 2025-01-XX バージョン: 1.0 作成者: EC-Spoke開発チーム