通俗地讲,它被称为command bus模式,但该库对命令和查询进行了区分,并允许您在命令处理程序中强制不返回值,以保持与 CQRS 模式保持一致。
这是一个独立的库,唯一的两个依赖项是 PSR-11 Container 和 PSR-3 Log 接口,以实现更好的互操作性。
目录:
使用 Composer 安装库:
composer require sco/message-bus
您需要遵循 PSR-4 自动加载标准,并创建自己的服务容器类,这是实现PsrContainerContainerInterface
的问题,并且可以像库用于其测试套件ScoMessageBusTestsStubContainerInMemoryContainer
一样简单ScoMessageBusTestsStubContainerInMemoryContainer
,或者您可以编写器需要一个遵循 PSR-11 标准(如 PHP-DI)的服务容器库。
require ' vendor/autoload.php '
$ container = new InMemoryContainer( $ services )
$ bus = new Sco MessageBus Bus ( $ container );
$ bus -> dispatch ( new FindPostByIdQuery ( 1 ))
我们在这里可以使用两种方法,装饰库提供的 Bus 类,或者注入服务定位器。有关更多信息,您可以阅读 Symfony 文档
我们可以创建一个新的 Decorator 类,它将实现 Symfony 的SymfonyContractsServiceServiceSubscriberInterface
接口:
use Sco MessageBus Bus ;
use Sco MessageBus Message ;
use Sco MessageBus Result ;
use Psr Container ContainerInterface ;
use Symfony Contracts Service ServiceSubscriberInterface ;
class MessageBus implements ServiceSubscriberInterface
{
private Bus $ bus ;
public function __construct ( ContainerInterface $ locator )
{
$ this -> bus = new Bus ( $ locator , [], null , new UuidV4Identity ());
}
public function dispatch ( Sco MessageBus Message $ message ): Result
{
return $ this -> bus -> dispatch ( $ message );
}
public static function getSubscribedServices (): array
{
return [
FindPostByIdHandler::class,
SavePostHandler::class
];
}
}
使用这种方法,应用程序中的所有处理程序都必须添加到getSubscribedServices
返回的数组中,因为 Symfony 中的服务默认情况下不是公开的,而且它们确实不应该公开,所以除非您在映射器时将处理程序添加到此数组中完成映射后,它将无法找到处理程序,并且将抛出服务未找到容器异常。
另一种方法是将带有所有处理程序的服务定位器注入到库的总线中。这将在服务注册 yaml 文件中完成。
匿名服务定位器:
services :
_defaults :
autowire : true
autoconfigure : true
# Anonymous Service Locator
ScoMessageBusBus :
arguments :
$container : !service_locator
' @FindPostByIdHandler ' : ' handler_one '
' @SavePostHandler ' : ' handler_two '
显式服务定位器定义:
services :
_defaults :
autowire : true
autoconfigure : true
# Explicit Service Locator
message_handler_service_locator :
class : SymfonyComponentDependencyInjectionServiceLocator
arguments :
- ' @FindPostByIdHandler '
- ' @SavePostHandler '
ScoMessageBusBus :
arguments :
$container : ' @message_handler_service_locator '
让我们扩展这些配置并使用 Symfony 服务容器的标签功能自动将处理程序添加到总线:
使用!tagged_locator
:
services :
_defaults :
autowire : true
autoconfigure : true
_instanceof :
ScoMessageBusHandler :
tags : ['message_handler']
# Anonymous Service Locator
ScoMessageBusBus :
arguments :
$container : !tagged_locator message_handler
显式服务定位器定义:
services :
_defaults :
autowire : true
autoconfigure : true
_instanceof :
ScoMessageBusHandler :
tags : ['message_handler']
# Explicit Service Locator
message_handler_service_locator :
class : SymfonyComponentDependencyInjectionServiceLocator
arguments :
- !tagged_iterator message_handler
ScoMessageBusBus :
arguments :
$container : ' @message_handler_service_locator '
要在 Laravel 框架中有效地使用它,您所要做的就是在 Laravel 的服务容器中注册总线,并将该容器作为参数提供给库的 Bus 类:
$ this -> app -> bind ( Sco MessageBus Bus::class, function ( $ app ) {
return new Sco MessageBus Bus ( $ app );
});
每个命令或查询及其各自的结果对象组合将被分配一个唯一的标识,例如命令,并且其各自的结果对象将具有标识00000001
。这对于日志记录、审计或调试目的非常有用。
默认的身份生成策略是一个简单的ScoMessageBusIdentityRandomString
生成器,以将外部依赖性保持在最低限度。要使用其他东西,您可能需要像 https://github.com/ramsey/uuid 这样的库并实现ScoMessageBusIdentity
。
use Sco MessageBus Identity ;
class UuidIdentity implements Identity
{
public function generate () : string
{
return Uuid:: uuid7 ()-> toString ();
}
}
FindPostByIdQuery
将被映射到FindPostByIdHandler
,或者SavePostCommand
将被映射到SavePostHandler
。#[IsCommand(handler: SavePostHandler::class)]
或#[IsQuery(handler: FindPostByIdHandler::class)]
添加到您的命令/查询类。 handler
参数名称可以省略,这取决于您的个人喜好。ScoMessageBusMapper
接口来实现。每个命令都将通过一系列中间件传递。默认情况下,该链是空的,但该库确实提供了一些开箱即用的中间件:
begin
、 commit
和rollback
步骤是普通的Closure
对象,因此您可以使用您喜欢的任何 ORM 或持久性方法。要创建您自己的自定义中间件,您需要实现ScoMessageBusMiddleware
接口并将其提供给总线:
use Sco MessageBus Bus ;
use Sco MessageBus Message ;
use Sco MessageBus Middleware ;
class CustomMiddleware implements Middleware
{
public function __invoke ( Message $ message , Closure $ next ) : mixed
{
// Do something before message handling
$ result = $ next ( $ message );
// Do something after message handling
return $ result ;
}
}
$ bus = new Bus (middlewares: [ new CustomMiddleware ()]);
如果添加ScoMessageBusMiddlewareEventMiddleware
您将能够订阅以下事件:
MessageReceivedEvent - 在收到消息但在处理之前引发。
use Sco MessageBus Event Subscriber ;
use Sco MessageBus Event MessageReceivedEvent ;
$ subscriber = new Subscriber ();
$ subscriber -> addListener (MessageReceivedEvent::class, function ( MessageReceivedEvent $ event ) {
$ event -> getName (); // Name of the Event
$ event -> getMessage ();; // Command or Query that has been received
});
MessageHandledEvent - 成功处理消息后引发。
use Sco MessageBus Event Subscriber ;
use Sco MessageBus Event MessageHandledEvent ;
$ subscriber = new Subscriber ();
$ subscriber -> addListener (MessageHandledEvent::class, function ( MessageHandledEvent $ event ) {
$ event -> getName (); // Name of the Event
$ event -> getMessage (); // Command or Query being handled
$ event -> getResult (); // Result for the handled message
});
MessageFailedEvent - 当消息处理失败并引发异常时引发。
use Sco MessageBus Event Subscriber ;
use Sco MessageBus Event MessageFailedEvent ;
$ subscriber = new Subscriber ();
$ subscriber -> addListener (MessageFailedEvent::class, function ( MessageFailedEvent $ event ) {
$ event -> getName (); // Name of the Event
$ event -> getMessage (); // Command or Query being handled
$ event -> getError (); // Captured Exception
});
事务中间件接受三个函数参数,每个参数对应事务的每个阶段:开始、提交和回滚。采用这种方法允许您使用您喜欢的任何 ORM,甚至使用本机 PDO 对象与持久层进行交互。
$ pdo = new PDO ( ' {connection_dsn} ' )
$ transaction = new Sco MessageBus Middleware TransactionMiddleware (
fn (): bool => $ pdo -> beginTransaction (),
fn (): bool => $ pdo -> commit (),
fn ( Throwable $ error ): bool => $ pdo -> rollBack (),
);
库将 Handler 返回值包装到Result 值对象中以提供一致的 API,以便您可以依赖始终具有相同类型的返回值。
所有 Result 值对象都扩展了ScoMessageBusResult
抽象类,并且可以分为 3 组:
ScoMessageBusResultBoolean
ScoMessageBusResultInteger
ScoMessageBusResultNumeric
ScoMessageBusResultText
ScoMessageBusResultNone
(包含空值)ScoMessageBusResultDelegated
包装对象并将对属性和方法的调用委托给底层对象ScoMessageBusResultCollection
和ScoMessageBusResultMap
包装数字索引数组(列表)和字符串索引数组(映射)并实现Countable
、 ArrayAccess
和IteratorAggregate
接口您还可以通过扩展抽象类ScoMessageBusResult
并在适当的处理程序中返回它们来添加自己的自定义 Result 值对象。
库遵循 PSR-12 标准。