Jalur pipa pemrosesan (mirip dengan logstash) yang ditulis di Go
Mengkloning dan menginstal dependensi:
mkdir -p $GOPATH/src/github.com/urban-1
cd $GOPATH/src/github.com/urban-1
git clone [email protected]:urban-1/gopipe.git
dep ensure
Ada Makefile yang dapat Anda gunakan untuk membangun dan mulai mengerjakan proyek ini dengan cepat:
$ make help
Available targets:
rdkafka Build librdkafka locally
gopipe Build gopipe
tests Run tests (includes fmt and vet)
show_coverage Show coverage results
Tujuan kami adalah menentukan saluran tempat peristiwa diterima dan diproses secara modular. Acara dapat berisi:
Data["raw"]
)Data["message"]
)Data
sebagai map[string]interface{}
)Kami menyebut modul kami "Komponen" dan ini terbagi dalam tiga kategori:
Intinya, semua komponen adalah “sama” (mengimplementasikan antarmuka yang sama). Satu-satunya perbedaan adalah saluran mana yang tersedia bagi mereka.
Baiklah... Saya telah melakukan hal serupa di C++ untuk memproses paket netflow dan berpikir karena Go (sangat) cepat dan bersamaan sangat cocok untuk aplikasi semacam itu.
Mengapa tidak menggunakan sesuatu yang sudah ada: Kita bisa memperluas kerangka kerja yang sudah ada, namun ini adalah latihan pembelajaran Go untuk mereplikasi kode C++...
Apa bedanya? Kami fokus pada perspektif sistem dan kami ingin kerangka kerja ini lebih berorientasi pada jaringan/data daripada berorientasi log
:
flowreplicator.json
.Apa rencana masa depan: Kami berencana untuk mempertahankan dan memperluas ini hingga kami sepenuhnya mem-porting kode C++ kami... Pemeliharaan akan terus berlanjut tetapi kami berharap kami akan memperluasnya sesuai kebutuhan dengan bantuan komunitas.
Akan mereplikasi dan secara opsional mengambil sampel paket UDP:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "UDPRawInput",
"listen": "0.0.0.0",
"port": 9090
},
"proc": [
{
"module": "SamplerProc",
"every": 2
},
{
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9091
}
],
"out": {
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9092
}
}
Menerima baris melalui TCP, menguraikannya menjadi bidang data, menambahkan stempel waktu, mengubah beberapa data menjadi tipe data yang berbeda, membuang pesan asli, melakukan hash pada beberapa bidang, dll.
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "TCPStrInput",
"listen": "0.0.0.0",
"port": 9092,
"headers": ["hello", "test", "src"],
"separator": ",",
"convert": false
},
"proc": [
{
"module": "RegexProc",
"regexes": [
"(?mi)(?P<host>[0-9a-z]+) (?P<port>[0-9]+): (?P<hostEvent>.*)"
]
},
{
"module": "DropFieldProc",
"field_name": "message"
},
{
"module": "CastProc",
"fields": ["port"],
"types": ["int"]
},
{
"module": "InListProc",
"in_field": "port",
"out_field": "port_block",
"reload_minutes": 100000000,
"list": ["8080", "443", "23230", "14572", "17018"]
},
{"module": "if", "condition": "port_block == true "},
{
"module": "Md5Proc",
"in_fields": ["host"],
"out_fields": ["host_hash"],
"salt": "andPepper!"
},
{"module": "else"},
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{"module": "endif"},
{
"module": "LogProc",
"level": "info"
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
Terima di soket TCP yang mendengarkan baris JSON:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 10000000
},
"in": {
"module": "TCPJSONInput",
"listen": "0.0.0.0",
"port": 9092
},
"proc": [
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{
"module": "LPMProc",
"filepath": "/tmp/prefix-asn.txt",
"reload_minutes": 1440,
"in_fields": ["src", "dst"],
"out_fields": [
{"newkey": "_{{in_field}}_prefix", "metakey": "prefix"},
{"newkey": "_{{in_field}}_asn", "metakey": "asn"}
]
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
Bagian konfigurasi berikut mendefinisikan tugas yang dijalankan setiap 10 detik. Biasanya Anda ingin memperbarui sumber file untuk komponen InListProc
dan LPMProc
... Dalam kasus seperti itu, idenya adalah Anda memiliki skrip shell kecil di suatu tempat di sistem Anda yang akan memperbarui file lokal Anda. Maka Anda perlu "meminta" memuat ulang untuk memuat data baru ke dalam memori:
...
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
},
"tasks": [
{
"name": "LSing...",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": [
{"mod": 4, "signal": "reload"}
]
},
{
"name": "LSing...2",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": []
}
]
...
Di atas kami mendefinisikan dua tugas. Perbedaan di antara keduanya adalah yang pertama akan memberi sinyal pada komponen jika berhasil dijalankan. Sinyal "reload"
akan dikirim ke komponen 4
dan terserah komponen untuk menanganinya.
Indeks komponen didefinisikan sebagai urutan komponen ini dalam konfigurasi termasuk komponen masukan . Mengingat saat ini kami hanya mendukung satu input, komponen 4
di atas adalah yang ke-3 di bagian proc
.
UseNumber()
diperlukan untuk keluaran yang benar, namun govaluate
rusak sehingga saat membandingkan Anda harus menggunakan json_to_float64()
. Lihat TestIf
misalnya... Halo! Idenya adalah kami dapat menyediakan kemampuan pemrosesan pipeline yang dapat dikonfigurasi JSON untuk pengguna akhir. Namun, kami memerlukan lebih banyak komponen untuk berbagai pekerjaan dan mungkin codec!
Komponen harus sangat mudah diimplementasikan. Gunakan proc/log.go
sebagai titik awal (~60 LOC) untuk mengimplementasikan komponen Anda.
Codec: Lihat sekilas linecodecs.go
. Seseorang dapat dengan mudah mengimplementasikan encoder/decoder baris baru. Ini kemudian dapat dihubungkan ke modul input/output
Tidak yakin dengan apa yang harus membantu? lihat TODO.md Seperti biasa, komentar, saran, dokumentasi, laporan bug, dll sangat kami harapkan :)