خط أنابيب معالجة (مشابه لـ logstash) مكتوب بلغة Go
استنساخ وتثبيت التبعيات:
mkdir -p $GOPATH/src/github.com/urban-1
cd $GOPATH/src/github.com/urban-1
git clone [email protected]:urban-1/gopipe.git
dep ensure
يوجد ملف Makefile يمكنك استخدامه لبناء هذا المشروع وبدء العمل عليه بسرعة:
$ make help
Available targets:
rdkafka Build librdkafka locally
gopipe Build gopipe
tests Run tests (includes fmt and vet)
show_coverage Show coverage results
هدفنا هو تحديد مسار يتم من خلاله تلقي الأحداث ومعالجتها بطريقة معيارية. يمكن أن تحتوي الأحداث على:
Data["raw"]
)Data["message"]
)Data
map[string]interface{}
)نطلق على وحداتنا اسم "المكونات" وتنقسم إلى ثلاث فئات:
في جوهرها، جميع المكونات هي "نفس" (تنفيذ نفس الواجهة). والفرق الوحيد هو القنوات المتاحة لهم.
حسنًا... لقد فعلت شيئًا مشابهًا في لغة C++ لمعالجة حزم netflow واعتقدت أن Go سريع (حقًا) ومتزامن وهو تطابق مثالي لمثل هذا التطبيق.
لماذا لا تستخدم شيئًا موجودًا بالفعل: يمكننا توسيع إطار عمل موجود، ومع ذلك، فهذا تمرين تعلم Go لتكرار كود C++...
كيف يختلف ذلك؟ نحن نركز على منظور الأنظمة ونريد أن يكون إطار العمل هذا أكثر توجهاً نحو الشبكة/البيانات بدلاً من التوجه نحو log
:
flowreplicator.json
.ما هي الخطط المستقبلية: نخطط لصيانة هذا وتوسيع نطاقه حتى ننقل كود C++ الخاص بنا بالكامل... ستستمر الصيانة ولكننا نأمل نوعًا ما أن نقوم بتمديدها حسب الحاجة بمساعدة المجتمع.
سيتم نسخ حزم UDP وأخذ عينات منها بشكل اختياري:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "UDPRawInput",
"listen": "0.0.0.0",
"port": 9090
},
"proc": [
{
"module": "SamplerProc",
"every": 2
},
{
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9091
}
],
"out": {
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9092
}
}
يستقبل الخطوط عبر TCP، ويوزعها في حقول البيانات، ويضيف الطابع الزمني، ويحول بعض البيانات إلى أنواع بيانات مختلفة، ويتجاهل الرسالة الأصلية، ويجزئ بعض الحقول، وما إلى ذلك
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "TCPStrInput",
"listen": "0.0.0.0",
"port": 9092,
"headers": ["hello", "test", "src"],
"separator": ",",
"convert": false
},
"proc": [
{
"module": "RegexProc",
"regexes": [
"(?mi)(?P<host>[0-9a-z]+) (?P<port>[0-9]+): (?P<hostEvent>.*)"
]
},
{
"module": "DropFieldProc",
"field_name": "message"
},
{
"module": "CastProc",
"fields": ["port"],
"types": ["int"]
},
{
"module": "InListProc",
"in_field": "port",
"out_field": "port_block",
"reload_minutes": 100000000,
"list": ["8080", "443", "23230", "14572", "17018"]
},
{"module": "if", "condition": "port_block == true "},
{
"module": "Md5Proc",
"in_fields": ["host"],
"out_fields": ["host_hash"],
"salt": "andPepper!"
},
{"module": "else"},
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{"module": "endif"},
{
"module": "LogProc",
"level": "info"
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
تلقي على مقبس TCP الاستماع لخط JSON:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 10000000
},
"in": {
"module": "TCPJSONInput",
"listen": "0.0.0.0",
"port": 9092
},
"proc": [
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{
"module": "LPMProc",
"filepath": "/tmp/prefix-asn.txt",
"reload_minutes": 1440,
"in_fields": ["src", "dst"],
"out_fields": [
{"newkey": "_{{in_field}}_prefix", "metakey": "prefix"},
{"newkey": "_{{in_field}}_asn", "metakey": "asn"}
]
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
يحدد جزء التكوين التالي مهمة يتم تشغيلها كل 10 ثوانٍ. عادةً ما ترغب في تحديث مصادر الملفات لمكونات InListProc
و LPMProc
... في مثل هذه الحالات، تكون الفكرة هي أن يكون لديك برنامج نصي صغير في مكان ما في نظامك والذي سيقوم بتحديث ملفاتك المحلية. فأنت بحاجة إلى "استدعاء" إعادة التحميل لتحميل البيانات الجديدة في الذاكرة:
...
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
},
"tasks": [
{
"name": "LSing...",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": [
{"mod": 4, "signal": "reload"}
]
},
{
"name": "LSing...2",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": []
}
]
...
أعلاه نحدد مهمتين. الفرق بينهما هو أن الأول سيشير إلى المكون إذا تم تشغيله بنجاح. سيتم إرسال إشارة "reload"
إلى المكون 4
ويعود الأمر إلى المكون للتعامل معها.
يتم تعريف فهرس المكون على أنه ترتيب هذا المكون في التكوين بما في ذلك مكونات الإدخال . نظرًا لأننا في الوقت الحالي ندعم مدخلاً واحدًا فقط، فإن المكون 4
أعلاه هو الثالث في قسم proc
.
UseNumber()
ضروري للإخراج الصحيح، ومع ذلك، فإنه يكسر govaluate
لذا عند المقارنة عليك استخدام json_to_float64()
. انظر TestIf
على سبيل المثال... مرحبًا! الفكرة هي أنه يمكننا توفير إمكانية معالجة خطوط الأنابيب القابلة للتكوين JSON للمستخدم النهائي. ومع ذلك، نحن بحاجة إلى المزيد من المكونات لمختلف المهام وربما برامج الترميز!
يجب أن تكون المكونات سهلة التنفيذ للغاية. استخدم proc/log.go
كنقطة بداية (~60 LOC) لتنفيذ المكون الخاص بك.
برامج الترميز: قم بإلقاء نظرة سريعة على linecodecs.go
. يمكن للمرء بسهولة تنفيذ أجهزة تشفير/وحدات فك ترميز الخطوط الجديدة. ويمكن بعد ذلك توصيلها بوحدات الإدخال/الإخراج
لست متأكدا مع ما للمساعدة؟ قم بإلقاء نظرة على TODO.md كما هو الحال دائمًا، التعليقات والاقتراحات والوثائق وتقارير الأخطاء وما إلى ذلك هي موضع ترحيب كبير :)