TNxHorizon
полностью потокобезопасен.NXHorizon.Instance
, является потокобезопасным и может использоваться из любого потока. Объявите тип события:
События классифицируются по информации о типе — TypeInfo
. Для каждой отдельной категории событий требуется отдельный тип.
type
TFoo = class
...
end ;
TOtherFoo = type TFoo;
TIntegerEvent = type Integer;
TStringEvent = type string;
TFooEvent = INxEvent<TFoo>;
TOtherFooEvent = INxEvent<TOtherFoo>;
Подписаться/отписаться на событие:
Подписку на события можно добавить в любой существующий класс.
type
TSubscriber = class
protected
// subscriptions
fIntegerSubscription: INxEventSubscription;
fStringSubscription: INxEventSubscription;
// event handlers
procedure OnIntegerEvent ( const aEvent: TIntegerEvent);
procedure OnStringEvent ( const aEvent: TStringEvent);
public
constructor Create;
destructor Destroy; override;
end ;
constructor TSubscriber.Create;
begin
fIntegerSubscription := NxHorizon.Instance.Subscribe<TIntegerEvent>(Async, OnIntegerEvent);
fStringSubscription := NxHorizon.Instance.Subscribe<TStringEvent>(Sync, OnStringEvent);
end ;
destructor TSubscriber.Destroy;
begin
fIntegerSubscription.WaitFor;
fStringSubscription.WaitFor;
NxHorizon.Instance.Unsubscribe(fIntegerSubscription);
NxHorizon.Instance.Unsubscribe(fStringSubscription);
inherited ;
end ;
procedure TSubscriber.OnIntegerEvent ( const aEvent: TIntegerEvent);
begin
Writeln(aEvent);
end ;
procedure TSubscriber.OnStringEvent ( const aEvent: TStringEvent);
begin
Writeln(aEvent);
end ;
Отправлять сообщения:
NxHorizon.Instance.Post<TIntegerEvent>( 5 );
NxHorizon.Instance.Send<TStringEvent>( ' abc ' , Async);
или
var
IntEvent: TIntegerEvent;
StrEvent: TStringEvent;
IntEvent := 5 ;
StrEvent := ' abc ' ;
NxHorizon.Instance.Post(IntEvent);
NxHorizon.Instance.Send(StrEvent, Async);
Методы обработчика событий должны соответствовать следующему объявлению, где T
может быть любого типа. Для асинхронной доставки требуются типы с автоматическим управлением памятью или типы значений. Вы также можете использовать экземпляры долгоживущих объектов, управляемые вручную, в качестве событий, но в таких случаях вы должны гарантировать, что они не будут уничтожены до полной обработки уже отправленных сообщений.
procedure( const aEvent: T) of object ;
Тип TNxHorizonDelivery
объявляет четыре варианта доставки:
Sync
- синхронно в текущем потокеAsync
— асинхронно в случайном фоновом потокеMainSync
— синхронно в основном потокеMainAsync
— асинхронно в основном потоке Sync
и MainSync
— это БЛОКИРУЮЩИЕ операции, и обработчик событий будет выполняться немедленно в контексте текущего потока или синхронизироваться с основным потоком. Это заблокирует отправку других событий с использованием того же экземпляра шины событий до тех пор, пока обработчик событий не завершится. Не используйте его (или используйте его экономно только для коротких исполнений) в экземпляре шины событий по умолчанию.
Если отправка событий выполняется из контекста основного потока, доставка MainAsync
будет использовать TThread.ForceQueue
для асинхронного запуска обработчика событий в контексте основного потока.
При подписке на обработчик событий создается новый экземпляр INxEventSubscription
. Вам следует сохранить возвращенный экземпляр, чтобы в дальнейшем отказаться от подписки.
Существует два метода отказа от подписки: Unsubscribe
и UnsubscribeAsync
.
Оба метода отменяют подписку и удаляют ее из коллекции подписок, хранящейся в шине событий. Эта коллекция повторяется внутри методов Post
и Send
. Любые изменения в это время не допускаются и могут привести к неожиданному поведению.
Чтобы избежать изменения коллекции подписчиков во время итерации, если вы хотите отказаться от подписки на код, выполняющийся в синхронно отправляемом обработчике событий, вам следует использовать UnsubscribeAsync
, который немедленно отменит подписку, но задержит фактическое удаление из коллекции, запустив ее за пределами диспетчерская итерация.
Асинхронно отправляемые обработчики событий всегда выполняются вне итерации диспетчеризации и позволяют использовать метод Unsubscribe
. Однако способ отправки обработчиков может быть изменен несвязанным внешним кодом, и если вы не можете абсолютно гарантировать асинхронную отправку, использование UnsubscribeAsync
оправдано.
Unsubscribe
и UnsubscribeAsync
также отменяют подписку перед ее удалением из коллекции подписок. Обычно нет необходимости явно отменять подписку перед отменой подписки, но если у вас есть какая-то конкретная причина, по которой вы хотите отменить подписку в какой-то момент перед отменой подписки, вы можете вызвать его метод Cancel
. Cancel
можно безопасно вызывать несколько раз. После отмены подписки ее состояние невозможно восстановить.
Благодаря асинхронной диспетчеризации событий можно иметь уже отправленный обработчик событий в момент отмены или отказа от подписки на конкретную подписку. Если вы отписываетесь от деструктора, деструктора вашего класса подписчика, это может привести к тому, что вы получите доступ к экземпляру подписчика во время процесса его уничтожения или после его уничтожения. Чтобы предотвратить такой сценарий, вы можете вызвать WaitFor
для подписки, которая немедленно отменит подписку и заблокирует ее до тех пор, пока все отправленные обработчики событий не завершат выполнение.
Если вы вызываете WaitFor
из контекста основного потока и ваши обработчики событий работают в течение длительного времени, это приведет к тому, что ваше приложение перестанет отвечать на запросы в течение этого периода времени.
Методы BeginWork
и EndWork
являются частью механизма ожидания подписки. Если вам нужно запустить некоторый код внутри обработчика событий в каком-то другом потоке и вам нужно убедиться, что код также будет ожидаться, вы можете вызвать BeginWork
перед запуском такого потока и EndWork
после его завершения. Убедитесь, что все пути кода в конечном итоге вызовут соответствующий EndWork
, поскольку в противном случае при вызове WaitFor
произойдет взаимоблокировка.
procedure TSubscriber.OnLongEvent ( const aEvent: TIntegerEvent);
begin
fIntegerSubscription.BeginWork;
try
TTask.Run(
procedure
begin
try
...
finally
fIntegerSubscription.EndWork;
end ;
end );
except
fIntegerSubscription.EndWork;
raise;
end ;
end ;
procedure Post <T>( const aEvent: T);
procedure Send <T>( const aEvent: T; aDelivery: TNxHorizonDelivery);
Метод Post
используется для публикации событий, где вариант доставки будет зависеть от варианта доставки подписки, установленного при подписке на событие.
Метод Send
переопределяет параметр доставки по подписке и отправляет событие способом, определяемым переданным параметром aDelivery
. Если в подписке указана диспетчеризация в контексте основного потока, метод Send
будет соблюдать это требование, поэтому вам не придется беспокоиться о синхронизации в этих обработчиках событий.
Будет ли Post
или Send
блокировать вызовы, зависит от используемых вариантов доставки. При использовании Post
обратите внимание, что для разных подписок на один и тот же тип событий можно настроить разные варианты доставки.
TNxHorizon
— это полностью потокобезопасный класс, управляемый вручную. Вы можете создать столько отдельных экземпляров шины событий, сколько захотите. Экземпляры полностью потокобезопасны и не требуют никакой дополнительной защиты, пока вы используете ссылки в режиме только для чтения — как только вы инициализируете ссылку и начинаете использовать этот экземпляр в нескольких потоках, вам не разрешается изменять саму ссылочную переменную. . Вы можете свободно вызывать любые методы по такой ссылке из любого потока.
Если вам необходимо поддерживать разные каналы (дополнительная категоризация событий), такой функциональности можно добиться, создав отдельный экземпляр шины событий для каждого канала.
Функциональность класса TNxHorizon
не может быть напрямую представлена как интерфейс, поскольку он использует параметризованные методы, которые не поддерживаются для интерфейсов.
Помимо экземпляра Singleton, доступного через NxHorizon.Instance
, можно использовать отдельные экземпляры шины для других целей с гораздо более коротким сроком службы. Чтобы упростить управление жизнью этих экземпляров и избежать доступа к висячим указателям в многопоточной среде, вы можете использовать INxHorizon
для безопасного хранения и совместного использования таких экземпляров шины событий.
Это также открывает возможность использовать экземпляры шины событий, которые довольно легковесны, в качестве механизма диспетчеризации в шаблоне наблюдателя , где наблюдаемый субъект хранит и предоставляет свою ссылку INxHorizon
, к которой могут подключиться наблюдатели. При подписке наблюдатели должны сохранять экземпляр INxHorizon
, на который они подписаны, чтобы они могли безопасно отказаться от подписки, даже если сам субъект за это время был выпущен.
Это позволяет использовать шаблон наблюдателя потокобезопасным образом с субъектами, которые не являются автоматически управляемыми экземплярами. Кроме того, сохранение строгой (потокобезопасной) ссылки на экземпляр шины событий вместо субъекта напрямую позволяет избежать потенциальных циклов ссылок при использовании экземпляров управляемых объектов вместо использования потокобезопасных слабых ссылок.
INxHorizon.Instance
возвращает завернутый экземпляр TNxHorizon
, которым вручную управляет контейнер. Его можно безопасно использовать, пока подписчик строго ссылается на свой контейнер.
Субъекту необходимо вызвать метод ShutDown
по ссылке INxHorizon
во время процесса очистки. Это установит флаг IsActive
в False
и отправит TNxHorizonShutDownEvent
своим подписчикам, чтобы они могли выполнить правильную очистку. TNxHorizonShutDownEvent
содержит завернутый экземпляр TNxHorizon
, поэтому подписчики могут использовать один обработчик событий завершения работы для управления несколькими субъектами.
Вызов ShutDown
не оказывает никакого влияния на способность шины отправлять и публиковать сообщения. Если вам нужно убедиться, что вы не отправляете новые события во время процесса очистки, вы можете проверить флаг IsActive
перед вызовом Post
или Send
.
Эта шина событий использует TTask
из PPL для асинхронной отправки событий в XE7 и более новых версиях Delphi. Эти задачи выполняются в пуле потоков по умолчанию. Это сделано специально. Это основано на предположении, что любой код, использующий пул потоков по умолчанию, должен выполняться очень быстро и не вызывать конфликтов.
Если у вас есть код в обработчиках событий или другой код, который использует пул по умолчанию для долго выполняющихся задач, которые могут вызвать проблемы, тогда правильный курс действий — вместо этого запустить этот конкретный, долго выполняющийся код в отдельном выделенном пуле потоков. создания нескольких пулов потоков, которые будут обслуживать разные части фреймворков, которым необходимо выполнять некоторые задачи.
Для долго выполняющегося обработчика событий самым быстрым решением проблемы является использование синхронной диспетчеризации и запуск новой задачи внутри кода обработчика событий, которая затем может использовать какой-либо другой пул потоков, отличный от пула потоков по умолчанию. Таким образом, вы получите больше контроля над своим кодом и сможете свободно изменять поведение конкретного обработчика, не затрагивая все другие обработчики, работающие в том же экземпляре шины событий:
procedure TSubscriber.OnLongEvent ( const aEvent: TLongEvent);
begin
TTask.Run(
procedure
begin
...
end , DedicatedThreadPool);
end ;
Основными особенностями этой реализации шины событий являются потокобезопасность, скорость и простота. Любые дополнительные функции и расширения не должны ставить под угрозу первоначальные цели и намерения.
Эта реализация также основана на моих собственных требованиях и коде, и возможно, что некоторые части не полностью удовлетворяют другому общему рабочему процессу кода.
Поскольку скорость зависит от текущей реализации методов Post
и Send
, я не ожидаю больших изменений в этих областях. Однако возможно улучшение или поддержка различных рабочих процессов подписки за пределами этих двух методов.
https://dalija.prasnikar.info