chronon es una plataforma que abstrae la complejidad del cálculo de datos y el servicio para aplicaciones de IA/ML. Los usuarios definen las características como la transformación de datos sin procesar, luego chronon puede realizar cálculos por lotes y de transmisión, reabastecimientos escalables, servicio de baja latencia, corrección y coherencia garantizadas, así como una gran cantidad de herramientas de observabilidad y monitoreo.
Le permite utilizar todos los datos dentro de su organización, desde tablas por lotes, flujos de eventos o servicios para impulsar sus proyectos de IA/ML, sin necesidad de preocuparse por toda la compleja orquestación que esto normalmente implicaría.
Puede encontrar más información sobre chronon en chronon .ai.
chronon ofrece una API para la recuperación en tiempo real que devuelve valores actualizados para sus funciones. Soporta:
Los profesionales del aprendizaje automático a menudo necesitan vistas históricas de los valores de las características para el entrenamiento y la evaluación del modelo. Los repuestos de chronon son:
chronon ofrece visibilidad sobre:
chronon admite una variedad de tipos de agregación. Para obtener una lista completa, consulte la documentación aquí.
Todas estas agregaciones se pueden configurar para que se calculen en tamaños de ventana arbitrarios.
Esta sección lo guía a través de los pasos para crear un conjunto de datos de entrenamiento con chronon , utilizando un conjunto de datos sin procesar subyacente fabricado.
Incluye:
GroupBy
y Join
.No incluye:
Para comenzar con chronon , todo lo que necesita hacer es descargar el archivo docker-compose.yml y ejecutarlo localmente:
curl -o docker-compose.yml https://chronon.ai/docker-compose.yml
docker-compose up
Una vez que vea algunos datos impresos con un aviso only showing top 20 rows
, estará listo para continuar con el tutorial.
En este ejemplo, supongamos que somos un gran minorista en línea y hemos detectado un vector de fraude basado en usuarios que realizan compras y luego devuelven artículos. Queremos entrenar un modelo que se llamará cuando comience el flujo de pago y prediga si es probable que esta transacción resulte en una devolución fraudulenta.
Los datos sin procesar fabricados se incluyen en el directorio de datos. Incluye cuatro tablas:
En una nueva ventana de terminal, ejecute:
docker-compose exec main bash
Esto abrirá un shell dentro del contenedor acoplable chronon .
Ahora que se completaron los pasos de configuración, podemos comenzar a crear y probar varios objetos chronon para definir transformaciones y agregaciones, y generar datos.
Comencemos con tres conjuntos de funciones, creadas sobre nuestras fuentes de entrada sin procesar.
Nota: Estas definiciones de Python ya están en su imagen chronon
. No hay nada que pueda ejecutar hasta el Paso 3: Rellenar datos, cuando ejecutará el cálculo para estas definiciones.
Conjunto de funciones 1: funciones de datos de compras
Podemos agregar los datos del registro de compras al nivel de usuario, para darnos una vista de la actividad anterior de este usuario en nuestra plataforma. Específicamente, podemos calcular SUM
COUNT
y AVERAGE
de sus montos de compras anteriores en varias ventanas.
Debido a que esta característica se basa en una fuente que incluye tanto una tabla como un tema, sus características se pueden calcular tanto en lotes como en streaming.
source = Source (
events = EventSource (
table = "data.purchases" , # This points to the log table with historical purchase events
topic = None , # Streaming is not currently part of quickstart, but this would be where you define the topic for realtime events
query = Query (
selects = select ( "user_id" , "purchase_price" ), # Select the fields we care about
time_column = "ts" ) # The event time
))
window_sizes = [ Window ( length = day , timeUnit = TimeUnit . DAYS ) for day in [ 3 , 14 , 30 ]] # Define some window sizes to use below
v1 = GroupBy (
sources = [ source ],
keys = [ "user_id" ], # We are aggregating by user
aggregations = [ Aggregation (
input_column = "purchase_price" ,
operation = Operation . SUM ,
windows = window_sizes
), # The sum of purchases prices in various windows
Aggregation (
input_column = "purchase_price" ,
operation = Operation . COUNT ,
windows = window_sizes
), # The count of purchases in various windows
Aggregation (
input_column = "purchase_price" ,
operation = Operation . AVERAGE ,
windows = window_sizes
) # The average purchases by user in various windows
],
)
Vea el archivo de código completo aquí: compras GroupBy. Esto también está en la imagen de la ventana acoplable. Realizaremos cálculos para este y los otros GroupBys en el Paso 3: Relleno de datos.
Conjunto de funciones 2: devuelve funciones de datos
Realizamos un conjunto similar de agregaciones de datos de devoluciones en devoluciones GroupBy. El código no se incluye aquí porque es similar al ejemplo anterior.
Conjunto de funciones 3: funciones de datos de usuario
Convertir los datos del usuario en funciones es un poco más sencillo, principalmente porque no hay agregaciones que incluir. En este caso, la clave principal de los datos de origen es la misma que la clave principal de la característica, por lo que simplemente extraemos valores de columna en lugar de realizar agregaciones en filas:
source = Source (
entities = EntitySource (
snapshotTable = "data.users" , # This points to a table that contains daily snapshots of the entire product catalog
query = Query (
selects = select ( "user_id" , "account_created_ds" , "email_verified" ), # Select the fields we care about
)
))
v1 = GroupBy (
sources = [ source ],
keys = [ "user_id" ], # Primary key is the same as the primary key for the source table
aggregations = None # In this case, there are no aggregations or windows to define
)
Tomado de los usuarios GroupBy.
A continuación, necesitamos que las características que definimos previamente se rellenen en una sola tabla para el entrenamiento del modelo. Esto se puede lograr utilizando la API Join
.
Para nuestro caso de uso, es muy importante que las características se calculen a partir de la marca de tiempo correcta. Debido a que nuestro modelo se ejecuta cuando comienza el flujo de pago, queremos asegurarnos de usar la marca de tiempo correspondiente en nuestro relleno, de modo que los valores de las características para el entrenamiento del modelo coincidan lógicamente con lo que el modelo verá en la inferencia en línea.
Join
es la API que impulsa el reabastecimiento de funciones para los datos de entrenamiento. Realiza principalmente las siguientes funciones:
Join
).Así es como se ve nuestra unión:
source = Source (
events = EventSource (
table = "data.checkouts" ,
query = Query (
selects = select ( "user_id" ), # The primary key used to join various GroupBys together
time_column = "ts" ,
) # The event time used to compute feature values as-of
))
v1 = Join (
left = source ,
right_parts = [ JoinPart ( group_by = group_by ) for group_by in [ purchases_v1 , refunds_v1 , users ]] # Include the three GroupBys
)
Tomado de Training_set Unirse.
El lado left
de la unión es lo que define las marcas de tiempo y las claves principales para el reabastecimiento (tenga en cuenta que se construye sobre el evento checkout
, según lo dictado por nuestro caso de uso).
Tenga en cuenta que este Join
combina los tres GroupBy
anteriores en una definición de datos. En el siguiente paso, ejecutaremos el comando para ejecutar el cálculo de todo este proceso.
Una vez definida la unión, la compilamos usando este comando:
compile.py --conf=joins/quickstart/training_set.py
Esto lo convierte en una definición de ahorro que podemos enviar a Spark con el siguiente comando:
run.py --conf production/joins/quickstart/training_set.v1
El resultado del relleno contendría las columnas user_id y ts de la fuente izquierda, así como las 11 columnas de características de los tres GroupBys que creamos.
Los valores de las características se calcularían para cada user_id y ts en el lado izquierdo, con precisión temporal garantizada. Entonces, por ejemplo, si una de las filas de la izquierda fuera para user_id = 123
y ts = 2023-10-01 10:11:23.195
, entonces la característica de purchase_price_avg_30d
se calcularía para ese usuario con una ventana precisa de 30 días que finaliza en esa marca de tiempo.
Ahora puede consultar los datos rellenados utilizando el shell Spark SQL:
spark-sql
Y luego:
spark - sql > SELECT user_id, quickstart_returns_v1_refund_amt_sum_30d, quickstart_purchases_v1_purchase_price_sum_14d, quickstart_users_v1_email_verified from default . quickstart_training_set_v1 limit 100 ;
Tenga en cuenta que esto solo selecciona unas pocas columnas. También puede ejecutar un select * from default.quickstart_training_set_v1 limit 100
para ver todas las columnas; sin embargo, tenga en cuenta que la tabla es bastante amplia y es posible que los resultados no sean muy legibles en su pantalla.
Para salir del shell SQL puedes ejecutar:
spark-sql > quit ;
Ahora que hemos creado una unión y hemos completado los datos, el siguiente paso sería entrenar un modelo. Eso no es parte de este tutorial, pero suponiendo que esté completo, el siguiente paso sería producir el modelo en línea. Para hacer esto, debemos poder recuperar vectores de características para la inferencia del modelo. Eso es lo que cubre la siguiente sección.
Para poder atender flujos en línea, primero necesitamos que los datos se carguen en la tienda KV en línea. Esto es diferente al reabastecimiento que ejecutamos en el paso anterior de dos maneras:
Sube las compras GroupBy:
run.py --mode upload --conf production/group_bys/quickstart/purchases.v1 --ds 2023-12-01
spark-submit --class ai. chronon .quickstart.online.Spark2MongoLoader --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_purchases_v1_upload mongodb://admin:admin@mongodb:27017/ ? authSource=admin
Sube las devoluciones GroupBy:
run.py --mode upload --conf production/group_bys/quickstart/returns.v1 --ds 2023-12-01
spark-submit --class ai. chronon .quickstart.online.Spark2MongoLoader --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_returns_v1_upload mongodb://admin:admin@mongodb:27017/ ? authSource=admin
Si queremos utilizar la API FetchJoin
en lugar de FetchGroupby
, también debemos cargar los metadatos de unión:
run.py --mode metadata-upload --conf production/joins/quickstart/training_set.v2
Esto hace que el buscador en línea sepa cómo tomar una solicitud para esta unión y dividirla en solicitudes GroupBy individuales, devolviendo el vector unificado, similar a cómo el relleno de unión produce la tabla de vista amplia con todas las características.
Con las entidades anteriores definidas, ahora puede recuperar fácilmente vectores de características con una simple llamada API.
Obteniendo una unión:
run.py --mode fetch --type join --name quickstart/training_set.v2 -k ' {"user_id":"5"} '
También puede recuperar un único GroupBy (esto no requeriría el paso de carga de metadatos Unirse realizado anteriormente):
run.py --mode fetch --type group-by --name quickstart/purchases.v1 -k ' {"user_id":"5"} '
Para producción, el cliente Java suele estar integrado directamente en los servicios.
Map < String , String > keyMap = new HashMap <>();
keyMap . put ( "user_id" , "123" );
Fetcher . fetch_join ( new Request ( "quickstart/training_set_v1" , keyMap ))
respuesta de muestra
> '{"purchase_price_avg_3d":14.3241, "purchase_price_avg_14d":11.89352, ...}'
Nota: Este código Java no se puede ejecutar en Docker Env, es solo un ejemplo ilustrativo.
Como se analizó en las secciones introductorias de este README, una de las principales garantías de chronon es la coherencia en línea/fuera de línea. Esto significa que los datos que utiliza para entrenar su modelo (fuera de línea) coinciden con los datos que el modelo ve para la inferencia de producción (en línea).
Un elemento clave de esto es la precisión temporal. Esto se puede expresar de la siguiente manera: al reponer funciones, el valor que se produce para cualquier timestamp
dada proporcionada por el lado izquierdo de la combinación debe ser el mismo que se habría devuelto en línea si esa característica se hubiera obtenido en esa timestamp
en particular .
chronon no sólo garantiza esta precisión temporal, sino que también ofrece una forma de medirla.
El proceso de medición comienza con los registros de las solicitudes de recuperación en línea. Estos registros incluyen las claves principales y la marca de tiempo de la solicitud, junto con los valores de las funciones obtenidas. Luego, chronon pasa las claves y las marcas de tiempo a un relleno de unión en el lado izquierdo, solicitando al motor informático que rellene los valores de las características. Luego compara los valores rellenados con los valores reales obtenidos para medir la coherencia.
Paso 1: búsquedas de registros
Primero, asegúrese de haber realizado algunas solicitudes de recuperación. Correr:
run.py --mode fetch --type join --name quickstart/training_set.v2 -k '{"user_id":"5"}'
Algunas veces para generar algunas recuperaciones.
Una vez completado, puede ejecutar esto para crear una tabla de registro utilizable (estos comandos producen una tabla de registro de subárbol con el esquema correcto):
spark-submit --class ai. chronon .quickstart.online.MongoLoggingDumper --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default. chronon _log_table mongodb://admin:admin@mongodb:27017/ ? authSource=admin
compile.py --conf group_bys/quickstart/schema.py
run.py --mode backfill --conf production/group_bys/quickstart/schema.v1
run.py --mode log-flattener --conf production/joins/quickstart/training_set.v2 --log-table default. chronon _log_table --schema-table default.quickstart_schema_v1
Esto crea una tabla default.quickstart_training_set_v2_logged
que contiene los resultados de cada una de las solicitudes de recuperación que realizó anteriormente, junto con la marca de tiempo en la que las realizó y el user
que solicitó.
Nota: Una vez que ejecute el comando anterior, creará y "cerrará" las particiones de registro, lo que significa que si realiza búsquedas adicionales el mismo día (hora UTC), no se agregará. Si desea regresar y generar más solicitudes de coherencia en línea/fuera de línea, puede eliminar la tabla (ejecute DROP TABLE default.quickstart_training_set_v2_logged
en un shell spark-sql
) antes de volver a ejecutar el comando anterior.
Ahora puedes calcular métricas de coherencia con este comando:
run.py --mode consistency-metrics-compute --conf production/joins/quickstart/training_set.v2
Este trabajo tomará las claves principales y las marcas de tiempo de la tabla de registro ( default.quickstart_training_set_v2_logged
en este caso) y las utilizará para crear y ejecutar un reabastecimiento de unión. Luego compara los resultados rellenados con los valores registrados reales que se obtuvieron en línea.
Produce dos tablas de salida:
default.quickstart_training_set_v2_consistency
: una tabla legible por humanos que puede consultar para ver los resultados de las comprobaciones de coherencia.spark-sql
desde su sesión de Docker Bash y luego consultar la tabla.DESC default.quickstart_training_set_v2_consistency
primero y luego seleccionar algunas columnas que le interesen consultar.default.quickstart_training_set_v2_consistency_upload
: una lista de bytes de KV que se carga en la tienda de KV en línea y que se puede utilizar para impulsar los flujos de monitoreo de la calidad de los datos en línea. No está destinado a ser legible por humanos. El uso chronon para su trabajo de ingeniería de funciones simplifica y mejora su flujo de trabajo de ML de varias maneras:
Para obtener una vista más detallada de los beneficios de usar chronon , consulte la documentación sobre Beneficios de chronon .
chronon ofrece el mayor valor a los profesionales de IA/ML que intentan crear modelos "en línea" que atiendan solicitudes en tiempo real en lugar de flujos de trabajo por lotes.
Sin chronon , los ingenieros que trabajan en estos proyectos necesitan descubrir cómo llevar datos a sus modelos para entrenamiento/evaluación, así como para inferencia de producción. A medida que aumenta la complejidad de los datos que entran en estos modelos (múltiples fuentes, transformaciones complejas como agregaciones en ventanas, etc.), también aumenta el desafío de infraestructura para respaldar esta tubería de datos.
En general, observamos que los profesionales del ML adoptaban uno de dos enfoques:
Con este enfoque, los usuarios comienzan con los datos que están disponibles en el entorno de servicio en línea desde el cual se ejecutará la inferencia del modelo. Registre características relevantes en el almacén de datos. Una vez que se hayan acumulado suficientes datos, entrene el modelo en los registros y sirva con los mismos datos.
Ventajas:
Contras:
Con este enfoque, los usuarios entrenan el modelo con datos del almacén de datos y luego descubren formas de replicar esas características en el entorno en línea.
Ventajas:
Contras:
El enfoque chronon
Con chronon puede utilizar cualquier dato disponible en su organización, incluido todo el contenido del almacén de datos, cualquier fuente de transmisión, llamadas de servicio, etc., con coherencia garantizada entre entornos en línea y fuera de línea. Abstrae la complejidad de la infraestructura de orquestar y mantener esta tubería de datos, de modo que los usuarios puedan simplemente definir características en una API simple y confiar en chronon para manejar el resto.
¡Agradecemos las contribuciones al proyecto chronon ! Lea CONTRIBUCIÓN para obtener más detalles.
Utilice el rastreador de problemas de GitHub para informar errores o solicitudes de funciones. Únase a nuestro espacio de trabajo comunitario de Slack para obtener debates, consejos y asistencia.