Una tubería de procesamiento (similar a logstash) escrita en Go
Clonar e instalar dependencias:
mkdir -p $GOPATH/src/github.com/urban-1
cd $GOPATH/src/github.com/urban-1
git clone [email protected]:urban-1/gopipe.git
dep ensure
Hay un Makefile que puedes usar para construir y comenzar a trabajar rápidamente en este proyecto:
$ make help
Available targets:
rdkafka Build librdkafka locally
gopipe Build gopipe
tests Run tests (includes fmt and vet)
show_coverage Show coverage results
Nuestro objetivo es definir un canal donde los eventos se reciben y procesan de forma modular. Los eventos pueden contener:
Data["raw"]
)Data["message"]
)Data
como map[string]interface{}
)Llamamos a nuestros módulos "Componentes" y se dividen en tres categorías:
En esencia, todos los componentes son "iguales" (implementan la misma interfaz). La única diferencia es qué canales están disponibles para ellos.
Bueno... Hice algo similar en C++ para procesar paquetes de netflow y pensé que, dado que Go es (realmente) rápido y concurrente, es una combinación perfecta para dicha aplicación.
¿Por qué no utilizar algo que ya existe? Podríamos ampliar un marco existente; sin embargo, este es un ejercicio de aprendizaje de Go para replicar el código C++...
¿En qué se diferencia eso? Nos centramos en una perspectiva de sistemas y queremos que este marco esté más orientado a redes/datos que log
:
flowreplicator.json
.¿Cuáles son los planes futuros? Planeamos mantener y ampliar esto hasta que portemos completamente nuestro código C++... El mantenimiento continuará, pero esperamos ampliarlo según sea necesario con la ayuda de la comunidad.
Replicará y opcionalmente tomará muestras de paquetes UDP:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "UDPRawInput",
"listen": "0.0.0.0",
"port": 9090
},
"proc": [
{
"module": "SamplerProc",
"every": 2
},
{
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9091
}
],
"out": {
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9092
}
}
Recibe líneas a través de TCP, las analiza en campos de datos, agrega marcas de tiempo, convierte algunos datos a diferentes tipos de datos, descarta el mensaje original, aplica hash en algunos campos, etc.
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "TCPStrInput",
"listen": "0.0.0.0",
"port": 9092,
"headers": ["hello", "test", "src"],
"separator": ",",
"convert": false
},
"proc": [
{
"module": "RegexProc",
"regexes": [
"(?mi)(?P<host>[0-9a-z]+) (?P<port>[0-9]+): (?P<hostEvent>.*)"
]
},
{
"module": "DropFieldProc",
"field_name": "message"
},
{
"module": "CastProc",
"fields": ["port"],
"types": ["int"]
},
{
"module": "InListProc",
"in_field": "port",
"out_field": "port_block",
"reload_minutes": 100000000,
"list": ["8080", "443", "23230", "14572", "17018"]
},
{"module": "if", "condition": "port_block == true "},
{
"module": "Md5Proc",
"in_fields": ["host"],
"out_fields": ["host_hash"],
"salt": "andPepper!"
},
{"module": "else"},
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{"module": "endif"},
{
"module": "LogProc",
"level": "info"
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
Reciba en un socket TCP escuchando la línea JSON:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 10000000
},
"in": {
"module": "TCPJSONInput",
"listen": "0.0.0.0",
"port": 9092
},
"proc": [
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{
"module": "LPMProc",
"filepath": "/tmp/prefix-asn.txt",
"reload_minutes": 1440,
"in_fields": ["src", "dst"],
"out_fields": [
{"newkey": "_{{in_field}}_prefix", "metakey": "prefix"},
{"newkey": "_{{in_field}}_asn", "metakey": "asn"}
]
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
La siguiente parte de configuración define una tarea que se ejecuta cada 10 segundos. Generalmente le gustaría actualizar las fuentes de los archivos para los componentes InListProc
y LPMProc
... En tales casos, la idea es que tenga un pequeño script de shell en algún lugar de su sistema que actualizará sus archivos locales. Luego necesitas "invocar" una recarga para cargar los nuevos datos en la memoria:
...
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
},
"tasks": [
{
"name": "LSing...",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": [
{"mod": 4, "signal": "reload"}
]
},
{
"name": "LSing...2",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": []
}
]
...
Arriba definimos dos tareas. La diferencia entre ellos es que el primero señalará a un componente si se ejecuta correctamente. La señal "reload"
se enviará al componente 4
y depende del componente manejarla.
El índice del componente se define como el orden de este componente en la configuración, incluidos los componentes de entrada . Dado que por el momento solo admitimos una entrada, el componente 4
anterior es el tercero en la sección proc
.
UseNumber()
para obtener una salida correcta; sin embargo, rompe govaluate
, por lo que al comparar debe usar json_to_float64()
. Vea TestIf
por ejemplo... ¡Hola! La idea es que podamos proporcionar capacidad de procesamiento de canalización configurable en JSON para el usuario final. Sin embargo, necesitamos más componentes para varios trabajos y tal vez códecs.
Los componentes deben ser extremadamente fáciles de implementar. Utilice proc/log.go
como punto de partida (~60 LOC) para implementar su componente.
Códecs: eche un vistazo rápido a linecodecs.go
. Se pueden implementar fácilmente codificadores/decodificadores de nueva línea. Luego se pueden conectar a módulos de entrada/salida.
¿No estás seguro de en qué ayudar? eche un vistazo a TODO.md. Como siempre, comentarios, sugerencias, documentación, informes de errores, etc. son más que bienvenidos :)