可靠是 Go 中 UDP 连接的可靠性层。
由于最多只有 9 个字节的数据包开销, reliable对基于 UDP 的应用程序的作用是:
** 这个项目仍然是一个 WIP!仔细检查源代码,编写一些单元测试,帮助提供文档,或者如果您想提供帮助或有任何问题,可以打开 Github 问题!
可靠使用networkprotocol/reliable.io
中描述的相同数据包标头布局。
所有数据包均以代表 8 个不同标志的单个字节(8 位)开始。数据包是连续的,并且使用数据包标头中包含的无符号 16 位整数进行编号,除非数据包被标记为不可靠。
数据包确认 (ACK) 冗余地包含在每个发送的数据包中,总共使用 5 个字节:两个字节表示无符号 16 位数据包序列号 (ack),三个字节表示 32 位位字段 (ackBits)。
数据包标头布局与networkprotocol/reliable.io
非常相似,采用 delta 编码和 RLE 编码,以减少每个数据包的大小开销。
给定一个我们刚刚从对等方收到的数据包,对于位字段 (ackBits) 中的每个设置位 (i),如果其序列号为 (ack - i),我们将标记已发送的要确认的数据包。
在对等点 A 向 B 发送数据包的情况下,B 根本不向 A 发送任何数据包,则 B 将从 A 接收到的每 32 个数据包发送一个空数据包,以便 A 知道 B 已确认其数据包。
更明确地说,维护一个计数器(lui),代表我们收到的最后一个连续数据包序列号,我们已将其确认告知我们的对等方。
例如,如果 (lui) 为 0,并且我们已发送了序列号为 2、3、4 和 6 的数据包的确认,并且我们随后确认了数据包序列号 1,则 lui 将为 4。
更新 (lui) 后,如果接下来的 32 个连续序列号是我们之前收到的数据包的序列号,我们会将 (lui) 增加 32 并发送包含以下数据包确认的单个空数据包: (ack=lui+31, ackBits=[lui,lui+31])。
为我们已发送的数据包 (wq) 和我们已接收的数据包 (rq) 维护两个固定大小的序列缓冲区。这些缓冲区的固定大小必须整除为无符号 16 位整数的最大值 (65536)。 Glenn Fiedler 的这篇博客文章描述了该数据结构。
我们跟踪一个计数器(oui),代表我们发送的数据包的最后一个连续序列号,该数据包已被我们的对等方确认。例如,如果我们发送了序列号在 [0, 256] 范围内的数据包,并且收到了数据包 (0, 1, 2, 3, 4, 8, 9, 10, 11, 12) 的确认,那么 (oui) 就是 4。
令 cap(q) 为序列缓冲区 q 的固定大小或容量。
在发送数据包时,如果我们认为发送更多数据包会溢出收件人的读取缓冲区,我们会间歇性地停止并缓冲数据包的发送。更明确地说,如果我们发送的下一个数据包分配的数据包编号大于 (oui + cap(rq)),我们将停止所有发送,直到 (oui) 通过来自对等方的数据包的接收者递增。
重新传输过时的、未确认的已发送数据包和维护确认的逻辑归功于 Glenn Fiedler 的这篇博客文章。
如果 100 毫秒后接收方未确认数据包,则怀疑数据包丢失。一旦怀疑数据包丢失,就重新发送。截至目前,数据包最多重发 10 次。
明智的做法是不允许数据包重新发送的次数达到上限,并将其留给开发人员。不过,这是可以公开讨论的,我很高兴在我的 Discord 服务器上或通过 Github 问题进行讨论。
在寻找针对 TCP 队头阻塞的可行解决方案时,我查看了 Go 中的许多可靠的 UDP 库,其中大多数主要适用于文件传输或游戏:
经历了所有这些,我觉得他们为我做了太多了。在我的工作和业余项目中,我一直致力于去中心化 p2p 网络协议的研究。这些协议的本质是它们在高延迟/高数据包丢失环境中运行时会严重受到 TCP 队头阻塞的影响。
在许多情况下,这些库提供的许多功能要么是不需要的,要么是诚实地认为它们最好由使用这些库的开发人员来处理和考虑。例如:
因此,我开始研究模块化方法,并决定将我构建到单独库中的协议的可靠性部分抽象出来。
我认为与 QUIC 或 SCTP 等流行的替代方案相比,这种方法是最好的,根据您的具体情况,这些替代方案可能对您来说有点太多了。毕竟,仅使基于 UDP 的协议的可靠性位正确并经过充分测试就已经足够困难了。
net.PacketConn
相关位以获得更精细的抽象。net.UDPAddr
的字符串表示形式的缓存。可靠使用 Go 模块。要将其包含在您的项目中,请运行以下命令:
$ go get github.com/lithdew/reliable
如果您只是希望快速启动并运行项目或演示,请使用Endpoint
。如果您需要更大的灵活性,请考虑直接与Conn
合作。
请注意,需要在顶部引导某种保活机制或心跳系统,否则数据包可能会无限期地重新发送,因为它们将无法被确认。
WithReadBufferSize
配置读取缓冲区大小。默认读取缓冲区大小为 256。WithWriteBufferSize
配置写入缓冲区大小。默认写入缓冲区大小为 256。WithResendTimeout
配置重新传输尚未确认的数据包之前的最短时间。默认重发超时为 100 毫秒。WithEndpointPacketHandler
或WithProtocolPacketHandler
配置接收数据包时回调的数据包处理程序。默认情况下,提供了一个 nil 处理程序,它会忽略所有传入的数据包。WithEndpointErrorHandler
或WithProtocolErrorHandler
配置。默认情况下,提供了一个 nil 处理程序来忽略所有错误。WithBufferPool
传入字节缓冲池。默认情况下,会实例化一个新的字节缓冲池。 使用来自日本的cmd/benchmark
对 DigitalOcean 2GB / 60 GB 磁盘 / NYC3 服务器进行了基准测试。
基准测试任务是将 1400 字节的数据包从日本发送到纽约。如果 ping 延迟约为 220 毫秒,则吞吐量约为 1.2 MiB/秒。
还执行了单元测试基准测试,如下所示。
$ cat /proc/cpuinfo | grep 'model name' | uniq
model name : Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz
$ go test -bench=. -benchtime=10s
goos: linux
goarch: amd64
pkg: github.com/lithdew/reliable
BenchmarkEndpointWriteReliablePacket-8 2053717 5941 ns/op 183 B/op 9 allocs/op
BenchmarkEndpointWriteUnreliablePacket-8 2472392 4866 ns/op 176 B/op 8 allocs/op
BenchmarkMarshalPacketHeader-8 749060137 15.7 ns/op 0 B/op 0 allocs/op
BenchmarkUnmarshalPacketHeader-8 835547473 14.6 ns/op 0 B/op 0 allocs/op
您可以通过执行以下命令来运行下面的示例:
$ go run github.com/lithdew/reliable/examples/basic
此示例演示:
package main
import (
"bytes"
"errors"
"github.com/davecgh/go-spew/spew"
"github.com/lithdew/reliable"
"io"
"log"
"net"
"os"
"os/signal"
"sync"
"sync/atomic"
"time"
)
var (
PacketData = bytes . Repeat ([] byte ( "x" ), 1400 )
NumPackets = uint64 ( 0 )
)
func check ( err error ) {
if err != nil && ! errors . Is ( err , io . EOF ) {
log . Panic ( err )
}
}
func listen ( addr string ) net. PacketConn {
conn , err := net . ListenPacket ( "udp" , addr )
check ( err )
return conn
}
func handler ( buf [] byte , _ net. Addr ) {
if bytes . Equal ( buf , PacketData ) || len ( buf ) == 0 {
return
}
spew . Dump ( buf )
os . Exit ( 1 )
}
func main () {
exit := make ( chan struct {})
var wg sync. WaitGroup
wg . Add ( 2 )
ca := listen ( "127.0.0.1:44444" )
cb := listen ( "127.0.0.1:55555" )
a := reliable . NewEndpoint ( ca , reliable . WithEndpointPacketHandler ( handler ))
b := reliable . NewEndpoint ( cb , reliable . WithEndpointPacketHandler ( handler ))
defer func () {
check ( ca . SetDeadline ( time . Now (). Add ( 1 * time . Millisecond )))
check ( cb . SetDeadline ( time . Now (). Add ( 1 * time . Millisecond )))
close ( exit )
check ( a . Close ())
check ( b . Close ())
check ( ca . Close ())
check ( cb . Close ())
wg . Wait ()
}()
go a . Listen ()
go b . Listen ()
// The two goroutines below have endpoint A spam endpoint B, and print out how
// many packets of data are being sent per second.
go func () {
defer wg . Done ()
for {
select {
case <- exit :
return
default :
}
check ( a . WriteReliablePacket ( PacketData , b . Addr ()))
atomic . AddUint64 ( & NumPackets , 1 )
}
}()
go func () {
defer wg . Done ()
ticker := time . NewTicker ( 1 * time . Second )
defer ticker . Stop ()
for {
select {
case <- exit :
return
case <- ticker . C :
numPackets := atomic . SwapUint64 ( & NumPackets , 0 )
numBytes := float64 ( numPackets ) * 1400.0 / 1024.0 / 1024.0
log . Printf (
"Sent %d packet(s) comprised of %.2f MiB worth of data." ,
numPackets ,
numBytes ,
)
}
}
}()
ch := make ( chan os. Signal , 1 )
signal . Notify ( ch , os . Interrupt )
<- ch
}