去直播
1.0.0
該庫是流處理分析的框架。它旨在用作需要對大量資料進行流處理的 go 程式的庫。
它由一個圖組成,該圖將來源連接到 1 個或多個運算符,並終止於接收器。操作員透過 go 通道將資料從一個通道傳遞到另一個通道。將物件編碼為 snappy 的範例圖是:
var from *util.MemoryBuffer
// fill up from
var to *util.MemoryBuffer
ch := stream.NewOrderedChain()
ch.Add(source.NewNextReaderSource(from))
timingOp, _, dur := timing.NewTimingOp()
ch.Add(timingOp)
ch.Add(compress.NewSnappyEncodeOp())
ch.Add(sink.NewWriterSink(to))
ch.Start()
log.Printf("RES: Compress Snappy.ttRatio %v", float64(to.ByteSize())/float64(from.ByteSize()))
log.Printf("RES: Compress Snappy.ttBuffered Items: %dtItem: %vttook: %vtrate: %dtsize: %Etsize/item: %E", to.Len(), *counter, *dur, int( float64(*counter)/(*dur).Seconds()), float64(to.ByteSize()), float64(to.ByteSize())/float64(*counter))
運營商是鏈條的主要組成部分。他們處理元組以產生結果。來源是沒有輸出的運算子。接收器是沒有輸入的運算子。運算子實作stream.Operator。如果需要輸入則實現stream.In;如果它產生輸出則實現stream.Out。
映射器提供了一種實現運算子的簡單方法。 mapper.NewOp() 採用 func(input stream.Object, out Outputer) 形式的函數,該函數處理輸入並將其輸出到 Outputer 物件。映射器會自動並行化。生成器提供了一種透過閉包為映射器提供線程本地存儲的方法。您也可以在映射器完成最後一個元組的處理後為其提供特殊功能。
您也可以將一條鏈的資料拆分到其他鏈中。 Stream.Fanout 取得輸入並將其複製到其他 N 個鏈。分配器取得輸入並根據映射函數將其放入 N 個鏈中的 1 個上。
鏈可以是有序的,也可以是無序的。有序鏈保留了從輸入到輸出的元組順序(儘管運算子仍然使用並行性)。
編譯:去構建
測試:去測試