該程式庫旨在幫助 Swift 在新領域取得進展:叢集多節點分散式系統。
透過這個庫,我們提供了可重複使用的運行時無關的成員協議實現,可以在各種叢集用例中採用。
叢集成員協定是分散式系統的關鍵構建塊,例如計算密集型叢集、調度程序、資料庫、鍵值儲存等。隨著此軟體包的發布,我們的目標是使建立此類系統變得更簡單,因為它們不再需要依賴外部服務來處理服務成員資格。我們也想邀請社區合作並制定其他會員協議。
從本質上講,成員協議需要為「誰是我的(即時)同伴?」這個問題提供答案。這個看似簡單的任務在分散式系統中事實證明根本不那麼簡單,其中延遲或丟失的訊息、網路分區以及無響應但仍然「活動」的節點是日常的麵包和黃油。集群成員協議的作用就是為這個問題提供可預測的、可靠的答案。
在實施會員協議時可以採取各種權衡,並且它仍然是一個有趣的研究和持續改進領域。因此,叢集成員包的目的不是專注於單一實現,而是作為該領域中各種分散式演算法的協作空間。
要更深入地討論協議和此實施中的修改,我們建議閱讀 SWIM API 文件以及下面連結的相關論文。
可擴展的弱一致感染式流程組成員演算法(也稱為「SWIM」),以及 2018 年《救生員:本地健康意識以實現更準確的故障檢測》論文中記錄的一些值得注意的協議擴展。
SWIM 是一種八卦協議,其中對等節點定期交換有關其對其他節點狀態的觀察的資訊位,最終將資訊傳播到叢集中的所有其他成員。這類分散式演算法對於任意訊息遺失、網路分區和類似問題具有很強的彈性。
從高層次來看,SWIM 的工作原理如下:
.ack
來實現此目的。請參考下圖A
最初如何探測B
payload
,它是有關訊息發送者所知道的其他對等點的(部分)信息,以及它們的成員資格狀態( .alive
、 .suspect
等).ack
,則對等方被視為仍然.alive
。否則,目標對等點可能已終止/崩潰或因其他原因而無回應。.pingRequest
訊息來詢問其他一些對等點有關無回應對等點的狀態,然後這些對等點向該對等點發出直接ping(探測對等點)下圖的E)。.suspect
,.nack
(「否定確認」)訊息來通知 ping 請求來源:中介確實收到了這些.pingRequest
訊息,但目標似乎沒有回應。我們使用此資訊來調整本地運行狀況乘數,這會影響逾時的計算方式。要了解更多信息,請參閱 API 文件和 Lifeguard 論文。上述機制不僅充當故障檢測機制,而且充當八卦機制,其攜帶有關集群的已知成員的資訊。透過這種方式,成員最終可以了解同事的狀態,即使沒有全部列出他們。然而,值得指出的是,這種成員資格觀點是弱一致的,這意味著如果所有成員在任何給定時間點對成員資格具有相同的確切觀點,則無法保證(或在沒有附加資訊的情況下知道)。然而,它是高級工具和系統的絕佳構建塊,可以在此基礎上建立更強大的保證。
一旦故障檢測機制檢測到無響應的節點,它最終會被標記為 .dead,從而導致其不可撤銷地從叢集中刪除。我們的實作提供了一個可選的擴展,將 .unreachable 狀態新增到可能的狀態中,但是大多數使用者不會發現它是必要的,並且預設情況下它是停用的。有關合法狀態轉換的詳細資訊和規則,請參閱 SWIM.Status 或下圖:
Swift Cluster Membership 實作協定的方式是提供它們的「 Instances
」。例如,SWIM 實作封裝在與執行時間無關的SWIM.Instance
中,需要透過網路執行時間和實例本身之間的某些黏合程式碼來「驅動」或「解釋」。我們將實現的這些黏合部分稱為“ Shell
”,並且該程式庫附帶了使用 SwiftNIO 的DatagramChannel
實現的SWIMNIOShell
,該 SWIMNIOShell 透過 UDP 非同步執行所有訊息傳遞。替代實作可以使用完全不同的傳輸,或在其他一些現有的八卦系統上搭載 SWIM 訊息等。
SWIM 實例還內建支援發出指標(使用 swift-metrics),並且可以配置為透過傳遞 swift-log Logger
來記錄有關內部詳細資訊的詳細資訊。
該程式庫的主要目的是在需要某種形式的進程內成員資格服務的各種實作之間共享SWIM.Instance
實作。專案的自述文件(https://github.com/apple/swift-cluster-membership/) 中詳細記錄了實作自訂執行時間的情況,因此,如果您有興趣透過某些不同的傳輸方式實現SWIM,請查看此處。
實施新的交通方式可以歸結為「填空」練習:
首先,必須使用目標傳輸來實現對等協議(https://github.com/apple/swift-cluster-membership/blob/main/Sources/SWIM/Peer.swift):
/// SWIM peer which can be initiated contact with, by sending ping or ping request messages.
public protocol SWIMPeer : SWIMAddressablePeer {
/// Perform a probe of this peer by sending a `ping` message.
///
/// <... more docs here - please refer to the API docs for the latest version ...>
func ping (
payload : SWIM . GossipPayload ,
from origin : SWIMPingOriginPeer ,
timeout : DispatchTimeInterval ,
sequenceNumber : SWIM . SequenceNumber
) async throws -> SWIM . PingResponse
// ...
}
這通常意味著包裝一些連接、通道或其他身份,能夠發送訊息並在適用時調用適當的回調。
然後,在對等方的接收端,必須實作接收這些訊息並呼叫在SWIM.Instance
(分組在 SWIMProtocol 下)上定義的所有對應on<SomeMessage>(...)
回呼。
下面列出了 SWIMProtocol 的一部分,供您了解:
public protocol SWIMProtocol {
/// MUST be invoked periodically, in intervals of `self.swim.dynamicLHMProtocolInterval`.
///
/// MUST NOT be scheduled using a "repeated" task/timer", as the interval is dynamic and may change as the algorithm proceeds.
/// Implementations should schedule each next tick by handling the returned directive's `scheduleNextTick` case,
/// which includes the appropriate delay to use for the next protocol tick.
///
/// This is the heart of the protocol, as each tick corresponds to a "protocol period" in which:
/// - suspect members are checked if they're overdue and should become `.unreachable` or `.dead`,
/// - decisions are made to `.ping` a random peer for fault detection,
/// - and some internal house keeping is performed.
///
/// Note: This means that effectively all decisions are made in interval sof protocol periods.
/// It would be possible to have a secondary periodic or more ad-hoc interval to speed up
/// some operations, however this is currently not implemented and the protocol follows the fairly
/// standard mode of simply carrying payloads in periodic ping messages.
///
/// - Returns: `SWIM.Instance.PeriodicPingTickDirective` which must be interpreted by a shell implementation
mutating func onPeriodicPingTick ( ) -> [ SWIM . Instance . PeriodicPingTickDirective ]
mutating func onPing ( ... ) -> [ SWIM . Instance . PingDirective ]
mutating func onPingRequest ( ... ) -> [ SWIM . Instance . PingRequestDirective ]
mutating func onPingResponse ( ... ) -> [ SWIM . Instance . PingResponseDirective ]
// ...
}
這些呼叫在內部執行所有 SWIM 協定特定任務,並傳回指令,這些指令很容易將「命令」解釋為關於如何對訊息做出反應的實作。例如,在接收.pingRequest
訊息後,傳回的指令可能指示 shell 向某些節點發送 ping。該指令準備了所有適當的目標、超時和附加信息,使簡單地遵循其指令並正確實現調用變得更加簡單,例如如下所示:
self . swim . onPingRequest (
target : target ,
pingRequestOrigin : pingRequestOrigin ,
payload : payload ,
sequenceNumber : sequenceNumber
) . forEach { directive in
switch directive {
case . gossipProcessed ( let gossipDirective ) :
self . handleGossipPayloadProcessedDirective ( gossipDirective )
case . sendPing ( let target , let payload , let pingRequestOriginPeer , let pingRequestSequenceNumber , let timeout , let sequenceNumber ) :
self . sendPing (
to : target ,
payload : payload ,
pingRequestOrigin : pingRequestOriginPeer ,
pingRequestSequenceNumber : pingRequestSequenceNumber ,
timeout : timeout ,
sequenceNumber : sequenceNumber
)
}
}
一般來說,這允許將所有棘手的「何時做什麼」封裝在協議實例中,並且 Shell 只需遵循實現它們的指令。實際的實作通常需要執行一些更複雜的並發和網路任務,例如等待一系列回應並以特定方式處理它們等,但是協議的總體輪廓是由實例的指令精心安排的。
有關每個回調、何時調用它們以及所有這些如何組合在一起的詳細文檔,請參閱API 文檔。
該儲存庫包含一個端到端範例和一個名為 SWIMNIOExample 的範例實現,它利用SWIM.Instance
來啟用基於簡單 UDP 的對等監控系統。這允許對等方使用 SWIM 協定透過發送 SwiftNIO 驅動的資料報來八卦並相互通知節點故障。
SWIMNIOExample
實現僅作為示例提供,並未考慮到生產用途而實現,但是通過一些努力,它肯定可以在某些用例中表現良好。如果您有興趣了解有關叢集成員演算法、可擴展性基準測試和使用SwiftNIO 本身的更多信息,這是一個很好的入門模組,也許一旦該模組足夠成熟,我們可以考慮將其不僅作為一個示例,而且也作為一個範例。
在最簡單的形式中,結合提供的 SWIM 實例和 NIO shell 來建立一個簡單的伺服器,可以將提供的處理程序嵌入到典型的 NIO 通道管道中,如下所示:
let bootstrap = DatagramBootstrap ( group : group )
. channelOption ( ChannelOptions . socketOption ( . so_reuseaddr ) , value : 1 )
. channelInitializer { channel in
channel . pipeline
// first install the SWIM handler, which contains the SWIMNIOShell:
. addHandler ( SWIMNIOHandler ( settings : settings ) ) . flatMap {
// then install some user handler, it will receive SWIM events:
channel . pipeline . addHandler ( SWIMNIOExampleHandler ( ) )
}
}
bootstrap . bind ( host : host , port : port )
然後,範例處理程序可以接收並處理 SWIM 叢集成員資格變更事件:
final class SWIMNIOExampleHandler : ChannelInboundHandler {
public typealias InboundIn = SWIM . MemberStatusChangedEvent
let log = Logger ( label : " SWIMNIOExampleHandler " )
public func channelRead ( context : ChannelHandlerContext , data : NIOAny ) {
let change : SWIM . MemberStatusChangedEvent = self . unwrapInboundIn ( data )
self . log . info ( " Membership status changed: [ ( change . member . node ) ] is now [ ( change . status ) ] " , metadata : [
" swim/member " : " ( change . member . node ) " ,
" swim/member/status " : " ( change . status ) " ,
] )
}
}
如果您有興趣貢獻和完善 SWIMNIO 實施,請前往問題並自行承擔任務或提出改進!
我們通常有興趣使用類似的“實例”風格來促進討論和實施其他成員資格實現。
如果您對此類演算法感興趣,並且希望實現最喜歡的協議,請隨時透過問題或 Swift 論壇與他聯繫。
非常鼓勵經驗報告、回饋、改進想法和貢獻!我們期待您的來信。
請參閱貢獻指南以了解提交拉取請求的流程,並參閱手冊以了解使用此程式庫的術語和其他有用技巧。