TNxHorizon
类是完全线程安全的NXHorizon.Instance
是线程安全的,可以从任何线程使用声明事件类型:
事件按类型信息 - TypeInfo
进行分类。每个单独的事件类别都需要不同的类型。
type
TFoo = class
...
end ;
TOtherFoo = type TFoo;
TIntegerEvent = type Integer;
TStringEvent = type string;
TFooEvent = INxEvent<TFoo>;
TOtherFooEvent = INxEvent<TOtherFoo>;
订阅/取消订阅事件:
订阅事件可以添加到任何现有的类中。
type
TSubscriber = class
protected
// subscriptions
fIntegerSubscription: INxEventSubscription;
fStringSubscription: INxEventSubscription;
// event handlers
procedure OnIntegerEvent ( const aEvent: TIntegerEvent);
procedure OnStringEvent ( const aEvent: TStringEvent);
public
constructor Create;
destructor Destroy; override;
end ;
constructor TSubscriber.Create;
begin
fIntegerSubscription := NxHorizon.Instance.Subscribe<TIntegerEvent>(Async, OnIntegerEvent);
fStringSubscription := NxHorizon.Instance.Subscribe<TStringEvent>(Sync, OnStringEvent);
end ;
destructor TSubscriber.Destroy;
begin
fIntegerSubscription.WaitFor;
fStringSubscription.WaitFor;
NxHorizon.Instance.Unsubscribe(fIntegerSubscription);
NxHorizon.Instance.Unsubscribe(fStringSubscription);
inherited ;
end ;
procedure TSubscriber.OnIntegerEvent ( const aEvent: TIntegerEvent);
begin
Writeln(aEvent);
end ;
procedure TSubscriber.OnStringEvent ( const aEvent: TStringEvent);
begin
Writeln(aEvent);
end ;
发送消息:
NxHorizon.Instance.Post<TIntegerEvent>( 5 );
NxHorizon.Instance.Send<TStringEvent>( ' abc ' , Async);
或者
var
IntEvent: TIntegerEvent;
StrEvent: TStringEvent;
IntEvent := 5 ;
StrEvent := ' abc ' ;
NxHorizon.Instance.Post(IntEvent);
NxHorizon.Instance.Send(StrEvent, Async);
事件处理程序方法必须符合以下声明,其中T
可以是任何类型。异步传递需要具有自动内存管理的类型或值类型。您还可以使用手动管理的长期对象实例作为事件,但在这种情况下,您必须确保它们在已分派的消息完全处理之前不会被销毁。
procedure( const aEvent: T) of object ;
TNxHorizonDelivery
类型声明了四种交付选项:
Sync
- 在当前线程中同步Async
- 在随机后台线程中异步MainSync
- 在主线程上同步MainAsync
- 主线程异步Sync
和MainSync
是 BLOCKING 操作,事件处理程序将在当前线程的上下文中立即执行,或者与主线程同步。这将阻止使用同一事件总线实例调度其他事件,直到事件处理程序完成。不要在默认事件总线实例上使用它(或仅在短期执行时谨慎使用)。
如果发送事件是从主线程上下文完成的, MainAsync
传递将使用TThread.ForceQueue
在主线程上下文中异步运行事件处理程序。
订阅事件处理程序会构造一个新的INxEventSubscription
实例。您应该存储返回的实例,以便稍后取消订阅。
取消订阅有两种方法: Unsubscribe
和UnsubscribeAsync
。
这两种方法都会取消订阅并将其从事件总线中维护的订阅集合中删除。该集合正在Post
和Send
方法内迭代。此时不允许进行任何修改,并且可能会导致意外行为。
为了避免在迭代期间修改订阅者集合,如果要取消订阅在同步分派的事件处理程序中运行的代码,则应使用UnsubscribeAsync
,这将立即取消订阅,但会延迟从集合中实际删除,在外部运行它。调度迭代。
异步分派的事件处理程序始终在分派迭代之外运行,并且它们允许使用Unsubscribe
方法。但是,处理程序的分派方式可以通过不相关的外部代码进行更改,如果您不能绝对保证异步分派,则可以使用UnsubscribeAsync
。
Unsubscribe
和UnsubscribeAsync
还会在从订阅集合中删除订阅之前取消订阅。通常,取消订阅之前不需要显式取消订阅,但是如果您出于某种特殊原因想要在取消订阅之前的某个时刻取消订阅,则可以调用其Cancel
方法。可以安全地多次调用Cancel
。一旦取消订阅,其状态就无法恢复。
由于异步事件分派,当您取消或取消订阅特定订阅时,可能会有一个已经分派的事件处理程序。如果您取消订阅析构函数(您的订阅者类析构函数),这可能会导致您在订阅者实例销毁过程中或销毁后访问订阅者实例。为了防止出现这种情况,您可以在订阅上调用WaitFor
,这将立即取消订阅并阻塞,直到所有分派的事件处理程序执行完毕。
如果您从主线程的上下文中调用WaitFor
,并且您的事件处理程序运行了很长时间,这将导致您的应用程序在该段时间内停止响应。
BeginWork
和EndWork
方法是订阅等待机制的一部分。如果您需要在其他线程的事件处理程序内运行某些代码,并且需要确保也等待该代码,则可以在启动此类线程之前调用BeginWork
,并在其完成后调用EndWork
。确保所有代码路径最终都会调用匹配的EndWork
,因为不这样做会在调用WaitFor
时导致死锁。
procedure TSubscriber.OnLongEvent ( const aEvent: TIntegerEvent);
begin
fIntegerSubscription.BeginWork;
try
TTask.Run(
procedure
begin
try
...
finally
fIntegerSubscription.EndWork;
end ;
end );
except
fIntegerSubscription.EndWork;
raise;
end ;
end ;
procedure Post <T>( const aEvent: T);
procedure Send <T>( const aEvent: T; aDelivery: TNxHorizonDelivery);
Post
方法用于发布事件,其中传递选项将取决于订阅事件时设置的订阅传递选项。
Send
方法重写订阅传递选项,并以由传递的aDelivery
参数确定的方式分派事件。如果订阅指定在主线程上下文中分派,则Send
方法将满足该要求,因此您不必担心这些事件处理程序中的同步。
Post
或Send
是否会阻止呼叫取决于所使用的传递选项。当您使用Post
时,请注意,同一事件类型的不同订阅可以配置不同的传递选项。
TNxHorizon
是一个手动管理的、完全线程安全的类。您可以根据需要创建任意多个单独的事件总线实例。实例是完全线程安全的,只要您以只读模式使用引用,就不需要任何额外的保护 - 一旦初始化引用并开始跨线程使用该实例,就不允许修改引用变量本身。您可以从任何线程自由地调用此类引用上的任何方法。
如果您需要支持不同的通道(额外的事件分类),您可以通过为每个通道创建单独的事件总线实例来实现此类功能。
TNxHorizon
类的功能不能直接公开为接口,因为它使用接口不支持的参数化方法。
除了通过NxHorizon.Instance
提供单例实例之外,还可以将单独的总线实例用于其他目的,但生命周期要短得多。为了简化这些实例的生命管理并避免在多线程环境中访问悬空指针,您可以使用INxHorizon
安全地保存和共享此类事件总线实例。
这也开启了使用事件总线实例的可能性,事件总线实例作为观察者模式中的调度机制相当轻量级,其中可观察的主题保存并公开其INxHorizon
引用,观察者可以附加到该引用。订阅时,观察者应该存储他们正在订阅的INxHorizon
实例,这样即使主题本身已同时被释放,他们也可以安全地取消订阅。
这允许以线程安全的方式对非自动管理实例的主题使用观察者模式。此外,保持对事件总线实例的强(线程安全)引用而不是直接使用主题,可以避免使用托管对象实例时潜在的引用循环,而不是使用线程不安全的弱引用。
INxHorizon.Instance
返回包装的TNxHorizon
实例,该实例由容器手动管理。只要订阅者对其容器持有强引用,就可以安全地使用它。
主体需要在清理过程中对其INxHorizon
引用调用ShutDown
方法。这会将IsActive
标志设置为False
并将TNxHorizonShutDownEvent
发送给其订阅者,以便他们可以执行适当的清理。 TNxHorizonShutDownEvent
包含包装的TNxHorizon
实例,因此订阅者可以使用单个关闭事件处理程序来管理多个主题。
调用ShutDown
不会对总线发送和发布消息的能力产生任何影响。如果您需要确保在清理过程中没有调度新事件,您可以在调用Post
或Send
之前检查IsActive
标志。
该事件总线利用 PPL 中的TTask
在 XE7 和更新的 Delphi 版本中异步分派事件。这些任务在默认线程池上运行。这是设计使然。这是基于这样的前提:任何使用默认线程池的代码都应该运行得非常快并且不应该引起争用。
如果事件处理程序中的代码或其他代码使用默认池来执行可能导致问题的长时间运行的任务,那么正确的做法是在单独的专用线程池上运行该特定的长时间运行的代码,而不是创建多个线程池将服务于需要运行某些任务的框架的不同部分。
对于长时间运行的事件处理程序,问题的最快解决方案是使用同步分派并在事件处理程序代码内启动一个新任务,然后可以使用其他一些非默认线程池。这样,您将可以更好地控制代码,并可以自由地更改特定处理程序的行为,而不会影响同一事件总线实例上运行的所有其他处理程序:
procedure TSubscriber.OnLongEvent ( const aEvent: TLongEvent);
begin
TTask.Run(
procedure
begin
...
end , DedicatedThreadPool);
end ;
该事件总线实现的主要特点是线程安全、速度和简单性。任何附加功能和扩展都不得损害这些最初的目标和意图。
这个实现也是基于我自己的需求和代码,并且某些部分可能不完全满足其他一些常见的代码工作流程。
由于速度基于Post
和Send
方法的当前实现,因此我预计这些方面不会有太多变化。然而,在这两种方法之外改进或支持不同的订阅工作流程是可能的。
https://dalija.prasnikar.info