可靠是 Go 中 UDP 連接的可靠性層。
最多只有 9 個位元組的封包開銷, reliable對基於 UDP 的應用程式的作用是:
** 這個項目仍然是一個 WIP!仔細檢查原始程式碼,編寫一些單元測試,幫助編寫文檔,或者如果您想提供幫助或有任何疑問,可以打開 Github 問題!
可靠使用networkprotocol/reliable.io
中所述的相同封包標頭佈局。
所有資料包均以代表 8 個不同標誌的單一位元組(8 位元)開始。資料包是連續的,並且使用資料包標頭中包含的無符號 16 位元整數進行編號,除非資料包被標記為不可靠。
封包確認(ACK) 冗餘地包含在每個傳送的封包中,總共使用5 個位元組:兩個位元組表示無符號16 位元封包序號(ack),三個位元組表示32 位元位元字段(ackBits)。
封包標頭佈局與networkprotocol/reliable.io
非常相似,採用 delta 編碼和 RLE 編碼,以減少每個封包的大小開銷。
給定一個我們剛從對等方收到的資料包,對於位元欄位(ackBits) 中的每個設定位元(i),如果其序號為(ack - i),我們將標記已傳送的要確認的數據包。
在對等點A 向B 發送資料包的情況下,B 完全不向A 發送任何資料包,B 將每從A 收到32 個資料包就發送一個空資料包,以便A 知道B 已確認其數據包。
更明確地說,維護一個計數器(lui),代表我們收到的最後一個連續資料包序號,我們已將其確認告知我們的對等方。
例如,如果 (lui) 為 0,並且我們已發送了序號為 2、3、4 和 6 的資料包的確認,並且我們隨後確認了資料包序號 1,則 lui 將為 4。
更新(lui) 後,如果接下來的32 個連續序號是我們之前收到的資料包的序號,我們會將(lui) 增加32 並發送包含以下資料包確認的單一空資料包: (ack =lui+31, ackBits=[lui,lui+31])。
為我們已傳送的資料包 (wq) 和我們已接收的資料包 (rq) 維護兩個固定大小的序列緩衝區。這些緩衝區的固定大小必須整除為無符號 16 位元整數的最大值 (65536)。 Glenn Fiedler 的這篇部落格文章描述了這個資料結構。
我們追蹤一個計數器(oui),代表我們發送的資料包的最後一個連續序號,該資料包已被我們的對等方確認。例如,如果我們發送了序號在 [0, 256] 範圍內的資料包,並且收到了資料包 (0, 1, 2, 3, 4, 8, 9, 10, 11, 12) 的確認,那麼(oui) 就是4。
令 cap(q) 為序列緩衝區 q 的固定大小或容量。
在發送資料包時,如果我們認為發送更多資料包會溢出收件人的讀取緩衝區,我們會間歇性地停止並緩衝資料包的發送。更明確地說,如果我們發送的下一個資料包分配的資料包編號大於(oui + cap(rq)),我們將停止所有發送,直到(oui) 透過來自對等方的資料包的接收者遞增。
重新傳輸過時的、未確認的已發送資料包和維護確認的邏輯歸功於 Glenn Fiedler 的這篇部落格文章。
如果 100 毫秒後接收方未確認封包,則懷疑封包遺失。一旦懷疑資料包遺失,就重新發送。截至目前,資料包最多重發 10 次。
明智的做法是不允許資料包重新發送的次數達到上限,並將其留給開發人員。不過,這是可以公開討論的,我很高興在我的 Discord 伺服器上或透過 Github 問題進行討論。
在尋找針對 TCP 隊頭阻塞的可行解決方案時,我查看了 Go 中的許多可靠的 UDP 庫,其中大多數主要適用於文件傳輸或遊戲:
經歷了所有這些,我覺得他們為我做了太多了。在我的工作和業餘專案中,我一直致力於去中心化 p2p 網路協議的研究。這些協定的本質是,它們在高延遲/高丟包環境中運行時會嚴重受到 TCP 隊頭阻塞的影響。
在許多情況下,這些庫提供的許可功能要么是不需要的,要么是誠實地認為它們最好由使用這些庫的開發人員來處理和考慮。例如:
因此,我開始研究模組化方法,並決定將我內建的協定的可靠性部分抽象化到一個單獨的庫中。
我認為與 QUIC 或 SCTP 等流行的替代方案相比,這種方法是最好的,根據您的具體情況,這些替代方案可能對您來說有點太多了。畢竟,僅使基於 UDP 的協議的可靠性位正確並經過充分測試就已經足夠困難了。
net.PacketConn
相關位元以獲得更精細的抽象。net.UDPAddr
的字串表示形式的快取。可靠使用 Go 模組。若要將其包含在您的專案中,請執行以下命令:
$ go get github.com/lithdew/reliable
如果您只是希望快速啟動並執行專案或演示,請使用Endpoint
。如果您需要更大的靈活性,請考慮直接與Conn
合作。
請注意,需要在頂部引導某種保活機製或心跳系統,否則資料包可能會無限期地重新發送,因為它們將無法被確認。
WithReadBufferSize
配置讀取緩衝區大小。預設讀取緩衝區大小為 256。WithWriteBufferSize
配置寫入緩衝區大小。預設寫入緩衝區大小為 256。WithResendTimeout
配置重新傳輸尚未確認的資料包之前的最短時間。預設重發超時為 100 毫秒。WithEndpointPacketHandler
或WithProtocolPacketHandler
配置接收資料包時回呼的資料包處理程序。預設情況下,提供了一個 nil 處理程序,它會忽略所有傳入的資料包。WithEndpointErrorHandler
或WithProtocolErrorHandler
配置。預設情況下,提供了一個 nil 處理程序來忽略所有錯誤。WithBufferPool
傳入位元組緩衝池。預設情況下,會實例化一個新的位元組緩衝池。 使用來自日本的cmd/benchmark
對 DigitalOcean 2GB / 60 GB 磁碟 / NYC3 伺服器進行了基準測試。
基準測試任務是將 1400 位元組的資料包從日本發送到紐約。如果 ping 延遲約為 220 毫秒,則吞吐量約為 1.2 MiB/秒。
也執行了單元測試基準測試,如下所示。
$ cat /proc/cpuinfo | grep 'model name' | uniq
model name : Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz
$ go test -bench=. -benchtime=10s
goos: linux
goarch: amd64
pkg: github.com/lithdew/reliable
BenchmarkEndpointWriteReliablePacket-8 2053717 5941 ns/op 183 B/op 9 allocs/op
BenchmarkEndpointWriteUnreliablePacket-8 2472392 4866 ns/op 176 B/op 8 allocs/op
BenchmarkMarshalPacketHeader-8 749060137 15.7 ns/op 0 B/op 0 allocs/op
BenchmarkUnmarshalPacketHeader-8 835547473 14.6 ns/op 0 B/op 0 allocs/op
您可以透過執行以下命令來運行下面的範例:
$ go run github.com/lithdew/reliable/examples/basic
此範例示範:
package main
import (
"bytes"
"errors"
"github.com/davecgh/go-spew/spew"
"github.com/lithdew/reliable"
"io"
"log"
"net"
"os"
"os/signal"
"sync"
"sync/atomic"
"time"
)
var (
PacketData = bytes . Repeat ([] byte ( "x" ), 1400 )
NumPackets = uint64 ( 0 )
)
func check ( err error ) {
if err != nil && ! errors . Is ( err , io . EOF ) {
log . Panic ( err )
}
}
func listen ( addr string ) net. PacketConn {
conn , err := net . ListenPacket ( "udp" , addr )
check ( err )
return conn
}
func handler ( buf [] byte , _ net. Addr ) {
if bytes . Equal ( buf , PacketData ) || len ( buf ) == 0 {
return
}
spew . Dump ( buf )
os . Exit ( 1 )
}
func main () {
exit := make ( chan struct {})
var wg sync. WaitGroup
wg . Add ( 2 )
ca := listen ( "127.0.0.1:44444" )
cb := listen ( "127.0.0.1:55555" )
a := reliable . NewEndpoint ( ca , reliable . WithEndpointPacketHandler ( handler ))
b := reliable . NewEndpoint ( cb , reliable . WithEndpointPacketHandler ( handler ))
defer func () {
check ( ca . SetDeadline ( time . Now (). Add ( 1 * time . Millisecond )))
check ( cb . SetDeadline ( time . Now (). Add ( 1 * time . Millisecond )))
close ( exit )
check ( a . Close ())
check ( b . Close ())
check ( ca . Close ())
check ( cb . Close ())
wg . Wait ()
}()
go a . Listen ()
go b . Listen ()
// The two goroutines below have endpoint A spam endpoint B, and print out how
// many packets of data are being sent per second.
go func () {
defer wg . Done ()
for {
select {
case <- exit :
return
default :
}
check ( a . WriteReliablePacket ( PacketData , b . Addr ()))
atomic . AddUint64 ( & NumPackets , 1 )
}
}()
go func () {
defer wg . Done ()
ticker := time . NewTicker ( 1 * time . Second )
defer ticker . Stop ()
for {
select {
case <- exit :
return
case <- ticker . C :
numPackets := atomic . SwapUint64 ( & NumPackets , 0 )
numBytes := float64 ( numPackets ) * 1400.0 / 1024.0 / 1024.0
log . Printf (
"Sent %d packet(s) comprised of %.2f MiB worth of data." ,
numPackets ,
numBytes ,
)
}
}
}()
ch := make ( chan os. Signal , 1 )
signal . Notify ( ch , os . Interrupt )
<- ch
}