用Go編寫的處理管道(類似logstash)
克隆並安裝相依性:
mkdir -p $GOPATH/src/github.com/urban-1
cd $GOPATH/src/github.com/urban-1
git clone [email protected]:urban-1/gopipe.git
dep ensure
您可以使用一個 Makefile 來快速建置並開始處理該專案:
$ make help
Available targets:
rdkafka Build librdkafka locally
gopipe Build gopipe
tests Run tests (includes fmt and vet)
show_coverage Show coverage results
我們的目標是定義一個以模組化方式接收和處理事件的管道。事件可以包含:
Data["raw"]
)Data["message"]
)Data
為map[string]interface{}
)我們將我們的模組稱為“組件”,這些模組分為三類:
本質上,所有元件都是「相同的」(實現相同的介面)。唯一的區別在於他們可以使用哪些管道。
嗯...我在 C++ 中做了類似的事情來處理網路串流資料包,並且認為 Go(真的)很快並且並發是此類應用程式的完美匹配。
為什麼不使用現有的東西:我們可以擴展現有框架,但是,這是一個複製 C++ 程式碼的 Go 學習練習...
那有什麼不同呢?我們關注系統的角度,我們希望這個框架更加面向網路/資料而不是面向log
:
flowreplicator.json
配置。未來的計劃是什麼:我們計劃維護和擴展它,直到我們完全移植我們的 C++ 程式碼...維護將繼續,但我們希望在社區的幫助下根據需要進行擴展。
將複製並可選擇採樣 UDP 封包:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "UDPRawInput",
"listen": "0.0.0.0",
"port": 9090
},
"proc": [
{
"module": "SamplerProc",
"every": 2
},
{
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9091
}
],
"out": {
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9092
}
}
透過 TCP 接收行,將其解析為資料字段,添加時間戳,將某些資料轉換為不同的資料類型,丟棄原始訊息,散列某些字段等
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "TCPStrInput",
"listen": "0.0.0.0",
"port": 9092,
"headers": ["hello", "test", "src"],
"separator": ",",
"convert": false
},
"proc": [
{
"module": "RegexProc",
"regexes": [
"(?mi)(?P<host>[0-9a-z]+) (?P<port>[0-9]+): (?P<hostEvent>.*)"
]
},
{
"module": "DropFieldProc",
"field_name": "message"
},
{
"module": "CastProc",
"fields": ["port"],
"types": ["int"]
},
{
"module": "InListProc",
"in_field": "port",
"out_field": "port_block",
"reload_minutes": 100000000,
"list": ["8080", "443", "23230", "14572", "17018"]
},
{"module": "if", "condition": "port_block == true "},
{
"module": "Md5Proc",
"in_fields": ["host"],
"out_fields": ["host_hash"],
"salt": "andPepper!"
},
{"module": "else"},
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{"module": "endif"},
{
"module": "LogProc",
"level": "info"
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
在偵聽 JSON 行的 TCP 套接字上接收:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 10000000
},
"in": {
"module": "TCPJSONInput",
"listen": "0.0.0.0",
"port": 9092
},
"proc": [
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{
"module": "LPMProc",
"filepath": "/tmp/prefix-asn.txt",
"reload_minutes": 1440,
"in_fields": ["src", "dst"],
"out_fields": [
{"newkey": "_{{in_field}}_prefix", "metakey": "prefix"},
{"newkey": "_{{in_field}}_asn", "metakey": "asn"}
]
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
以下配置部分定義了每 10 秒運行一次的任務。通常您希望更新InListProc
和LPMProc
元件的檔案來源...在這種情況下,您的想法是您系統中的某個位置有一個小的 shell 腳本來更新本機檔案。然後您需要“調用”重新加載以將新數據加載到內存中:
...
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
},
"tasks": [
{
"name": "LSing...",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": [
{"mod": 4, "signal": "reload"}
]
},
{
"name": "LSing...2",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": []
}
]
...
上面我們定義了兩個任務。它們之間的區別在於,如果組件運行成功,第一個組件將向其發出信號。訊號"reload"
將被傳送到元件4
,並由該元件來處理它。
元件索引定義為此元件在 config 中的順序,包括輸入元件。鑑於目前我們只支援一個輸入,上面的組件4
是proc
部分的第三個。
UseNumber()
解碼才能正確輸出,但是,它會破壞govaluate
,因此在比較時必須使用json_to_float64()
。例如,請參見TestIf
... 你好!我們的想法是,我們可以為最終用戶提供 JSON 可設定的管道處理能力。然而,我們確實需要更多組件來完成各種工作,也許還需要編解碼器!
組件應該非常容易實現。使用proc/log.go
作為起點(~60 LOC)來實現您的元件。
編解碼器:快速瀏覽一下linecodecs.go
。人們可以輕鬆實現新的線路編碼器/解碼器。然後可以將它們插入輸入/輸出模組
不確定需要什麼幫助?看看 TODO.md 一如既往,我們非常歡迎評論、建議、文件、錯誤報告等:)