chronon est une plate-forme qui élimine la complexité du calcul des données et sert aux applications IA/ML. Les utilisateurs définissent les fonctionnalités comme la transformation des données brutes, puis chronon peut effectuer des calculs par lots et en streaming, des remplissages évolutifs, un service à faible latence, une exactitude et une cohérence garanties, ainsi qu'une multitude d'outils d'observabilité et de surveillance.
Il vous permet d'utiliser toutes les données de votre organisation, depuis les tables de lots, les flux d'événements ou les services, pour alimenter vos projets IA/ML, sans avoir à vous soucier de toute l'orchestration complexe que cela impliquerait habituellement.
Plus d’informations sur chronon peuvent être trouvées sur chronon .ai.
chronon propose une API pour la récupération en temps réel qui renvoie des valeurs à jour pour vos fonctionnalités. Il prend en charge :
Les praticiens du ML ont souvent besoin de vues historiques des valeurs des fonctionnalités pour la formation et l’évaluation des modèles. Les remplissages de chronon sont :
chronon offre une visibilité sur :
chronon prend en charge une gamme de types d’agrégation. Pour une liste complète, consultez la documentation ici.
Ces agrégations peuvent toutes être configurées pour être calculées sur des tailles de fenêtre arbitraires.
Cette section vous guide à travers les étapes de création d'un ensemble de données d'entraînement avec chronon , à l'aide d'un ensemble de données brutes sous-jacentes fabriquées.
Comprend :
GroupBy
et Join
.Ne comprend pas :
Pour démarrer avec le chronon , il vous suffit de télécharger le fichier docker-compose.yml et de l'exécuter localement :
curl -o docker-compose.yml https://chronon.ai/docker-compose.yml
docker-compose up
Une fois que vous voyez certaines données imprimées avec un avis only showing top 20 rows
, vous êtes prêt à poursuivre le didacticiel.
Dans cet exemple, supposons que nous soyons un grand détaillant en ligne et que nous ayons détecté un vecteur de fraude basé sur le fait que les utilisateurs effectuent des achats et retournent ensuite des articles. Nous souhaitons former un modèle qui sera appelé au début du flux de paiement et prédit si cette transaction est susceptible d'entraîner un retour frauduleux.
Les données brutes fabriquées sont incluses dans le répertoire de données. Il comprend quatre tableaux :
Dans une nouvelle fenêtre de terminal, exécutez :
docker-compose exec main bash
Cela ouvrira un shell dans le conteneur Docker chronon .
Maintenant que les étapes de configuration sont terminées, nous pouvons commencer à créer et tester divers objets chronon pour définir la transformation et les agrégations, et générer des données.
Commençons par trois ensembles de fonctionnalités, construits sur nos sources d'entrée brutes.
Remarque : Ces définitions python sont déjà dans votre image chronon
. Vous n'avez rien à exécuter avant l'étape 3 : Remplissage des données, lorsque vous exécuterez le calcul pour ces définitions.
Ensemble de fonctionnalités 1 : achète des fonctionnalités de données
Nous pouvons regrouper les données du journal d'achats au niveau de l'utilisateur, pour nous donner un aperçu de l'activité précédente de cet utilisateur sur notre plateforme. Plus précisément, nous pouvons calculer les SUM
COUNT
et AVERAGE
de leurs montants d'achat précédents sur différentes fenêtres.
Étant donné que cette fonctionnalité repose sur une source qui comprend à la fois un tableau et une rubrique, ses fonctionnalités peuvent être calculées à la fois par lots et en streaming.
source = Source (
events = EventSource (
table = "data.purchases" , # This points to the log table with historical purchase events
topic = None , # Streaming is not currently part of quickstart, but this would be where you define the topic for realtime events
query = Query (
selects = select ( "user_id" , "purchase_price" ), # Select the fields we care about
time_column = "ts" ) # The event time
))
window_sizes = [ Window ( length = day , timeUnit = TimeUnit . DAYS ) for day in [ 3 , 14 , 30 ]] # Define some window sizes to use below
v1 = GroupBy (
sources = [ source ],
keys = [ "user_id" ], # We are aggregating by user
aggregations = [ Aggregation (
input_column = "purchase_price" ,
operation = Operation . SUM ,
windows = window_sizes
), # The sum of purchases prices in various windows
Aggregation (
input_column = "purchase_price" ,
operation = Operation . COUNT ,
windows = window_sizes
), # The count of purchases in various windows
Aggregation (
input_column = "purchase_price" ,
operation = Operation . AVERAGE ,
windows = window_sizes
) # The average purchases by user in various windows
],
)
Voir le fichier de code complet ici : achats GroupBy. C'est également dans votre image Docker. Nous exécuterons des calculs pour lui et les autres GroupBys à l'étape 3 - Remplissage des données.
Ensemble de fonctionnalités 2 : renvoie les fonctionnalités de données
Nous effectuons un ensemble similaire d'agrégations sur les données de retours dans les retours GroupBy. Le code n'est pas inclus ici car il ressemble à l'exemple ci-dessus.
Ensemble de fonctionnalités 3 : fonctionnalités de données utilisateur
Transformer les données utilisateur en fonctionnalités est un peu plus simple, principalement parce qu'il n'y a aucune agrégation à inclure. Dans ce cas, la clé primaire des données source est la même que la clé primaire de la fonctionnalité, nous extrayons donc simplement les valeurs des colonnes plutôt que d'effectuer des agrégations sur les lignes :
source = Source (
entities = EntitySource (
snapshotTable = "data.users" , # This points to a table that contains daily snapshots of the entire product catalog
query = Query (
selects = select ( "user_id" , "account_created_ds" , "email_verified" ), # Select the fields we care about
)
))
v1 = GroupBy (
sources = [ source ],
keys = [ "user_id" ], # Primary key is the same as the primary key for the source table
aggregations = None # In this case, there are no aggregations or windows to define
)
Tiré des utilisateurs GroupBy.
Ensuite, nous avons besoin que les fonctionnalités que nous avons définies précédemment soient remplies dans une seule table pour la formation du modèle. Ceci peut être réalisé en utilisant l'API Join
.
Pour notre cas d’utilisation, il est très important que les fonctionnalités soient calculées à partir de l’horodatage correct. Étant donné que notre modèle s'exécute lorsque le flux de paiement commence, nous voulons être sûrs d'utiliser l'horodatage correspondant dans notre remplissage, de sorte que les valeurs des fonctionnalités pour la formation du modèle correspondent logiquement à ce que le modèle verra dans l'inférence en ligne.
Join
est l'API qui gère les remplissages de fonctionnalités pour les données d'entraînement. Il remplit principalement les fonctions suivantes :
Join
).Voici à quoi ressemble notre jointure :
source = Source (
events = EventSource (
table = "data.checkouts" ,
query = Query (
selects = select ( "user_id" ), # The primary key used to join various GroupBys together
time_column = "ts" ,
) # The event time used to compute feature values as-of
))
v1 = Join (
left = source ,
right_parts = [ JoinPart ( group_by = group_by ) for group_by in [ purchases_v1 , refunds_v1 , users ]] # Include the three GroupBys
)
Tiré du training_set Join.
Le côté left
de la jointure définit les horodatages et les clés primaires pour le remplissage (notez qu'il est construit au-dessus de l'événement checkout
, comme dicté par notre cas d'utilisation).
Notez que ce Join
combine les trois GroupBy
ci-dessus en une seule définition de données. À l'étape suivante, nous exécuterons la commande pour exécuter le calcul pour l'ensemble de ce pipeline.
Une fois la jointure définie, nous la compilons à l'aide de cette commande :
compile.py --conf=joins/quickstart/training_set.py
Cela le convertit en une définition d'épargne que nous pouvons soumettre à Spark avec la commande suivante :
run.py --conf production/joins/quickstart/training_set.v1
La sortie du remplissage contiendrait les colonnes user_id et ts de la source de gauche, ainsi que les 11 colonnes de fonctionnalités des trois GroupBys que nous avons créés.
Les valeurs des fonctionnalités seraient calculées pour chaque user_id et ts sur le côté gauche, avec une précision temporelle garantie. Ainsi, par exemple, si l'une des lignes de gauche était pour user_id = 123
et ts = 2023-10-01 10:11:23.195
, alors la fonctionnalité purchase_price_avg_30d
serait calculée pour cet utilisateur avec une fenêtre précise de 30 jours se terminant le cet horodatage.
Vous pouvez maintenant interroger les données renseignées à l'aide du shell Spark SQL :
spark-sql
Et puis:
spark - sql > SELECT user_id, quickstart_returns_v1_refund_amt_sum_30d, quickstart_purchases_v1_purchase_price_sum_14d, quickstart_users_v1_email_verified from default . quickstart_training_set_v1 limit 100 ;
Notez que cela ne sélectionne que quelques colonnes. Vous pouvez également exécuter un select * from default.quickstart_training_set_v1 limit 100
pour voir toutes les colonnes, cependant, notez que le tableau est assez large et que les résultats peuvent ne pas être très lisibles sur votre écran.
Pour quitter le shell SQL, vous pouvez exécuter :
spark-sql > quit ;
Maintenant que nous avons créé une jointure et rempli les données, l'étape suivante consisterait à entraîner un modèle. Cela ne fait pas partie de ce didacticiel, mais en supposant qu'il soit terminé, la prochaine étape serait de produire le modèle en ligne. Pour ce faire, nous devons être capables de récupérer des vecteurs de caractéristiques pour l'inférence de modèle. C'est ce que couvre cette section suivante.
Afin de servir les flux en ligne, nous avons d'abord besoin des données téléchargées sur la boutique en ligne KV. Ceci diffère du remplissage que nous avons effectué à l'étape précédente de deux manières :
Téléchargez les achats GroupBy :
run.py --mode upload --conf production/group_bys/quickstart/purchases.v1 --ds 2023-12-01
spark-submit --class ai. chronon .quickstart.online.Spark2MongoLoader --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_purchases_v1_upload mongodb://admin:admin@mongodb:27017/ ? authSource=admin
Téléchargez les retours GroupBy :
run.py --mode upload --conf production/group_bys/quickstart/returns.v1 --ds 2023-12-01
spark-submit --class ai. chronon .quickstart.online.Spark2MongoLoader --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_returns_v1_upload mongodb://admin:admin@mongodb:27017/ ? authSource=admin
Si nous voulons utiliser l'API FetchJoin
plutôt que FetchGroupby
, nous devons également télécharger les métadonnées de jointure :
run.py --mode metadata-upload --conf production/joins/quickstart/training_set.v2
Cela permet au récupérateur en ligne de savoir comment prendre une demande pour cette jointure et la diviser en requêtes GroupBy individuelles, renvoyant le vecteur unifié, de la même manière que le remplissage de jointure produit la table à vue large avec toutes les fonctionnalités.
Une fois les entités ci-dessus définies, vous pouvez désormais facilement récupérer des vecteurs de fonctionnalités avec un simple appel API.
Récupérer une jointure :
run.py --mode fetch --type join --name quickstart/training_set.v2 -k ' {"user_id":"5"} '
Vous pouvez également récupérer un seul GroupBy (cela ne nécessiterait pas l’étape de téléchargement des métadonnées Join effectuée précédemment) :
run.py --mode fetch --type group-by --name quickstart/purchases.v1 -k ' {"user_id":"5"} '
Pour la production, le client Java est généralement intégré directement dans les services.
Map < String , String > keyMap = new HashMap <>();
keyMap . put ( "user_id" , "123" );
Fetcher . fetch_join ( new Request ( "quickstart/training_set_v1" , keyMap ))
exemple de réponse
> '{"purchase_price_avg_3d":14.3241, "purchase_price_avg_14d":11.89352, ...}'
Remarque : Ce code Java n'est pas exécutable dans l'environnement Docker, il s'agit simplement d'un exemple illustratif.
Comme indiqué dans les sections d'introduction de ce README, l'une des principales garanties de chronon est la cohérence en ligne/hors ligne. Cela signifie que les données que vous utilisez pour entraîner votre modèle (hors ligne) correspondent aux données que le modèle voit pour l'inférence de production (en ligne).
Un élément clé de ceci est la précision temporelle. Cela peut être formulé ainsi : lors du remplissage de fonctionnalités, la valeur produite pour un timestamp
donné fourni par le côté gauche de la jointure doit être la même que celle qui aurait été renvoyée en ligne si cette fonctionnalité avait été récupérée à cet timestamp
particulier .
chronon garantit non seulement cette précision temporelle, mais offre également un moyen de la mesurer.
Le pipeline de mesure commence par les journaux des demandes de récupération en ligne. Ces journaux incluent les clés primaires et l'horodatage de la demande, ainsi que les valeurs des fonctionnalités récupérées. chronon transmet ensuite les clés et les horodatages à un remplissage Join sur le côté gauche, demandant au moteur de calcul de remplir les valeurs des fonctionnalités. Il compare ensuite les valeurs renseignées aux valeurs réellement récupérées pour mesurer la cohérence.
Étape 1 : récupérations de journaux
Tout d’abord, assurez-vous d’avoir exécuté quelques requêtes de récupération. Courir:
run.py --mode fetch --type join --name quickstart/training_set.v2 -k '{"user_id":"5"}'
Quelques fois pour générer des récupérations.
Une fois cela terminé, vous pouvez exécuter ceci pour créer une table de journalisation utilisable (ces commandes produisent une table de ruche de journalisation avec le schéma correct) :
spark-submit --class ai. chronon .quickstart.online.MongoLoggingDumper --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default. chronon _log_table mongodb://admin:admin@mongodb:27017/ ? authSource=admin
compile.py --conf group_bys/quickstart/schema.py
run.py --mode backfill --conf production/group_bys/quickstart/schema.v1
run.py --mode log-flattener --conf production/joins/quickstart/training_set.v2 --log-table default. chronon _log_table --schema-table default.quickstart_schema_v1
Cela crée une table default.quickstart_training_set_v2_logged
qui contient les résultats de chacune des demandes de récupération que vous avez effectuées précédemment, ainsi que l'horodatage auquel vous les avez effectuées et l' user
que vous avez demandé.
Remarque : Une fois que vous avez exécuté la commande ci-dessus, elle créera et "fermera" les partitions de journal, ce qui signifie que si vous effectuez des récupérations supplémentaires le même jour (heure UTC), elles ne seront pas ajoutées. Si vous souhaitez revenir en arrière et générer plus de requêtes pour la cohérence en ligne/hors ligne, vous pouvez supprimer la table (exécuter DROP TABLE default.quickstart_training_set_v2_logged
dans un shell spark-sql
) avant de réexécuter la commande ci-dessus.
Vous pouvez désormais calculer des métriques de cohérence avec cette commande :
run.py --mode consistency-metrics-compute --conf production/joins/quickstart/training_set.v2
Ce travail prendra la ou les clés primaires et les horodatages de la table de journal ( default.quickstart_training_set_v2_logged
dans ce cas) et les utilisera pour créer et exécuter un remplissage de jointure. Il compare ensuite les résultats renseignés aux valeurs réellement enregistrées qui ont été récupérées en ligne.
Il produit deux tables de sortie :
default.quickstart_training_set_v2_consistency
: Un tableau lisible par l'homme que vous pouvez interroger pour voir les résultats des contrôles de cohérence.spark-sql
à partir de votre session docker bash, puis interroger la table.DESC default.quickstart_training_set_v2_consistency
, puis sélectionner quelques colonnes que vous souhaitez interroger.default.quickstart_training_set_v2_consistency_upload
: liste d'octets KV téléchargée vers le magasin KV en ligne, qui peut être utilisée pour alimenter les flux de surveillance de la qualité des données en ligne. Pas censé être lisible par l’homme. L'utilisation chronon pour votre travail d'ingénierie de fonctionnalités simplifie et améliore votre flux de travail ML de plusieurs manières :
Pour une vue plus détaillée des avantages de l'utilisation chronon , consultez la documentation Avantages de chronon .
chronon offre le plus de valeur aux praticiens de l'IA/ML qui tentent de créer des modèles « en ligne » qui répondent aux demandes en temps réel, par opposition aux flux de travail par lots.
Sans chronon , les ingénieurs travaillant sur ces projets doivent trouver comment transférer les données vers leurs modèles à des fins de formation/évaluation ainsi que d'inférence de production. À mesure que la complexité des données entrant dans ces modèles augmente (sources multiples, transformations complexes telles que des agrégations fenêtrées, etc.), le défi infrastructurel lié à la prise en charge de cette plomberie de données augmente également.
En général, nous avons observé que les praticiens du ML adoptaient l’une des deux approches suivantes :
Avec cette approche, les utilisateurs commencent avec les données disponibles dans l'environnement de service en ligne à partir duquel l'inférence du modèle sera exécutée. Enregistrez les fonctionnalités pertinentes dans l’entrepôt de données. Une fois que suffisamment de données se sont accumulées, entraînez le modèle sur les journaux et servez-le avec les mêmes données.
Avantages :
Inconvénients :
Avec cette approche, les utilisateurs entraînent le modèle avec les données de l'entrepôt de données, puis trouvent des moyens de répliquer ces fonctionnalités dans l'environnement en ligne.
Avantages :
Inconvénients :
L'approche chronon
Avec chronon vous pouvez utiliser toutes les données disponibles dans votre organisation, y compris tout ce qui se trouve dans l'entrepôt de données, toute source de streaming, les appels de service, etc., avec une cohérence garantie entre les environnements en ligne et hors ligne. Il élimine la complexité de l'infrastructure liée à l'orchestration et à la maintenance de cette plomberie de données, afin que les utilisateurs puissent simplement définir des fonctionnalités dans une simple API et faire confiance à chronon pour gérer le reste.
Nous apprécions les contributions au projet chronon ! Veuillez lire CONTRIBUER pour plus de détails.
Utilisez le suivi des problèmes GitHub pour signaler des bogues ou des demandes de fonctionnalités. Rejoignez notre espace de travail communautaire Slack pour des discussions, des conseils et de l'assistance.