GitHub

Pipeline Processing

TinyMVC provides a powerful pipeline processing system through the Pipeline class, allowing you to create sequences of operations (pipes) that process data sequentially with support for middleware, error handling, and context management.

Basic Usage

// Create a pipeline with initial data
$result = Pipeline::make($userData)
    ->through(
        ValidateUser::class,
        [SanitizeData::class, 'process'],
        function($payload, $next) {
            // Custom processing
            $payload['processed'] = true;
            return $next($payload);
        }
    )
    ->then(function($processedData) {
        // Final processing
        return saveUser($processedData);
    });

Pipeline Creation

// Create pipeline with initial payload
$pipeline = Pipeline::make($initialData);

// Alternative: create and set payload separately
$pipeline = new Pipeline();
$pipeline->send($data);

Adding Pipes

// Add pipes (multiple formats supported)
$pipeline->through(
    'ClassName',                  // Class with handle() or __invoke()
    [ClassName::class, 'method'], // Class method
    function($payload, $next) {   // Closure
        // Process payload
        return $next($payload);
    }
);

// Alias method
$pipeline->pipe(SanitizeStep::class);

// Conditional pipes
$pipeline->when($condition, LogOperation::class);
$pipeline->unless($condition, SkipStep::class);

Execution Methods

// Execute with final callback
$result = $pipeline->then(function($processedData) {
    return finalProcessing($processedData);
});

// Alias method
$result = $pipeline->execute($finalCallback);

// Collect all intermediate results
$results = $pipeline->collect();

// Execute asynchronously (returns Generator)
foreach ($pipeline->async() as $index => $result) {
    // Process each result as it becomes available
}

Error Handling

// Set custom error handler
$pipeline->onError(function($exception, $payload, $context) {
    logError($exception->getMessage());
    return fallbackProcessing($payload);
}, $stopOnError = true);

// Example with error handling
$result = Pipeline::make($data)
    ->through(RiskyOperation::class, AnotherStep::class)
    ->onError(function($e) {
        return ['error' => $e->getMessage()];
    })
    ->then();

Middleware and Context

// Add middleware that wraps entire pipeline
$pipeline->middleware(function($payload, $next) {
    // Pre-processing
    $start = microtime(true);
    
    $result = $next($payload);
    
    // Post-processing
    $duration = microtime(true) - $start;
    logDuration($duration);
    
    return $result;
});

// Add context data available to all pipes
$pipeline->withContext([
    'requestId' => uniqid(),
    'userId' => Auth::id()
]);

Debugging and Logging

// Enable debug mode
$pipeline->debug(true);

// Execute pipeline
$result = $pipeline->then();

// Get debug logs
$logs = $pipeline->getLogs();
foreach ($logs as $log) {
    echo "{$log['timestamp']} [{$log['level']}] {$log['message']}";
}

Pipe Interface

Create custom pipes by implementing the PipeInterface:

use Spark\Contracts\PipeInterface;

class ValidationPipe implements PipeInterface
{
    public function handle($payload, Closure $next)
    {
        // Validate payload
        if (!isValid($payload)) {
            throw new ValidationException('Invalid data');
        }
        
        // Pass to next pipe
        return $next($payload);
    }
}

// Usage
$pipeline->through(new ValidationPipe());

Advanced Usage

Pipeline Reuse

// Create a base pipeline with common configuration
$basePipeline = Pipeline::make()
    ->through(LogStep::class, AuditStep::class)
    ->middleware(TimingMiddleware::class)
    ->debug(config('debug'));

// Clone for specific use cases
$userPipeline = $basePipeline->clone()
    ->send($userData)
    ->through(ValidateUser::class, ProcessUser::class);

$orderPipeline = $basePipeline->clone()
    ->send($orderData)
    ->through(ValidateOrder::class, ProcessOrder::class);

// Reset pipeline for reuse
$pipeline->reset()->send($newData)->through($newPipes);

Complex Conditional Logic

Pipeline::make($data)
    ->through(BaseValidation::class)
    ->when($isPremiumUser, PremiumFeatures::class)
    ->when($hasDiscount, [DiscountApplication::class, 'apply'])
    ->unless($isGuest, UserPersonalization::class)
    ->then(function($processed) {
        return finalize($processed);
    });

Full Examples

User Registration Pipeline

public function registerUser(Request $request)
{
    try {
        $user = Pipeline::make($request->all())
            ->through(
                [UserValidator::class, 'validate'],
                [UserSanitizer::class, 'sanitize'],
                function($data, $next) {
                    // Custom transformation
                    $data['registration_ip'] = $request->ip();
                    return $next($data);
                },
                [UserCreator::class, 'create'],
                [WelcomeEmail::class, 'send']
            )
            ->withContext(['request' => $request])
            ->onError(function($e) use ($request) {
                Log::error("Registration failed: {$e->getMessage()}");
                return null;
            })
            ->then();
            
        return $user ? response()->json($user) : abort(500);
        
    } catch (Exception $e) {
        return response()->json(['error' => $e->getMessage()], 500);
    }
}

Data Processing Workflow

public function processImportFile($filePath)
{
    $results = Pipeline::make($filePath)
        ->debug(true)
        ->through(
            [FileValidator::class, 'validate'],
            [FileReader::class, 'read'],
            [DataTransformer::class, 'transform'],
            [DatabaseInserter::class, 'insert']
        )
        ->middleware(function($payload, $next) {
            // Track memory usage
            $startMemory = memory_get_usage();
            $result = $next($payload);
            $endMemory = memory_get_usage();
            
            $this->logMemoryUsage($endMemory - $startMemory);
            return $result;
        })
        ->collect(); // Get results from each step
        
    return [
        'success' => true,
        'processed' => count($results),
        'logs' => Pipeline::getLogs()
    ];
}

Best Practices

  • Keep pipes focused on single responsibilities
  • Use the PipeInterface for complex pipe logic
  • Always implement error handling for production pipelines
  • Use context for passing metadata without polluting payload
  • Enable debug mode during development but disable in production
  • Consider cloning base pipelines for similar workflows