ストリーミングプロトコルは、GolangのTCPを介してメッセージをバッファリングします
Buffstreamsは、メッセージの長さ +メッセージ自体(プロトコルバッファーなど)を含む形式でデータを書き込むストリーミング接続のTCPConns上の抽象化のセットです。
Buffstreamsは、特定のポートで(ブロッキングまたは非非)リスナーを起動するためのシンプルなインターフェイスを提供します。これにより、生のバイトの配列があなたがそれを提供するコールバックにストリーミングします。このようにして、バフストリームはそれほどデーモンではなく、プロトコルバッファーメッセージを使用してTCPを介して通信できるネットワーク化されたサービスを構築するライブラリです。
私はGolangでFunのためにいくつかの異なるプロジェクトを書いていて、図書館にあるもののようなコードを書き続けましたが、あまり組織化されていません。ネットワークコードに焦点を当て、それを引き出して改善することにしました。
ここのバフストリームやコードについて特別なものや魔法のようなものはありません。アイデアは、それがより良い、より高速なソケットの抽象化であるということではありません - プロトバフメッセージのようなストリーミングデータを処理するときに、パフォーマンスへの影響がほとんどなく、あなたのために多くのボイラープレートを行うことです。現在、バフストリームは1秒あたり110万件以上の混乱を行うことができ、1Gig NICを飽和させる単一のリスニングソケットのメッセージごとに110バイトで行うことができます。
バフストリームのアイデアは、退屈な部分を実行し、一般的なエラーを処理し、できるだけ少ないオーバーヘッドでパフォーマンスを発揮しながら、その上にシステムを作成できるようにすることです。
Protobuffメッセージにはあらゆる種類の自然区分がないため、Buffstreamsは、実際のペイロードのサイズを記述するバイトの固定ヘッダー(構成可能)を追加する方法を使用します。これは、書く呼び出しによって、あなたのために処理されます。自分でサイズを詰める必要はありません。
サーバー側では、これらのペイロードを聞き、固定ヘッダーを読んでから、後続のメッセージを読み取ります。サーバーは、これが機能するためにクライアントと同じ最大サイズを持っている必要があります。バフストリームは、そのポートで受信したメッセージを処理するために提供されたコールバックにバイト配列を渡します。メッセージをゆったりとし、その価値を解釈することはあなた次第です。
重要な注意点の1つは、内部的には、バフストリームが実際にプロトコルバッファリングライブラリ自体を使用したり依存したりしないことです。すべてのシリアル化 /脱介入は、バフストリームとの相互作用の前後にクライアントによって処理されます。このようにして、理論的には、このライブラリを使用して、バイト +後続のメッセージ本文の固定ヘッダーと同じ戦略を使用するTCPを介してデータをストリーミングできます。
現在、プロトコルバファーメッセージにのみ使用しています。
オプションでエラーのロギングを有効にすることができますが、これには自然に極端な負荷の下でパフォーマンスペナルティが付属しています。
私はバフストリームを可能な限り最適化するために非常に懸命に努力しました。平均は1秒あたり1Mを超えるメッセージを維持しようと努力しています。
ベンチを参照してください
ライブラリをダウンロードします
go get "github.com/StabbyCutyou/buffstreams"
ライブラリをインポートします
import "github.com/StabbyCutyou/buffstreams"
完全なエンドツーエンドクライアントとサーバーの簡単な例については、test/directoryの例、つまりtest/client/test_client.goとtest/server/test_server.goをご覧ください。これらの2つのファイルは、可能な限り単純な方法で、バフストリームのエンドツーエンド統合を実証するために協力するように設計されています。
バフストリームのコアオブジェクトの1つは、Tcplistenerです。この構造体を使用すると、ローカルポートでソケットを開き、クライアントが接続するのを待ち始めます。接続が行われると、クライアントが書いた各メッセージはリスナーによって受信され、定義するコールバックはメッセージコンテンツ(バイトの配列)で呼び出されます。
リスニングを開始するには、最初にtcplistenerconfigオブジェクトを作成して、リスナーがどのように動作するかを定義します。サンプルtcplistenerconfigは次のようになるかもしれません:
cfg := TCPListenerConfig {
EnableLogging : false , // true will have log messages printed to stdout/stderr, via log
MaxMessageSize : 4096 ,
Callback : func ( byte []) error { return nil } // Any function type that adheres to this signature, you'll need to deserialize in here if need be
Address : FormatAddress ( "" , strconv . Itoa ( 5031 )) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience. For listening, you normally don't want to provide an ip unless you have a reason.
}
btl , err := buffstreams . ListenTCP ( cfg )
この方法でリスナーを開くと、ソケットは現在使用されていますが、リスナー自体はまだ接続を受け入れ始めていません。
そのためには、2つの選択肢があります。デフォルトでは、この操作は現在のスレッドをブロックします。あなたがそれを避け、火を使ってアプローチを忘れるなら、あなたは電話することができます
err := btl . StartListeningAsync ()
起動中にエラーが発生した場合、この方法によって返されます。または、自分でコールの実行を処理したい場合、またはブロックすることを気にしない場合は、電話をかけることができます
err := btl . StartListening ()
バフストリームの処理方法は、着信メッセージの上に作用することです。バイトで動作するためのコールバックを提供できるようにします。 ListEncallbackは、バイトの配列/スライスを取り入れ、エラーを返します。
type ListenCallback func ([] byte ) error
コールバックは、特定のプロトバフメッセージの生のバイトを受信します。サイズを含むヘッダーは削除されます。メッセージにdaserializeを脱直し、行動することはコールバックの責任です。
リスナーがメッセージを受け取り、コールバックが作業を行います。
サンプルコールバックはそうするように始まるかもしれません:
func ListenCallbackExample ([] byte data ) error {
msg := & message. ImportantProtoBuffStreamingMessage {}
err := proto . Unmarshal ( data , msg )
// Now you do some stuff with msg
...
}
現在、コールバックは独自のGoroutineで実行されています。これは、読者が切断されるまで接続から読み取りを処理するか、エラーが発生します。接続から読み取られるエラーは、処理するクライアント次第です。
新しい接続へのメッセージの書き込みを開始するには、tcpconnconfigを使用してダイヤルする必要があります
cfg := TCPConnConfig {
EnableLogging : false , // true will have log messages printed to stdout/stderr, via log
MaxMessageSize : 4096 , // You want this to match the MaxMessageSize the server expects for messages on that socket
Address : FormatAddress ( "127.0.0.1" , strconv . Itoa ( 5031 )) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience.
}
構成オブジェクトができたら、ダイヤルアウトできます。
btc , err := buffstreams . DialTCP ( cfg )
これにより、指定された場所のエンドポイントへの接続が開きます。さらに、TCPLISTENERが返すTCPConnは、以下と同じ方法を使用してデータを記述することもできます。
そこから、データを書くことができます
bytesWritten , err := btc . Write ( msgBytes , true )
書面にエラーがある場合、その接続は閉じられ、次の書き込みで再開されます。再接続が発生するエラーが発生した場合に、byteswritten値が> 0になるかどうかは保証はありません。
3番目のオプション、提供されたマネージャークラスがあります。このクラスは、ポートを介したダイヤルとリスニングをめぐるシンプルで効果的なマネージャーの抽象化を提供し、あなたのための接続を管理します。着信接続をダイヤルアウトまたはリスニングするための通常の構成を提供し、マネージャーに参照を保持させます。マネージャーは、ロックを内部的に使用して、保持されている接続への同時アクセス間の一貫性と調整を確保するため、ThreadSafeと見なされます。
マネージャーは実際には「プール」ではありません。なぜなら、再利用するためのX接続を開いて保持しないからです。ただし、接続のキャッシングや再利用など、プールと同じ動作の多くを維持しており、前述のようにThreadSafeです。
マネージャーの作成
bm := buffstreams . NewManager ()
ポートで聴く。マネージャーは常にこの非同期および非ブロッキングを行います
// Assuming you've got a configuration object cfg, see above
err := bm . StartListening ( cfg )
リモートエンドポイントにダイヤルアウトします
// Assuming you've got a configuration object cfg, see above
err := bm . Dial ( cfg )
接続を開いて、その接続に一定の方法で書き込み
bytesWritten , err := bm . Write ( "127.0.0.1:5031" , dataBytes )
マネージャーは聴き続け、内部的にキャッシュされた接続をダイヤルします。開くと、開いたままになります。作家は、あなたの同じ住所に手紙を書くたびに、正しい作家が再利用されるように、あなたの着信の書き込み宛先と一致します。リスニング接続は、単に公開されたままで、リクエストを受信するのを待っています。
どちらかを呼び出すことで、これらの接続を強制的に閉じることができます
err := bm . CloseListener ( "127.0.0.1:5031" )
または
err := bm . CloseWriter ( "127.0.0.1:5031" )
バグを報告した人やバフストリームの改善を助けてくれた人に感謝します
Apache V2-ライセンスを参照してください