chronon é uma plataforma que abstrai a complexidade da computação de dados e serve para aplicações de IA/ML. Os usuários definem recursos como transformação de dados brutos, então chronon pode realizar computação em lote e streaming, preenchimentos escalonáveis, serviço de baixa latência, correção e consistência garantidas, bem como uma série de ferramentas de observabilidade e monitoramento.
Ele permite que você utilize todos os dados da sua organização, desde tabelas em lote, fluxos de eventos ou serviços para potencializar seus projetos de IA/ML, sem precisar se preocupar com toda a orquestração complexa que isso normalmente implicaria.
Mais informações sobre chronon podem ser encontradas em chronon .ai.
chronon oferece uma API para busca em tempo real que retorna valores atualizados para seus recursos. Suporta:
Os profissionais de ML geralmente precisam de visualizações históricas dos valores dos recursos para treinamento e avaliação de modelos. Os preenchimentos de chronon são:
chronon oferece visibilidade sobre:
chronon suporta uma variedade de tipos de agregação. Para obter uma lista completa, consulte a documentação aqui.
Todas essas agregações podem ser configuradas para serem calculadas em tamanhos de janela arbitrários.
Esta seção orienta você nas etapas para criar um conjunto de dados de treinamento com chronon , usando um conjunto de dados brutos subjacente fabricado.
Inclui:
GroupBy
e Join
.Não inclui:
Para começar a usar o chronon , tudo que você precisa fazer é baixar o arquivo docker-compose.yml e executá-lo localmente:
curl -o docker-compose.yml https://chronon.ai/docker-compose.yml
docker-compose up
Depois de ver alguns dados impressos com um aviso only showing top 20 rows
, você estará pronto para prosseguir com o tutorial.
Neste exemplo, vamos supor que somos um grande varejista on-line e detectamos um vetor de fraude baseado em usuários que fazem compras e posteriormente devolvem itens. Queremos treinar um modelo que será chamado quando o fluxo de checkout começar e prever se essa transação provavelmente resultará em uma devolução fraudulenta.
Os dados brutos fabricados são incluídos no diretório de dados. Inclui quatro tabelas:
Em uma nova janela de terminal, execute:
docker-compose exec main bash
Isso abrirá um shell dentro do contêiner do docker chronon .
Agora que as etapas de configuração foram concluídas, podemos começar a criar e testar vários objetos chronon para definir transformações e agregações e gerar dados.
Vamos começar com três conjuntos de recursos, construídos com base em nossas fontes de entrada brutas.
Nota: Essas definições de python já estão em sua imagem chronon
. Não há nada para você executar até a Etapa 3 - Preencher dados, quando você executará o cálculo para essas definições.
Conjunto de recursos 1: recursos de dados de compras
Podemos agregar os dados do registo de compras ao nível do utilizador, para nos dar uma visão da atividade anterior deste utilizador na nossa plataforma. Especificamente, podemos calcular SUM
s COUNT
s e AVERAGE
s de seus valores de compras anteriores em várias janelas.
Como esse recurso é construído sobre uma fonte que inclui uma tabela e um tópico, seus recursos podem ser computados em lote e em streaming.
source = Source (
events = EventSource (
table = "data.purchases" , # This points to the log table with historical purchase events
topic = None , # Streaming is not currently part of quickstart, but this would be where you define the topic for realtime events
query = Query (
selects = select ( "user_id" , "purchase_price" ), # Select the fields we care about
time_column = "ts" ) # The event time
))
window_sizes = [ Window ( length = day , timeUnit = TimeUnit . DAYS ) for day in [ 3 , 14 , 30 ]] # Define some window sizes to use below
v1 = GroupBy (
sources = [ source ],
keys = [ "user_id" ], # We are aggregating by user
aggregations = [ Aggregation (
input_column = "purchase_price" ,
operation = Operation . SUM ,
windows = window_sizes
), # The sum of purchases prices in various windows
Aggregation (
input_column = "purchase_price" ,
operation = Operation . COUNT ,
windows = window_sizes
), # The count of purchases in various windows
Aggregation (
input_column = "purchase_price" ,
operation = Operation . AVERAGE ,
windows = window_sizes
) # The average purchases by user in various windows
],
)
Veja o arquivo de código completo aqui: compras GroupBy. Isso também está na sua imagem do Docker. Estaremos executando cálculos para ele e os outros GroupBys na Etapa 3 - Preenchimento de dados.
Conjunto de recursos 2: retorna recursos de dados
Realizamos um conjunto semelhante de agregações em dados de retornos no GroupBy de retornos. O código não está incluído aqui porque é semelhante ao exemplo acima.
Conjunto de recursos 3: recursos de dados do usuário
Transformar dados do usuário em recursos é um pouco mais simples, principalmente porque não há agregações a serem incluídas. Nesse caso, a chave primária dos dados de origem é igual à chave primária do recurso, portanto, estamos simplesmente extraindo valores de coluna em vez de realizar agregações em linhas:
source = Source (
entities = EntitySource (
snapshotTable = "data.users" , # This points to a table that contains daily snapshots of the entire product catalog
query = Query (
selects = select ( "user_id" , "account_created_ds" , "email_verified" ), # Select the fields we care about
)
))
v1 = GroupBy (
sources = [ source ],
keys = [ "user_id" ], # Primary key is the same as the primary key for the source table
aggregations = None # In this case, there are no aggregations or windows to define
)
Retirado dos usuários GroupBy.
Em seguida, precisamos que os recursos que definimos anteriormente sejam preenchidos em uma única tabela para treinamento do modelo. Isso pode ser conseguido usando a API Join
.
Para nosso caso de uso, é muito importante que os recursos sejam calculados a partir do carimbo de data/hora correto. Como nosso modelo é executado quando o fluxo de checkout começa, queremos ter certeza de usar o carimbo de data/hora correspondente em nosso preenchimento, de modo que os valores dos recursos para treinamento do modelo correspondam logicamente ao que o modelo verá na inferência online.
Join
é a API que impulsiona preenchimentos de recursos para dados de treinamento. Desempenha principalmente as seguintes funções:
Join
).Esta é a aparência da nossa junção:
source = Source (
events = EventSource (
table = "data.checkouts" ,
query = Query (
selects = select ( "user_id" ), # The primary key used to join various GroupBys together
time_column = "ts" ,
) # The event time used to compute feature values as-of
))
v1 = Join (
left = source ,
right_parts = [ JoinPart ( group_by = group_by ) for group_by in [ purchases_v1 , refunds_v1 , users ]] # Include the three GroupBys
)
Retirado do training_set Join.
O lado left
da junção é o que define os carimbos de data/hora e as chaves primárias para o preenchimento (observe que ele é construído sobre o evento checkout
, conforme determinado pelo nosso caso de uso).
Observe que este Join
combina os três GroupBy
s acima em uma definição de dados. Na próxima etapa, executaremos o comando para executar a computação para todo esse pipeline.
Uma vez definida a junção, nós a compilamos usando este comando:
compile.py --conf=joins/quickstart/training_set.py
Isso o converte em uma definição de economia que podemos enviar ao spark com o seguinte comando:
run.py --conf production/joins/quickstart/training_set.v1
A saída do preenchimento conteria as colunas user_id e ts da fonte esquerda, bem como as 11 colunas de recursos dos três GroupBys que criamos.
Os valores dos recursos seriam calculados para cada user_id e ts no lado esquerdo, com precisão temporal garantida. Assim, por exemplo, se uma das linhas à esquerda fosse para user_id = 123
e ts = 2023-10-01 10:11:23.195
, então o recurso purchase_price_avg_30d
seria calculado para esse usuário com uma janela precisa de 30 dias terminando em esse carimbo de data/hora.
Agora você pode consultar os dados preenchidos usando o shell spark sql:
spark-sql
E então:
spark - sql > SELECT user_id, quickstart_returns_v1_refund_amt_sum_30d, quickstart_purchases_v1_purchase_price_sum_14d, quickstart_users_v1_email_verified from default . quickstart_training_set_v1 limit 100 ;
Observe que isso seleciona apenas algumas colunas. Você também pode executar um select * from default.quickstart_training_set_v1 limit 100
para ver todas as colunas, no entanto, observe que a tabela é bastante ampla e os resultados podem não ser muito legíveis na tela.
Para sair do shell sql você pode executar:
spark-sql > quit ;
Agora que criamos uma junção e preenchemos os dados, a próxima etapa seria treinar um modelo. Isso não faz parte deste tutorial, mas supondo que esteja completo, o próximo passo seria produzir o modelo online. Para fazer isso, precisamos ser capazes de buscar vetores de características para inferência de modelo. É isso que a próxima seção cobre.
Para atender fluxos online, primeiro precisamos que os dados sejam carregados na loja KV online. Isso é diferente do preenchimento que executamos na etapa anterior de duas maneiras:
Faça upload das compras GroupBy:
run.py --mode upload --conf production/group_bys/quickstart/purchases.v1 --ds 2023-12-01
spark-submit --class ai. chronon .quickstart.online.Spark2MongoLoader --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_purchases_v1_upload mongodb://admin:admin@mongodb:27017/ ? authSource=admin
Faça upload dos retornos GroupBy:
run.py --mode upload --conf production/group_bys/quickstart/returns.v1 --ds 2023-12-01
spark-submit --class ai. chronon .quickstart.online.Spark2MongoLoader --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_returns_v1_upload mongodb://admin:admin@mongodb:27017/ ? authSource=admin
Se quisermos usar a API FetchJoin
em vez de FetchGroupby
, também precisaremos fazer upload dos metadados de junção:
run.py --mode metadata-upload --conf production/joins/quickstart/training_set.v2
Isso faz com que o buscador on-line saiba como receber uma solicitação para essa junção e dividi-la em solicitações GroupBy individuais, retornando o vetor unificado, semelhante a como o preenchimento da junção produz a tabela de visualização ampla com todos os recursos.
Com as entidades acima definidas, agora você pode buscar facilmente vetores de recursos com uma simples chamada de API.
Buscando uma junção:
run.py --mode fetch --type join --name quickstart/training_set.v2 -k ' {"user_id":"5"} '
Você também pode buscar um único GroupBy (isso não exigiria a etapa de upload de metadados Join realizada anteriormente):
run.py --mode fetch --type group-by --name quickstart/purchases.v1 -k ' {"user_id":"5"} '
Para produção, o cliente Java geralmente é incorporado diretamente nos serviços.
Map < String , String > keyMap = new HashMap <>();
keyMap . put ( "user_id" , "123" );
Fetcher . fetch_join ( new Request ( "quickstart/training_set_v1" , keyMap ))
exemplo de resposta
> '{"purchase_price_avg_3d":14.3241, "purchase_price_avg_14d":11.89352, ...}'
Nota: Este código java não pode ser executado no ambiente docker, é apenas um exemplo ilustrativo.
Conforme discutido nas seções introdutórias deste README, uma das principais garantias do chronon é a consistência online/offline. Isso significa que os dados que você usa para treinar seu modelo (offline) correspondem aos dados que o modelo vê para inferência de produção (online).
Um elemento-chave disso é a precisão temporal. Isso pode ser expresso da seguinte forma: ao preencher recursos, o valor produzido para qualquer carimbo de data timestamp
fornecido pelo lado esquerdo da junção deve ser o mesmo que teria sido retornado on-line se esse recurso fosse buscado naquele timestamp
específico .
chronon não apenas garante essa precisão temporal, mas também oferece uma forma de medi-la.
O pipeline de medição começa com os logs das solicitações de busca online. Esses logs incluem as chaves primárias e o carimbo de data/hora da solicitação, juntamente com os valores dos recursos buscados. chronon então passa as chaves e os carimbos de data e hora para um preenchimento de junção no lado esquerdo, solicitando ao mecanismo de computação que preencha os valores do recurso. Em seguida, ele compara os valores preenchidos com os valores reais buscados para medir a consistência.
Etapa 1: buscas de log
Primeiro, certifique-se de ter executado algumas solicitações de busca. Correr:
run.py --mode fetch --type join --name quickstart/training_set.v2 -k '{"user_id":"5"}'
Algumas vezes para gerar algumas buscas.
Com isso concluído, você pode executar isso para criar uma tabela de log utilizável (esses comandos produzem uma tabela de hive de log com o esquema correto):
spark-submit --class ai. chronon .quickstart.online.MongoLoggingDumper --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default. chronon _log_table mongodb://admin:admin@mongodb:27017/ ? authSource=admin
compile.py --conf group_bys/quickstart/schema.py
run.py --mode backfill --conf production/group_bys/quickstart/schema.v1
run.py --mode log-flattener --conf production/joins/quickstart/training_set.v2 --log-table default. chronon _log_table --schema-table default.quickstart_schema_v1
Isso cria uma tabela default.quickstart_training_set_v2_logged
que contém os resultados de cada uma das solicitações de busca feitas anteriormente, juntamente com o carimbo de data e hora em que você as fez e o user
solicitado.
Nota: Depois de executar o comando acima, ele criará e "fechará" as partições de log, o que significa que se você fizer buscas adicionais no mesmo dia (horário UTC), elas não serão anexadas. Se quiser voltar e gerar mais solicitações de consistência online/offline, você pode eliminar a tabela (executar DROP TABLE default.quickstart_training_set_v2_logged
em um shell spark-sql
) antes de executar novamente o comando acima.
Agora você pode calcular métricas de consistência com este comando:
run.py --mode consistency-metrics-compute --conf production/joins/quickstart/training_set.v2
Este trabalho pegará as chaves primárias e os carimbos de data/hora da tabela de log ( neste caso, default.quickstart_training_set_v2_logged
) e os usará para criar e executar um preenchimento de junção. Em seguida, ele compara os resultados preenchidos com os valores reais registrados que foram obtidos on-line
Produz duas tabelas de saída:
default.quickstart_training_set_v2_consistency
: uma tabela legível por humanos que você pode consultar para ver os resultados das verificações de consistência.spark-sql
em sua sessão docker bash e, em seguida, consultar a tabela.DESC default.quickstart_training_set_v2_consistency
primeiro e, em seguida, selecionar algumas colunas que deseja consultar.default.quickstart_training_set_v2_consistency_upload
: uma lista de bytes KV que são carregados no armazenamento KV online, que pode ser usada para alimentar fluxos de monitoramento de qualidade de dados online. Não foi feito para ser legível por humanos. Usar chronon para seu trabalho de engenharia de recursos simplifica e melhora seu fluxo de trabalho de ML de várias maneiras:
Para uma visão mais detalhada dos benefícios do uso chronon , consulte Benefícios da documentação chronon .
chronon oferece o maior valor para profissionais de IA/ML que estão tentando construir modelos "online" que atendem solicitações em tempo real, em oposição a fluxos de trabalho em lote.
Sem chronon , os engenheiros que trabalham nesses projetos precisam descobrir como enviar dados para seus modelos para treinamento/avaliação, bem como para inferência de produção. À medida que aumenta a complexidade dos dados que entram nestes modelos (múltiplas fontes, transformações complexas, como agregações em janelas, etc.), aumenta também o desafio da infraestrutura de suporte a esta canalização de dados.
Geralmente, observamos profissionais de ML adotando uma de duas abordagens:
Com esta abordagem, os usuários começam com os dados que estão disponíveis no ambiente de serviço on-line a partir do qual a inferência do modelo será executada. Registre recursos relevantes no data warehouse. Depois que dados suficientes forem acumulados, treine o modelo nos logs e sirva com os mesmos dados.
Prós:
Contras:
Com essa abordagem, os usuários treinam o modelo com dados do data warehouse e, em seguida, descobrem maneiras de replicar esses recursos no ambiente online.
Prós:
Contras:
A abordagem chronon
Com chronon você pode utilizar qualquer dado disponível em sua organização, incluindo tudo no data warehouse, qualquer fonte de streaming, chamadas de serviço, etc, com consistência garantida entre ambientes online e offline. Ele abstrai a complexidade da infraestrutura de orquestração e manutenção desse encanamento de dados, para que os usuários possam simplesmente definir recursos em uma API simples e confiar no chronon para lidar com o resto.
Aceitamos contribuições para o projeto chronon ! Por favor, leia CONTRIBUINDO para obter detalhes.
Use o rastreador de problemas do GitHub para relatar bugs ou solicitações de recursos. Participe do nosso espaço de trabalho comunitário no Slack para discussões, dicas e suporte.