„Reliable“ ist eine Zuverlässigkeitsschicht für UDP-Verbindungen in Go.
Mit höchstens 9 Byte Paket-Overhead leistet Reliable für Ihre UDP-basierte Anwendung Folgendes:
** Dieses Projekt ist noch ein WIP! Durchsuchen Sie den Quellcode, schreiben Sie einige Unit-Tests, helfen Sie bei der Dokumentation oder öffnen Sie ein Github-Problem, wenn Sie helfen möchten oder Fragen haben!
„Reliable“ verwendet das gleiche Paket-Header-Layout wie in networkprotocol/reliable.io
beschrieben.
Alle Pakete beginnen mit einem einzelnen Byte (8 Bits), das 8 verschiedene Flags darstellt. Pakete sind sequentiell und werden mit einer vorzeichenlosen 16-Bit-Ganzzahl im Paket-Header nummeriert, es sei denn, das Paket ist als unzuverlässig markiert.
Paketbestätigungen (ACKs) sind redundant in jedem gesendeten Paket enthalten und umfassen insgesamt 5 Bytes: zwei Bytes repräsentieren eine vorzeichenlose 16-Bit-Paketsequenznummer (ACK) und drei Bytes repräsentieren ein 32-Bit-Bitfeld (ackBits).
Das Paket-Header-Layout ist, ähnlich wie networkprotocol/reliable.io
, delta-kodiert und RLE-kodiert, um den Größen-Overhead pro Paket zu reduzieren.
Bei einem Paket, das wir gerade von unserem Peer erhalten haben, markieren wir für jedes gesetzte Bit (i) im Bitfeld (ackBits) ein von uns gesendetes Paket zur Bestätigung, wenn seine Sequenznummer (ack – i) ist.
Wenn Peer A Pakete an B sendet, während B überhaupt keine Pakete an A sendet, sendet B für alle 32 von A empfangenen Pakete ein leeres Paket, sodass A weiß, dass B seine Pakete bestätigt hat.
Genauer gesagt wird ein Zähler (lui) verwaltet, der die letzte aufeinanderfolgende Paketsequenznummer darstellt, die wir erhalten haben und deren Bestätigung wir unserem Peer mitgeteilt haben.
Wenn (lui) beispielsweise 0 ist und wir Bestätigungen für Pakete gesendet haben, deren Sequenznummern 2, 3, 4 und 6 sind, und wir dann die Paketsequenznummer 1 bestätigt haben, wäre lui 4.
Wenn bei der Aktualisierung (lui) die nächsten 32 aufeinanderfolgenden Sequenznummern Sequenznummern von Paketen sind, die wir zuvor empfangen haben, erhöhen wir (lui) um 32 und senden ein einzelnes leeres Paket mit den folgenden Paketbestätigungen: (ack=lui+31, ackBits=[lui,lui+31]).
Für Pakete, die wir gesendet haben (wq), und Pakete, die wir empfangen haben (rq), werden zwei Sequenzpuffer fester Größe verwaltet. Die für diese Puffer festgelegte Größe muss gleichmäßig durch den Maximalwert einer vorzeichenlosen 16-Bit-Ganzzahl (65536) geteilt werden. Die Datenstruktur wird in diesem Blogbeitrag von Glenn Fiedler beschrieben.
Wir verfolgen einen Zähler (oui), der die letzte fortlaufende Sequenznummer eines Pakets darstellt, das wir gesendet haben und das von unserem Peer bestätigt wurde. Wenn wir beispielsweise Pakete gesendet haben, deren Sequenznummern im Bereich [0, 256] liegen, und wir Bestätigungen für Pakete erhalten haben (0, 1, 2, 3, 4, 8, 9, 10, 11, 12), dann wäre (oui) 4.
Cap(q) sei die feste Größe oder Kapazität des Sequenzpuffers q.
Beim Senden von Paketen stoppen und puffern wir zeitweise das Senden von Paketen, wenn wir glauben, dass das Senden weiterer Pakete den Lesepuffer unseres Empfängers überlaufen würde. Genauer gesagt: Wenn dem nächsten von uns gesendeten Paket eine Paketnummer größer als (oui + cap(rq)) zugewiesen wird, stoppen wir alle Sendungen, bis (oui) durch den Empfänger eines Pakets von unserem Peer inkrementiert wurde.
Die Logik zur erneuten Übertragung veralteter, unbestätigter gesendeter Pakete und zur Aufrechterhaltung von Bestätigungen wurde in diesem Blogbeitrag von Glenn Fiedler übernommen.
Es besteht der Verdacht, dass Pakete verloren gehen, wenn sie vom Empfänger nicht nach 100 ms bestätigt werden. Sobald der Verdacht besteht, dass ein Paket verloren gegangen ist, wird es erneut gesendet. Derzeit werden Pakete maximal 10 Mal erneut gesendet.
Es könnte sinnvoll sein, nicht zuzulassen, dass Pakete eine begrenzte Anzahl von Malen erneut gesendet werden, und dies dem Entwickler zu überlassen. Dies ist jedoch offen für Diskussionen, die ich gerne auf meinem Discord-Server oder über ein Github-Problem führen kann.
Auf meiner Suche nach einer praktikablen Lösung gegen TCP-Head-of-Line-Blockierung habe ich viele zuverlässige UDP-Bibliotheken in Go durchgesehen, von denen sich die meisten hauptsächlich für Dateiübertragungen oder Spiele eignen:
Als ich sie alle durchging, hatte ich das Gefühl, dass sie etwas zu viel für mich getan hatten. Bei meiner Arbeit und meinen Nebenprojekten habe ich mich intensiv mit dezentralen P2P-Netzwerkprotokollen beschäftigt. Es liegt in der Natur dieser Protokolle, dass sie in Umgebungen mit hoher Latenz und hohem Paketverlust stark unter der TCP-Head-of-Line-Blockierung leiden.
In vielen Fällen wurden viele der von diesen Bibliotheken bereitgestellten Funktionen entweder nicht benötigt oder man hatte ehrlich gesagt den Eindruck, dass sie vom Entwickler, der diese Bibliotheken verwendet, am besten gehandhabt und durchdacht werden sollten. Zum Beispiel:
Also begann ich mit der Arbeit an einem modularen Ansatz und beschloss, den Zuverlässigkeitsanteil der von mir integrierten Protokolle in einer separaten Bibliothek zu abstrahieren.
Meiner Meinung nach ist dieser Ansatz am besten im Vergleich zu den beliebten Alternativen wie QUIC oder SCTP, die je nach Ihren Umständen möglicherweise etwas zu viel für Sie tun. Schließlich ist es schon schwer genug, nur die Zuverlässigkeitsbits eines UDP-basierten Protokolls korrekt und gut getestet zu bekommen.
net.PacketConn
-bezogene Bits für eine feinere Abstraktion weg.net.UDPAddr
.Zuverlässig verwendet Go-Module. Um es in Ihr Projekt einzubinden, führen Sie den folgenden Befehl aus:
$ go get github.com/lithdew/reliable
Wenn Sie nur schnell ein Projekt oder eine Demo zum Laufen bringen möchten, verwenden Sie Endpoint
. Wenn Sie mehr Flexibilität benötigen, sollten Sie eine direkte Zusammenarbeit mit Conn
in Betracht ziehen.
Beachten Sie, dass eine Art Keep-Alive-Mechanismus oder ein Heartbeat-System zusätzlich gebootet werden muss, andernfalls werden Pakete möglicherweise auf unbestimmte Zeit erneut gesendet, da sie nicht bestätigt wurden.
WithReadBufferSize
konfiguriert werden. Die Standardgröße des Lesepuffers beträgt 256.WithWriteBufferSize
konfiguriert werden. Die Standardgröße des Schreibpuffers beträgt 256.WithResendTimeout
konfiguriert werden. Das Standard-Timeout für erneutes Senden beträgt 100 Millisekunden.WithEndpointPacketHandler
oder WithProtocolPacketHandler
konfiguriert werden. Standardmäßig wird ein Null-Handler bereitgestellt, der alle eingehenden Pakete ignoriert.WithEndpointErrorHandler
oder WithProtocolErrorHandler
konfiguriert werden können. Standardmäßig wird ein Null-Handler bereitgestellt, der alle Fehler ignoriert.WithBufferPool
kann ein Byte-Pufferpool übergeben werden. Standardmäßig wird ein neuer Bytepufferpool instanziiert. Ein Benchmark wurde mit cmd/benchmark
aus Japan auf einem DigitalOcean 2GB/60 GB Disk/NYC3-Server durchgeführt.
Die Benchmark-Aufgabe bestand darin, 1400-Byte-Pakete von Japan nach New York zu spammen. Bei einer Ping-Latenz von etwa 220 Millisekunden betrug der Durchsatz etwa 1,2 MiB/Sek.
Es wurden auch Unit-Test-Benchmarks durchgeführt, wie unten gezeigt.
$ cat /proc/cpuinfo | grep 'model name' | uniq
model name : Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz
$ go test -bench=. -benchtime=10s
goos: linux
goarch: amd64
pkg: github.com/lithdew/reliable
BenchmarkEndpointWriteReliablePacket-8 2053717 5941 ns/op 183 B/op 9 allocs/op
BenchmarkEndpointWriteUnreliablePacket-8 2472392 4866 ns/op 176 B/op 8 allocs/op
BenchmarkMarshalPacketHeader-8 749060137 15.7 ns/op 0 B/op 0 allocs/op
BenchmarkUnmarshalPacketHeader-8 835547473 14.6 ns/op 0 B/op 0 allocs/op
Sie können das folgende Beispiel ausführen, indem Sie den folgenden Befehl ausführen:
$ go run github.com/lithdew/reliable/examples/basic
Dieses Beispiel zeigt:
package main
import (
"bytes"
"errors"
"github.com/davecgh/go-spew/spew"
"github.com/lithdew/reliable"
"io"
"log"
"net"
"os"
"os/signal"
"sync"
"sync/atomic"
"time"
)
var (
PacketData = bytes . Repeat ([] byte ( "x" ), 1400 )
NumPackets = uint64 ( 0 )
)
func check ( err error ) {
if err != nil && ! errors . Is ( err , io . EOF ) {
log . Panic ( err )
}
}
func listen ( addr string ) net. PacketConn {
conn , err := net . ListenPacket ( "udp" , addr )
check ( err )
return conn
}
func handler ( buf [] byte , _ net. Addr ) {
if bytes . Equal ( buf , PacketData ) || len ( buf ) == 0 {
return
}
spew . Dump ( buf )
os . Exit ( 1 )
}
func main () {
exit := make ( chan struct {})
var wg sync. WaitGroup
wg . Add ( 2 )
ca := listen ( "127.0.0.1:44444" )
cb := listen ( "127.0.0.1:55555" )
a := reliable . NewEndpoint ( ca , reliable . WithEndpointPacketHandler ( handler ))
b := reliable . NewEndpoint ( cb , reliable . WithEndpointPacketHandler ( handler ))
defer func () {
check ( ca . SetDeadline ( time . Now (). Add ( 1 * time . Millisecond )))
check ( cb . SetDeadline ( time . Now (). Add ( 1 * time . Millisecond )))
close ( exit )
check ( a . Close ())
check ( b . Close ())
check ( ca . Close ())
check ( cb . Close ())
wg . Wait ()
}()
go a . Listen ()
go b . Listen ()
// The two goroutines below have endpoint A spam endpoint B, and print out how
// many packets of data are being sent per second.
go func () {
defer wg . Done ()
for {
select {
case <- exit :
return
default :
}
check ( a . WriteReliablePacket ( PacketData , b . Addr ()))
atomic . AddUint64 ( & NumPackets , 1 )
}
}()
go func () {
defer wg . Done ()
ticker := time . NewTicker ( 1 * time . Second )
defer ticker . Stop ()
for {
select {
case <- exit :
return
case <- ticker . C :
numPackets := atomic . SwapUint64 ( & NumPackets , 0 )
numBytes := float64 ( numPackets ) * 1400.0 / 1024.0 / 1024.0
log . Printf (
"Sent %d packet(s) comprised of %.2f MiB worth of data." ,
numPackets ,
numBytes ,
)
}
}
}()
ch := make ( chan os. Signal , 1 )
signal . Notify ( ch , os . Interrupt )
<- ch
}