Pipeline Processing
TinyMVC provides a powerful pipeline processing system through the Pipeline
class,
allowing you to create sequences of operations (pipes) that process data sequentially with support
for middleware, error handling, and context management.
Basic Usage
// Create a pipeline with initial data
$result = Pipeline::make($userData)
->through(
ValidateUser::class,
[SanitizeData::class, 'process'],
function($payload, $next) {
// Custom processing
$payload['processed'] = true;
return $next($payload);
}
)
->then(function($processedData) {
// Final processing
return saveUser($processedData);
});
Pipeline Creation
// Create pipeline with initial payload
$pipeline = Pipeline::make($initialData);
// Alternative: create and set payload separately
$pipeline = new Pipeline();
$pipeline->send($data);
Adding Pipes
// Add pipes (multiple formats supported)
$pipeline->through(
'ClassName', // Class with handle() or __invoke()
[ClassName::class, 'method'], // Class method
function($payload, $next) { // Closure
// Process payload
return $next($payload);
}
);
// Alias method
$pipeline->pipe(SanitizeStep::class);
// Conditional pipes
$pipeline->when($condition, LogOperation::class);
$pipeline->unless($condition, SkipStep::class);
Execution Methods
// Execute with final callback
$result = $pipeline->then(function($processedData) {
return finalProcessing($processedData);
});
// Alias method
$result = $pipeline->execute($finalCallback);
// Collect all intermediate results
$results = $pipeline->collect();
// Execute asynchronously (returns Generator)
foreach ($pipeline->async() as $index => $result) {
// Process each result as it becomes available
}
Error Handling
// Set custom error handler
$pipeline->onError(function($exception, $payload, $context) {
logError($exception->getMessage());
return fallbackProcessing($payload);
}, $stopOnError = true);
// Example with error handling
$result = Pipeline::make($data)
->through(RiskyOperation::class, AnotherStep::class)
->onError(function($e) {
return ['error' => $e->getMessage()];
})
->then();
Middleware and Context
// Add middleware that wraps entire pipeline
$pipeline->middleware(function($payload, $next) {
// Pre-processing
$start = microtime(true);
$result = $next($payload);
// Post-processing
$duration = microtime(true) - $start;
logDuration($duration);
return $result;
});
// Add context data available to all pipes
$pipeline->withContext([
'requestId' => uniqid(),
'userId' => Auth::id()
]);
Debugging and Logging
// Enable debug mode
$pipeline->debug(true);
// Execute pipeline
$result = $pipeline->then();
// Get debug logs
$logs = $pipeline->getLogs();
foreach ($logs as $log) {
echo "{$log['timestamp']} [{$log['level']}] {$log['message']}";
}
Pipe Interface
Create custom pipes by implementing the PipeInterface:
use Spark\Contracts\PipeInterface;
class ValidationPipe implements PipeInterface
{
public function handle($payload, Closure $next)
{
// Validate payload
if (!isValid($payload)) {
throw new ValidationException('Invalid data');
}
// Pass to next pipe
return $next($payload);
}
}
// Usage
$pipeline->through(new ValidationPipe());
Advanced Usage
Pipeline Reuse
// Create a base pipeline with common configuration
$basePipeline = Pipeline::make()
->through(LogStep::class, AuditStep::class)
->middleware(TimingMiddleware::class)
->debug(config('debug'));
// Clone for specific use cases
$userPipeline = $basePipeline->clone()
->send($userData)
->through(ValidateUser::class, ProcessUser::class);
$orderPipeline = $basePipeline->clone()
->send($orderData)
->through(ValidateOrder::class, ProcessOrder::class);
// Reset pipeline for reuse
$pipeline->reset()->send($newData)->through($newPipes);
Complex Conditional Logic
Pipeline::make($data)
->through(BaseValidation::class)
->when($isPremiumUser, PremiumFeatures::class)
->when($hasDiscount, [DiscountApplication::class, 'apply'])
->unless($isGuest, UserPersonalization::class)
->then(function($processed) {
return finalize($processed);
});
Full Examples
User Registration Pipeline
public function registerUser(Request $request)
{
try {
$user = Pipeline::make($request->all())
->through(
[UserValidator::class, 'validate'],
[UserSanitizer::class, 'sanitize'],
function($data, $next) {
// Custom transformation
$data['registration_ip'] = $request->ip();
return $next($data);
},
[UserCreator::class, 'create'],
[WelcomeEmail::class, 'send']
)
->withContext(['request' => $request])
->onError(function($e) use ($request) {
Log::error("Registration failed: {$e->getMessage()}");
return null;
})
->then();
return $user ? response()->json($user) : abort(500);
} catch (Exception $e) {
return response()->json(['error' => $e->getMessage()], 500);
}
}
Data Processing Workflow
public function processImportFile($filePath)
{
$results = Pipeline::make($filePath)
->debug(true)
->through(
[FileValidator::class, 'validate'],
[FileReader::class, 'read'],
[DataTransformer::class, 'transform'],
[DatabaseInserter::class, 'insert']
)
->middleware(function($payload, $next) {
// Track memory usage
$startMemory = memory_get_usage();
$result = $next($payload);
$endMemory = memory_get_usage();
$this->logMemoryUsage($endMemory - $startMemory);
return $result;
})
->collect(); // Get results from each step
return [
'success' => true,
'processed' => count($results),
'logs' => Pipeline::getLogs()
];
}
Best Practices
- Keep pipes focused on single responsibilities
- Use the PipeInterface for complex pipe logic
- Always implement error handling for production pipelines
- Use context for passing metadata without polluting payload
- Enable debug mode during development but disable in production
- Consider cloning base pipelines for similar workflows