chronon是一個抽象化了資料計算複雜性並為 AI/ML 應用程式提供服務的平台。使用者將功能定義為原始資料的轉換,然後chronon可以執行批次和流計算、可擴展的回填、低延遲服務、保證正確性和一致性,以及一系列可觀察性和監控工具。
它允許您利用組織內的所有資料(從批次表、事件流或服務)來為您的 AI/ML 專案提供支持,而無需擔心這通常需要的所有複雜編排。
有關chronon的更多信息,請訪問chronon .ai。
chronon提供了一個用於即時取得的 API,它可以傳回您的特徵的最新值。它支援:
機器學習從業者通常需要特徵值的歷史視圖來進行模型訓練和評估。 chronon的回填是:
chronon提供以下方面的可見性:
chronon支援一系列聚合類型。有關完整列表,請參閱此處的文件。
這些聚合都可以配置為在任意視窗大小上進行計算。
本部分將引導您完成使用chronon建立訓練資料集(使用建置的底層原始資料集)的步驟。
包括:
GroupBy
和Join
。不包括:
要開始使用chronon ,您所需要做的就是下載 docker-compose.yml 檔案並在本地運行它:
curl -o docker-compose.yml https://chronon.ai/docker-compose.yml
docker-compose up
一旦您看到一些列印的資料only showing top 20 rows
通知,您就可以繼續學習本教學了。
在此範例中,假設我們是一家大型線上零售商,並且我們偵測到基於用戶購買並隨後退貨的詐騙向量。我們想要訓練一個模型,該模型將在結帳流程開始時調用,並預測該交易是否可能導致欺詐性退貨。
製作的原始資料包含在資料目錄中。它包括四個表:
在新的終端機視窗中,運行:
docker-compose exec main bash
這將在chronon docker 容器內開啟一個 shell。
現在設定步驟已完成,我們可以開始建立和測試各種chronon物件來定義轉換和聚合,並產生資料。
讓我們從建立在原始輸入來源之上的三個功能集開始。
注意:這些 python 定義已經在您的chronon
映像中。在步驟 3 - 回填資料之前,您無需執行任何內容,此時您將為這些定義執行計算。
功能集1:購買數據功能
我們可以將購買日誌資料聚合到使用者級別,以便我們了解該用戶先前在我們平台上的活動。具體來說,我們可以計算他們之前在各個視窗中購買金額的SUM
COUNT
和AVERAGE
。
由於此功能建立在包含表格和主題的來源之上,因此可以批次和串流計算其功能。
source = Source (
events = EventSource (
table = "data.purchases" , # This points to the log table with historical purchase events
topic = None , # Streaming is not currently part of quickstart, but this would be where you define the topic for realtime events
query = Query (
selects = select ( "user_id" , "purchase_price" ), # Select the fields we care about
time_column = "ts" ) # The event time
))
window_sizes = [ Window ( length = day , timeUnit = TimeUnit . DAYS ) for day in [ 3 , 14 , 30 ]] # Define some window sizes to use below
v1 = GroupBy (
sources = [ source ],
keys = [ "user_id" ], # We are aggregating by user
aggregations = [ Aggregation (
input_column = "purchase_price" ,
operation = Operation . SUM ,
windows = window_sizes
), # The sum of purchases prices in various windows
Aggregation (
input_column = "purchase_price" ,
operation = Operation . COUNT ,
windows = window_sizes
), # The count of purchases in various windows
Aggregation (
input_column = "purchase_price" ,
operation = Operation . AVERAGE ,
windows = window_sizes
) # The average purchases by user in various windows
],
)
請在此處查看整個程式碼檔案:購買 GroupBy。這也在您的 docker 映像中。我們將在步驟 3 - 回填資料中為其和其他 GroupBy 運行計算。
特徵集2:傳回資料特徵
我們對返回 GroupBy 中的返回資料執行一組類似的聚合。此處未包含該程式碼,因為它看起來與上面的範例類似。
特徵集3:使用者資料特徵
將使用者資料轉換為特徵要簡單一些,主要是因為不需要包含聚合。在這種情況下,來源資料的主鍵與特徵的主鍵相同,因此我們只是提取列值而不是對行執行聚合:
source = Source (
entities = EntitySource (
snapshotTable = "data.users" , # This points to a table that contains daily snapshots of the entire product catalog
query = Query (
selects = select ( "user_id" , "account_created_ds" , "email_verified" ), # Select the fields we care about
)
))
v1 = GroupBy (
sources = [ source ],
keys = [ "user_id" ], # Primary key is the same as the primary key for the source table
aggregations = None # In this case, there are no aggregations or windows to define
)
取自用戶 GroupBy。
接下來,我們需要將先前定義的特徵回填到單一表中以進行模型訓練。這可以使用Join
API 來實作。
對於我們的用例,從正確的時間戳開始計算特徵非常重要。由於我們的模型在結帳流程開始時運行,因此我們希望確保在回填中使用相應的時間戳,以便模型訓練的特徵值在邏輯上與模型在線上推理中看到的特徵值相符。
Join
是驅動訓練資料功能回填的 API。它主要執行以下功能:
Join
)。這是我們的連結的樣子:
source = Source (
events = EventSource (
table = "data.checkouts" ,
query = Query (
selects = select ( "user_id" ), # The primary key used to join various GroupBys together
time_column = "ts" ,
) # The event time used to compute feature values as-of
))
v1 = Join (
left = source ,
right_parts = [ JoinPart ( group_by = group_by ) for group_by in [ purchases_v1 , refunds_v1 , users ]] # Include the three GroupBys
)
取自training_set Join。
連接的left
定義了回填的時間戳記和主鍵(請注意,它是建構在checkout
事件之上的,正如我們的用例所指示的)。
請注意,此Join
將上述三個GroupBy
合併為一個資料定義。在下一步中,我們將運行命令來執行整個管道的計算。
定義連接後,我們使用以下命令對其進行編譯:
compile.py --conf=joins/quickstart/training_set.py
這會將其轉換為一個 thrift 定義,我們可以使用以下命令將其提交給 Spark:
run.py --conf production/joins/quickstart/training_set.v1
回填的輸出將包含左側來源中的 user_id 和 ts 列,以及我們建立的三個 GroupBy 中的 11 個特徵列。
將為左側的每個 user_id 和 ts 計算特徵值,並確保時間準確性。因此,例如,如果左側的一行是user_id = 123
且ts = 2023-10-01 10:11:23.195
,則將為該使用者計算purchase_price_avg_30d
特徵,並以精確的 30 天視窗結束那個時間戳。
現在您可以使用 Spark sql shell 查詢回填的資料:
spark-sql
進而:
spark - sql > SELECT user_id, quickstart_returns_v1_refund_amt_sum_30d, quickstart_purchases_v1_purchase_price_sum_14d, quickstart_users_v1_email_verified from default . quickstart_training_set_v1 limit 100 ;
請注意,這僅選擇幾列。您也可以執行select * from default.quickstart_training_set_v1 limit 100
來查看所有列,但是請注意,該表相當寬,結果在螢幕上可能不太可讀。
要退出 sql shell,您可以執行:
spark-sql > quit ;
現在我們已經創建了連接並回填數據,下一步是訓練模型。這不是本教程的一部分,但假設它已完成,下一步將是在線生產模型。為此,我們需要能夠獲取特徵向量以進行模型推理。這就是下一節的內容。
為了服務線上流量,我們首先需要將資料上傳到線上KV儲存。這與我們在上一步中運行的回填有兩個不同之處:
上傳購買分組依據:
run.py --mode upload --conf production/group_bys/quickstart/purchases.v1 --ds 2023-12-01
spark-submit --class ai. chronon .quickstart.online.Spark2MongoLoader --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_purchases_v1_upload mongodb://admin:admin@mongodb:27017/ ? authSource=admin
上傳退貨分組依據:
run.py --mode upload --conf production/group_bys/quickstart/returns.v1 --ds 2023-12-01
spark-submit --class ai. chronon .quickstart.online.Spark2MongoLoader --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_returns_v1_upload mongodb://admin:admin@mongodb:27017/ ? authSource=admin
如果我們想要使用FetchJoin
api 而不是FetchGroupby
,那麼我們還需要上傳連結元資料:
run.py --mode metadata-upload --conf production/joins/quickstart/training_set.v2
這使得線上獲取器知道如何獲取此連接的請求並將其分解為單獨的 GroupBy 請求,返回統一向量,類似於連接回填如何產生具有所有功能的寬視圖表。
定義上述實體後,現在可以透過簡單的 API 呼叫輕鬆取得特徵向量。
取得連線:
run.py --mode fetch --type join --name quickstart/training_set.v2 -k ' {"user_id":"5"} '
您也可以獲得單一 GroupBy(這不需要先前執行的加入元資料上傳步驟):
run.py --mode fetch --type group-by --name quickstart/purchases.v1 -k ' {"user_id":"5"} '
對於生產,Java 用戶端通常直接嵌入到服務中。
Map < String , String > keyMap = new HashMap <>();
keyMap . put ( "user_id" , "123" );
Fetcher . fetch_join ( new Request ( "quickstart/training_set_v1" , keyMap ))
回應樣本
> '{"purchase_price_avg_3d":14.3241, "purchase_price_avg_14d":11.89352, ...}'
注意:此java程式碼無法在docker環境中運行,它只是一個說明性範例。
如本自述文件的介紹部分所討論的, chronon的核心保證之一是線上/線下一致性。這意味著您用於訓練模型(離線)的資料與模型在生產推理(線上)中看到的資料相符。
其中一個關鍵要素是時間準確度。這可以表述為:回填特徵時,為連接左側提供的任何給定timestamp
生成的值應該與在該特定timestamp
獲取該特徵時在線返回的值相同。
chronon不僅保證了這種時間準確性,而且還提供了一種測量它的方法。
測量管道從線上取得請求的日誌開始。這些日誌包括請求的主鍵和時間戳,以及所取得的特徵值。然後chronon將鍵和時間戳記傳遞到左側的 Join 回填,要求計算引擎回填特徵值。然後,它將回填的值與實際取得的值進行比較,以衡量一致性。
步驟一:日誌取得
首先,請確保您已經執行了一些獲取請求。跑步:
run.py --mode fetch --type join --name quickstart/training_set.v2 -k '{"user_id":"5"}'
幾次生成一些提取。
完成後,您可以運行它來建立可用的日誌表(這些命令產生具有正確架構的日誌記錄配置單元表):
spark-submit --class ai. chronon .quickstart.online.MongoLoggingDumper --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default. chronon _log_table mongodb://admin:admin@mongodb:27017/ ? authSource=admin
compile.py --conf group_bys/quickstart/schema.py
run.py --mode backfill --conf production/group_bys/quickstart/schema.v1
run.py --mode log-flattener --conf production/joins/quickstart/training_set.v2 --log-table default. chronon _log_table --schema-table default.quickstart_schema_v1
這將建立一個default.quickstart_training_set_v2_logged
表,其中包含您之前發出的每個提取請求的結果,以及您發出請求的時間戳記和您請求的user
。
注意:在執行上述命令後,它將建立並「關閉」日誌分區,這表示如果您在同一天(UTC 時間)進行其他提取,它將不會追加。如果您想要返回並產生更多線上/離線一致性請求,可以在重新執行上述命令之前刪除該表(在spark-sql
shell 中執行DROP TABLE default.quickstart_training_set_v2_logged
)。
現在您可以使用以下命令計算一致性指標:
run.py --mode consistency-metrics-compute --conf production/joins/quickstart/training_set.v2
作業將從日誌表中取得主鍵和時間戳記(在本例中為default.quickstart_training_set_v2_logged
),並使用它們來建立和執行連接回填。然後,它將回填結果與線上取得的實際記錄值進行比較
它產生兩個輸出表:
default.quickstart_training_set_v2_consistency
:一個人類可讀的表,您可以查詢該表以查看一致性檢查的結果。spark-sql
來進入 sql shell,然後查詢表。DESC default.quickstart_training_set_v2_consistency
,然後選擇您關心查詢的一些欄位。default.quickstart_training_set_v2_consistency_upload
:上傳到線上 KV 儲存的 KV 位元組列表,可用於支援線上資料品質監控流。不意味著人類可讀。 使用chronon進行特徵工程工作可以透過多種方式簡化和改進您的 ML 工作流程:
有關使用chronon的好處的更詳細信息,請參閱chronon文件的好處。
chronon為試圖建立即時服務請求(而不是批次工作流程)的「線上」模型的 AI/ML 從業者提供了最大的價值。
如果沒有chronon ,從事這些專案的工程師需要弄清楚如何將資料獲取到模型中以進行訓練/評估以及生產推理。隨著進入這些模型的資料的複雜性增加(多個來源、複雜的轉換,例如視窗聚合等),支援這種資料管道的基礎設施挑戰也隨之增加。
一般來說,我們觀察到 ML 從業者採用以下兩種方法之一:
透過這種方法,使用者可以從運行模型推理的線上服務環境中可用的資料開始。將相關特徵記錄到資料倉儲。一旦累積了足夠的數據,就在日誌上訓練模型,並使用相同的數據提供服務。
優點:
缺點:
透過這種方法,使用者使用資料倉儲中的資料訓練模型,然後找到在線上環境中複製這些功能的方法。
優點:
缺點:
chronon法
借助chronon您可以使用組織中可用的任何數據,包括資料倉儲中的所有資料、任何串流媒體來源、服務呼叫等,並保證在線和離線環境之間的一致性。它抽象化了編排和維護此資料管道的基礎設施複雜性,以便使用者可以在簡單的 API 中簡單地定義功能,並信任chronon來處理其餘的事情。
我們歡迎對chronon專案做出貢獻!請閱讀貢獻以了解詳細資訊。
使用 GitHub 問題追蹤器報告錯誤或功能請求。加入我們的社群 Slack 工作區以獲取討論、提示和支持。