Un pipeline de traitement (similaire à logstash) écrit en Go
Cloner et installer les dépendances :
mkdir -p $GOPATH/src/github.com/urban-1
cd $GOPATH/src/github.com/urban-1
git clone [email protected]:urban-1/gopipe.git
dep ensure
Il existe un Makefile que vous pouvez utiliser pour créer et commencer rapidement à travailler sur ce projet :
$ make help
Available targets:
rdkafka Build librdkafka locally
gopipe Build gopipe
tests Run tests (includes fmt and vet)
show_coverage Show coverage results
Notre objectif est de définir un pipeline où les événements sont reçus et traités de manière modulaire. Les événements peuvent contenir :
Data["raw"]
)Data["message"]
)Data
sous forme map[string]interface{}
)Nous appelons nos modules « Composants » et ceux-ci se répartissent en trois catégories :
Essentiellement, tous les composants sont « identiques » (implémentent la même interface). La seule différence réside dans les chaînes mises à leur disposition.
Eh bien... J'ai fait quelque chose de similaire en C++ pour le traitement des paquets netflow et j'ai pensé que puisque Go est (vraiment) rapide et simultané, il correspond parfaitement à une telle application.
Pourquoi ne pas utiliser quelque chose déjà disponible : nous pourrions étendre un framework existant, cependant, il s'agit d'un exercice d'apprentissage Go pour répliquer le code C++...
En quoi est-ce différent ? Nous nous concentrons sur une perspective systémique et nous souhaitons que ce cadre soit davantage orienté réseau/données plutôt que log
:
flowreplicator.json
.Quels sont les projets futurs : Nous prévoyons de maintenir et d'étendre cela jusqu'à ce que nous portions complètement notre code C++... La maintenance se poursuivra mais nous espérons que nous l'étendrons si nécessaire avec l'aide de la communauté.
Répliquera et éventuellement échantillonnera les paquets UDP :
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "UDPRawInput",
"listen": "0.0.0.0",
"port": 9090
},
"proc": [
{
"module": "SamplerProc",
"every": 2
},
{
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9091
}
],
"out": {
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9092
}
}
Reçoit les lignes via TCP, les analyse en champs de données, ajoute un horodatage, convertit certaines données en différents types de données, supprime le message d'origine, hache certains champs, etc.
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "TCPStrInput",
"listen": "0.0.0.0",
"port": 9092,
"headers": ["hello", "test", "src"],
"separator": ",",
"convert": false
},
"proc": [
{
"module": "RegexProc",
"regexes": [
"(?mi)(?P<host>[0-9a-z]+) (?P<port>[0-9]+): (?P<hostEvent>.*)"
]
},
{
"module": "DropFieldProc",
"field_name": "message"
},
{
"module": "CastProc",
"fields": ["port"],
"types": ["int"]
},
{
"module": "InListProc",
"in_field": "port",
"out_field": "port_block",
"reload_minutes": 100000000,
"list": ["8080", "443", "23230", "14572", "17018"]
},
{"module": "if", "condition": "port_block == true "},
{
"module": "Md5Proc",
"in_fields": ["host"],
"out_fields": ["host_hash"],
"salt": "andPepper!"
},
{"module": "else"},
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{"module": "endif"},
{
"module": "LogProc",
"level": "info"
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
Recevoir sur un socket TCP à l'écoute de la ligne JSON :
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 10000000
},
"in": {
"module": "TCPJSONInput",
"listen": "0.0.0.0",
"port": 9092
},
"proc": [
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{
"module": "LPMProc",
"filepath": "/tmp/prefix-asn.txt",
"reload_minutes": 1440,
"in_fields": ["src", "dst"],
"out_fields": [
{"newkey": "_{{in_field}}_prefix", "metakey": "prefix"},
{"newkey": "_{{in_field}}_asn", "metakey": "asn"}
]
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
La partie de configuration suivante définit une tâche qui s'exécute toutes les 10 secondes. Habituellement, vous souhaitez mettre à jour les sources de fichiers pour les composants InListProc
et LPMProc
... Dans de tels cas, l'idée est que vous ayez un petit script shell quelque part dans votre système qui mettra à jour vos fichiers locaux. Ensuite, vous devez "invoquer" un rechargement pour charger les nouvelles données en mémoire :
...
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
},
"tasks": [
{
"name": "LSing...",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": [
{"mod": 4, "signal": "reload"}
]
},
{
"name": "LSing...2",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": []
}
]
...
Ci-dessus, nous définissons deux tâches. La différence entre eux est que le premier signalera un composant s'il fonctionne correctement. Le signal "reload"
va être envoyé au composant 4
et c'est au composant de le gérer.
L'index du composant est défini comme l'ordre de ce composant dans la configuration, y compris les composants d'entrée . Étant donné que pour le moment nous ne prenons en charge qu'une seule entrée, le composant 4
ci-dessus est le 3ème de la section proc
.
UseNumber()
est nécessaire pour une sortie correcte, cependant, cela interrompt govaluate
, donc lors de la comparaison, vous devez utiliser json_to_float64()
. Voir TestIf
par exemple... Bonjour! L’idée est que nous pouvons fournir à l’utilisateur final une capacité de traitement de pipeline configurable en JSON. Cependant, nous avons besoin de plus de composants pour diverses tâches et peut-être de codecs !
Les composants doivent être extrêmement faciles à mettre en œuvre. Utilisez proc/log.go
comme point de départ (~ 60 LOC) pour implémenter votre composant.
Codecs : jetez un coup d’œil rapide à linecodecs.go
. On peut facilement implémenter de nouveaux encodeurs/décodeurs de ligne. Ceux-ci peuvent ensuite être branchés sur des modules d'entrée/sortie
Vous ne savez pas quoi aider ? jetez un oeil à TODO.md Comme toujours, les commentaires, suggestions, documentation, rapports de bogues, etc. sont plus que bienvenus :)