Um pipeline de processamento (semelhante ao logstash) escrito em Go
Clone e instale dependências:
mkdir -p $GOPATH/src/github.com/urban-1
cd $GOPATH/src/github.com/urban-1
git clone [email protected]:urban-1/gopipe.git
dep ensure
Existe um Makefile que você pode usar para construir rapidamente e começar a trabalhar neste projeto:
$ make help
Available targets:
rdkafka Build librdkafka locally
gopipe Build gopipe
tests Run tests (includes fmt and vet)
show_coverage Show coverage results
Nosso objetivo é definir um pipeline onde os eventos sejam recebidos e processados de forma modular. Os eventos podem conter:
Data["raw"]
)Data["message"]
)Data
como map[string]interface{}
)Chamamos nossos módulos de "Componentes" e eles se enquadram em três categorias:
Em essência, todos os componentes são "iguais" (implementam a mesma interface). A única diferença é quais canais são disponibilizados para eles.
Bem... eu fiz algo semelhante em C++ para processar pacotes netflow e pensei que Go é (realmente) rápido e simultâneo é uma combinação perfeita para tal aplicação.
Por que não usar algo que já existe: poderíamos estender uma estrutura existente, no entanto, este é um exercício de aprendizagem Go para replicar o código C++...
Como isso é diferente? Nós nos concentramos em uma perspectiva de sistemas e queremos que esta estrutura seja mais orientada para redes/dados, em vez de orientada log
:
flowreplicator.json
.Quais são os planos futuros: Planejamos manter e estender isso até portarmos totalmente nosso código C++... A manutenção continuará, mas esperamos estender conforme necessário com a ajuda da comunidade.
Irá replicar e, opcionalmente, amostrar pacotes UDP:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "UDPRawInput",
"listen": "0.0.0.0",
"port": 9090
},
"proc": [
{
"module": "SamplerProc",
"every": 2
},
{
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9091
}
],
"out": {
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9092
}
}
Recebe linhas sobre TCP, analisa-as em campos de dados, adiciona carimbo de data/hora, converte alguns dados em diferentes tipos de dados, descarta a mensagem original, faz hash de alguns campos, etc.
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "TCPStrInput",
"listen": "0.0.0.0",
"port": 9092,
"headers": ["hello", "test", "src"],
"separator": ",",
"convert": false
},
"proc": [
{
"module": "RegexProc",
"regexes": [
"(?mi)(?P<host>[0-9a-z]+) (?P<port>[0-9]+): (?P<hostEvent>.*)"
]
},
{
"module": "DropFieldProc",
"field_name": "message"
},
{
"module": "CastProc",
"fields": ["port"],
"types": ["int"]
},
{
"module": "InListProc",
"in_field": "port",
"out_field": "port_block",
"reload_minutes": 100000000,
"list": ["8080", "443", "23230", "14572", "17018"]
},
{"module": "if", "condition": "port_block == true "},
{
"module": "Md5Proc",
"in_fields": ["host"],
"out_fields": ["host_hash"],
"salt": "andPepper!"
},
{"module": "else"},
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{"module": "endif"},
{
"module": "LogProc",
"level": "info"
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
Receba em um soquete TCP ouvindo a linha JSON:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 10000000
},
"in": {
"module": "TCPJSONInput",
"listen": "0.0.0.0",
"port": 9092
},
"proc": [
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{
"module": "LPMProc",
"filepath": "/tmp/prefix-asn.txt",
"reload_minutes": 1440,
"in_fields": ["src", "dst"],
"out_fields": [
{"newkey": "_{{in_field}}_prefix", "metakey": "prefix"},
{"newkey": "_{{in_field}}_asn", "metakey": "asn"}
]
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
A parte de configuração a seguir define uma tarefa que é executada a cada 10 segundos. Normalmente você gostaria de atualizar fontes de arquivos para componentes InListProc
e LPMProc
... Nesses casos, a idéia é que você tenha um pequeno script de shell em algum lugar do seu sistema que atualizará seus arquivos locais. Então você precisa "invocar" um reload para carregar os novos dados na memória:
...
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
},
"tasks": [
{
"name": "LSing...",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": [
{"mod": 4, "signal": "reload"}
]
},
{
"name": "LSing...2",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": []
}
]
...
Acima definimos duas tarefas. A diferença entre eles é que o primeiro sinalizará um componente se ele for executado com sucesso. O sinal "reload"
será enviado ao componente 4
e cabe ao componente tratá-lo.
O índice do componente é definido como a ordem deste componente na configuração, incluindo componentes de entrada . Dado que no momento suportamos apenas uma entrada, o componente 4
acima é o terceiro na seção proc
.
UseNumber()
é necessária para a saída correta, no entanto, ela quebra govaluate
, portanto, ao comparar, você deve usar json_to_float64()
. Veja TestIf
por exemplo... Olá! A ideia é que possamos fornecer capacidade de processamento de pipeline configurável em JSON para o usuário final. No entanto, precisamos de mais componentes para vários trabalhos e talvez codecs!
Os componentes devem ser extremamente fáceis de implementar. Use proc/log.go
como ponto de partida (~60 LOC) para implementar seu componente.
Codecs: dê uma olhada rápida em linecodecs.go
. É possível implementar facilmente novos codificadores/decodificadores de linha. Eles podem então ser conectados a módulos de entrada/saída
Não tem certeza com o que ajudar? dê uma olhada em TODO.md Como sempre, comentários, sugestões, documentação, relatórios de bugs, etc. são mais que bem-vindos :)