chronon — это платформа, которая абстрагирует сложность вычислений данных и обслуживания приложений AI/ML. Пользователи определяют функции как преобразование необработанных данных, после чего chronon может выполнять пакетные и потоковые вычисления, масштабируемое заполнение, обслуживание с малой задержкой, гарантированную корректность и согласованность, а также множество инструментов наблюдения и мониторинга.
Это позволяет вам использовать все данные внутри вашей организации, из пакетных таблиц, потоков событий или сервисов, для реализации ваших проектов AI/ML, не беспокоясь о всей сложной оркестрации, которую это обычно влечет за собой.
Более подробную информацию о chronon можно найти на сайте chronon .
chronon предлагает API для получения данных в реальном времени, который возвращает актуальные значения для ваших функций. Он поддерживает:
Специалистам по машинному обучению часто требуются исторические представления значений признаков для обучения и оценки моделей. Засыпки chronon :
chronon обеспечивает видимость:
chronon поддерживает ряд типов агрегирования. Полный список смотрите в документации здесь.
Все эти агрегаты можно настроить для вычисления в окнах произвольных размеров.
В этом разделе описаны шаги по созданию набора обучающих данных с помощью chronon с использованием сфабрикованного базового набора необработанных данных.
Включает:
GroupBy
и Join
.Не включает:
Чтобы начать работу с chronon , все, что вам нужно сделать, это загрузить файл docker-compose.yml и запустить его локально:
curl -o docker-compose.yml https://chronon.ai/docker-compose.yml
docker-compose up
Как только вы увидите некоторые данные, напечатанные с уведомлением о only showing top 20 rows
, вы готовы продолжить обучение.
В этом примере предположим, что мы являемся крупным интернет-магазином и обнаружили вектор мошенничества, основанный на том, что пользователи совершают покупки, а затем возвращают товары. Мы хотим обучить модель, которая будет вызываться при начале процесса оформления заказа и прогнозировать, приведет ли эта транзакция к мошенническому возврату.
Изготовленные необработанные данные включены в каталог данных. Он включает в себя четыре таблицы:
В новом окне терминала запустите:
docker-compose exec main bash
Это откроет оболочку в контейнере докера chronon .
Теперь, когда этапы настройки завершены, мы можем приступить к созданию и тестированию различных объектов chronon для определения преобразований и агрегаций, а также генерации данных.
Начнем с трех наборов функций, созданных на основе наших исходных источников ввода.
Примечание. Эти определения Python уже находятся в вашем образе chronon
. Вам не нужно ничего выполнять до шага 3 — Обратное заполнение данных, когда вы запустите вычисления для этих определений.
Набор функций 1: функции данных о покупках
Мы можем агрегировать данные журнала покупок на уровне пользователя, чтобы получить представление о предыдущей активности этого пользователя на нашей платформе. В частности, мы можем вычислить SUM
COUNT
и AVERAGE
сумм их предыдущих покупок в различных окнах.
Поскольку эта функция основана на источнике, который включает в себя как таблицу, так и тему, ее функции можно вычислять как в пакетном, так и в потоковом режиме.
source = Source (
events = EventSource (
table = "data.purchases" , # This points to the log table with historical purchase events
topic = None , # Streaming is not currently part of quickstart, but this would be where you define the topic for realtime events
query = Query (
selects = select ( "user_id" , "purchase_price" ), # Select the fields we care about
time_column = "ts" ) # The event time
))
window_sizes = [ Window ( length = day , timeUnit = TimeUnit . DAYS ) for day in [ 3 , 14 , 30 ]] # Define some window sizes to use below
v1 = GroupBy (
sources = [ source ],
keys = [ "user_id" ], # We are aggregating by user
aggregations = [ Aggregation (
input_column = "purchase_price" ,
operation = Operation . SUM ,
windows = window_sizes
), # The sum of purchases prices in various windows
Aggregation (
input_column = "purchase_price" ,
operation = Operation . COUNT ,
windows = window_sizes
), # The count of purchases in various windows
Aggregation (
input_column = "purchase_price" ,
operation = Operation . AVERAGE ,
windows = window_sizes
) # The average purchases by user in various windows
],
)
Полный файл кода см. здесь: Purchases GroupBy. Это также есть в вашем образе докера. Мы будем выполнять вычисления для него и других GroupBy на этапе 3 — обратное заполнение данных.
Набор функций 2. Возвращает функции данных.
Мы выполняем аналогичный набор агрегирования данных о возвратах в отчетах GroupBy. Код сюда не включен, поскольку он похож на приведенный выше пример.
Набор функций 3: Функции пользовательских данных
Преобразование пользовательских данных в функции немного проще, в первую очередь потому, что нет необходимости включать агрегаты. В этом случае первичный ключ исходных данных совпадает с первичным ключом функции, поэтому мы просто извлекаем значения столбца, а не выполняем агрегацию по строкам:
source = Source (
entities = EntitySource (
snapshotTable = "data.users" , # This points to a table that contains daily snapshots of the entire product catalog
query = Query (
selects = select ( "user_id" , "account_created_ds" , "email_verified" ), # Select the fields we care about
)
))
v1 = GroupBy (
sources = [ source ],
keys = [ "user_id" ], # Primary key is the same as the primary key for the source table
aggregations = None # In this case, there are no aggregations or windows to define
)
Взято у пользователей GroupBy.
Далее нам нужно, чтобы функции, которые мы ранее определили, были заполнены в одной таблице для обучения модели. Этого можно добиться с помощью API Join
.
Для нашего варианта использования очень важно, чтобы функции вычислялись по правильной временной метке. Поскольку наша модель запускается, когда начинается поток оформления заказа, мы обязательно должны использовать соответствующую временную метку в нашем заполнении, чтобы значения функций для обучения модели логически соответствовали тому, что модель увидит в онлайн-выводе.
Join
— это API, который управляет заполнением функций для обучающих данных. В первую очередь он выполняет следующие функции:
Join
).Вот как выглядит наше объединение:
source = Source (
events = EventSource (
table = "data.checkouts" ,
query = Query (
selects = select ( "user_id" ), # The primary key used to join various GroupBys together
time_column = "ts" ,
) # The event time used to compute feature values as-of
))
v1 = Join (
left = source ,
right_parts = [ JoinPart ( group_by = group_by ) for group_by in [ purchases_v1 , refunds_v1 , users ]] # Include the three GroupBys
)
Взято из обучающего_сета. Присоединяйтесь.
left
часть соединения определяет временные метки и первичные ключи для обратного заполнения (обратите внимание, что оно создается поверх события checkout
, как того требует наш вариант использования).
Обратите внимание, что это Join
объединяет три вышеупомянутых GroupBy
в одно определение данных. На следующем шаге мы запустим команду для выполнения вычислений для всего этого конвейера.
Как только объединение определено, мы скомпилируем его с помощью этой команды:
compile.py --conf=joins/quickstart/training_set.py
Это преобразует его в определение экономии, которое мы можем отправить в Spark с помощью следующей команды:
run.py --conf production/joins/quickstart/training_set.v1
Выходные данные обратной засыпки будут содержать столбцы user_id и ts из левого источника, а также 11 столбцов функций из трех созданных нами объектов GroupBy.
Значения функций будут вычисляться для каждого user_id и ts слева с гарантированной временной точностью. Так, например, если одна из строк слева была для user_id = 123
и ts = 2023-10-01 10:11:23.195
, то функция purchase_price_avg_30d
будет рассчитана для этого пользователя с точным 30-дневным окном, заканчивающимся эта временная метка.
Теперь вы можете запросить заполненные данные с помощью оболочки Spark sql:
spark-sql
А потом:
spark - sql > SELECT user_id, quickstart_returns_v1_refund_amt_sum_30d, quickstart_purchases_v1_purchase_price_sum_14d, quickstart_users_v1_email_verified from default . quickstart_training_set_v1 limit 100 ;
Обратите внимание, что при этом выбирается только несколько столбцов. Вы также можете запустить select * from default.quickstart_training_set_v1 limit 100
, чтобы просмотреть все столбцы, однако учтите, что таблица довольно широкая, и результаты могут быть не очень читабельны на вашем экране.
Чтобы выйти из оболочки sql, вы можете запустить:
spark-sql > quit ;
Теперь, когда мы создали объединение и заполнили данные, следующим шагом будет обучение модели. Это не входит в данное руководство, но, если оно завершено, следующим шагом будет создание модели в Интернете. Для этого нам нужно иметь возможность получать векторы признаков для вывода модели. Это то, что рассматривается в следующем разделе.
Для обслуживания онлайн-потоков нам сначала нужны данные, загруженные в интернет-магазин KV. Это отличается от обратной засыпки, которую мы запускали на предыдущем шаге, двумя способами:
Загрузите покупки GroupBy:
run.py --mode upload --conf production/group_bys/quickstart/purchases.v1 --ds 2023-12-01
spark-submit --class ai. chronon .quickstart.online.Spark2MongoLoader --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_purchases_v1_upload mongodb://admin:admin@mongodb:27017/ ? authSource=admin
Загрузите результаты GroupBy:
run.py --mode upload --conf production/group_bys/quickstart/returns.v1 --ds 2023-12-01
spark-submit --class ai. chronon .quickstart.online.Spark2MongoLoader --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_returns_v1_upload mongodb://admin:admin@mongodb:27017/ ? authSource=admin
Если мы хотим использовать API FetchJoin
вместо FetchGroupby
, нам также необходимо загрузить метаданные соединения:
run.py --mode metadata-upload --conf production/joins/quickstart/training_set.v2
Благодаря этому онлайн-выборщик знает, как принять запрос на это соединение и разбить его на отдельные запросы GroupBy, возвращая единый вектор, аналогично тому, как обратное заполнение соединения создает таблицу широкого представления со всеми функциями.
Определив вышеуказанные объекты, вы теперь можете легко получать векторы объектов с помощью простого вызова API.
Получение соединения:
run.py --mode fetch --type join --name quickstart/training_set.v2 -k ' {"user_id":"5"} '
Вы также можете получить один GroupBy (для этого не потребуется шаг загрузки метаданных «Присоединиться», выполненный ранее):
run.py --mode fetch --type group-by --name quickstart/purchases.v1 -k ' {"user_id":"5"} '
В производственной среде клиент Java обычно встраивается непосредственно в сервисы.
Map < String , String > keyMap = new HashMap <>();
keyMap . put ( "user_id" , "123" );
Fetcher . fetch_join ( new Request ( "quickstart/training_set_v1" , keyMap ))
образец ответа
> '{"purchase_price_avg_3d":14.3241, "purchase_price_avg_14d":11.89352, ...}'
Примечание. Этот Java-код нельзя запустить в среде Docker, это всего лишь иллюстративный пример.
Как обсуждалось во вводных разделах этого файла README, одной из основных гарантий chronon является согласованность онлайн/оффлайн. Это означает, что данные, которые вы используете для обучения модели (офлайн), совпадают с данными, которые модель видит для производственного вывода (онлайн).
Ключевым элементом этого является временная точность. Это можно сформулировать так: при обратном заполнении функций значение, создаваемое для любой заданной timestamp
предоставленной левой частью соединения, должно быть таким же, как то, которое было бы возвращено онлайн, если бы эта функция была получена в эту конкретную timestamp
.
chronon не только гарантирует эту временную точность, но и предлагает способ ее измерения.
Конвейер измерения начинается с журналов запросов онлайн-выборки. Эти журналы включают первичные ключи и метку времени запроса, а также полученные значения функций. Затем chronon передает ключи и временные метки в резервное заполнение соединения в качестве левой стороны, прося вычислительный механизм заполнить значения функций. Затем он сравнивает заполненные значения с фактически полученными значениями для измерения согласованности.
Шаг 1. Получение журналов
Сначала убедитесь, что вы выполнили несколько запросов на выборку. Бегать:
run.py --mode fetch --type join --name quickstart/training_set.v2 -k '{"user_id":"5"}'
Несколько раз, чтобы сгенерировать несколько выборок.
После этого вы можете запустить это, чтобы создать полезную таблицу журнала (эти команды создают таблицу куста журналирования с правильной схемой):
spark-submit --class ai. chronon .quickstart.online.MongoLoggingDumper --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default. chronon _log_table mongodb://admin:admin@mongodb:27017/ ? authSource=admin
compile.py --conf group_bys/quickstart/schema.py
run.py --mode backfill --conf production/group_bys/quickstart/schema.v1
run.py --mode log-flattener --conf production/joins/quickstart/training_set.v2 --log-table default. chronon _log_table --schema-table default.quickstart_schema_v1
При этом создается таблица default.quickstart_training_set_v2_logged
, содержащая результаты каждого из ранее сделанных вами запросов на выборку, а также отметку времени, когда вы их сделали, и запрошенного user
.
Примечание. После запуска указанной выше команды она создаст и «закроет» разделы журнала. Это означает, что если вы сделаете дополнительные выборки в тот же день (время UTC), они не будут добавлены. Если вы хотите вернуться и сгенерировать больше запросов для согласованности онлайн/оффлайн, вы можете удалить таблицу (запустите DROP TABLE default.quickstart_training_set_v2_logged
в оболочке spark-sql
) перед повторным запуском указанной выше команды.
Теперь вы можете вычислить показатели согласованности с помощью этой команды:
run.py --mode consistency-metrics-compute --conf production/joins/quickstart/training_set.v2
Это задание возьмет первичный ключ(и) и временные метки из таблицы журнала (в данном случае default.quickstart_training_set_v2_logged
) и использует их для создания и запуска обратного заполнения соединения. Затем он сравнивает заполненные результаты с фактическими зарегистрированными значениями, полученными онлайн.
Он создает две выходные таблицы:
default.quickstart_training_set_v2_consistency
: удобочитаемая таблица, к которой вы можете запросить результаты проверок согласованности.spark-sql
из сеанса docker bash, а затем запросив таблицу.DESC default.quickstart_training_set_v2_consistency
, а затем выбрать несколько столбцов, которые вам нужны для запроса.default.quickstart_training_set_v2_consistency_upload
: список байтов KV, который загружается в интернет-магазин KV и может использоваться для обеспечения потоков онлайн-мониторинга качества данных. Не предназначено для чтения человеком. Использование chronon для разработки функций упрощает и улучшает рабочий процесс машинного обучения несколькими способами:
Более подробное описание преимуществ использования chronon см. в разделе «Преимущества документации chronon ».
chronon предлагает наибольшую пользу специалистам по искусственному интеллекту и машинному обучению, которые пытаются создавать «онлайн» модели, обслуживающие запросы в режиме реального времени, а не в пакетных рабочих процессах.
Без chronon инженерам, работающим над этими проектами, необходимо выяснить, как получить данные для своих моделей для обучения/оценки, а также для вывода производственных данных. По мере того как сложность данных, входящих в эти модели, возрастает (множественные источники, сложные преобразования, такие как оконные агрегации и т. д.), растет и инфраструктурная проблема поддержки этой системы передачи данных.
Как правило, мы наблюдали, как специалисты по ОД применяли один из двух подходов:
При таком подходе пользователи начинают с данных, доступных в онлайн-среде обслуживания, из которой будет выполняться вывод модели. Зарегистрируйте соответствующие функции в хранилище данных. Как только накопится достаточно данных, обучите модель на журналах и обслуживайте ее с теми же данными.
Плюсы:
Минусы:
При таком подходе пользователи обучают модель на данных из хранилища данных, а затем находят способы воспроизвести эти функции в онлайн-среде.
Плюсы:
Минусы:
chronon подход
С помощью chronon вы можете использовать любые данные, доступные в вашей организации, включая все, что находится в хранилище данных, любой источник потоковой передачи, сервисные вызовы и т. д., с гарантированной согласованностью между онлайн- и оффлайн-средами. Он абстрагирует сложность инфраструктуры, связанную с организацией и обслуживанием этой передачи данных, так что пользователи могут просто определять функции в простом API и доверять chronon обработку всего остального.
Мы приветствуем вклад в проект chronon ! Пожалуйста, прочитайте ВКЛАД для получения подробной информации.
Используйте систему отслеживания проблем GitHub, чтобы сообщать об ошибках или запрашивать новые функции. Присоединяйтесь к нашему сообществу, рабочему пространству Slack, чтобы получать обсуждения, советы и поддержку.