chrononデータ計算の複雑さを抽象化し、AI/ML アプリケーションに提供するプラットフォームです。ユーザーは生データの変換として機能を定義すると、 chrononバッチおよびストリーミング計算、スケーラブルなバックフィル、低レイテンシーの提供、正確性と一貫性の保証、および多数の可観測性および監視ツールを実行できます。
これにより、通常必要となる複雑なオーケストレーションを気にすることなく、バッチ テーブル、イベント ストリーム、AI/ML プロジェクトを強化するサービスなど、組織内のすべてのデータを利用できるようになります。
chrononの詳細については、 chronon .ai を参照してください。
chronon 、機能の最新の値を返すリアルタイム取得用の API を提供します。以下をサポートします。
ML 実践者は、多くの場合、モデルのトレーニングと評価のために特徴値の履歴ビューを必要とします。 chrononのバックフィルは次のとおりです。
chronon以下の可視性を提供します。
chrononさまざまな集計タイプをサポートしています。完全なリストについては、こちらのドキュメントを参照してください。
これらの集計はすべて、任意のウィンドウ サイズで計算されるように構成できます。
このセクションでは、作成された基礎となる生のデータセットを使用して、 chrononでトレーニング データセットを作成する手順を説明します。
含まれるもの:
GroupBy
およびJoin
の実装例。以下は含まれません:
chrononを使い始めるには、 docker-compose.yml ファイルをダウンロードしてローカルで実行するだけです。
curl -o docker-compose.yml https://chronon.ai/docker-compose.yml
docker-compose up
only showing top 20 rows
通知とともに印刷されたデータが表示されたら、チュートリアルに進む準備は完了です。
この例では、大規模なオンライン小売業者であり、ユーザーが購入し、その後商品を返品することに基づいて詐欺ベクトルを検出したと仮定します。チェックアウトフローの開始時に呼び出され、このトランザクションが不正な返品につながる可能性があるかどうかを予測するモデルをトレーニングしたいと考えています。
加工された生データはデータディレクトリに含まれます。これには 4 つのテーブルが含まれています。
新しいターミナル ウィンドウで、次を実行します。
docker-compose exec main bash
これにより、 chronon Docker コンテナー内でシェルが開きます。
セットアップ手順が完了したので、さまざまなchrononオブジェクトの作成とテストを開始して、変換と集計を定義し、データを生成できます。
生の入力ソースに基づいて構築された 3 つの機能セットから始めましょう。
注: これらの Python 定義は既にchronon
イメージに含まれています。これらの定義の計算を実行する場合は、ステップ 3 - データのバックフィルまで実行する必要はありません。
機能セット 1: データ機能の購入
購入ログ データをユーザー レベルに集約して、プラットフォーム上でのこのユーザーの以前のアクティビティを把握することができます。具体的には、さまざまなウィンドウにわたって以前の購入金額のSUM
s COUNT
とAVERAGE
計算できます。
この機能はテーブルとトピックの両方を含むソースに基づいて構築されているため、その機能はバッチとストリーミングの両方で計算できます。
source = Source (
events = EventSource (
table = "data.purchases" , # This points to the log table with historical purchase events
topic = None , # Streaming is not currently part of quickstart, but this would be where you define the topic for realtime events
query = Query (
selects = select ( "user_id" , "purchase_price" ), # Select the fields we care about
time_column = "ts" ) # The event time
))
window_sizes = [ Window ( length = day , timeUnit = TimeUnit . DAYS ) for day in [ 3 , 14 , 30 ]] # Define some window sizes to use below
v1 = GroupBy (
sources = [ source ],
keys = [ "user_id" ], # We are aggregating by user
aggregations = [ Aggregation (
input_column = "purchase_price" ,
operation = Operation . SUM ,
windows = window_sizes
), # The sum of purchases prices in various windows
Aggregation (
input_column = "purchase_price" ,
operation = Operation . COUNT ,
windows = window_sizes
), # The count of purchases in various windows
Aggregation (
input_column = "purchase_price" ,
operation = Operation . AVERAGE ,
windows = window_sizes
) # The average purchases by user in various windows
],
)
ここでコード ファイル全体を参照してください: Purchase GroupBy。これは Docker イメージにも含まれています。ステップ 3 - データのバックフィルで、これと他の GroupBy の計算を実行します。
機能セット 2: データ機能を返します
戻り値 GroupBy 内の戻り値データに対して同様の一連の集計を実行します。コードは上の例と似ているため、ここには含まれていません。
機能セット 3: ユーザー データ機能
ユーザー データを特徴に変換することは、主に含める集計がないため、少し簡単になります。この場合、ソース データの主キーはフィーチャの主キーと同じであるため、行に対して集計を実行するのではなく、単に列の値を抽出します。
source = Source (
entities = EntitySource (
snapshotTable = "data.users" , # This points to a table that contains daily snapshots of the entire product catalog
query = Query (
selects = select ( "user_id" , "account_created_ds" , "email_verified" ), # Select the fields we care about
)
))
v1 = GroupBy (
sources = [ source ],
keys = [ "user_id" ], # Primary key is the same as the primary key for the source table
aggregations = None # In this case, there are no aggregations or windows to define
)
ユーザーの GroupBy から取得されます。
次に、モデルのトレーニングのために、以前に定義した特徴を 1 つのテーブルにバックフィルする必要があります。これは、 Join
API を使用して実現できます。
私たちのユースケースでは、特徴が正しいタイムスタンプで計算されることが非常に重要です。モデルはチェックアウト フローの開始時に実行されるため、モデル トレーニングの特徴値がオンライン推論でモデルに表示される値と論理的に一致するように、バックフィルで対応するタイムスタンプを必ず使用する必要があります。
Join
トレーニング データの機能バックフィルを駆動する API です。主に次の機能を実行します。
Join
という名前が付けられています)。結合は次のようになります。
source = Source (
events = EventSource (
table = "data.checkouts" ,
query = Query (
selects = select ( "user_id" ), # The primary key used to join various GroupBys together
time_column = "ts" ,
) # The event time used to compute feature values as-of
))
v1 = Join (
left = source ,
right_parts = [ JoinPart ( group_by = group_by ) for group_by in [ purchases_v1 , refunds_v1 , users ]] # Include the three GroupBys
)
training_set Join から取得。
結合のleft
は、バックフィルのタイムスタンプと主キーを定義するものです (ユースケースで規定されているように、 checkout
イベントの上に構築されることに注意してください)。
このJoin
、上記の 3 つのGroupBy
を 1 つのデータ定義に結合することに注意してください。次のステップでは、このパイプライン全体の計算を実行するコマンドを実行します。
結合が定義されたら、次のコマンドを使用してコンパイルします。
compile.py --conf=joins/quickstart/training_set.py
これにより、次のコマンドを使用して Spark に送信できる節約定義に変換されます。
run.py --conf production/joins/quickstart/training_set.v1
バックフィルの出力には、左側のソースの user_id 列と ts 列、および作成した 3 つの GroupBy の 11 個の機能列が含まれます。
特徴値は、時間精度が保証された状態で、左側の user_id と ts ごとに計算されます。したがって、たとえば、左側の行の 1 つがuser_id = 123
およびts = 2023-10-01 10:11:23.195
ものである場合、 purchase_price_avg_30d
特徴は、で終わる正確な 30 日のウィンドウでそのユーザーに対して計算されます。そのタイムスタンプ。
これで、Spark SQL シェルを使用してバックフィルされたデータをクエリできるようになりました。
spark-sql
その後:
spark - sql > SELECT user_id, quickstart_returns_v1_refund_amt_sum_30d, quickstart_purchases_v1_purchase_price_sum_14d, quickstart_users_v1_email_verified from default . quickstart_training_set_v1 limit 100 ;
これはいくつかの列のみを選択することに注意してください。 select * from default.quickstart_training_set_v1 limit 100
実行してすべての列を表示することもできます。ただし、テーブルの幅が非常に広いため、結果が画面上で読みにくい場合があることに注意してください。
SQL シェルを終了するには、次のコマンドを実行します。
spark-sql > quit ;
結合を作成してデータをバックフィルしたので、次のステップはモデルをトレーニングすることです。これはこのチュートリアルの一部ではありませんが、これが完了したと仮定すると、その後の次のステップはモデルをオンラインで実稼働化することになります。これを行うには、モデル推論用の特徴ベクトルをフェッチできる必要があります。それについては、次のセクションで説明します。
オンライン フローを提供するには、まずデータをオンライン KV ストアにアップロードする必要があります。これは、前のステップで実行したバックフィルとは次の 2 つの点で異なります。
購入したものを GroupBy でアップロードします。
run.py --mode upload --conf production/group_bys/quickstart/purchases.v1 --ds 2023-12-01
spark-submit --class ai. chronon .quickstart.online.Spark2MongoLoader --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_purchases_v1_upload mongodb://admin:admin@mongodb:27017/ ? authSource=admin
戻り値の GroupBy をアップロードします。
run.py --mode upload --conf production/group_bys/quickstart/returns.v1 --ds 2023-12-01
spark-submit --class ai. chronon .quickstart.online.Spark2MongoLoader --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_returns_v1_upload mongodb://admin:admin@mongodb:27017/ ? authSource=admin
FetchGroupby
ではなくFetchJoin
API を使用する場合は、結合メタデータもアップロードする必要があります。
run.py --mode metadata-upload --conf production/joins/quickstart/training_set.v2
これにより、オンライン フェッチャーは、結合バックフィルがすべての機能を備えたワイド ビュー テーブルを生成する方法と同様に、この結合のリクエストを取得して個々の GroupBy リクエストに分割し、統合ベクトルを返す方法を認識できるようになります。
上記のエンティティを定義すると、単純な API 呼び出しで特徴ベクトルを簡単にフェッチできるようになります。
結合を取得する:
run.py --mode fetch --type join --name quickstart/training_set.v2 -k ' {"user_id":"5"} '
単一の GroupBy をフェッチすることもできます (これには、前に実行した結合メタデータのアップロード手順は必要ありません)。
run.py --mode fetch --type group-by --name quickstart/purchases.v1 -k ' {"user_id":"5"} '
運用環境では、Java クライアントは通常、サービスに直接埋め込まれます。
Map < String , String > keyMap = new HashMap <>();
keyMap . put ( "user_id" , "123" );
Fetcher . fetch_join ( new Request ( "quickstart/training_set_v1" , keyMap ))
サンプル応答
> '{"purchase_price_avg_3d":14.3241, "purchase_price_avg_14d":11.89352, ...}'
注: この Java コードは docker env では実行できません。これは単なる説明用の例です。
この README の導入セクションで説明したように、 chrononの中核となる保証の 1 つはオンライン/オフラインの一貫性です。これは、モデルのトレーニング (オフライン) に使用するデータが、モデルが実稼働推論 (オンライン) で参照するデータと一致することを意味します。
この重要な要素は時間的な正確さです。これは次のように表現できます:フィーチャをバックフィルするとき、結合の左側によって提供される特定のtimestamp
に対して生成される値は、そのフィーチャがその特定のtimestamp
でフェッチされた場合にオンラインで返される値と同じである必要があります。
chrononこの時間的精度を保証するだけでなく、それを測定する方法も提供します。
測定パイプラインは、オンラインフェッチリクエストのログから始まります。これらのログには、フェッチされた特徴値とともに、リクエストの主キーとタイムスタンプが含まれます。次に、 chrononキーとタイムスタンプを左側として結合バックフィルに渡し、コンピューティング エンジンに特徴値をバックフィルするように要求します。次に、バックフィルされた値と実際にフェッチされた値を比較して、一貫性を測定します。
ステップ 1: ログの取得
まず、いくつかのフェッチ リクエストを実行したことを確認します。走る:
run.py --mode fetch --type join --name quickstart/training_set.v2 -k '{"user_id":"5"}'
いくつかのフェッチを生成するために数回。
それが完了したら、これを実行して使用可能なログ テーブルを作成できます (これらのコマンドは、正しいスキーマを持つログ ハイブ テーブルを生成します)。
spark-submit --class ai. chronon .quickstart.online.MongoLoggingDumper --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default. chronon _log_table mongodb://admin:admin@mongodb:27017/ ? authSource=admin
compile.py --conf group_bys/quickstart/schema.py
run.py --mode backfill --conf production/group_bys/quickstart/schema.v1
run.py --mode log-flattener --conf production/joins/quickstart/training_set.v2 --log-table default. chronon _log_table --schema-table default.quickstart_schema_v1
これにより、以前に行った各フェッチ リクエストの結果と、リクエストを行ったタイムスタンプおよびリクエストしたuser
を含む、 default.quickstart_training_set_v2_logged
テーブルが作成されます。
注:上記のコマンドを実行すると、ログ パーティションが作成されて「閉じ」られます。つまり、同じ日 (UTC 時間) に追加のフェッチを行っても追加されません。戻って、オンライン/オフラインの一貫性のためにさらにリクエストを生成したい場合は、上記のコマンドを再実行する前に、テーブルを削除できます ( spark-sql
シェルでDROP TABLE default.quickstart_training_set_v2_logged
を実行します)。
これで、次のコマンドを使用して整合性メトリックを計算できるようになりました。
run.py --mode consistency-metrics-compute --conf production/joins/quickstart/training_set.v2
このジョブは、ログ テーブル (この場合は、 default.quickstart_training_set_v2_logged
) から主キーとタイムスタンプを取得し、それらを使用して結合バックフィルを作成および実行します。次に、バックフィルされた結果を、オンラインで取得された実際のログ値と比較します。
次の 2 つの出力テーブルが生成されます。
default.quickstart_training_set_v2_consistency
: 整合性チェックの結果を確認するためにクエリを実行できる、人間が判読できるテーブル。spark-sql
を実行してSQLシェルに入り、テーブルにクエリを実行できます。DESC default.quickstart_training_set_v2_consistency
を実行してから、クエリしたい列をいくつか選択することをお勧めします。default.quickstart_training_set_v2_consistency_upload
: オンライン KV ストアにアップロードされる KV バイトのリスト。オンライン データ品質監視フローを強化するために使用できます。人間が読めるようにすることを意図したものではありません。 特徴エンジニアリング作業にchronon使用すると、さまざまな方法で ML ワークフローが簡素化および改善されます。
chronon使用する利点の詳細については、 chrononのドキュメントの利点を参照してください。
chronon 、バッチ ワークフローではなくリアルタイムでリクエストに対応する「オンライン」モデルを構築しようとしている AI/ML 実践者に最大の価値を提供します。
chrononがないと、これらのプロジェクトに取り組むエンジニアは、トレーニング/評価および運用推論のためにモデルにデータを取得する方法を理解する必要があります。これらのモデルに入力されるデータの複雑さが増すにつれて (複数のソース、ウィンドウ集計などの複雑な変換など)、このデータ配管をサポートするインフラストラクチャの課題も増加します。
一般に、ML 実践者が次の 2 つのアプローチのいずれかを採用していることが観察されました。
このアプローチでは、ユーザーは、モデル推論を実行するオンライン サービス環境で利用可能なデータから開始します。関連する機能をデータ ウェアハウスに記録します。十分なデータが蓄積されたら、ログでモデルをトレーニングし、同じデータを提供します。
長所:
短所:
このアプローチでは、ユーザーはデータ ウェアハウスからのデータを使用してモデルをトレーニングし、それらの機能をオンライン環境で複製する方法を見つけます。
長所:
短所:
chrononアプローチ
chrononを使用すると、オンライン環境とオフライン環境の間で一貫性が保証された状態で、データ ウェアハウス内のすべてのデータ、ストリーミング ソース、サービス呼び出しなど、組織内で利用可能なあらゆるデータを使用できます。このデータ配管の調整と保守に伴うインフラストラクチャの複雑さが抽象化されるため、ユーザーは単純な API で機能を定義するだけで、残りの処理はchrononに任せることができます。
chrononプロジェクトへの貢献を歓迎します。詳細については「貢献」をお読みください。
バグや機能リクエストを報告するには、GitHub Issue Tracker を使用してください。コミュニティの Slack ワークスペースに参加して、ディスカッション、ヒント、サポートを入手してください。