chronon是一个抽象了数据计算复杂性并为 AI/ML 应用程序提供服务的平台。用户将功能定义为原始数据的转换,然后chronon可以执行批处理和流计算、可扩展的回填、低延迟服务、保证正确性和一致性,以及一系列可观察性和监控工具。
它允许您利用组织内的所有数据(从批处理表、事件流或服务)来为您的 AI/ML 项目提供支持,而无需担心这通常需要的所有复杂编排。
有关chronon的更多信息,请访问chronon .ai。
chronon提供了一个用于实时获取的 API,它可以返回您的特征的最新值。它支持:
机器学习从业者通常需要特征值的历史视图来进行模型训练和评估。 chronon的回填是:
chronon提供以下方面的可见性:
chronon支持一系列聚合类型。有关完整列表,请参阅此处的文档。
这些聚合都可以配置为在任意窗口大小上进行计算。
本部分将引导您完成使用chronon创建训练数据集(使用构建的底层原始数据集)的步骤。
包括:
GroupBy
和Join
。不包括:
要开始使用chronon ,您所需要做的就是下载 docker-compose.yml 文件并在本地运行它:
curl -o docker-compose.yml https://chronon.ai/docker-compose.yml
docker-compose up
一旦您看到一些打印的数据only showing top 20 rows
通知,您就可以继续学习本教程了。
在此示例中,假设我们是一家大型在线零售商,并且我们检测到基于用户购买并随后退货的欺诈向量。我们想要训练一个模型,该模型将在结账流程开始时调用,并预测该交易是否可能导致欺诈性退货。
制作的原始数据包含在数据目录中。它包括四个表:
在新的终端窗口中,运行:
docker-compose exec main bash
这将在chronon docker 容器内打开一个 shell。
现在设置步骤已完成,我们可以开始创建和测试各种chronon对象来定义转换和聚合,并生成数据。
让我们从构建在原始输入源之上的三个功能集开始。
注意:这些 python 定义已经在您的chronon
镜像中。在步骤 3 - 回填数据之前,您无需运行任何内容,此时您将为这些定义运行计算。
功能集1:购买数据功能
我们可以将购买日志数据聚合到用户级别,以便我们了解该用户之前在我们平台上的活动。具体来说,我们可以计算他们之前在各个窗口中购买金额的SUM
COUNT
和AVERAGE
。
由于此功能建立在包含表和主题的源之上,因此可以批量和流式计算其功能。
source = Source (
events = EventSource (
table = "data.purchases" , # This points to the log table with historical purchase events
topic = None , # Streaming is not currently part of quickstart, but this would be where you define the topic for realtime events
query = Query (
selects = select ( "user_id" , "purchase_price" ), # Select the fields we care about
time_column = "ts" ) # The event time
))
window_sizes = [ Window ( length = day , timeUnit = TimeUnit . DAYS ) for day in [ 3 , 14 , 30 ]] # Define some window sizes to use below
v1 = GroupBy (
sources = [ source ],
keys = [ "user_id" ], # We are aggregating by user
aggregations = [ Aggregation (
input_column = "purchase_price" ,
operation = Operation . SUM ,
windows = window_sizes
), # The sum of purchases prices in various windows
Aggregation (
input_column = "purchase_price" ,
operation = Operation . COUNT ,
windows = window_sizes
), # The count of purchases in various windows
Aggregation (
input_column = "purchase_price" ,
operation = Operation . AVERAGE ,
windows = window_sizes
) # The average purchases by user in various windows
],
)
请在此处查看整个代码文件:购买 GroupBy。这也在您的 docker 镜像中。我们将在步骤 3 - 回填数据中为其和其他 GroupBy 运行计算。
特征集2:返回数据特征
我们对返回 GroupBy 中的返回数据执行一组类似的聚合。此处未包含该代码,因为它看起来与上面的示例类似。
特征集3:用户数据特征
将用户数据转换为特征要简单一些,主要是因为不需要包含聚合。在这种情况下,源数据的主键与特征的主键相同,因此我们只是提取列值而不是对行执行聚合:
source = Source (
entities = EntitySource (
snapshotTable = "data.users" , # This points to a table that contains daily snapshots of the entire product catalog
query = Query (
selects = select ( "user_id" , "account_created_ds" , "email_verified" ), # Select the fields we care about
)
))
v1 = GroupBy (
sources = [ source ],
keys = [ "user_id" ], # Primary key is the same as the primary key for the source table
aggregations = None # In this case, there are no aggregations or windows to define
)
取自用户 GroupBy。
接下来,我们需要将之前定义的特征回填到单个表中以进行模型训练。这可以使用Join
API 来实现。
对于我们的用例,从正确的时间戳开始计算特征非常重要。由于我们的模型在结账流程开始时运行,因此我们希望确保在回填中使用相应的时间戳,以便模型训练的特征值在逻辑上与模型在在线推理中看到的特征值相匹配。
Join
是驱动训练数据功能回填的 API。它主要执行以下功能:
Join
)。这是我们的连接的样子:
source = Source (
events = EventSource (
table = "data.checkouts" ,
query = Query (
selects = select ( "user_id" ), # The primary key used to join various GroupBys together
time_column = "ts" ,
) # The event time used to compute feature values as-of
))
v1 = Join (
left = source ,
right_parts = [ JoinPart ( group_by = group_by ) for group_by in [ purchases_v1 , refunds_v1 , users ]] # Include the three GroupBys
)
取自training_set Join。
连接的left
定义了回填的时间戳和主键(请注意,它是构建在checkout
事件之上的,正如我们的用例所指示的)。
请注意,此Join
将上述三个GroupBy
合并为一个数据定义。在下一步中,我们将运行命令来执行整个管道的计算。
定义连接后,我们使用以下命令对其进行编译:
compile.py --conf=joins/quickstart/training_set.py
这会将其转换为一个 thrift 定义,我们可以使用以下命令将其提交给 Spark:
run.py --conf production/joins/quickstart/training_set.v1
回填的输出将包含左侧源中的 user_id 和 ts 列,以及我们创建的三个 GroupBy 中的 11 个特征列。
将为左侧的每个 user_id 和 ts 计算特征值,并保证时间准确性。因此,例如,如果左侧的一行是user_id = 123
且ts = 2023-10-01 10:11:23.195
,则将为该用户计算purchase_price_avg_30d
特征,并以精确的 30 天窗口结束那个时间戳。
您现在可以使用 Spark sql shell 查询回填的数据:
spark-sql
进而:
spark - sql > SELECT user_id, quickstart_returns_v1_refund_amt_sum_30d, quickstart_purchases_v1_purchase_price_sum_14d, quickstart_users_v1_email_verified from default . quickstart_training_set_v1 limit 100 ;
请注意,这仅选择几列。您还可以运行select * from default.quickstart_training_set_v1 limit 100
来查看所有列,但是请注意,该表相当宽,结果在屏幕上可能不太可读。
要退出 sql shell,您可以运行:
spark-sql > quit ;
现在我们已经创建了连接并回填数据,下一步是训练模型。这不是本教程的一部分,但假设它已完成,下一步将是在线生产模型。为此,我们需要能够获取特征向量以进行模型推理。这就是下一节的内容。
为了服务在线流量,我们首先需要将数据上传到在线KV存储。这与我们在上一步中运行的回填有两个不同之处:
上传购买分组依据:
run.py --mode upload --conf production/group_bys/quickstart/purchases.v1 --ds 2023-12-01
spark-submit --class ai. chronon .quickstart.online.Spark2MongoLoader --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_purchases_v1_upload mongodb://admin:admin@mongodb:27017/ ? authSource=admin
上传退货分组依据:
run.py --mode upload --conf production/group_bys/quickstart/returns.v1 --ds 2023-12-01
spark-submit --class ai. chronon .quickstart.online.Spark2MongoLoader --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_returns_v1_upload mongodb://admin:admin@mongodb:27017/ ? authSource=admin
如果我们想使用FetchJoin
api 而不是FetchGroupby
,那么我们还需要上传连接元数据:
run.py --mode metadata-upload --conf production/joins/quickstart/training_set.v2
这使得在线获取器知道如何获取此连接的请求并将其分解为单独的 GroupBy 请求,返回统一向量,类似于连接回填如何生成具有所有功能的宽视图表。
定义上述实体后,您现在可以通过简单的 API 调用轻松获取特征向量。
获取连接:
run.py --mode fetch --type join --name quickstart/training_set.v2 -k ' {"user_id":"5"} '
您还可以获取单个 GroupBy(这不需要之前执行的加入元数据上传步骤):
run.py --mode fetch --type group-by --name quickstart/purchases.v1 -k ' {"user_id":"5"} '
对于生产,Java 客户端通常直接嵌入到服务中。
Map < String , String > keyMap = new HashMap <>();
keyMap . put ( "user_id" , "123" );
Fetcher . fetch_join ( new Request ( "quickstart/training_set_v1" , keyMap ))
响应样本
> '{"purchase_price_avg_3d":14.3241, "purchase_price_avg_14d":11.89352, ...}'
注意:此java代码无法在docker环境中运行,它只是一个说明性示例。
正如本自述文件的介绍部分中所讨论的, chronon的核心保证之一是在线/离线一致性。这意味着您用于训练模型(离线)的数据与模型在生产推理(在线)中看到的数据相匹配。
其中一个关键要素是时间准确性。这可以表述为:回填特征时,为连接左侧提供的任何给定timestamp
生成的值应该与在该特定timestamp
获取该特征时在线返回的值相同。
chronon不仅保证了这种时间准确性,而且还提供了一种测量它的方法。
测量管道从在线获取请求的日志开始。这些日志包括请求的主键和时间戳,以及获取的特征值。然后chronon将键和时间戳传递到左侧的 Join 回填,要求计算引擎回填特征值。然后,它将回填的值与实际获取的值进行比较,以衡量一致性。
步骤一:日志获取
首先,确保您已经运行了一些获取请求。跑步:
run.py --mode fetch --type join --name quickstart/training_set.v2 -k '{"user_id":"5"}'
几次生成一些提取。
完成后,您可以运行它来创建可用的日志表(这些命令生成具有正确架构的日志记录配置单元表):
spark-submit --class ai. chronon .quickstart.online.MongoLoggingDumper --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default. chronon _log_table mongodb://admin:admin@mongodb:27017/ ? authSource=admin
compile.py --conf group_bys/quickstart/schema.py
run.py --mode backfill --conf production/group_bys/quickstart/schema.v1
run.py --mode log-flattener --conf production/joins/quickstart/training_set.v2 --log-table default. chronon _log_table --schema-table default.quickstart_schema_v1
这将创建一个default.quickstart_training_set_v2_logged
表,其中包含您之前发出的每个提取请求的结果,以及您发出请求的时间戳和您请求的user
。
注意:运行上述命令后,它将创建并“关闭”日志分区,这意味着如果您在同一天(UTC 时间)进行其他提取,它将不会追加。如果您想返回并生成更多在线/离线一致性请求,可以在重新运行上述命令之前删除该表(在spark-sql
shell 中运行DROP TABLE default.quickstart_training_set_v2_logged
)。
现在您可以使用以下命令计算一致性指标:
run.py --mode consistency-metrics-compute --conf production/joins/quickstart/training_set.v2
该作业将从日志表中获取主键和时间戳(在本例中为default.quickstart_training_set_v2_logged
),并使用它们来创建和运行连接回填。然后,它将回填结果与在线获取的实际记录值进行比较
它产生两个输出表:
default.quickstart_training_set_v2_consistency
:一个人类可读的表,您可以查询该表以查看一致性检查的结果。spark-sql
来进入 sql shell,然后查询表。DESC default.quickstart_training_set_v2_consistency
,然后选择您关心查询的一些列。default.quickstart_training_set_v2_consistency_upload
:上传到在线 KV 存储的 KV 字节列表,可用于支持在线数据质量监控流。不意味着人类可读。 使用chronon进行特征工程工作可以通过多种方式简化和改进您的 ML 工作流程:
有关使用chronon的好处的更详细信息,请参阅chronon文档的好处。
chronon为试图构建实时服务请求(而不是批处理工作流程)的“在线”模型的 AI/ML 从业者提供了最大的价值。
如果没有chronon ,从事这些项目的工程师需要弄清楚如何将数据获取到模型中以进行训练/评估以及生产推理。随着进入这些模型的数据的复杂性增加(多个来源、复杂的转换,例如窗口聚合等),支持这种数据管道的基础设施挑战也随之增加。
一般来说,我们观察到 ML 从业者采用以下两种方法之一:
通过这种方法,用户可以从运行模型推理的在线服务环境中可用的数据开始。将相关特征记录到数据仓库。一旦积累了足够的数据,就在日志上训练模型,并使用相同的数据提供服务。
优点:
缺点:
通过这种方法,用户使用数据仓库中的数据训练模型,然后找到在在线环境中复制这些功能的方法。
优点:
缺点:
chronon方法
借助chronon您可以使用组织中可用的任何数据,包括数据仓库中的所有数据、任何流媒体源、服务调用等,并保证在线和离线环境之间的一致性。它抽象了编排和维护此数据管道的基础设施复杂性,以便用户可以在简单的 API 中简单地定义功能,并信任chronon来处理其余的事情。
我们欢迎对chronon项目做出贡献!请阅读贡献了解详细信息。
使用 GitHub 问题跟踪器报告错误或功能请求。加入我们的社区 Slack 工作区以获取讨论、提示和支持。