이 라이브러리는 스트림 처리 분석을 위한 프레임워크입니다. 대량의 데이터를 스트림 처리해야 하는 Go 프로그램용 라이브러리로 사용하기 위한 것입니다.
소스를 1개 이상의 연산자에 연결하고 싱크에서 끝나는 그래프로 구성됩니다. 운영자는 go 채널을 사용하여 서로 데이터를 전달합니다. 객체를 snappy로 인코딩하는 그래프의 예는 다음과 같습니다.
var from *util.MemoryBuffer
// fill up from
var to *util.MemoryBuffer
ch := stream.NewOrderedChain()
ch.Add(source.NewNextReaderSource(from))
timingOp, _, dur := timing.NewTimingOp()
ch.Add(timingOp)
ch.Add(compress.NewSnappyEncodeOp())
ch.Add(sink.NewWriterSink(to))
ch.Start()
log.Printf("RES: Compress Snappy.ttRatio %v", float64(to.ByteSize())/float64(from.ByteSize()))
log.Printf("RES: Compress Snappy.ttBuffered Items: %dtItem: %vttook: %vtrate: %dtsize: %Etsize/item: %E", to.Len(), *counter, *dur, int( float64(*counter)/(*dur).Seconds()), float64(to.ByteSize()), float64(to.ByteSize())/float64(*counter))
연산자는 체인의 주요 구성 요소입니다. 결과를 생성하기 위해 튜플을 처리합니다. 소스는 출력이 없는 연산자입니다. 싱크는 입력이 없는 연산자입니다. 연산자는 stream.Operator를 구현합니다. 입력이 필요한 경우 stream.In을 구현합니다. 출력이 생성되면 stream.Out을 구현합니다.
매퍼는 연산자를 구현하는 간단한 방법을 제공합니다. mapper.NewOp()는 입력을 처리하고 이를 Outputer 객체에 출력하는 func(input stream.Object, out Outputer) 형식의 함수를 사용합니다. 매퍼는 자동으로 병렬화됩니다. 생성기는 클로저를 통해 매퍼에게 스레드 로컬 저장소를 제공하는 방법을 제공합니다. 마지막 튜플 처리를 마친 후 매퍼에게 특수 기능을 제공할 수도 있습니다.
체인의 데이터를 다른 체인으로 분할할 수도 있습니다. stream.Fanout은 입력을 받아 N개의 다른 체인에 복사합니다. 분배자는 입력을 받아 매핑 기능에 따라 N개 체인 중 1개에 넣습니다.
체인은 주문하거나 주문하지 않을 수 있습니다. 순서가 지정된 체인은 입력에서 출력까지 튜플의 순서를 유지합니다(연산자는 여전히 병렬성을 사용하지만).
컴파일: 빌드하기
테스트: 테스트하러 가기