去直播
1.0.0
该库是流处理分析的框架。它旨在用作需要对大量数据进行流处理的 go 程序的库。
它由一个图组成,该图将源连接到 1 个或多个运算符,并终止于接收器。操作员通过 go 通道将数据从一个通道传递到另一个通道。将对象编码为 snappy 的示例图是:
var from *util.MemoryBuffer
// fill up from
var to *util.MemoryBuffer
ch := stream.NewOrderedChain()
ch.Add(source.NewNextReaderSource(from))
timingOp, _, dur := timing.NewTimingOp()
ch.Add(timingOp)
ch.Add(compress.NewSnappyEncodeOp())
ch.Add(sink.NewWriterSink(to))
ch.Start()
log.Printf("RES: Compress Snappy.ttRatio %v", float64(to.ByteSize())/float64(from.ByteSize()))
log.Printf("RES: Compress Snappy.ttBuffered Items: %dtItem: %vttook: %vtrate: %dtsize: %Etsize/item: %E", to.Len(), *counter, *dur, int( float64(*counter)/(*dur).Seconds()), float64(to.ByteSize()), float64(to.ByteSize())/float64(*counter))
运营商是链条的主要组成部分。他们处理元组以产生结果。源是没有输出的运算符。接收器是没有输入的运算符。运算符实现stream.Operator。如果需要输入则实现stream.In;如果它产生输出则实现stream.Out。
映射器提供了一种实现运算符的简单方法。 mapper.NewOp() 采用 func(input stream.Object, out Outputer) 形式的函数,该函数处理输入并将其输出到 Outputer 对象。映射器会自动并行化。生成器提供了一种通过闭包为映射器提供线程本地存储的方法。您还可以在映射器完成最后一个元组的处理后为其提供特殊功能。
您还可以将一条链的数据拆分到其他链中。 Stream.Fanout 获取输入并将其复制到其他 N 个链。分配器获取输入并根据映射函数将其放入 N 个链中的 1 个上。
链可以是有序的,也可以是无序的。有序链保留了从输入到输出的元组顺序(尽管运算符仍然使用并行性)。
编译:去构建
测试:去测试