Pesan buffer protokol streaming melalui TCP di Golang
Buffstreams adalah satu set abstraksi di atas tcpconns untuk koneksi streaming yang menulis data dalam format yang melibatkan panjang pesan + pesan muatan itu sendiri (seperti buffer protokol, maka namanya).
Buffstreams memberi Anda antarmuka sederhana untuk memulai (memblokir atau tidak) pendengar pada port tertentu, yang akan mengalirkan array byte mentah ke callback yang Anda berikan. Dengan cara ini, Buffstreams bukanlah daemon, tetapi perpustakaan untuk membangun layanan jaringan yang dapat berkomunikasi melalui TCP menggunakan pesan buffer protokol.
Saya sedang menulis beberapa proyek berbeda untuk bersenang -senang di Golang, dan terus menulis kode sesuatu seperti apa yang ada di perpustakaan, tetapi kurang terorganisir. Saya memutuskan untuk fokus pada kode jaringan, menariknya keluar dan memperbaikinya sehingga saya tahu itu bisa dipercaya untuk melakukan dengan andal di seluruh proyek.
Tidak ada yang istimewa atau ajaib tentang Buffstreams, atau kode di sini. Idenya bukanlah abstraksi soket yang lebih baik, lebih cepat - itu untuk melakukan sebanyak mungkin boilerplate untuk Anda saat menangani data streaming seperti pesan protobuff, dengan dampak yang sesedikit mungkin terhadap kinerja. Saat ini, Buffstreams mampu melakukan lebih dari 1,1 juta messsages per detik, pada 110 byte per pesan pada soket mendengarkan tunggal yang menjenuhkan 1gig NIC.
Gagasan Buffstreams adalah melakukan bagian yang membosankan dan menangani kesalahan umum, memungkinkan Anda untuk menulis sistem di atasnya, sambil melakukan dengan overhead sesedikit mungkin.
Karena pesan protobuff tidak memiliki jenis perampasan alami apa pun, Buffstreams menggunakan metode menambahkan header byte tetap (yang dapat dikonfigurasi) yang menggambarkan ukuran muatan yang sebenarnya. Ini ditangani untuk Anda, melalui panggilan untuk menulis. Anda tidak perlu mengemas sendiri ukurannya.
Di sisi server, itu akan mendengarkan muatan ini, membaca header tetap, dan kemudian pesan berikutnya. Server harus memiliki ukuran maksimum yang sama dengan klien agar ini berfungsi. Buffstreams kemudian akan meneruskan array byte ke panggilan balik yang Anda berikan untuk menangani pesan yang diterima di port itu. Deserializing pesan dan menafsirkan nilainya terserah Anda.
Salah satu catatan penting adalah bahwa secara internal, Buffstreams tidak benar -benar menggunakan atau mengandalkan perpustakaan buffer protokol itu sendiri dengan cara apa pun. Semua serialisasi / deserialisasi ditangani oleh klien sebelum / setelah interaksi dengan Buffstreams. Dengan cara ini, Anda secara teoritis dapat menggunakan pustaka ini untuk mengalirkan data apa pun melalui TCP yang menggunakan strategi yang sama dari header byte tetap + badan pesan berikutnya.
Saat ini, saya hanya menggunakannya untuk pesan ProtocolBuffer.
Anda secara opsional dapat mengaktifkan pencatatan kesalahan, meskipun ini secara alami dilengkapi dengan penalti kinerja di bawah beban ekstrem.
Saya sudah berusaha sangat keras untuk mengoptimalkan buffstreams sebaik mungkin, berusaha untuk menjaga rata -rata di atas 1 juta pesan per detik, tanpa kesalahan selama transit.
Lihat Bench
Unduh Perpustakaan
go get "github.com/StabbyCutyou/buffstreams"
Impor Perpustakaan
import "github.com/StabbyCutyou/buffstreams"
Untuk contoh cepat dari klien dan server ujung ke ujung lengkap, periksa contoh -contoh dalam tes/direktori, yaitu tes/klien/test_client.go dan test/server/test_server.go. Kedua file ini dirancang untuk bekerja bersama untuk menunjukkan integrasi ujung ke ujung buffstreams, dengan cara yang paling sederhana.
Salah satu objek inti di Buffstreams adalah TcPlistener. Struct ini memungkinkan Anda untuk membuka soket di port lokal, dan mulai menunggu klien untuk terhubung. Setelah koneksi dibuat, setiap pesan lengkap yang ditulis oleh klien akan diterima oleh pendengar, dan panggilan balik yang Anda definisikan akan dipanggil dengan konten pesan (array byte).
Untuk mulai mendengarkan, pertama -tama buat objek TcPlistenerConfig untuk menentukan bagaimana pendengar harus berperilaku. Sampel tcplistenerconfig mungkin terlihat seperti ini:
cfg := TCPListenerConfig {
EnableLogging : false , // true will have log messages printed to stdout/stderr, via log
MaxMessageSize : 4096 ,
Callback : func ( byte []) error { return nil } // Any function type that adheres to this signature, you'll need to deserialize in here if need be
Address : FormatAddress ( "" , strconv . Itoa ( 5031 )) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience. For listening, you normally don't want to provide an ip unless you have a reason.
}
btl , err := buffstreams . ListenTCP ( cfg )
Setelah Anda membuka pendengar dengan cara ini, soket sekarang sedang digunakan, tetapi pendengar itu sendiri belum mulai menerima koneksi.
Untuk melakukannya, Anda memiliki dua pilihan. Secara default, operasi ini akan memblokir utas saat ini. Jika Anda ingin menghindarinya, dan menggunakan pendekatan api dan lupa, Anda dapat menelepon
err := btl . StartListeningAsync ()
Jika ada kesalahan saat memulai, itu akan dikembalikan dengan metode ini. Atau, jika Anda ingin menangani menjalankan panggilan sendiri, atau tidak peduli bahwa itu memblokir, Anda dapat menelepon
err := btl . StartListening ()
Cara Buffstreams menangani yang bertindak atas pesan yang masuk adalah dengan membiarkan Anda memberikan panggilan balik untuk beroperasi pada byte. ListEnCallback mengambil array/irisan byte, dan mengembalikan kesalahan.
type ListenCallback func ([] byte ) error
Panggilan balik akan menerima byte mentah untuk pesan protobuff yang diberikan. Header yang berisi ukuran akan dilepas. Adalah tanggung jawab callback untuk deserialize dan bertindak berdasarkan pesan.
Pendengar mendapat pesan, panggilan balik Anda melakukan pekerjaan.
Sampel panggilan balik mungkin dimulai seperti itu:
func ListenCallbackExample ([] byte data ) error {
msg := & message. ImportantProtoBuffStreamingMessage {}
err := proto . Unmarshal ( data , msg )
// Now you do some stuff with msg
...
}
Callback saat ini dijalankan di goroutine sendiri, yang juga menangani pembacaan dari koneksi sampai pembaca memutuskan hubungan, atau ada kesalahan. Setiap kesalahan yang dibaca dari koneksi yang masuk akan tergantung pada klien untuk ditangani.
Untuk mulai menulis pesan ke koneksi baru, Anda harus menghubungi TCPConnConfig
cfg := TCPConnConfig {
EnableLogging : false , // true will have log messages printed to stdout/stderr, via log
MaxMessageSize : 4096 , // You want this to match the MaxMessageSize the server expects for messages on that socket
Address : FormatAddress ( "127.0.0.1" , strconv . Itoa ( 5031 )) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience.
}
Setelah Anda memiliki objek konfigurasi, Anda dapat menghubungi.
btc , err := buffstreams . DialTCP ( cfg )
Ini akan membuka koneksi ke titik akhir di lokasi yang ditentukan. Selain itu, TCPConn yang dikembalikan TCPlistener juga akan memungkinkan Anda untuk menulis data, menggunakan metode yang sama seperti di bawah ini.
Dari sana, Anda dapat menulis data Anda
bytesWritten , err := btc . Write ( msgBytes , true )
Jika ada kesalahan secara tertulis, koneksi itu akan ditutup dan dibuka kembali pada penulisan berikutnya. Tidak ada jaminan jika ada nilai byteswritten akan> 0 atau tidak jika terjadi kesalahan yang menghasilkan koneksi kembali.
Ada opsi ketiga, kelas manajer yang disediakan. Kelas ini akan memberi Anda abstraksi manajer yang sederhana namun efektif daripada panggilan dan mendengarkan port, mengelola koneksi untuk Anda. Anda memberikan konfigurasi normal untuk memutar atau mendengarkan koneksi yang masuk, dan membiarkan manajer memegang referensi. Manajer dianggap ThreadSafe, karena menggunakan kunci secara internal untuk memastikan konsistensi dan koordinasi antara akses bersamaan ke koneksi yang diadakan.
Manajer tidak benar-benar "kumpulan", karena tidak membuka dan menahan koneksi X untuk Anda gunakan kembali. Namun, ia mempertahankan banyak perilaku yang sama dengan kumpulan, termasuk caching dan menggunakan kembali koneksi, dan seperti yang disebutkan adalah ThreadSafe.
Membuat manajer
bm := buffstreams . NewManager ()
Mendengarkan di port. Manajer selalu membuat ini tidak sinkron dan tidak memblokir
// Assuming you've got a configuration object cfg, see above
err := bm . StartListening ( cfg )
Memanggil ke titik akhir jarak jauh
// Assuming you've got a configuration object cfg, see above
err := bm . Dial ( cfg )
Setelah membuka koneksi, menulis ke koneksi itu secara konstan
bytesWritten , err := bm . Write ( "127.0.0.1:5031" , dataBytes )
Manajer akan terus mendengarkan dan memutar koneksi yang di -cache secara internal. Setelah Anda membukanya, itu akan tetap terbuka. Penulis akan mencocokkan tujuan penulisan Anda yang masuk, sehingga setiap kali Anda menulis ke alamat yang sama, penulis yang benar akan digunakan kembali. Koneksi mendengarkan akan tetap terbuka, menunggu untuk menerima permintaan.
Anda dapat menutup koneksi ini secara paksa, dengan menelepon keduanya
err := bm . CloseListener ( "127.0.0.1:5031" )
atau
err := bm . CloseWriter ( "127.0.0.1:5031" )
Terima kasih khusus kepada mereka yang telah melaporkan bug atau membantu saya meningkatkan Buffstreams
Apache V2 - Lihat Lisensi