Colloquially more known as command bus pattern, but the library makes a distinction between Commands and Queries and allows you to enforce no return values in Command Handlers to keep you in line with CQRS pattern.
This is a stand-alone library, the only two dependencies being the PSR-11 Container and PSR-3 Log interfaces to allow for better interoperability.
Table of Contents:
Install the library using composer:
composer require sco/message-bus
You will need to follow the PSR-4 autoloading standard and either create your own Service Container class, which is a matter of implementing the PsrContainerContainerInterface
and can be as simple as what
the library is using for its test suite ScoMessageBusTestsStubContainerInMemoryContainer
, or you can composer require a Service Container library which
adheres to the PSR-11 Standard like PHP-DI.
require 'vendor/autoload.php'
$container = new InMemoryContainer($services)
$bus = new ScoMessageBusBus($container);
$bus->dispatch(new FindPostByIdQuery(1))
We can use two approaches here, decorating the Bus class provided by the library, or injecting the Service Locator. For more info you can read Symfony Docs
We can create a new Decorator class which will implement Symfony's SymfonyContractsServiceServiceSubscriberInterface
interface:
use ScoMessageBusBus;
use ScoMessageBusMessage;
use ScoMessageBusResult;
use PsrContainerContainerInterface;
use SymfonyContractsServiceServiceSubscriberInterface;
class MessageBus implements ServiceSubscriberInterface
{
private Bus $bus;
public function __construct(ContainerInterface $locator)
{
$this->bus = new Bus($locator, [], null, new UuidV4Identity());
}
public function dispatch(ScoMessageBusMessage $message): Result
{
return $this->bus->dispatch($message);
}
public static function getSubscribedServices(): array
{
return [
FindPostByIdHandler::class,
SavePostHandler::class
];
}
}
With this approach all handlers in you application will have to be added to the array returned by getSubscribedServices
, since services in Symfony are not
public by default, and they really shouldn't be, so unless you add your handlers to this array when the mapper is done mapping
it won't be able to find the handler and a service not found container exception will be thrown.
A different approach would be to inject a Service Locator with all the handlers into the library's Bus. This would be done in the service registration yaml files.
Anonymous service locator:
services:
_defaults:
autowire: true
autoconfigure: true
# Anonymous Service Locator
ScoMessageBusBus:
arguments:
$container: !service_locator
'@FindPostByIdHandler': 'handler_one'
'@SavePostHandler': 'handler_two'
Explicit service locator definition:
services:
_defaults:
autowire: true
autoconfigure: true
# Explicit Service Locator
message_handler_service_locator:
class: SymfonyComponentDependencyInjectionServiceLocator
arguments:
- '@FindPostByIdHandler'
- '@SavePostHandler'
ScoMessageBusBus:
arguments:
$container: '@message_handler_service_locator'
Let's expand these configurations and use the tags feature of Symfony's service container to automatically add handlers to the Bus:
Using !tagged_locator
:
services:
_defaults:
autowire: true
autoconfigure: true
_instanceof:
ScoMessageBusHandler:
tags: ['message_handler']
# Anonymous Service Locator
ScoMessageBusBus:
arguments:
$container: !tagged_locator message_handler
Explicit service locator definition:
services:
_defaults:
autowire: true
autoconfigure: true
_instanceof:
ScoMessageBusHandler:
tags: ['message_handler']
# Explicit Service Locator
message_handler_service_locator:
class: SymfonyComponentDependencyInjectionServiceLocator
arguments:
- !tagged_iterator message_handler
ScoMessageBusBus:
arguments:
$container: '@message_handler_service_locator'
To use it effectively with Laravel framework all you have to do is register the Bus in Laravel's Service Container and provide the container as an argument to the library's Bus class:
$this->app->bind(ScoMessageBusBus::class, function ($app) {
return new ScoMessageBusBus($app);
});
Each Command or Query and their respective Result object combo will be assigned a unique Identity, e.g. a Command, and its respective Result object will have and identity of 00000001
.
This can be useful for logging, auditing or debugging purposes.
The default Identity generation strategy is a simple ScoMessageBusIdentityRandomString
generator to keep the external dependencies to a minimum. To use something else you could require a library like https://github.com/ramsey/uuid and implement the ScoMessageBusIdentity
.
use ScoMessageBusIdentity;
class UuidIdentity implements Identity
{
public function generate() : string
{
return Uuid::uuid7()->toString();
}
}
FindPostByIdQuery
will get mapped to FindPostByIdHandler
or a SavePostCommand
will get mapped to SavePostHandler
.#[IsCommand(handler: SavePostHandler::class)]
or #[IsQuery(handler: FindPostByIdHandler::class)]
to your Command/Query class. The handler
parameter name can be omitted, it's up to your personal preference.ScoMessageBusMapper
interface.Each command will be passed through a chain of Middlewares. By default the chain is empty, but the library does offer some Middleware out of the box:
begin
, commit
and rollback
steps are plain Closure
objects, so you can use whichever ORM or Persistence approach you prefer.To create your own custom middleware you need to implement the ScoMessageBusMiddleware
interface and provide it
to the bus:
use ScoMessageBusBus;
use ScoMessageBusMessage;
use ScoMessageBusMiddleware;
class CustomMiddleware implements Middleware
{
public function __invoke(Message $message,Closure $next) : mixed
{
// Do something before message handling
$result = $next($message);
// Do something after message handling
return $result;
}
}
$bus = new Bus(middlewares: [new CustomMiddleware()]);
If you add the ScoMessageBusMiddlewareEventMiddleware
you will be able to subscribe to the following events:
MessageReceivedEvent - raised when the message is received but before being handled.
use ScoMessageBusEventSubscriber;
use ScoMessageBusEventMessageReceivedEvent;
$subscriber = new Subscriber();
$subscriber->addListener(MessageReceivedEvent::class, function (MessageReceivedEvent $event) {
$event->getName(); // Name of the Event
$event->getMessage();; // Command or Query that has been received
});
MessageHandledEvent - raised after the message has been handled successfully.
use ScoMessageBusEventSubscriber;
use ScoMessageBusEventMessageHandledEvent;
$subscriber = new Subscriber();
$subscriber->addListener(MessageHandledEvent::class, function (MessageHandledEvent $event) {
$event->getName(); // Name of the Event
$event->getMessage(); // Command or Query being handled
$event->getResult(); // Result for the handled message
});
MessageFailedEvent - raised when the message handling fails and an exception gets thrown.
use ScoMessageBusEventSubscriber;
use ScoMessageBusEventMessageFailedEvent;
$subscriber = new Subscriber();
$subscriber->addListener(MessageFailedEvent::class, function (MessageFailedEvent $event) {
$event->getName(); // Name of the Event
$event->getMessage(); // Command or Query being handled
$event->getError(); // Captured Exception
});
Transaction Middleware accepts three function arguments, each for every stage of the transaction: begin, commit, and rollback. Going with this approach allows you to use any ORM you prefer or even using the native PDO object to interact with your persistence layer.
$pdo = new PDO('{connection_dsn}')
$transaction = new ScoMessageBusMiddlewareTransactionMiddleware(
fn(): bool => $pdo->beginTransaction(),
fn(): bool => $pdo->commit(),
fn(Throwable $error): bool => $pdo->rollBack(),
);
Library wraps the Handler return values into Result value objects to provide a consistent API and so that you can depend on the return values always being of the same type.
All Result value objects extend the ScoMessageBusResult
abstract class and can be divided into 3 groups:
ScoMessageBusResultBoolean
ScoMessageBusResultInteger
ScoMessageBusResultNumeric
ScoMessageBusResultText
ScoMessageBusResultNone
(wraps null values)ScoMessageBusResultDelegated
which wraps objects and delegates calls to properties and methods to the underlying objectScoMessageBusResultCollection
and ScoMessageBusResultMap
which wrap number indexed arrays (lists) and string indexed arrays (maps) and implement Countable
, ArrayAccess
and IteratorAggregate
interfacesYou can also add your own custom Result value objects by extending the abstract class ScoMessageBusResult
and returning them in the appropriate handler.
Library follows the PSR-12 standard.