Потоковое протокол буферизирует сообщения по TCP в Golang
Buffstreams - это набор абстракции по TCPConns для потоковых соединений, которые записывают данные в формате, включающем длину сообщения + сама полезная нагрузка сообщения (например, буферы протокола, отсюда и имя).
Buffstreams дает вам простой интерфейс, чтобы запустить (блокировку или не) слушатель на данном порту, который будет транслировать массивы сырых байтов в обратный вызов, который вы его предоставите. Таким образом, Buffstreams - это не столько демон, а библиотека для создания сетевых сервисов, которые могут передавать TCP с помощью сообщений буфера протокола.
Я писал несколько разных проектов для развлечения в Голанге и продолжал писать код чего -то вроде того, что находится в библиотеке, но менее организован. Я решил сосредоточиться на сетевом коде, вытащить его и улучшить его, чтобы я знал, что ему можно доверять надежно выполнять в разных проектах.
Нет ничего особенного или волшебного в Buffstreams, или код здесь. Идея не в том, что это лучшая, более быстрая абстракция сокетов - это сделать как можно большую часть шаблона для обработки потоковых данных, таких как сообщения Protobuff, с максимально небольшим влиянием на производительность. В настоящее время Buffstreams может делать более 1,1 миллиона беспорядков в секунду, на 110 байт на сообщение на одном сокете прослушивания, который насыщает 1GIG NIC.
Идея Buffstreams состоит в том, чтобы делать скучные детали и обрабатывать общие ошибки, позволяя вам писать системы сверху, выполняя с максимально возможным количеством накладных расходов.
Поскольку у сообщений Protobuff отсутствует какой -либо естественный делиметр, Buffstreams использует метод добавления фиксированного заголовка байтов (который настраивается), который описывает размер фактической полезной нагрузки. Это обрабатывается для вас, по призыву написать. Вам никогда не нужно собирать размер самостоятельно.
На стороне сервера он прослушивает эти полезные нагрузки, прочитал фиксированный заголовок, а затем последующее сообщение. Сервер должен иметь такой же максимальный размер, что и клиент для работы. Buffstreams затем передаст массив байтов к обратном вызове, который вы предоставили для обработки сообщений, полученных в этом порту. Освободивание сообщений и интерпретация их ценности зависит от вас.
Одним из важных примечаний является то, что внутри Buffstreams фактически не используется и не полагается на саму библиотеку буферов протокола. Вся сериализация / десериализация обрабатывается клиентом до / после взаимодействия с Buffstreams. Таким образом, вы можете теоретически использовать эту библиотеку для потоковой передачи любых данных по TCP, которая использует ту же стратегию фиксированного заголовка байтов + последующего корпуса сообщений.
В настоящее время я использовал его только для сообщений Protocolbuffers.
При желании вы можете включить регистрацию ошибок, хотя это естественно поставляется с штрафом на производительности под экстремальной нагрузкой.
Я очень старался оптимизировать Buffstreams как можно лучше, стремясь сохранить их средние значения выше 1 млн. Сообщений в секунду, без ошибок во время транзита.
Смотрите скамейку
Скачать библиотеку
go get "github.com/StabbyCutyou/buffstreams"
Импортировать библиотеку
import "github.com/StabbyCutyou/buffstreams"
Для быстрого примера полного клиента и сервера конечного к конечному клиенту, ознакомьтесь с примерами в тесте/каталоге, а именно Test/Client/Test_client.go и Test/Server/test_server.go. Эти два файла предназначены для совместной работы, чтобы продемонстрировать конечную интеграцию Buffstreams самым простым возможным способом.
Одним из основных объектов в Buffstreams является tcplistener. Эта структура позволяет вам открывать гнездо на локальном порту и начать ждать, пока клиенты подключаются. После того, как соединение будет сделано, каждое полное сообщение, написанное клиентом, будет получено слушателем, и определенный вызыв, который вы определяете, будет вызван содержимым сообщением (массив байтов).
Чтобы начать слушать, сначала создайте объект tcplistenerconfig, чтобы определить, как должен вести себя слушатель. Образец tcplistenerconfig может выглядеть так:
cfg := TCPListenerConfig {
EnableLogging : false , // true will have log messages printed to stdout/stderr, via log
MaxMessageSize : 4096 ,
Callback : func ( byte []) error { return nil } // Any function type that adheres to this signature, you'll need to deserialize in here if need be
Address : FormatAddress ( "" , strconv . Itoa ( 5031 )) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience. For listening, you normally don't want to provide an ip unless you have a reason.
}
btl , err := buffstreams . ListenTCP ( cfg )
После того, как вы открыли слушателя таким образом, розетка теперь используется, но сам слушатель еще не начал принимать соединения.
Для этого у вас есть два варианта. По умолчанию эта операция заблокирует текущий поток. Если вы хотите избежать этого, и использовать подход к огню и забыть, вы можете позвонить
err := btl . StartListeningAsync ()
Если есть ошибка во время запуска, она будет возвращена этим методом. В качестве альтернативы, если вы хотите справиться с заполнением вызова самостоятельно, или не волнует, что он блокирует, вы можете позвонить
err := btl . StartListening ()
То, как Buffstreams обрабатывает, действуя над входящими сообщениями, - это позволить вам предоставить обратный вызов для работы на байтах. Listencallback берет массив/ломтик байтов и возвращает ошибку.
type ListenCallback func ([] byte ) error
Обратный вызов получит необработанные байты для данного сообщения Protobuff. Заголовок, содержащий размер, будет удален. Ответственность за обратные вызовы обязаны покинуть и действовать в соответствии с сообщением.
Слушатель получает сообщение, ваш обратный вызов выполняет работу.
Образец обратного вызова может начаться так:
func ListenCallbackExample ([] byte data ) error {
msg := & message. ImportantProtoBuffStreamingMessage {}
err := proto . Unmarshal ( data , msg )
// Now you do some stuff with msg
...
}
Обратный вызов в настоящее время запускается в собственном goroutine, который также обрабатывает чтение из соединения до тех пор, пока читатель не отключается, или произойдет ошибка. Любые ошибки, чтение с входящего подключения, будут соответствовать клиенту для обработки.
Чтобы начать запись сообщений в новое соединение, вам нужно набрать TcpConnconfig
cfg := TCPConnConfig {
EnableLogging : false , // true will have log messages printed to stdout/stderr, via log
MaxMessageSize : 4096 , // You want this to match the MaxMessageSize the server expects for messages on that socket
Address : FormatAddress ( "127.0.0.1" , strconv . Itoa ( 5031 )) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience.
}
После того, как у вас есть объект конфигурации, вы можете набрать.
btc , err := buffstreams . DialTCP ( cfg )
Это откроет соединение с конечной точкой в указанном месте. Кроме того, TCPConn, который возвращает tcplistener, также позволит вам записывать данные, используя те же методы, что и ниже.
Оттуда вы можете написать свои данные
bytesWritten , err := btc . Write ( msgBytes , true )
Если в письменной форме есть ошибка, это соединение будет закрыто и будет повторно открыто при следующей записи. Нет никакой гарантии, если таковое значение, написанное на байса, будет> 0 или нет в случае ошибки, которая приводит к повторному подключению.
Есть третий вариант, предоставленный класс менеджера. Этот класс даст вам простую, но эффективную абстракцию менеджера по поводу набора и прослушивания портов, управляя подключениями для вас. Вы предоставляете обычную конфигурацию для набора или прослушивания входящих соединений и позволяете менеджеру удерживать ссылки. Менеджер считается Threadsafe, так как он внутренне использует замки для обеспечения согласованности и координации между параллельным доступом к соединениям, проводимым.
Менеджер на самом деле не «пул», в котором он не открывается и не подключает x подключения для повторного использования. Тем не менее, он поддерживает многие из тех же поведений, что и пул, включая кэширование и повторное использование соединений, и, как упоминалось, является Threadsafe.
Создание менеджера
bm := buffstreams . NewManager ()
Слушать на порту. Менеджер всегда делает это асинхронным и не блокирующим
// Assuming you've got a configuration object cfg, see above
err := bm . StartListening ( cfg )
Набрать в удаленную конечную точку
// Assuming you've got a configuration object cfg, see above
err := bm . Dial ( cfg )
Открывая связь, писая на эту связь постоянным образом
bytesWritten , err := bm . Write ( "127.0.0.1:5031" , dataBytes )
Менеджер будет продолжать слушать и набрать соединения, кэшированные внутри. Как только вы откроете один, он будет открыт. Автор будет соответствовать вашему входящему пункту назначения записи, так что в любое время, когда вы пишете по тому же адресу, правильный писатель будет повторно использован. Подключение к прослушиванию просто останется открытым, ожидая получения запросов.
Вы можете насильственно закрыть эти соединения, позвонив либо
err := bm . CloseListener ( "127.0.0.1:5031" )
или
err := bm . CloseWriter ( "127.0.0.1:5031" )
Особая благодарность тем, кто сообщил об ошибках или помог мне улучшить Buffstreams
Apache V2 - см. Лицензию