Go로 작성된 처리 파이프라인(logstash와 유사)
종속성을 복제하고 설치합니다.
mkdir -p $GOPATH/src/github.com/urban-1
cd $GOPATH/src/github.com/urban-1
git clone [email protected]:urban-1/gopipe.git
dep ensure
이 프로젝트를 빠르게 빌드하고 작업을 시작하는 데 사용할 수 있는 Makefile이 있습니다.
$ make help
Available targets:
rdkafka Build librdkafka locally
gopipe Build gopipe
tests Run tests (includes fmt and vet)
show_coverage Show coverage results
우리의 목표는 이벤트가 모듈 방식으로 수신되고 처리되는 파이프라인을 정의하는 것입니다. 이벤트에는 다음이 포함될 수 있습니다.
Data["raw"]
)Data["message"]
)map[string]interface{}
형식의 Data
)우리는 모듈을 "구성 요소"라고 부르며 이는 세 가지 범주로 나뉩니다.
본질적으로 모든 구성 요소는 "동일"합니다(동일한 인터페이스 구현). 유일한 차이점은 어떤 채널을 사용할 수 있는지입니다.
글쎄요... 저는 netflow 패킷을 처리하기 위해 C++에서 비슷한 작업을 수행했으며 Go가 (정말로) 빠르고 동시성이 그러한 애플리케이션에 완벽하게 일치한다고 생각했습니다.
이미 있는 것을 사용하면 어떨까요? 기존 프레임워크를 확장할 수 있지만 이는 C++ 코드를 복제하는 Go 학습 연습입니다.
그게 어떻게 다른가요? 우리는 시스템 관점에 초점을 맞추고 이 프레임워크가 log
지향보다는 네트워크/데이터 지향이 되기를 원합니다.
flowreplicator.json
구성을 참조하세요.향후 계획은 무엇입니까? C++ 코드를 완전히 이식할 때까지 이를 유지하고 확장할 계획입니다. 유지 관리는 계속되지만 커뮤니티의 도움으로 필요에 따라 확장할 수 있기를 바랍니다.
UDP 패킷을 복제하고 선택적으로 샘플링합니다.
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "UDPRawInput",
"listen": "0.0.0.0",
"port": 9090
},
"proc": [
{
"module": "SamplerProc",
"every": 2
},
{
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9091
}
],
"out": {
"module": "UDPRawOutput",
"target": "127.0.0.1",
"port": 9092
}
}
TCP를 통해 라인을 수신하고, 이를 데이터 필드로 구문 분석하고, 타임스탬프를 추가하고, 일부 데이터를 다른 데이터 유형으로 변환하고, 원본 메시지를 삭제하고, 일부 필드를 해시하는 등의 작업을 수행합니다.
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 100000
},
"in": {
"module": "TCPStrInput",
"listen": "0.0.0.0",
"port": 9092,
"headers": ["hello", "test", "src"],
"separator": ",",
"convert": false
},
"proc": [
{
"module": "RegexProc",
"regexes": [
"(?mi)(?P<host>[0-9a-z]+) (?P<port>[0-9]+): (?P<hostEvent>.*)"
]
},
{
"module": "DropFieldProc",
"field_name": "message"
},
{
"module": "CastProc",
"fields": ["port"],
"types": ["int"]
},
{
"module": "InListProc",
"in_field": "port",
"out_field": "port_block",
"reload_minutes": 100000000,
"list": ["8080", "443", "23230", "14572", "17018"]
},
{"module": "if", "condition": "port_block == true "},
{
"module": "Md5Proc",
"in_fields": ["host"],
"out_fields": ["host_hash"],
"salt": "andPepper!"
},
{"module": "else"},
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{"module": "endif"},
{
"module": "LogProc",
"level": "info"
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
JSON 라인을 수신하는 TCP 소켓에서 수신:
{
"main": {
"num_cpus": 2,
"log_level": 1,
"channel_size": 50000,
"stats_every": 10000000
},
"in": {
"module": "TCPJSONInput",
"listen": "0.0.0.0",
"port": 9092
},
"proc": [
{
"module": "AddTimeProc",
"field_name": "_timestamp"
},
{
"module": "LPMProc",
"filepath": "/tmp/prefix-asn.txt",
"reload_minutes": 1440,
"in_fields": ["src", "dst"],
"out_fields": [
{"newkey": "_{{in_field}}_prefix", "metakey": "prefix"},
{"newkey": "_{{in_field}}_asn", "metakey": "asn"}
]
}
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
}
}
다음 구성 부분은 10초마다 실행되는 작업을 정의합니다. 일반적으로 InListProc
및 LPMProc
구성 요소에 대한 파일 소스를 업데이트하려고 합니다. 이러한 경우 시스템 어딘가에 로컬 파일을 업데이트할 작은 쉘 스크립트가 있다는 아이디어가 있습니다. 그런 다음 메모리에 새 데이터를 로드하려면 다시 로드를 "호출"해야 합니다.
...
],
"out": {
"module": "FileJSONOutput",
"rotate_seconds": 60,
"folder": "/tmp",
"file_name_format": "gopipe-20060102-150405.json"
},
"tasks": [
{
"name": "LSing...",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": [
{"mod": 4, "signal": "reload"}
]
},
{
"name": "LSing...2",
"command": ["ls", "-al"],
"interval_seconds": 10,
"signals": []
}
]
...
위에서 우리는 두 가지 작업을 정의합니다. 차이점은 첫 번째 구성 요소가 성공적으로 실행되면 구성 요소에 신호를 보낸다는 것입니다. "reload"
신호는 구성요소 4
로 전송되며 이를 처리하는 것은 구성요소에 달려 있습니다.
구성요소 색인은 입력 구성요소를 포함하는 구성에서 이 구성요소의 순서로 정의됩니다. 현재 우리가 하나의 입력만 지원한다는 점을 고려하면 위의 구성 요소 4
proc
섹션에서 세 번째입니다.
UseNumber()
사용한 디코딩이 필요하지만 govaluate
중단되므로 비교할 때 json_to_float64()
사용해야 합니다. 예를 들어 TestIf
참조하세요... 안녕하세요! 최종 사용자에게 JSON으로 구성 가능한 파이프라인 처리 기능을 제공할 수 있다는 아이디어입니다. 그러나 다양한 작업을 위해서는 더 많은 구성 요소와 코덱이 필요합니다!
구성요소는 구현하기가 매우 쉬워야 합니다. proc/log.go
시작점(~60 LOC)으로 사용하여 구성 요소를 구현하세요.
코덱: linecodecs.go
를 빠르게 살펴보세요. 새로운 라인 인코더/디코더를 쉽게 구현할 수 있습니다. 그런 다음 입력/출력 모듈에 연결할 수 있습니다.
무엇을 도와야 할지 모르시나요? TODO.md를 살펴보십시오. 언제나 그렇듯이 의견, 제안, 문서, 버그 보고서 등을 환영합니다 :)