Go 中的 RFC6455 WebSocket 實作。
wsutil
套件中 API 的高級包裝器和幫助器,允許快速啟動,而無需挖掘協議內部結構轉到文檔。
現有的 WebSocket 實作不允許使用者以明確的方式重複使用連線之間的 I/O 緩衝區。該程式庫旨在導出高效的低階介面來使用協議,而不強制只使用一種方式。
順便說一句,如果您想獲得更高等級的工具,可以使用wsutil
套件。
庫被標記為v1*
因此在某些改進或重構過程中不得破壞其 API。
RFC6455 的此實作通過了 Autobahn 測試套件,目前覆蓋率約為 78%。
使用ws
的範例應用程式是在單獨的儲存庫 ws-examples 中開發的。
WebSocket echo 伺服器的進階範例:
package main
import (
"net/http"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
)
func main () {
http . ListenAndServe ( ":8080" , http . HandlerFunc ( func ( w http. ResponseWriter , r * http. Request ) {
conn , _ , _ , err := ws . UpgradeHTTP ( r , w )
if err != nil {
// handle error
}
go func () {
defer conn . Close ()
for {
msg , op , err := wsutil . ReadClientData ( conn )
if err != nil {
// handle error
}
err = wsutil . WriteServerMessage ( conn , op , msg )
if err != nil {
// handle error
}
}
}()
}))
}
較低但仍較高層級的範例:
import (
"net/http"
"io"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
)
func main () {
http . ListenAndServe ( ":8080" , http . HandlerFunc ( func ( w http. ResponseWriter , r * http. Request ) {
conn , _ , _ , err := ws . UpgradeHTTP ( r , w )
if err != nil {
// handle error
}
go func () {
defer conn . Close ()
var (
state = ws . StateServerSide
reader = wsutil . NewReader ( conn , state )
writer = wsutil . NewWriter ( conn , state , ws . OpText )
)
for {
header , err := reader . NextFrame ()
if err != nil {
// handle error
}
// Reset writer to write frame with right operation code.
writer . Reset ( conn , state , header . OpCode )
if _ , err = io . Copy ( writer , reader ); err != nil {
// handle error
}
if err = writer . Flush (); err != nil {
// handle error
}
}
}()
}))
}
我們可以應用相同的模式透過 JSON 編碼器和解碼器讀取和寫入結構化回應:
...
var (
r = wsutil . NewReader ( conn , ws . StateServerSide )
w = wsutil . NewWriter ( conn , ws . StateServerSide , ws . OpText )
decoder = json . NewDecoder ( r )
encoder = json . NewEncoder ( w )
)
for {
hdr , err = r . NextFrame ()
if err != nil {
return err
}
if hdr . OpCode == ws . OpClose {
return io . EOF
}
var req Request
if err := decoder . Decode ( & req ); err != nil {
return err
}
var resp Response
if err := encoder . Encode ( & resp ); err != nil {
return err
}
if err = w . Flush (); err != nil {
return err
}
}
...
沒有wsutil
的較低層級範例:
package main
import (
"net"
"io"
"github.com/gobwas/ws"
)
func main () {
ln , err := net . Listen ( "tcp" , "localhost:8080" )
if err != nil {
log . Fatal ( err )
}
for {
conn , err := ln . Accept ()
if err != nil {
// handle error
}
_ , err = ws . Upgrade ( conn )
if err != nil {
// handle error
}
go func () {
defer conn . Close ()
for {
header , err := ws . ReadHeader ( conn )
if err != nil {
// handle error
}
payload := make ([] byte , header . Length )
_ , err = io . ReadFull ( conn , payload )
if err != nil {
// handle error
}
if header . Masked {
ws . Cipher ( payload , header . Mask , 0 )
}
// Reset the Masked flag, server frames must not be masked as
// RFC6455 says.
header . Masked = false
if err := ws . WriteHeader ( conn , header ); err != nil {
// handle error
}
if _ , err := conn . Write ( payload ); err != nil {
// handle error
}
if header . OpCode == ws . OpClose {
return
}
}
}()
}
}
零拷貝升級有助於在處理 HTTP 升級請求時避免不必要的分配和複製。
所有非 Websocket 標頭的處理都是透過使用註冊用戶回呼來完成的,其參數僅在回調返回之前有效。
簡單的例子如下:
package main
import (
"net"
"log"
"github.com/gobwas/ws"
)
func main () {
ln , err := net . Listen ( "tcp" , "localhost:8080" )
if err != nil {
log . Fatal ( err )
}
u := ws. Upgrader {
OnHeader : func ( key , value [] byte ) ( err error ) {
log . Printf ( "non-websocket header: %q=%q" , key , value )
return
},
}
for {
conn , err := ln . Accept ()
if err != nil {
// handle error
}
_ , err = u . Upgrade ( conn )
if err != nil {
// handle error
}
}
}
此處使用ws.Upgrader
可以在 tcp 層級上控制傳入連接,並且透過某種邏輯不接受它們。
零拷貝升級適用於必須控制連接緩衝區等許多資源的高負載服務。
現實生活中的例子可能是這樣的:
package main
import (
"fmt"
"io"
"log"
"net"
"net/http"
"runtime"
"github.com/gobwas/httphead"
"github.com/gobwas/ws"
)
func main () {
ln , err := net . Listen ( "tcp" , "localhost:8080" )
if err != nil {
// handle error
}
// Prepare handshake header writer from http.Header mapping.
header := ws . HandshakeHeaderHTTP (http. Header {
"X-Go-Version" : [] string { runtime . Version ()},
})
u := ws. Upgrader {
OnHost : func ( host [] byte ) error {
if string ( host ) == "github.com" {
return nil
}
return ws . RejectConnectionError (
ws . RejectionStatus ( 403 ),
ws . RejectionHeader ( ws . HandshakeHeaderString (
"X-Want-Host: github.com r n " ,
)),
)
},
OnHeader : func ( key , value [] byte ) error {
if string ( key ) != "Cookie" {
return nil
}
ok := httphead . ScanCookie ( value , func ( key , value [] byte ) bool {
// Check session here or do some other stuff with cookies.
// Maybe copy some values for future use.
return true
})
if ok {
return nil
}
return ws . RejectConnectionError (
ws . RejectionReason ( "bad cookie" ),
ws . RejectionStatus ( 400 ),
)
},
OnBeforeUpgrade : func () (ws. HandshakeHeader , error ) {
return header , nil
},
}
for {
conn , err := ln . Accept ()
if err != nil {
log . Fatal ( err )
}
_ , err = u . Upgrade ( conn )
if err != nil {
log . Printf ( "upgrade error: %s" , err )
}
}
}
有一個ws/wsflate
套件支援 Permessage-Deflate 壓縮擴充。
它提供了可與任何 deflate 實作(例如標準庫的 compress/flate)結合使用的簡約 I/O 包裝器。
它還透過提供wsflate.MessageState
類型來與wsutil
的讀取器和寫入器相容,該類型實作wsutil.SendExtension
和wsutil.RecvExtension
介面。
package main
import (
"bytes"
"log"
"net"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsflate"
)
func main () {
ln , err := net . Listen ( "tcp" , "localhost:8080" )
if err != nil {
// handle error
}
e := wsflate. Extension {
// We are using default parameters here since we use
// wsflate.{Compress,Decompress}Frame helpers below in the code.
// This assumes that we use standard compress/flate package as flate
// implementation.
Parameters : wsflate . DefaultParameters ,
}
u := ws. Upgrader {
Negotiate : e . Negotiate ,
}
for {
conn , err := ln . Accept ()
if err != nil {
log . Fatal ( err )
}
// Reset extension after previous upgrades.
e . Reset ()
_ , err = u . Upgrade ( conn )
if err != nil {
log . Printf ( "upgrade error: %s" , err )
continue
}
if _ , ok := e . Accepted (); ! ok {
log . Printf ( "didn't negotiate compression for %s" , conn . RemoteAddr ())
conn . Close ()
continue
}
go func () {
defer conn . Close ()
for {
frame , err := ws . ReadFrame ( conn )
if err != nil {
// Handle error.
return
}
frame = ws . UnmaskFrameInPlace ( frame )
if wsflate . IsCompressed ( frame . Header ) {
// Note that even after successful negotiation of
// compression extension, both sides are able to send
// non-compressed messages.
frame , err = wsflate . DecompressFrame ( frame )
if err != nil {
// Handle error.
return
}
}
// Do something with frame...
ack := ws . NewTextFrame ([] byte ( "this is an acknowledgement" ))
// Compress response unconditionally.
ack , err = wsflate . CompressFrame ( ack )
if err != nil {
// Handle error.
return
}
if err = ws . WriteFrame ( conn , ack ); err != nil {
// Handle error.
return
}
}
}()
}
}
您可以透過以下方式將壓縮與wsutil
套件一起使用:
// Upgrade somehow and negotiate compression to get the conn...
// Initialize flate reader. We are using nil as a source io.Reader because
// we will Reset() it in the message i/o loop below.
fr := wsflate . NewReader ( nil , func ( r io. Reader ) wsflate. Decompressor {
return flate . NewReader ( r )
})
// Initialize flate writer. We are using nil as a destination io.Writer
// because we will Reset() it in the message i/o loop below.
fw := wsflate . NewWriter ( nil , func ( w io. Writer ) wsflate. Compressor {
f , _ := flate . NewWriter ( w , 9 )
return f
})
// Declare compression message state variable.
//
// It has two goals:
// - Allow users to check whether received message is compressed or not.
// - Help wsutil.Reader and wsutil.Writer to set/unset appropriate
// WebSocket header bits while writing next frame to the wire (it
// implements wsutil.RecvExtension and wsutil.SendExtension).
var msg wsflate. MessageState
// Initialize WebSocket reader as previously.
// Please note the use of Reader.Extensions field as well as
// of ws.StateExtended flag.
rd := & wsutil. Reader {
Source : conn ,
State : ws . StateServerSide | ws . StateExtended ,
Extensions : []wsutil. RecvExtension {
& msg ,
},
}
// Initialize WebSocket writer with ws.StateExtended flag as well.
wr := wsutil . NewWriter ( conn , ws . StateServerSide | ws . StateExtended , 0 )
// Use the message state as wsutil.SendExtension.
wr . SetExtensions ( & msg )
for {
h , err := rd . NextFrame ()
if err != nil {
// handle error.
}
if h . OpCode . IsControl () {
// handle control frame.
}
if ! msg . IsCompressed () {
// handle uncompressed frame (skipped for the sake of example
// simplicity).
}
// Reset the writer to echo same op code.
wr . Reset ( h . OpCode )
// Reset both flate reader and writer to start the new round of i/o.
fr . Reset ( rd )
fw . Reset ( wr )
// Copy whole message from reader to writer decompressing it and
// compressing again.
if _ , err := io . Copy ( fw , fr ); err != nil {
// handle error.
}
// Flush any remaining buffers from flate writer to WebSocket writer.
if err := fw . Close (); err != nil {
// handle error.
}
// Flush the whole WebSocket message to the wire.
if err := wr . Flush (); err != nil {
// handle error.
}
}