Eine in Go geschriebene Verarbeitungspipeline (ähnlich Logstash).
Abhängigkeiten klonen und installieren:
mkdir -p $GOPATH/src/github.com/urban-1
cd $GOPATH/src/github.com/urban-1
git clone [email protected]:urban-1/gopipe.git
dep ensure
Es gibt ein Makefile, mit dem Sie dieses Projekt schnell erstellen und mit der Arbeit beginnen können:
$ make help
Available targets:
rdkafka Build librdkafka locally
gopipe Build gopipe
tests Run tests (includes fmt and vet)
show_coverage Show coverage results
Unser Ziel ist es, eine Pipeline zu definieren, in der Ereignisse modular empfangen und verarbeitet werden. Ereignisse können Folgendes enthalten:
Data["raw"]
)Data["message"]
)Data
als map[string]interface{}
)Wir nennen unsere Module „Komponenten“ und diese lassen sich in drei Kategorien einteilen:
Im Wesentlichen sind alle Komponenten „gleich“ (implementieren dieselbe Schnittstelle). Der einzige Unterschied besteht darin, welche Kanäle ihnen zur Verfügung gestellt werden.
Nun ... ich habe in C++ etwas Ähnliches für die Verarbeitung von Netflow-Paketen gemacht und dachte, da Go (wirklich) schnell ist und Concurrent perfekt zu einer solchen Anwendung passt.
Warum nicht etwas verwenden, das es bereits gibt? Wir könnten ein vorhandenes Framework erweitern. Dies ist jedoch eine Go-Lernübung, um den C++-Code zu replizieren ...
Wie ist das anders? Wir konzentrieren uns auf eine Systemperspektive und möchten, dass dieses Framework eher netzwerk-/datenorientiert als log
ist:
flowreplicator.json
.Was sind die Zukunftspläne: Wir planen, dies beizubehalten und zu erweitern, bis wir unseren C++-Code vollständig portiert haben ... Die Wartung wird fortgesetzt, aber wir hoffen, dass wir sie bei Bedarf mit Hilfe der Community erweitern können.
Repliziert und testet optional UDP-Pakete:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "UDPRawInput",
"listen": "0.0.0.0",
"port": 9090
},
"proc": [
{
"module": "SamplerProc",
"every": 2
},
{
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9091
}
],
"out": {
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9092
}
}
Empfängt Zeilen über TCP, analysiert sie in Datenfelder, fügt Zeitstempel hinzu, konvertiert einige Daten in andere Datentypen, verwirft die ursprüngliche Nachricht, hasht einige Felder usw
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "TCPStrInput",
"listen": "0.0.0.0",
"port": 9092,
"headers": ["hello", "test", "src"],
"separator": ",",
"convert": false
},
"proc": [
{
"module": "RegexProc",
"regexes": [
"(?mi)(?P<host>[0-9a-z]+) (?P<port>[0-9]+): (?P<hostEvent>.*)"
]
},
{
"module": "DropFieldProc",
"field_name": "message"
},
{
"module": "CastProc",
"fields": ["port"],
"types": ["int"]
},
{
"module": "InListProc",
"in_field": "port",
"out_field": "port_block",
"reload_minutes": 100000000,
"list": ["8080", "443", "23230", "14572", "17018"]
},
{"module": "if", "condition": "port_block == true "},
{
"module": "Md5Proc",
"in_fields": ["host"],
"out_fields": ["host_hash"],
"salt": "andPepper!"
},
{"module": "else"},
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{"module": "endif"},
{
"module": "LogProc",
"level": "info"
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
Empfangen Sie auf einem TCP-Socket, der auf die JSON-Zeile wartet:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 10000000
},
"in": {
"module": "TCPJSONInput",
"listen": "0.0.0.0",
"port": 9092
},
"proc": [
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{
"module": "LPMProc",
"filepath": "/tmp/prefix-asn.txt",
"reload_minutes": 1440,
"in_fields": ["src", "dst"],
"out_fields": [
{"newkey": "_{{in_field}}_prefix", "metakey": "prefix"},
{"newkey": "_{{in_field}}_asn", "metakey": "asn"}
]
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
Der folgende Konfigurationsteil definiert eine Aufgabe, die alle 10 Sekunden ausgeführt wird. Normalerweise möchten Sie Dateiquellen für InListProc
und LPMProc
Komponenten aktualisieren. In solchen Fällen besteht die Idee darin, dass Sie irgendwo in Ihrem System ein kleines Shell-Skript haben, das Ihre lokalen Dateien aktualisiert. Dann müssen Sie ein Neuladen „aufrufen“, um die neuen Daten in den Speicher zu laden:
...
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
},
"tasks": [
{
"name": "LSing...",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": [
{"mod": 4, "signal": "reload"}
]
},
{
"name": "LSing...2",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": []
}
]
...
Oben definieren wir zwei Aufgaben. Der Unterschied zwischen ihnen besteht darin, dass die erste einer Komponente signalisiert, ob sie erfolgreich ausgeführt wird. Das Signal "reload"
wird an Komponente 4
gesendet und ist Sache der Komponente, damit umzugehen.
Der Komponentenindex ist als die Reihenfolge dieser Komponente in der Konfiguration einschließlich der Eingabekomponenten definiert. Da wir derzeit nur eine Eingabe unterstützen, ist Komponente 4
oben die dritte im proc
Abschnitt.
UseNumber()
erforderlich. govaluate
wird dadurch jedoch beschädigt, sodass Sie beim Vergleich json_to_float64()
verwenden müssen. Siehe zum Beispiel TestIf
... Hallo! Die Idee ist, dass wir dem Endbenutzer JSON-konfigurierbare Pipeline-Verarbeitungsfunktionen bereitstellen können. Wir benötigen jedoch für verschiedene Aufgaben weitere Komponenten und möglicherweise Codecs!
Komponenten sollten äußerst einfach zu implementieren sein. Verwenden Sie proc/log.go
als Ausgangspunkt (~60 LOC), um Ihre Komponente zu implementieren.
Codecs: Werfen Sie einen kurzen Blick auf linecodecs.go
. Man kann problemlos neue Leitungsencoder/Decoder implementieren. Diese können dann in Ein-/Ausgabemodule gesteckt werden
Sie sind sich nicht sicher, was Sie helfen können? Schauen Sie sich TODO.md an. Wie immer sind Kommentare, Vorschläge, Dokumentationen, Fehlerberichte usw. mehr als willkommen :)