ไปป์ไลน์การประมวลผล (คล้ายกับ logstash) ที่เขียนด้วยภาษา Go
โคลนและติดตั้งการอ้างอิง:
mkdir -p $GOPATH/src/github.com/urban-1
cd $GOPATH/src/github.com/urban-1
git clone [email protected]:urban-1/gopipe.git
dep ensure
มี Makefile ที่คุณสามารถใช้เพื่อสร้างและเริ่มทำงานในโครงการนี้ได้อย่างรวดเร็ว:
$ make help
Available targets:
rdkafka Build librdkafka locally
gopipe Build gopipe
tests Run tests (includes fmt and vet)
show_coverage Show coverage results
เป้าหมายของเราคือการกำหนดไปป์ไลน์ที่รับและประมวลผลเหตุการณ์ในลักษณะโมดูลาร์ กิจกรรมสามารถประกอบด้วย:
Data["raw"]
)Data["message"]
)Data
as map[string]interface{}
)เราเรียกโมดูลของเราว่า "ส่วนประกอบ" และแบ่งออกเป็นสามประเภท:
โดยพื้นฐานแล้วส่วนประกอบทั้งหมดจะ "เหมือนกัน" (ใช้อินเทอร์เฟซเดียวกัน) ข้อแตกต่างเพียงอย่างเดียวคือช่องทางใดที่พวกเขาสามารถใช้ได้
ฉันได้ทำสิ่งที่คล้ายกันใน C ++ เพื่อประมวลผลแพ็กเก็ต netflow และคิดว่าเนื่องจาก Go นั้นเร็ว (จริงๆ) และทำงานพร้อมกันจึงเป็นคู่ที่สมบูรณ์แบบสำหรับแอปพลิเคชันดังกล่าว
ทำไมไม่ใช้บางอย่างที่มีอยู่แล้ว: เราสามารถขยายเฟรมเวิร์กที่มีอยู่ได้ แต่นี่คือแบบฝึกหัด Go Learning เพื่อจำลองโค้ด C++...
มันแตกต่างกันอย่างไร? เรามุ่งเน้นไปที่มุมมองของระบบและเราต้องการให้เฟรมเวิร์กนี้เน้นไปที่เครือข่าย/ข้อมูลมากกว่าที่จะเน้นไป log
:
flowreplicator.json
แผนในอนาคตคืออะไร: เราวางแผนที่จะรักษาและขยายเวลานี้จนกว่าเราจะพอร์ตโค้ด C++ ของเราอย่างสมบูรณ์... การบำรุงรักษาจะดำเนินต่อไป แต่เราหวังว่าจะขยายเวลาตามความจำเป็นด้วยความช่วยเหลือจากชุมชน
จะทำซ้ำและสุ่มตัวอย่างแพ็ก UDP:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "UDPRawInput",
"listen": "0.0.0.0",
"port": 9090
},
"proc": [
{
"module": "SamplerProc",
"every": 2
},
{
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9091
}
],
"out": {
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9092
}
}
รับบรรทัดบน TCP, แยกวิเคราะห์ลงในช่องข้อมูล, เพิ่มการประทับเวลา, แปลงข้อมูลบางส่วนเป็นประเภทข้อมูลที่แตกต่างกัน, ละทิ้งข้อความต้นฉบับ, แฮชบางช่อง ฯลฯ
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "TCPStrInput",
"listen": "0.0.0.0",
"port": 9092,
"headers": ["hello", "test", "src"],
"separator": ",",
"convert": false
},
"proc": [
{
"module": "RegexProc",
"regexes": [
"(?mi)(?P<host>[0-9a-z]+) (?P<port>[0-9]+): (?P<hostEvent>.*)"
]
},
{
"module": "DropFieldProc",
"field_name": "message"
},
{
"module": "CastProc",
"fields": ["port"],
"types": ["int"]
},
{
"module": "InListProc",
"in_field": "port",
"out_field": "port_block",
"reload_minutes": 100000000,
"list": ["8080", "443", "23230", "14572", "17018"]
},
{"module": "if", "condition": "port_block == true "},
{
"module": "Md5Proc",
"in_fields": ["host"],
"out_fields": ["host_hash"],
"salt": "andPepper!"
},
{"module": "else"},
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{"module": "endif"},
{
"module": "LogProc",
"level": "info"
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
รับบนซ็อกเก็ต TCP ที่ฟังสำหรับบรรทัด JSON:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 10000000
},
"in": {
"module": "TCPJSONInput",
"listen": "0.0.0.0",
"port": 9092
},
"proc": [
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{
"module": "LPMProc",
"filepath": "/tmp/prefix-asn.txt",
"reload_minutes": 1440,
"in_fields": ["src", "dst"],
"out_fields": [
{"newkey": "_{{in_field}}_prefix", "metakey": "prefix"},
{"newkey": "_{{in_field}}_asn", "metakey": "asn"}
]
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
ส่วนการกำหนดค่าต่อไปนี้จะกำหนดงานที่รันทุกๆ 10 วินาที โดยปกติแล้ว คุณต้องการอัปเดตแหล่งไฟล์สำหรับส่วนประกอบ InListProc
และ LPMProc
... ในกรณีเช่นนี้ แนวคิดก็คือคุณมีเชลล์สคริปต์เล็กๆ อยู่ที่ไหนสักแห่งในระบบของคุณที่จะอัปเดตไฟล์ในเครื่องของคุณ จากนั้นคุณจะต้อง "เรียกใช้" การโหลดซ้ำเพื่อโหลดข้อมูลใหม่ในหน่วยความจำ:
...
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
},
"tasks": [
{
"name": "LSing...",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": [
{"mod": 4, "signal": "reload"}
]
},
{
"name": "LSing...2",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": []
}
]
...
ข้างต้นเรากำหนดสองงาน ความแตกต่างระหว่างสิ่งเหล่านั้นคืออันแรกจะส่งสัญญาณส่วนประกอบหากทำงานได้สำเร็จ สัญญาณ "reload"
จะถูกส่งไปยังส่วนประกอบ 4
และขึ้นอยู่กับส่วนประกอบที่จะจัดการ
ดัชนีส่วนประกอบถูกกำหนดให้เป็นลำดับของส่วนประกอบนี้ในการกำหนดค่า รวมถึงส่วนประกอบอินพุต เนื่องจากในขณะนี้เรารองรับอินพุตเดียวเท่านั้น คอมโพเนนต์ 4
ด้านบนคือองค์ประกอบที่ 3 ในส่วน proc
UseNumber()
เพื่อเอาต์พุตที่ถูกต้อง อย่างไรก็ตาม มันทำให้ govaluate
พัง ดังนั้นเมื่อเปรียบเทียบคุณต้องใช้ json_to_float64()
ดูตัวอย่าง TestIf
... สวัสดี! แนวคิดก็คือเราสามารถจัดเตรียมความสามารถในการประมวลผลไปป์ไลน์ที่กำหนดค่า JSON ให้กับผู้ใช้ได้ อย่างไรก็ตาม เราต้องการส่วนประกอบเพิ่มเติมสำหรับงานต่างๆ และอาจรวมถึงตัวแปลงสัญญาณด้วย!
ส่วนประกอบควรใช้งานง่ายมาก ใช้ proc/log.go
เป็นจุดเริ่มต้น (~60 LOC) เพื่อใช้งานส่วนประกอบของคุณ
Codecs: ดูที่ linecodecs.go
อย่างรวดเร็ว เราสามารถใช้ตัวเข้ารหัส/ตัวถอดรหัสบรรทัดใหม่ได้อย่างง่ายดาย จากนั้นสามารถเสียบเข้ากับโมดูลอินพุต/เอาท์พุตได้
ไม่แน่ใจว่าจะช่วยอะไรได้บ้าง? ดูที่ TODO.md เช่นเคย ยินดีต้อนรับความคิดเห็น คำแนะนำ เอกสาร รายงานข้อผิดพลาด ฯลฯ :)