该库旨在帮助 Swift 在新领域取得进展:集群多节点分布式系统。
通过这个库,我们提供了可重用的运行时无关的成员协议实现,可以在各种集群用例中采用。
集群成员协议是分布式系统的关键构建块,例如计算密集型集群、调度程序、数据库、键值存储等。随着此软件包的发布,我们的目标是使构建此类系统变得更简单,因为它们不再需要依赖外部服务来处理服务成员资格。我们还想邀请社区合作并制定其他会员协议。
从本质上讲,成员协议需要为“谁是我的(实时)同伴?”这个问题提供答案。这个看似简单的任务在分布式系统中事实证明根本不那么简单,其中延迟或丢失的消息、网络分区以及无响应但仍然“活动”的节点是日常的面包和黄油。集群成员协议的作用就是为这个问题提供可预测的、可靠的答案。
在实施会员协议时可以采取各种权衡,并且它仍然是一个有趣的研究和持续改进领域。因此,集群成员包的目的不是专注于单一实现,而是作为该领域中各种分布式算法的协作空间。
要更深入地讨论协议和此实施中的修改,我们建议阅读 SWIM API 文档以及下面链接的相关论文。
可扩展的弱一致感染式流程组成员算法(也称为“SWIM”),以及 2018 年《救生员:本地健康意识以实现更准确的故障检测》论文中记录的一些值得注意的协议扩展。
SWIM 是一种八卦协议,其中对等节点定期交换有关其对其他节点状态的观察的信息位,最终将信息传播到集群中的所有其他成员。这类分布式算法对于任意消息丢失、网络分区和类似问题具有很强的弹性。
从高层次来看,SWIM 的工作原理如下:
.ack
来实现此目的。请参见下图中A
最初如何探测B
payload
,它是有关消息发送者所知道的其他对等点的(部分)信息,以及它们的成员资格状态( .alive
、 .suspect
等).ack
,则对等方被视为仍然.alive
。否则,目标对等点可能已终止/崩溃或由于其他原因而无响应。.pingRequest
消息来询问其他一些对等点有关无响应对等点的状态,然后这些对等点向该对等点发出直接 ping(探测对等点)下图中的E)。.suspect
,.nack
(“否定确认”)消息来通知 ping 请求源:中介确实收到了这些.pingRequest
消息,但目标似乎没有响应。我们使用此信息来调整本地运行状况乘数,这会影响超时的计算方式。要了解更多信息,请参阅 API 文档和 Lifeguard 论文。上述机制不仅充当故障检测机制,而且充当八卦机制,其携带有关集群的已知成员的信息。通过这种方式,成员最终可以了解同事的状态,即使没有将他们全部列出来。然而,值得指出的是,这种成员资格观点是弱一致的,这意味着如果所有成员在任何给定时间点对成员资格具有相同的确切观点,则无法保证(或在没有附加信息的情况下知道)。然而,它是高级工具和系统的绝佳构建块,可以在此基础上构建更强大的保证。
一旦故障检测机制检测到无响应的节点,它最终会被标记为 .dead,从而导致其不可撤销地从集群中删除。我们的实现提供了一个可选的扩展,将 .unreachable 状态添加到可能的状态中,但是大多数用户不会发现它是必要的,并且默认情况下它是禁用的。有关合法状态转换的详细信息和规则,请参阅 SWIM.Status 或下图:
Swift Cluster Membership 实现协议的方式是提供它们的“ Instances
”。例如,SWIM 实现封装在与运行时无关的SWIM.Instance
中,需要通过网络运行时和实例本身之间的某些粘合代码来“驱动”或“解释”。我们将实现的这些粘合部分称为“ Shell
”,并且该库附带了使用 SwiftNIO 的DatagramChannel
实现的SWIMNIOShell
,该 SWIMNIOShell 通过 UDP 异步执行所有消息传递。替代实现可以使用完全不同的传输,或者在其他一些现有的八卦系统上搭载 SWIM 消息等。
SWIM 实例还内置支持发出指标(使用 swift-metrics),并且可以配置为通过传递 swift-log Logger
来记录有关内部详细信息的详细信息。
该库的主要目的是在需要某种形式的进程内成员资格服务的各种实现之间共享SWIM.Instance
实现。项目的自述文件 (https://github.com/apple/swift-cluster-membership/) 中详细记录了实现自定义运行时的情况,因此,如果您有兴趣通过某些不同的传输方式实现 SWIM,请查看此处。
实施新的交通方式可以归结为“填空”练习:
首先,必须使用目标传输来实现对等协议(https://github.com/apple/swift-cluster-membership/blob/main/Sources/SWIM/Peer.swift):
/// SWIM peer which can be initiated contact with, by sending ping or ping request messages.
public protocol SWIMPeer : SWIMAddressablePeer {
/// Perform a probe of this peer by sending a `ping` message.
///
/// <... more docs here - please refer to the API docs for the latest version ...>
func ping (
payload : SWIM . GossipPayload ,
from origin : SWIMPingOriginPeer ,
timeout : DispatchTimeInterval ,
sequenceNumber : SWIM . SequenceNumber
) async throws -> SWIM . PingResponse
// ...
}
这通常意味着包装一些连接、通道或其他身份,能够发送消息并在适用时调用适当的回调。
然后,在对等方的接收端,必须实现接收这些消息并调用在SWIM.Instance
(分组在 SWIMProtocol 下)上定义的所有相应on<SomeMessage>(...)
回调。
下面列出了 SWIMProtocol 的一部分,供您了解:
public protocol SWIMProtocol {
/// MUST be invoked periodically, in intervals of `self.swim.dynamicLHMProtocolInterval`.
///
/// MUST NOT be scheduled using a "repeated" task/timer", as the interval is dynamic and may change as the algorithm proceeds.
/// Implementations should schedule each next tick by handling the returned directive's `scheduleNextTick` case,
/// which includes the appropriate delay to use for the next protocol tick.
///
/// This is the heart of the protocol, as each tick corresponds to a "protocol period" in which:
/// - suspect members are checked if they're overdue and should become `.unreachable` or `.dead`,
/// - decisions are made to `.ping` a random peer for fault detection,
/// - and some internal house keeping is performed.
///
/// Note: This means that effectively all decisions are made in interval sof protocol periods.
/// It would be possible to have a secondary periodic or more ad-hoc interval to speed up
/// some operations, however this is currently not implemented and the protocol follows the fairly
/// standard mode of simply carrying payloads in periodic ping messages.
///
/// - Returns: `SWIM.Instance.PeriodicPingTickDirective` which must be interpreted by a shell implementation
mutating func onPeriodicPingTick ( ) -> [ SWIM . Instance . PeriodicPingTickDirective ]
mutating func onPing ( ... ) -> [ SWIM . Instance . PingDirective ]
mutating func onPingRequest ( ... ) -> [ SWIM . Instance . PingRequestDirective ]
mutating func onPingResponse ( ... ) -> [ SWIM . Instance . PingResponseDirective ]
// ...
}
这些调用在内部执行所有 SWIM 协议特定任务,并返回指令,这些指令很容易将“命令”解释为关于如何对消息做出反应的实现。例如,在接收到.pingRequest
消息后,返回的指令可能指示 shell 向某些节点发送 ping。该指令准备了所有适当的目标、超时和附加信息,使简单地遵循其指令并正确实现调用变得更加简单,例如如下所示:
self . swim . onPingRequest (
target : target ,
pingRequestOrigin : pingRequestOrigin ,
payload : payload ,
sequenceNumber : sequenceNumber
) . forEach { directive in
switch directive {
case . gossipProcessed ( let gossipDirective ) :
self . handleGossipPayloadProcessedDirective ( gossipDirective )
case . sendPing ( let target , let payload , let pingRequestOriginPeer , let pingRequestSequenceNumber , let timeout , let sequenceNumber ) :
self . sendPing (
to : target ,
payload : payload ,
pingRequestOrigin : pingRequestOriginPeer ,
pingRequestSequenceNumber : pingRequestSequenceNumber ,
timeout : timeout ,
sequenceNumber : sequenceNumber
)
}
}
一般来说,这允许将所有棘手的“何时做什么”封装在协议实例中,并且 Shell 只需遵循实现它们的指令。实际的实现通常需要执行一些更复杂的并发和网络任务,例如等待一系列响应并以特定方式处理它们等,但是协议的总体轮廓是由实例的指令精心安排的。
有关每个回调、何时调用它们以及所有这些如何组合在一起的详细文档,请参阅API 文档。
该存储库包含一个端到端示例和一个名为 SWIMNIOExample 的示例实现,它利用SWIM.Instance
来启用基于简单 UDP 的对等监控系统。这允许对等方使用 SWIM 协议通过发送 SwiftNIO 驱动的数据报来八卦并相互通知节点故障。
SWIMNIOExample
实现仅作为示例提供,并未考虑到生产用途而实现,但是通过一些努力,它肯定可以在某些用例中表现良好。如果您有兴趣了解有关集群成员算法、可扩展性基准测试和使用 SwiftNIO 本身的更多信息,这是一个很好的入门模块,也许一旦该模块足够成熟,我们可以考虑将其不仅作为一个示例,而且还作为一个示例。基于 Swift NIO 的集群应用程序的可重用组件。
在最简单的形式中,结合提供的 SWIM 实例和 NIO shell 来构建一个简单的服务器,可以将提供的处理程序嵌入到典型的 NIO 通道管道中,如下所示:
let bootstrap = DatagramBootstrap ( group : group )
. channelOption ( ChannelOptions . socketOption ( . so_reuseaddr ) , value : 1 )
. channelInitializer { channel in
channel . pipeline
// first install the SWIM handler, which contains the SWIMNIOShell:
. addHandler ( SWIMNIOHandler ( settings : settings ) ) . flatMap {
// then install some user handler, it will receive SWIM events:
channel . pipeline . addHandler ( SWIMNIOExampleHandler ( ) )
}
}
bootstrap . bind ( host : host , port : port )
然后,示例处理程序可以接收并处理 SWIM 集群成员资格更改事件:
final class SWIMNIOExampleHandler : ChannelInboundHandler {
public typealias InboundIn = SWIM . MemberStatusChangedEvent
let log = Logger ( label : " SWIMNIOExampleHandler " )
public func channelRead ( context : ChannelHandlerContext , data : NIOAny ) {
let change : SWIM . MemberStatusChangedEvent = self . unwrapInboundIn ( data )
self . log . info ( " Membership status changed: [ ( change . member . node ) ] is now [ ( change . status ) ] " , metadata : [
" swim/member " : " ( change . member . node ) " ,
" swim/member/status " : " ( change . status ) " ,
] )
}
}
如果您有兴趣贡献和完善 SWIMNIO 实施,请前往问题并自行承担任务或提出改进!
我们通常有兴趣使用类似的“实例”风格促进讨论和实施其他成员资格实现。
如果您对此类算法感兴趣,并且希望实现最喜欢的协议,请随时通过问题或 Swift 论坛与他联系。
非常鼓励经验报告、反馈、改进想法和贡献!我们期待您的来信。
请参阅贡献指南以了解提交拉取请求的过程,并参阅手册以了解使用此库的术语和其他有用技巧。