Go で書かれた処理パイプライン (logstash に類似)
依存関係のクローンを作成してインストールします。
mkdir -p $GOPATH/src/github.com/urban-1
cd $GOPATH/src/github.com/urban-1
git clone [email protected]:urban-1/gopipe.git
dep ensure
このプロジェクトをすばやくビルドして作業を開始するために使用できる Makefile があります。
$ make help
Available targets:
rdkafka Build librdkafka locally
gopipe Build gopipe
tests Run tests (includes fmt and vet)
show_coverage Show coverage results
私たちの目標は、モジュール方式でイベントを受信して処理するパイプラインを定義することです。イベントには以下を含めることができます。
Data["raw"]
)Data["message"]
)map[string]interface{}
としてのData
)私たちはモジュールを「コンポーネント」と呼び、これらは 3 つのカテゴリに分類されます。
本質的に、すべてのコンポーネントは「同じ」です (同じインターフェイスを実装します)。唯一の違いは、どのチャネルが利用可能になるかということです。
そうですね...ネットフローパケットを処理するためにC++で同様のことを行ったことがありますが、Goは(非常に)高速であり、同時実行がそのようなアプリケーションに最適であるため、考えました。
すでに存在しているものを使用してみてはいかがでしょうか。既存のフレームワークを拡張することもできますが、これは C++ コードを複製するための Go 学習演習です...
それはどう違うのでしょうか?私たちはシステムの観点に焦点を当てており、このフレームワークをlog
指向ではなくネットワーク/データ指向にしたいと考えています。
flowreplicator.json
構成を参照してください。将来の計画は何ですか: C++ コードを完全に移植するまで、これを維持および拡張する予定です... メンテナンスは継続しますが、コミュニティの助けを借りて必要に応じて拡張できることを願っています。
UDP パケットを複製し、オプションでサンプリングします。
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "UDPRawInput",
"listen": "0.0.0.0",
"port": 9090
},
"proc": [
{
"module": "SamplerProc",
"every": 2
},
{
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9091
}
],
"out": {
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9092
}
}
TCP 経由で行を受信し、それらをデータ フィールドに解析し、タイムスタンプを追加し、一部のデータを別のデータ型に変換し、元のメッセージを破棄し、いくつかのフィールドをハッシュします。
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "TCPStrInput",
"listen": "0.0.0.0",
"port": 9092,
"headers": ["hello", "test", "src"],
"separator": ",",
"convert": false
},
"proc": [
{
"module": "RegexProc",
"regexes": [
"(?mi)(?P<host>[0-9a-z]+) (?P<port>[0-9]+): (?P<hostEvent>.*)"
]
},
{
"module": "DropFieldProc",
"field_name": "message"
},
{
"module": "CastProc",
"fields": ["port"],
"types": ["int"]
},
{
"module": "InListProc",
"in_field": "port",
"out_field": "port_block",
"reload_minutes": 100000000,
"list": ["8080", "443", "23230", "14572", "17018"]
},
{"module": "if", "condition": "port_block == true "},
{
"module": "Md5Proc",
"in_fields": ["host"],
"out_fields": ["host_hash"],
"salt": "andPepper!"
},
{"module": "else"},
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{"module": "endif"},
{
"module": "LogProc",
"level": "info"
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
JSON 行をリッスンする TCP ソケットで受信します。
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 10000000
},
"in": {
"module": "TCPJSONInput",
"listen": "0.0.0.0",
"port": 9092
},
"proc": [
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{
"module": "LPMProc",
"filepath": "/tmp/prefix-asn.txt",
"reload_minutes": 1440,
"in_fields": ["src", "dst"],
"out_fields": [
{"newkey": "_{{in_field}}_prefix", "metakey": "prefix"},
{"newkey": "_{{in_field}}_asn", "metakey": "asn"}
]
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
次の構成部分は、10 秒ごとに実行されるタスクを定義します。通常、 InListProc
およびLPMProc
コンポーネントのファイル ソースを更新したいと考えます。そのような場合、システムのどこかにローカル ファイルを更新する小さなシェル スクリプトを用意するという考えになります。次に、リロードを「呼び出し」て、新しいデータをメモリにロードする必要があります。
...
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
},
"tasks": [
{
"name": "LSing...",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": [
{"mod": 4, "signal": "reload"}
]
},
{
"name": "LSing...2",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": []
}
]
...
上記で 2 つのタスクを定義します。それらの違いは、最初のものが正常に実行された場合にコンポーネントに信号を送信することです。信号"reload"
はコンポーネント4
に送信され、それを処理するのはコンポーネント次第です。
コンポーネント インデックスは、入力コンポーネントを含む構成内のこのコンポーネントの順序として定義されます。現時点では 1 つの入力のみをサポートしていることを考えると、上記のコンポーネント4
proc
セクションの 3 番目になります。
UseNumber()
によるデコードが必要ですが、 govaluate
が壊れるので、比較するときはjson_to_float64()
使用する必要があります。たとえば、 TestIf
参照してください... こんにちは!アイデアは、JSON で構成可能なパイプライン処理機能をエンド ユーザーに提供できるということです。ただし、さまざまなジョブや場合によってはコーデック用にさらに多くのコンポーネントが必要です。
コンポーネントは実装が非常に簡単である必要があります。コンポーネントを実装するには、開始点 (~60 LOC) としてproc/log.go
使用します。
コーデック: linecodecs.go
を簡単に調べてください。新しいライン エンコーダ/デコーダを簡単に実装できます。これらは入出力モジュールに接続できます。
何を手助けすればよいかわからないですか? TODO.md を見てください。いつものように、コメント、提案、ドキュメント、バグレポートなどは大歓迎です :)