Go 中的 RFC6455 WebSocket 实现。
wsutil
包中 API 的高级包装器和帮助器,允许快速启动,而无需挖掘协议内部结构转到文档。
现有的 WebSocket 实现不允许用户以明确的方式重用连接之间的 I/O 缓冲区。该库旨在导出高效的低级接口来使用协议,而不强制只使用一种方式。
顺便说一句,如果您想获得更高级别的工具,可以使用wsutil
包。
库被标记为v1*
因此在某些改进或重构过程中不得破坏其 API。
RFC6455 的此实现通过了 Autobahn 测试套件,目前覆盖率约为 78%。
使用ws
的示例应用程序是在单独的存储库 ws-examples 中开发的。
WebSocket echo 服务器的高级示例:
package main
import (
"net/http"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
)
func main () {
http . ListenAndServe ( ":8080" , http . HandlerFunc ( func ( w http. ResponseWriter , r * http. Request ) {
conn , _ , _ , err := ws . UpgradeHTTP ( r , w )
if err != nil {
// handle error
}
go func () {
defer conn . Close ()
for {
msg , op , err := wsutil . ReadClientData ( conn )
if err != nil {
// handle error
}
err = wsutil . WriteServerMessage ( conn , op , msg )
if err != nil {
// handle error
}
}
}()
}))
}
较低级别但仍然较高级别的示例:
import (
"net/http"
"io"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
)
func main () {
http . ListenAndServe ( ":8080" , http . HandlerFunc ( func ( w http. ResponseWriter , r * http. Request ) {
conn , _ , _ , err := ws . UpgradeHTTP ( r , w )
if err != nil {
// handle error
}
go func () {
defer conn . Close ()
var (
state = ws . StateServerSide
reader = wsutil . NewReader ( conn , state )
writer = wsutil . NewWriter ( conn , state , ws . OpText )
)
for {
header , err := reader . NextFrame ()
if err != nil {
// handle error
}
// Reset writer to write frame with right operation code.
writer . Reset ( conn , state , header . OpCode )
if _ , err = io . Copy ( writer , reader ); err != nil {
// handle error
}
if err = writer . Flush (); err != nil {
// handle error
}
}
}()
}))
}
我们可以应用相同的模式通过 JSON 编码器和解码器读取和写入结构化响应:
...
var (
r = wsutil . NewReader ( conn , ws . StateServerSide )
w = wsutil . NewWriter ( conn , ws . StateServerSide , ws . OpText )
decoder = json . NewDecoder ( r )
encoder = json . NewEncoder ( w )
)
for {
hdr , err = r . NextFrame ()
if err != nil {
return err
}
if hdr . OpCode == ws . OpClose {
return io . EOF
}
var req Request
if err := decoder . Decode ( & req ); err != nil {
return err
}
var resp Response
if err := encoder . Encode ( & resp ); err != nil {
return err
}
if err = w . Flush (); err != nil {
return err
}
}
...
没有wsutil
的较低级别示例:
package main
import (
"net"
"io"
"github.com/gobwas/ws"
)
func main () {
ln , err := net . Listen ( "tcp" , "localhost:8080" )
if err != nil {
log . Fatal ( err )
}
for {
conn , err := ln . Accept ()
if err != nil {
// handle error
}
_ , err = ws . Upgrade ( conn )
if err != nil {
// handle error
}
go func () {
defer conn . Close ()
for {
header , err := ws . ReadHeader ( conn )
if err != nil {
// handle error
}
payload := make ([] byte , header . Length )
_ , err = io . ReadFull ( conn , payload )
if err != nil {
// handle error
}
if header . Masked {
ws . Cipher ( payload , header . Mask , 0 )
}
// Reset the Masked flag, server frames must not be masked as
// RFC6455 says.
header . Masked = false
if err := ws . WriteHeader ( conn , header ); err != nil {
// handle error
}
if _ , err := conn . Write ( payload ); err != nil {
// handle error
}
if header . OpCode == ws . OpClose {
return
}
}
}()
}
}
零拷贝升级有助于在处理 HTTP 升级请求时避免不必要的分配和复制。
所有非 Websocket 标头的处理都是通过使用注册用户回调来完成的,其参数仅在回调返回之前有效。
简单的例子如下所示:
package main
import (
"net"
"log"
"github.com/gobwas/ws"
)
func main () {
ln , err := net . Listen ( "tcp" , "localhost:8080" )
if err != nil {
log . Fatal ( err )
}
u := ws. Upgrader {
OnHeader : func ( key , value [] byte ) ( err error ) {
log . Printf ( "non-websocket header: %q=%q" , key , value )
return
},
}
for {
conn , err := ln . Accept ()
if err != nil {
// handle error
}
_ , err = u . Upgrade ( conn )
if err != nil {
// handle error
}
}
}
此处使用ws.Upgrader
可以在 tcp 级别上控制传入连接,并且通过某种逻辑不接受它们。
零拷贝升级适用于必须控制连接缓冲区等许多资源的高负载服务。
现实生活中的例子可能是这样的:
package main
import (
"fmt"
"io"
"log"
"net"
"net/http"
"runtime"
"github.com/gobwas/httphead"
"github.com/gobwas/ws"
)
func main () {
ln , err := net . Listen ( "tcp" , "localhost:8080" )
if err != nil {
// handle error
}
// Prepare handshake header writer from http.Header mapping.
header := ws . HandshakeHeaderHTTP (http. Header {
"X-Go-Version" : [] string { runtime . Version ()},
})
u := ws. Upgrader {
OnHost : func ( host [] byte ) error {
if string ( host ) == "github.com" {
return nil
}
return ws . RejectConnectionError (
ws . RejectionStatus ( 403 ),
ws . RejectionHeader ( ws . HandshakeHeaderString (
"X-Want-Host: github.com r n " ,
)),
)
},
OnHeader : func ( key , value [] byte ) error {
if string ( key ) != "Cookie" {
return nil
}
ok := httphead . ScanCookie ( value , func ( key , value [] byte ) bool {
// Check session here or do some other stuff with cookies.
// Maybe copy some values for future use.
return true
})
if ok {
return nil
}
return ws . RejectConnectionError (
ws . RejectionReason ( "bad cookie" ),
ws . RejectionStatus ( 400 ),
)
},
OnBeforeUpgrade : func () (ws. HandshakeHeader , error ) {
return header , nil
},
}
for {
conn , err := ln . Accept ()
if err != nil {
log . Fatal ( err )
}
_ , err = u . Upgrade ( conn )
if err != nil {
log . Printf ( "upgrade error: %s" , err )
}
}
}
有一个ws/wsflate
包支持 Permessage-Deflate 压缩扩展。
它提供了可与任何 deflate 实现(例如标准库的 compress/flate)结合使用的简约 I/O 包装器。
它还通过提供wsflate.MessageState
类型来与wsutil
的读取器和写入器兼容,该类型实现wsutil.SendExtension
和wsutil.RecvExtension
接口。
package main
import (
"bytes"
"log"
"net"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsflate"
)
func main () {
ln , err := net . Listen ( "tcp" , "localhost:8080" )
if err != nil {
// handle error
}
e := wsflate. Extension {
// We are using default parameters here since we use
// wsflate.{Compress,Decompress}Frame helpers below in the code.
// This assumes that we use standard compress/flate package as flate
// implementation.
Parameters : wsflate . DefaultParameters ,
}
u := ws. Upgrader {
Negotiate : e . Negotiate ,
}
for {
conn , err := ln . Accept ()
if err != nil {
log . Fatal ( err )
}
// Reset extension after previous upgrades.
e . Reset ()
_ , err = u . Upgrade ( conn )
if err != nil {
log . Printf ( "upgrade error: %s" , err )
continue
}
if _ , ok := e . Accepted (); ! ok {
log . Printf ( "didn't negotiate compression for %s" , conn . RemoteAddr ())
conn . Close ()
continue
}
go func () {
defer conn . Close ()
for {
frame , err := ws . ReadFrame ( conn )
if err != nil {
// Handle error.
return
}
frame = ws . UnmaskFrameInPlace ( frame )
if wsflate . IsCompressed ( frame . Header ) {
// Note that even after successful negotiation of
// compression extension, both sides are able to send
// non-compressed messages.
frame , err = wsflate . DecompressFrame ( frame )
if err != nil {
// Handle error.
return
}
}
// Do something with frame...
ack := ws . NewTextFrame ([] byte ( "this is an acknowledgement" ))
// Compress response unconditionally.
ack , err = wsflate . CompressFrame ( ack )
if err != nil {
// Handle error.
return
}
if err = ws . WriteFrame ( conn , ack ); err != nil {
// Handle error.
return
}
}
}()
}
}
您可以通过以下方式将压缩与wsutil
包一起使用:
// Upgrade somehow and negotiate compression to get the conn...
// Initialize flate reader. We are using nil as a source io.Reader because
// we will Reset() it in the message i/o loop below.
fr := wsflate . NewReader ( nil , func ( r io. Reader ) wsflate. Decompressor {
return flate . NewReader ( r )
})
// Initialize flate writer. We are using nil as a destination io.Writer
// because we will Reset() it in the message i/o loop below.
fw := wsflate . NewWriter ( nil , func ( w io. Writer ) wsflate. Compressor {
f , _ := flate . NewWriter ( w , 9 )
return f
})
// Declare compression message state variable.
//
// It has two goals:
// - Allow users to check whether received message is compressed or not.
// - Help wsutil.Reader and wsutil.Writer to set/unset appropriate
// WebSocket header bits while writing next frame to the wire (it
// implements wsutil.RecvExtension and wsutil.SendExtension).
var msg wsflate. MessageState
// Initialize WebSocket reader as previously.
// Please note the use of Reader.Extensions field as well as
// of ws.StateExtended flag.
rd := & wsutil. Reader {
Source : conn ,
State : ws . StateServerSide | ws . StateExtended ,
Extensions : []wsutil. RecvExtension {
& msg ,
},
}
// Initialize WebSocket writer with ws.StateExtended flag as well.
wr := wsutil . NewWriter ( conn , ws . StateServerSide | ws . StateExtended , 0 )
// Use the message state as wsutil.SendExtension.
wr . SetExtensions ( & msg )
for {
h , err := rd . NextFrame ()
if err != nil {
// handle error.
}
if h . OpCode . IsControl () {
// handle control frame.
}
if ! msg . IsCompressed () {
// handle uncompressed frame (skipped for the sake of example
// simplicity).
}
// Reset the writer to echo same op code.
wr . Reset ( h . OpCode )
// Reset both flate reader and writer to start the new round of i/o.
fr . Reset ( rd )
fw . Reset ( wr )
// Copy whole message from reader to writer decompressing it and
// compressing again.
if _ , err := io . Copy ( fw , fr ); err != nil {
// handle error.
}
// Flush any remaining buffers from flate writer to WebSocket writer.
if err := fw . Close (); err != nil {
// handle error.
}
// Flush the whole WebSocket message to the wire.
if err := wr . Flush (); err != nil {
// handle error.
}
}