Конвейер обработки (похожий на logstash), написанный на Go.
Клонируйте и установите зависимости:
mkdir -p $GOPATH/src/github.com/urban-1
cd $GOPATH/src/github.com/urban-1
git clone [email protected]:urban-1/gopipe.git
dep ensure
Существует Makefile, который вы можете использовать для быстрой сборки и начала работы над этим проектом:
$ make help
Available targets:
rdkafka Build librdkafka locally
gopipe Build gopipe
tests Run tests (includes fmt and vet)
show_coverage Show coverage results
Наша цель — определить конвейер, в котором события принимаются и обрабатываются модульным способом. События могут содержать:
Data["raw"]
)Data["message"]
)Data
в виде map[string]interface{}
)Мы называем наши модули «Компоненты», и они делятся на три категории:
По сути, все компоненты «одинаковы» (реализуют один и тот же интерфейс). Разница лишь в том, какие каналы им доступны.
Что ж... Я сделал что-то подобное на C++ для обработки сетевых пакетов и подумал, что, поскольку Go (действительно) быстрый и параллельный, он идеально подходит для такого приложения.
Почему бы не использовать что-то уже существующее: мы могли бы расширить существующую структуру, однако это упражнение по изучению Go для репликации кода C++...
Чем это отличается? Мы фокусируемся на системной перспективе и хотим, чтобы эта структура была больше ориентирована на сеть/данные, а не на log
:
flowreplicator.json
.Каковы планы на будущее: Мы планируем поддерживать и расширять это до тех пор, пока мы полностью не перенесем наш код C++... Техническое обслуживание будет продолжаться, но мы надеемся, что будем расширять его по мере необходимости с помощью сообщества.
Будет реплицировать и, при необходимости, пробовать UDP-пакеты:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "UDPRawInput",
"listen": "0.0.0.0",
"port": 9090
},
"proc": [
{
"module": "SamplerProc",
"every": 2
},
{
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9091
}
],
"out": {
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9092
}
}
Получает строки по TCP, анализирует их в поля данных, добавляет метку времени, преобразует некоторые данные в другие типы данных, отбрасывает исходное сообщение, хэширует некоторые поля и т. д.
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "TCPStrInput",
"listen": "0.0.0.0",
"port": 9092,
"headers": ["hello", "test", "src"],
"separator": ",",
"convert": false
},
"proc": [
{
"module": "RegexProc",
"regexes": [
"(?mi)(?P<host>[0-9a-z]+) (?P<port>[0-9]+): (?P<hostEvent>.*)"
]
},
{
"module": "DropFieldProc",
"field_name": "message"
},
{
"module": "CastProc",
"fields": ["port"],
"types": ["int"]
},
{
"module": "InListProc",
"in_field": "port",
"out_field": "port_block",
"reload_minutes": 100000000,
"list": ["8080", "443", "23230", "14572", "17018"]
},
{"module": "if", "condition": "port_block == true "},
{
"module": "Md5Proc",
"in_fields": ["host"],
"out_fields": ["host_hash"],
"salt": "andPepper!"
},
{"module": "else"},
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{"module": "endif"},
{
"module": "LogProc",
"level": "info"
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
Получите сокет TCP, прослушивающий строку JSON:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 10000000
},
"in": {
"module": "TCPJSONInput",
"listen": "0.0.0.0",
"port": 9092
},
"proc": [
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{
"module": "LPMProc",
"filepath": "/tmp/prefix-asn.txt",
"reload_minutes": 1440,
"in_fields": ["src", "dst"],
"out_fields": [
{"newkey": "_{{in_field}}_prefix", "metakey": "prefix"},
{"newkey": "_{{in_field}}_asn", "metakey": "asn"}
]
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
Следующая часть конфигурации определяет задачу, которая запускается каждые 10 секунд. Обычно вы хотите обновить источники файлов для компонентов InListProc
и LPMProc
... В таких случаях идея состоит в том, что где-то в вашей системе есть небольшой сценарий оболочки, который будет обновлять ваши локальные файлы. Затем вам нужно «вызвать» перезагрузку, чтобы загрузить новые данные в память:
...
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
},
"tasks": [
{
"name": "LSing...",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": [
{"mod": 4, "signal": "reload"}
]
},
{
"name": "LSing...2",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": []
}
]
...
Выше мы определили две задачи. Разница между ними в том, что первый сигнализирует компоненту об успешном запуске. Сигнал "reload"
будет отправлен компоненту 4
, и компонент должен его обработать.
Индекс компонента определяется как порядок этого компонента в конфигурации, включая входные компоненты . Учитывая, что на данный момент мы поддерживаем только один вход, компонент 4
выше является третьим в разделе proc
.
UseNumber()
необходимо для правильного вывода, однако оно нарушает govaluate
, поэтому при сравнении вам придется использовать json_to_float64()
. См., например, TestIf
... Привет! Идея состоит в том, что мы можем предоставить конечному пользователю возможность настраиваемой конвейерной обработки JSON. Однако нам нужно больше компонентов для различных задач и, возможно, кодеков!
Компоненты должны быть чрезвычайно простыми в реализации. Используйте proc/log.go
в качестве отправной точки (~60 LOC) для реализации вашего компонента.
Кодеки: быстро просмотрите linecodecs.go
. Можно легко реализовать новые кодеры/декодеры строк. Затем их можно подключить к модулям ввода/вывода.
Не знаете, чем помочь? загляните на TODO.md. Как всегда, комментарии, предложения, документация, отчеты об ошибках и т. д. более чем приветствуются :)