用Go编写的处理管道(类似于logstash)
克隆并安装依赖项:
mkdir -p $GOPATH/src/github.com/urban-1
cd $GOPATH/src/github.com/urban-1
git clone [email protected]:urban-1/gopipe.git
dep ensure
您可以使用一个 Makefile 来快速构建并开始处理该项目:
$ make help
Available targets:
rdkafka Build librdkafka locally
gopipe Build gopipe
tests Run tests (includes fmt and vet)
show_coverage Show coverage results
我们的目标是定义一个以模块化方式接收和处理事件的管道。事件可以包含:
Data["raw"]
)Data["message"]
)Data
为map[string]interface{}
)我们将我们的模块称为“组件”,这些模块分为三类:
本质上,所有组件都是“相同的”(实现相同的接口)。唯一的区别在于他们可以使用哪些渠道。
嗯...我在 C++ 中做了类似的事情来处理网络流数据包,并且认为 Go(真的)很快并且并发是此类应用程序的完美匹配。
为什么不使用已有的东西:我们可以扩展现有框架,但是,这是一个复制 C++ 代码的 Go 学习练习......
那有什么不同呢?我们关注系统的角度,我们希望这个框架更加面向网络/数据而不是面向log
:
flowreplicator.json
配置。未来的计划是什么:我们计划维护和扩展它,直到我们完全移植我们的 C++ 代码...维护将继续,但我们希望在社区的帮助下根据需要进行扩展。
将复制并可选择采样 UDP 数据包:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "UDPRawInput",
"listen": "0.0.0.0",
"port": 9090
},
"proc": [
{
"module": "SamplerProc",
"every": 2
},
{
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9091
}
],
"out": {
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9092
}
}
通过 TCP 接收行,将其解析为数据字段,添加时间戳,将某些数据转换为不同的数据类型,丢弃原始消息,散列某些字段等
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "TCPStrInput",
"listen": "0.0.0.0",
"port": 9092,
"headers": ["hello", "test", "src"],
"separator": ",",
"convert": false
},
"proc": [
{
"module": "RegexProc",
"regexes": [
"(?mi)(?P<host>[0-9a-z]+) (?P<port>[0-9]+): (?P<hostEvent>.*)"
]
},
{
"module": "DropFieldProc",
"field_name": "message"
},
{
"module": "CastProc",
"fields": ["port"],
"types": ["int"]
},
{
"module": "InListProc",
"in_field": "port",
"out_field": "port_block",
"reload_minutes": 100000000,
"list": ["8080", "443", "23230", "14572", "17018"]
},
{"module": "if", "condition": "port_block == true "},
{
"module": "Md5Proc",
"in_fields": ["host"],
"out_fields": ["host_hash"],
"salt": "andPepper!"
},
{"module": "else"},
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{"module": "endif"},
{
"module": "LogProc",
"level": "info"
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
在侦听 JSON 行的 TCP 套接字上接收:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 10000000
},
"in": {
"module": "TCPJSONInput",
"listen": "0.0.0.0",
"port": 9092
},
"proc": [
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{
"module": "LPMProc",
"filepath": "/tmp/prefix-asn.txt",
"reload_minutes": 1440,
"in_fields": ["src", "dst"],
"out_fields": [
{"newkey": "_{{in_field}}_prefix", "metakey": "prefix"},
{"newkey": "_{{in_field}}_asn", "metakey": "asn"}
]
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
以下配置部分定义了每 10 秒运行一次的任务。通常您希望更新InListProc
和LPMProc
组件的文件源...在这种情况下,您的想法是您系统中的某个位置有一个小的 shell 脚本来更新本地文件。然后您需要“调用”重新加载以将新数据加载到内存中:
...
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
},
"tasks": [
{
"name": "LSing...",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": [
{"mod": 4, "signal": "reload"}
]
},
{
"name": "LSing...2",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": []
}
]
...
上面我们定义了两个任务。它们之间的区别在于,如果组件运行成功,第一个组件将向其发出信号。信号"reload"
将被发送到组件4
,并由该组件来处理它。
组件索引定义为该组件在 config 中的顺序,包括输入组件。鉴于目前我们只支持一个输入,上面的组件4
是proc
部分的第三个。
UseNumber()
进行解码才能正确输出,但是,它会破坏govaluate
,因此在比较时必须使用json_to_float64()
。例如,请参见TestIf
... 你好!我们的想法是,我们可以为最终用户提供 JSON 可配置的管道处理能力。然而,我们确实需要更多组件来完成各种工作,也许还需要编解码器!
组件应该非常容易实现。使用proc/log.go
作为起点(~60 LOC)来实现您的组件。
编解码器:快速浏览一下linecodecs.go
。人们可以轻松实现新的线路编码器/解码器。然后可以将它们插入输入/输出模块
不确定需要什么帮助?看看 TODO.md 一如既往,我们非常欢迎评论、建议、文档、错误报告等:)