A processing pipe-line (similar to logstash) written in Go
Clone and install dependencies:
mkdir -p $GOPATH/src/github.com/urban-1
cd $GOPATH/src/github.com/urban-1
git clone [email protected]:urban-1/gopipe.git
dep ensure
There is a Makefile you can use to quickly build and start working on this project:
$ make help
Available targets:
rdkafka Build librdkafka locally
gopipe Build gopipe
tests Run tests (includes fmt and vet)
show_coverage Show coverage results
Our goal is to define a pipeline where events are received and processed in a modular way. Events can contain:
Data["raw"]
)Data["message"]
)Data
as map[string]interface{}
)We call our modules "Components" and these fall into three categories:
In essence, all components are the "same" (implement the same interface). The only difference is which channels are made available to them.
Well... I have done something similar in C++ for processing netflow packets and thought since Go is (really) fast and concurrent is a perfect match for such an application.
Why not use something already out there: We could extend an existing framework however, this is a Go learning exercise to replicate the C++ code...
How is that different? We focus on a systems perspective and we want this
framework to be more network/data oriented rather than log
s oriented:
flowreplicator.json
configuration.What are the future plans: We plan to maintain and extend this until we fully port our C++ code... Maintenance will continue but we kinda hope we will extend as needed with the help of the community.
Will replicate and optionally sample UDP packtes:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "UDPRawInput",
"listen": "0.0.0.0",
"port": 9090
},
"proc": [
{
"module": "SamplerProc",
"every": 2
},
{
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9091
}
],
"out": {
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9092
}
}
Receives lines over TCP, parses them into data fields, adds timestamp, converts some data to different data-types, discards the original message, hashes some fields, etc
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "TCPStrInput",
"listen": "0.0.0.0",
"port": 9092,
"headers": ["hello", "test", "src"],
"separator": ",",
"convert": false
},
"proc": [
{
"module": "RegexProc",
"regexes": [
"(?mi)(?P<host>[0-9a-z]+) (?P<port>[0-9]+): (?P<hostEvent>.*)"
]
},
{
"module": "DropFieldProc",
"field_name": "message"
},
{
"module": "CastProc",
"fields": ["port"],
"types": ["int"]
},
{
"module": "InListProc",
"in_field": "port",
"out_field": "port_block",
"reload_minutes": 100000000,
"list": ["8080", "443", "23230", "14572", "17018"]
},
{"module": "if", "condition": "port_block == true "},
{
"module": "Md5Proc",
"in_fields": ["host"],
"out_fields": ["host_hash"],
"salt": "andPepper!"
},
{"module": "else"},
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{"module": "endif"},
{
"module": "LogProc",
"level": "info"
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
Receive on a TCP socket listening for JSON line:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 10000000
},
"in": {
"module": "TCPJSONInput",
"listen": "0.0.0.0",
"port": 9092
},
"proc": [
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{
"module": "LPMProc",
"filepath": "/tmp/prefix-asn.txt",
"reload_minutes": 1440,
"in_fields": ["src", "dst"],
"out_fields": [
{"newkey": "_{{in_field}}_prefix", "metakey": "prefix"},
{"newkey": "_{{in_field}}_asn", "metakey": "asn"}
]
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
The following config part defines a task that runs every 10 seconds. Usually you
would like to update file sources for InListProc
and LPMProc
components...
In such cases the idea is that you have a small shell-script somewhere in your
system that will update your local files. Then you need to "invoke" a reload to
load the new data in memory:
...
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
},
"tasks": [
{
"name": "LSing...",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": [
{"mod": 4, "signal": "reload"}
]
},
{
"name": "LSing...2",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": []
}
]
...
Above we define two tasks. The difference between them is that the first one
will signal a component if it runs successfully. The signal "reload"
is going
to be sent to component 4
and is up to the component to handle it.
The component index is defined as the order of this component in config
including input components. Given that at the moment we only support one
input, component 4
above is the 3rd in proc
section.
UseNumber()
is needed for correct output, however,
it breaks govaluate
so when comparing you have to use json_to_float64()
.
See TestIf
for example...Hello! The idea is that we can provide JSON-configurable pipeline processing capability for the end user. However, we do need more components for various jobs and maybe codecs!
Components should be extremely easy to implement. Use proc/log.go
as a
starting point (~60 LOC) to implement you component.
Codecs: Have a quick look into linecodecs.go
. One can easily implement new
line encoders/decoders. These can then be plugged into input/output modules
Not sure with what to help? have a look at TODO.md As always, comments, suggestions, documentation, bug reports, etc are more than welcome :)