chronon ist eine Plattform, die die Komplexität der Datenberechnung und -bereitstellung für KI/ML-Anwendungen abstrahiert. Benutzer definieren Funktionen wie die Transformation von Rohdaten, dann kann chronon Batch- und Streaming-Berechnungen, skalierbare Backfills, Bereitstellung mit geringer Latenz, garantierte Korrektheit und Konsistenz sowie eine Vielzahl von Beobachtbarkeits- und Überwachungstools durchführen.
Es ermöglicht Ihnen, alle Daten in Ihrem Unternehmen zu nutzen, von Batch-Tabellen über Ereignisströme bis hin zu Diensten, um Ihre KI-/ML-Projekte voranzutreiben, ohne sich um die komplexe Orchestrierung kümmern zu müssen, die dies normalerweise mit sich bringt.
Weitere Informationen zu chronon finden Sie unter chronon .
chronon bietet eine API zum Echtzeitabruf, die aktuelle Werte für Ihre Features zurückgibt. Es unterstützt:
ML-Praktiker benötigen häufig historische Ansichten von Merkmalswerten für das Modelltraining und die Modellbewertung. Die Backfills von chronon sind:
chronon bietet Einblick in:
chronon unterstützt eine Reihe von Aggregationstypen. Eine vollständige Liste finden Sie in der Dokumentation hier.
Diese Aggregationen können alle so konfiguriert werden, dass sie über beliebige Fenstergrößen berechnet werden.
Dieser Abschnitt führt Sie durch die Schritte zum Erstellen eines Trainingsdatensatzes mit chronon unter Verwendung eines fabrizierten zugrunde liegenden Rohdatensatzes.
Beinhaltet:
GroupBy
und Join
.Nicht enthalten:
Um mit dem chronon zu beginnen, müssen Sie lediglich die Datei docker-compose.yml herunterladen und lokal ausführen:
curl -o docker-compose.yml https://chronon.ai/docker-compose.yml
docker-compose up
Sobald Sie sehen, dass einige Daten gedruckt werden und only showing top 20 rows
angezeigt werden, können Sie mit dem Tutorial fortfahren.
Nehmen wir in diesem Beispiel an, dass wir ein großer Online-Händler sind und einen Betrugsvektor entdeckt haben, der darauf basiert, dass Benutzer Einkäufe tätigen und später Artikel zurücksenden. Wir möchten ein Modell trainieren, das aufgerufen wird, wenn der Checkout- Ablauf beginnt, und vorhersagt, ob diese Transaktion wahrscheinlich zu einer betrügerischen Rückgabe führt.
Die erstellten Rohdaten sind im Datenverzeichnis enthalten. Es enthält vier Tabellen:
Führen Sie in einem neuen Terminalfenster Folgendes aus:
docker-compose exec main bash
Dadurch wird eine Shell im chronon Docker-Container geöffnet.
Nachdem die Einrichtungsschritte nun abgeschlossen sind, können wir mit dem Erstellen und Testen verschiedener chronon -Objekte beginnen, um Transformationen und Aggregationen zu definieren und Daten zu generieren.
Beginnen wir mit drei Funktionssätzen, die auf unseren Roheingabequellen aufbauen.
Hinweis: Diese Python-Definitionen befinden sich bereits in Ihrem chronon
-Image. Bis Schritt 3 – Daten auffüllen, müssen Sie nichts ausführen, wenn Sie die Berechnung für diese Definitionen ausführen.
Funktionssatz 1: Kaufdatenfunktionen
Wir können die Kaufprotokolldaten auf Benutzerebene zusammenfassen, um einen Einblick in die früheren Aktivitäten dieses Benutzers auf unserer Plattform zu erhalten. Insbesondere können wir SUM
s COUNT
s und AVERAGE
s ihrer vorherigen Kaufbeträge über verschiedene Zeiträume hinweg berechnen.
Da diese Funktion auf einer Quelle aufbaut, die sowohl eine Tabelle als auch ein Thema enthält, können ihre Funktionen sowohl im Batch als auch im Streaming berechnet werden.
source = Source (
events = EventSource (
table = "data.purchases" , # This points to the log table with historical purchase events
topic = None , # Streaming is not currently part of quickstart, but this would be where you define the topic for realtime events
query = Query (
selects = select ( "user_id" , "purchase_price" ), # Select the fields we care about
time_column = "ts" ) # The event time
))
window_sizes = [ Window ( length = day , timeUnit = TimeUnit . DAYS ) for day in [ 3 , 14 , 30 ]] # Define some window sizes to use below
v1 = GroupBy (
sources = [ source ],
keys = [ "user_id" ], # We are aggregating by user
aggregations = [ Aggregation (
input_column = "purchase_price" ,
operation = Operation . SUM ,
windows = window_sizes
), # The sum of purchases prices in various windows
Aggregation (
input_column = "purchase_price" ,
operation = Operation . COUNT ,
windows = window_sizes
), # The count of purchases in various windows
Aggregation (
input_column = "purchase_price" ,
operation = Operation . AVERAGE ,
windows = window_sizes
) # The average purchases by user in various windows
],
)
Die gesamte Codedatei finden Sie hier: kauft GroupBy. Dies ist auch in Ihrem Docker-Image enthalten. Wir werden die Berechnung dafür und für die anderen GroupBys in Schritt 3 – Daten auffüllen – durchführen.
Funktionssatz 2: Gibt Datenfunktionen zurück
Wir führen einen ähnlichen Satz von Aggregationen für Rückgabedaten im Rückgabe-GroupBy durch. Der Code ist hier nicht enthalten, da er dem obigen Beispiel ähnelt.
Funktionssatz 3: Benutzerdatenfunktionen
Das Umwandeln von Benutzerdaten in Features ist etwas einfacher, vor allem weil keine Aggregationen einbezogen werden müssen. In diesem Fall ist der Primärschlüssel der Quelldaten derselbe wie der Primärschlüssel des Features, daher extrahieren wir einfach Spaltenwerte, anstatt Aggregationen über Zeilen durchzuführen:
source = Source (
entities = EntitySource (
snapshotTable = "data.users" , # This points to a table that contains daily snapshots of the entire product catalog
query = Query (
selects = select ( "user_id" , "account_created_ds" , "email_verified" ), # Select the fields we care about
)
))
v1 = GroupBy (
sources = [ source ],
keys = [ "user_id" ], # Primary key is the same as the primary key for the source table
aggregations = None # In this case, there are no aggregations or windows to define
)
Entnommen aus dem Benutzer GroupBy.
Als nächstes müssen wir die Funktionen, die wir zuvor definiert haben, in einer einzigen Tabelle für das Modelltraining auffüllen. Dies kann mithilfe der Join
-API erreicht werden.
Für unseren Anwendungsfall ist es sehr wichtig, dass Features mit dem richtigen Zeitstempel berechnet werden. Da unser Modell ausgeführt wird, wenn der Checkout-Ablauf beginnt, möchten wir sicherstellen, dass wir den entsprechenden Zeitstempel in unserem Backfill verwenden, damit die Funktionswerte für das Modelltraining logisch mit dem übereinstimmen, was das Modell in der Online-Inferenz sieht.
Join
ist die API, die Feature-Backfills für Trainingsdaten steuert. Es erfüllt im Wesentlichen folgende Funktionen:
Join
).So sieht unser Beitritt aus:
source = Source (
events = EventSource (
table = "data.checkouts" ,
query = Query (
selects = select ( "user_id" ), # The primary key used to join various GroupBys together
time_column = "ts" ,
) # The event time used to compute feature values as-of
))
v1 = Join (
left = source ,
right_parts = [ JoinPart ( group_by = group_by ) for group_by in [ purchases_v1 , refunds_v1 , users ]] # Include the three GroupBys
)
Entnommen aus dem training_set Join.
Die left
Seite des Joins definiert die Zeitstempel und Primärschlüssel für den Backfill (beachten Sie, dass er auf dem checkout
-Ereignis aufbaut, wie es unser Anwendungsfall vorschreibt).
Beachten Sie, dass dieser Join
die oben genannten drei GroupBy
s in einer Datendefinition kombiniert. Im nächsten Schritt führen wir den Befehl aus, um die Berechnung für diese gesamte Pipeline auszuführen.
Sobald der Join definiert ist, kompilieren wir ihn mit diesem Befehl:
compile.py --conf=joins/quickstart/training_set.py
Dies wandelt es in eine Sparsamkeitsdefinition um, die wir mit dem folgenden Befehl an Spark senden können:
run.py --conf production/joins/quickstart/training_set.v1
Die Ausgabe des Backfills würde die Spalten „user_id“ und „ts“ aus der linken Quelle sowie die 11 Feature-Spalten aus den drei von uns erstellten GroupBys enthalten.
Merkmalswerte würden für jede Benutzer-ID und jeden Benutzer auf der linken Seite mit garantierter zeitlicher Genauigkeit berechnet. Wenn also beispielsweise eine der Zeilen auf der linken Seite für user_id = 123
und ts = 2023-10-01 10:11:23.195
wäre, würde die Funktion purchase_price_avg_30d
für diesen Benutzer mit einem genauen 30-Tage-Fenster berechnet, das am endet dieser Zeitstempel.
Sie können die aufgefüllten Daten jetzt mit der Spark-SQL-Shell abfragen:
spark-sql
Und dann:
spark - sql > SELECT user_id, quickstart_returns_v1_refund_amt_sum_30d, quickstart_purchases_v1_purchase_price_sum_14d, quickstart_users_v1_email_verified from default . quickstart_training_set_v1 limit 100 ;
Beachten Sie, dass dadurch nur einige Spalten ausgewählt werden. Sie können auch „ select * from default.quickstart_training_set_v1 limit 100
ausführen, um alle Spalten anzuzeigen. Beachten Sie jedoch, dass die Tabelle recht breit ist und die Ergebnisse auf Ihrem Bildschirm möglicherweise nicht gut lesbar sind.
Um die SQL-Shell zu verlassen, können Sie Folgendes ausführen:
spark-sql > quit ;
Nachdem wir nun einen Join erstellt und Daten aufgefüllt haben, besteht der nächste Schritt darin, ein Modell zu trainieren. Das ist nicht Teil dieses Tutorials, aber wenn es vollständig ist, besteht der nächste Schritt darin, das Modell online zu produzieren. Dazu müssen wir in der Lage sein, Merkmalsvektoren für die Modellinferenz abzurufen. Darum geht es im nächsten Abschnitt.
Um Online-Flows bedienen zu können, benötigen wir zunächst die im Online-KV-Shop hochgeladenen Daten. Dies unterscheidet sich in zweierlei Hinsicht vom Backfill, den wir im vorherigen Schritt ausgeführt haben:
Laden Sie die Einkäufe GroupBy hoch:
run.py --mode upload --conf production/group_bys/quickstart/purchases.v1 --ds 2023-12-01
spark-submit --class ai. chronon .quickstart.online.Spark2MongoLoader --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_purchases_v1_upload mongodb://admin:admin@mongodb:27017/ ? authSource=admin
Laden Sie die Retouren GroupBy hoch:
run.py --mode upload --conf production/group_bys/quickstart/returns.v1 --ds 2023-12-01
spark-submit --class ai. chronon .quickstart.online.Spark2MongoLoader --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_returns_v1_upload mongodb://admin:admin@mongodb:27017/ ? authSource=admin
Wenn wir die FetchJoin
API anstelle von FetchGroupby
verwenden möchten, müssen wir auch die Join-Metadaten hochladen:
run.py --mode metadata-upload --conf production/joins/quickstart/training_set.v2
Dadurch weiß der Online-Abrufer, wie er eine Anfrage für diesen Join annehmen und in einzelne GroupBy-Anfragen aufteilen und den einheitlichen Vektor zurückgeben muss, ähnlich wie der Join-Backfill die Wide-View-Tabelle mit allen Features erzeugt.
Wenn die oben genannten Entitäten definiert sind, können Sie Feature-Vektoren jetzt problemlos mit einem einfachen API-Aufruf abrufen.
Einen Join abrufen:
run.py --mode fetch --type join --name quickstart/training_set.v2 -k ' {"user_id":"5"} '
Sie können auch ein einzelnes GroupBy abrufen (dafür wäre der zuvor durchgeführte Schritt zum Hochladen von Metadaten beitreten nicht erforderlich):
run.py --mode fetch --type group-by --name quickstart/purchases.v1 -k ' {"user_id":"5"} '
Für die Produktion wird der Java-Client üblicherweise direkt in Dienste eingebettet.
Map < String , String > keyMap = new HashMap <>();
keyMap . put ( "user_id" , "123" );
Fetcher . fetch_join ( new Request ( "quickstart/training_set_v1" , keyMap ))
Beispielantwort
> '{"purchase_price_avg_3d":14.3241, "purchase_price_avg_14d":11.89352, ...}'
Hinweis: Dieser Java-Code kann nicht in der Docker-Umgebung ausgeführt werden, es handelt sich lediglich um ein veranschaulichendes Beispiel.
Wie in den einleitenden Abschnitten dieser README-Datei erläutert, ist eine der Kerngarantien von chronon die Online-/Offline-Konsistenz. Das bedeutet, dass die Daten, die Sie zum Trainieren Ihres Modells (offline) verwenden, mit den Daten übereinstimmen, die das Modell für die Produktionsinferenz (online) sieht.
Ein Schlüsselelement dabei ist die zeitliche Genauigkeit. Dies lässt sich folgendermaßen formulieren: Beim Backfilling von Features sollte der Wert, der für einen bestimmten timestamp
auf der linken Seite des Joins erzeugt wird, derselbe sein wie der Wert, der online zurückgegeben worden wäre, wenn das Feature zu diesem bestimmten timestamp
abgerufen worden wäre .
chronon garantiert nicht nur diese zeitliche Genauigkeit, sondern bietet auch eine Möglichkeit, sie zu messen.
Die Messpipeline beginnt mit den Protokollen der Online-Abrufanforderungen. Diese Protokolle enthalten die Primärschlüssel und den Zeitstempel der Anfrage sowie die abgerufenen Funktionswerte. chronon übergibt dann die Schlüssel und Zeitstempel als linke Seite an einen Join-Backfill und fordert die Rechenmaschine auf, die Feature-Werte per Backfill aufzufüllen. Anschließend werden die aufgefüllten Werte mit den tatsächlich abgerufenen Werten verglichen, um die Konsistenz zu messen.
Schritt 1: Protokollabrufe
Stellen Sie zunächst sicher, dass Sie einige Abrufanfragen ausgeführt haben. Laufen:
run.py --mode fetch --type join --name quickstart/training_set.v2 -k '{"user_id":"5"}'
Ein paar Mal, um einige Abrufe zu generieren.
Wenn dies abgeschlossen ist, können Sie Folgendes ausführen, um eine verwendbare Protokolltabelle zu erstellen (diese Befehle erzeugen eine Protokollierungs-Hive-Tabelle mit dem richtigen Schema):
spark-submit --class ai. chronon .quickstart.online.MongoLoggingDumper --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default. chronon _log_table mongodb://admin:admin@mongodb:27017/ ? authSource=admin
compile.py --conf group_bys/quickstart/schema.py
run.py --mode backfill --conf production/group_bys/quickstart/schema.v1
run.py --mode log-flattener --conf production/joins/quickstart/training_set.v2 --log-table default. chronon _log_table --schema-table default.quickstart_schema_v1
Dadurch wird eine Tabelle default.quickstart_training_set_v2_logged
erstellt, die die Ergebnisse aller zuvor von Ihnen gestellten Abrufanforderungen zusammen mit dem Zeitstempel, zu dem Sie sie gestellt haben, und dem von Ihnen angeforderten user
enthält.
Hinweis: Sobald Sie den obigen Befehl ausführen, werden die Protokollpartitionen erstellt und „geschlossen“, was bedeutet, dass bei weiteren Abrufen am selben Tag (UTC-Zeit) keine Anhänge erfolgen. Wenn Sie zurückgehen und weitere Anforderungen für die Online-/Offline-Konsistenz generieren möchten, können Sie die Tabelle löschen (führen Sie DROP TABLE default.quickstart_training_set_v2_logged
in einer spark-sql
Shell aus), bevor Sie den obigen Befehl erneut ausführen.
Jetzt können Sie mit diesem Befehl Konsistenzmetriken berechnen:
run.py --mode consistency-metrics-compute --conf production/joins/quickstart/training_set.v2
Dieser Job übernimmt den/die Primärschlüssel und Zeitstempel aus der Protokolltabelle (in diesem Fall default.quickstart_training_set_v2_logged
) und verwendet diese, um einen Join-Backfill zu erstellen und auszuführen. Anschließend werden die Backfill-Ergebnisse mit den tatsächlich protokollierten Werten verglichen, die online abgerufen wurden
Es werden zwei Ausgabetabellen erstellt:
default.quickstart_training_set_v2_consistency
: Eine für Menschen lesbare Tabelle, die Sie abfragen können, um die Ergebnisse der Konsistenzprüfungen anzuzeigen.spark-sql
in Ihrer Docker-Bash-Sitzung ausführen und dann die Tabelle abfragen.DESC default.quickstart_training_set_v2_consistency
ausführen und dann einige Spalten auswählen, die Sie abfragen möchten.default.quickstart_training_set_v2_consistency_upload
: Eine Liste von KV-Bytes, die in den Online-KV-Speicher hochgeladen wird und zur Unterstützung von Online-Datenqualitätsüberwachungsflüssen verwendet werden kann. Nicht dazu gedacht, für Menschen lesbar zu sein. Die Verwendung chronon für Ihre Feature-Engineering-Arbeit vereinfacht und verbessert Ihren ML-Workflow auf verschiedene Weise:
Einen detaillierteren Einblick in die Vorteile der Verwendung von chronon finden Sie in der Dokumentation zu den Vorteilen von chronon .
chronon bietet den größten Nutzen für KI/ML-Praktiker, die versuchen, „Online“-Modelle zu erstellen, die Anfragen in Echtzeit und nicht in Batch-Workflows bearbeiten.
Ohne chronon müssen Ingenieure, die an diesen Projekten arbeiten, herausfinden, wie sie Daten für Training/Bewertung sowie Produktionsrückschlüsse in ihre Modelle übertragen können. Da die Komplexität der in diese Modelle einfließenden Daten zunimmt (mehrere Quellen, komplexe Transformationen wie fensterbasierte Aggregationen usw.), steigt auch die Infrastrukturherausforderung bei der Unterstützung dieser Datenverarbeitung.
Im Allgemeinen haben wir beobachtet, dass ML-Praktiker einen von zwei Ansätzen verfolgen:
Bei diesem Ansatz beginnen Benutzer mit den Daten, die in der Online-Bereitstellungsumgebung verfügbar sind, von der aus die Modellinferenz ausgeführt wird. Protokollieren Sie relevante Funktionen im Data Warehouse. Sobald genügend Daten gesammelt wurden, trainieren Sie das Modell anhand der Protokolle und stellen Sie es mit denselben Daten bereit.
Vorteile:
Nachteile:
Bei diesem Ansatz trainieren Benutzer das Modell mit Daten aus dem Data Warehouse und finden dann Möglichkeiten, diese Funktionen in der Online-Umgebung zu replizieren.
Vorteile:
Nachteile:
Der chronon -Ansatz
Mit chronon können Sie alle in Ihrem Unternehmen verfügbaren Daten nutzen, einschließlich aller Daten im Data Warehouse, aller Streaming-Quellen, Serviceanrufe usw., mit garantierter Konsistenz zwischen Online- und Offline-Umgebungen. Es abstrahiert die Infrastrukturkomplexität der Orchestrierung und Wartung dieser Dateninstallation, sodass Benutzer einfach Funktionen in einer einfachen API definieren und chronon den Rest anvertrauen können.
Wir freuen uns über Beiträge zum chronon -Projekt! Weitere Informationen finden Sie unter BEITRAG.
Verwenden Sie den GitHub-Issue-Tracker, um Fehler oder Funktionsanfragen zu melden. Treten Sie unserem Community-Slack-Workspace bei, um Diskussionen, Tipps und Unterstützung zu erhalten.