chronon 은 데이터 계산의 복잡성을 추상화하고 AI/ML 애플리케이션을 제공하는 플랫폼입니다. 사용자는 기능을 원시 데이터의 변환으로 정의한 다음 chronon 배치 및 스트리밍 계산, 확장 가능한 백필, 짧은 지연 시간 제공, 정확성 및 일관성 보장, 다양한 관찰 가능성 및 모니터링 도구를 수행할 수 있습니다.
이를 통해 일반적으로 수반되는 모든 복잡한 오케스트레이션에 대해 걱정할 필요 없이 배치 테이블, 이벤트 스트림 또는 서비스에서 AI/ML 프로젝트를 지원하는 등 조직 내 모든 데이터를 활용할 수 있습니다.
chronon 에 대한 자세한 내용은 chronon .ai에서 확인할 수 있습니다.
chronon 기능에 대한 최신 값을 반환하는 실시간 가져오기용 API를 제공합니다. 다음을 지원합니다:
ML 실무자는 모델 학습 및 평가를 위해 특성 값에 대한 기록 보기가 필요한 경우가 많습니다. chronon 의 백필은 다음과 같습니다.
chronon 다음에 대한 가시성을 제공합니다.
chronon 다양한 집계 유형을 지원합니다. 전체 목록을 보려면 여기에서 설명서를 참조하세요.
이러한 집계는 모두 임의의 창 크기에 대해 계산되도록 구성할 수 있습니다.
이 섹션에서는 조작된 기본 원시 데이터 세트를 사용하여 chronon 으로 훈련 데이터 세트를 생성하는 단계를 안내합니다.
포함:
GroupBy
및 Join
.다음은 포함되지 않습니다:
chronon 을 시작하려면 docker-compose.yml 파일을 다운로드하여 로컬에서 실행하기만 하면 됩니다.
curl -o docker-compose.yml https://chronon.ai/docker-compose.yml
docker-compose up
only showing top 20 rows
알림과 함께 일부 데이터가 인쇄되면 튜토리얼을 진행할 준비가 된 것입니다.
이 예에서는 우리가 대규모 온라인 소매업체이고 사용자가 상품을 구매하고 나중에 반품하는 것을 기반으로 사기 벡터를 감지했다고 가정해 보겠습니다. 우리는 결제 흐름이 시작될 때 호출될 모델을 훈련하고 이 거래가 사기성 반품으로 이어질지 여부를 예측하려고 합니다.
조작된 원시 데이터는 데이터 디렉터리에 포함됩니다. 여기에는 4개의 테이블이 포함됩니다.
새 터미널 창에서 다음을 실행합니다.
docker-compose exec main bash
그러면 chronon docker 컨테이너 내의 셸이 열립니다.
이제 설정 단계가 완료되었으므로 다양한 chronon 개체를 생성 및 테스트하여 변환 및 집계를 정의하고 데이터를 생성할 수 있습니다.
원시 입력 소스를 기반으로 구축된 세 가지 기능 세트부터 시작해 보겠습니다.
참고: 이러한 Python 정의는 이미 chronon
이미지에 있습니다. 이러한 정의에 대한 계산을 실행할 때 3단계 - 데이터 채우기까지는 실행할 작업이 없습니다.
기능 세트 1: 구매 데이터 기능
구매 로그 데이터를 사용자 수준으로 집계하여 플랫폼에서 이 사용자의 이전 활동을 확인할 수 있습니다. 특히 다양한 기간에 걸쳐 이전 구매 금액의 SUM
COUNT
및 AVERAGE
를 계산할 수 있습니다.
이 기능은 테이블과 주제를 모두 포함하는 소스를 기반으로 구축되었으므로 해당 기능은 일괄 처리와 스트리밍 모두에서 계산될 수 있습니다.
source = Source (
events = EventSource (
table = "data.purchases" , # This points to the log table with historical purchase events
topic = None , # Streaming is not currently part of quickstart, but this would be where you define the topic for realtime events
query = Query (
selects = select ( "user_id" , "purchase_price" ), # Select the fields we care about
time_column = "ts" ) # The event time
))
window_sizes = [ Window ( length = day , timeUnit = TimeUnit . DAYS ) for day in [ 3 , 14 , 30 ]] # Define some window sizes to use below
v1 = GroupBy (
sources = [ source ],
keys = [ "user_id" ], # We are aggregating by user
aggregations = [ Aggregation (
input_column = "purchase_price" ,
operation = Operation . SUM ,
windows = window_sizes
), # The sum of purchases prices in various windows
Aggregation (
input_column = "purchase_price" ,
operation = Operation . COUNT ,
windows = window_sizes
), # The count of purchases in various windows
Aggregation (
input_column = "purchase_price" ,
operation = Operation . AVERAGE ,
windows = window_sizes
) # The average purchases by user in various windows
],
)
여기에서 전체 코드 파일을 확인하세요. GroupBy 구매. 이는 도커 이미지에도 있습니다. 3단계 - 데이터 채우기에서 해당 그룹과 다른 GroupBy에 대한 계산을 실행하겠습니다.
기능 세트 2: 데이터 기능 반환
반품 GroupBy의 반품 데이터에 대해 유사한 집계 세트를 수행합니다. 코드는 위의 예와 유사하므로 여기에 포함되지 않습니다.
기능 세트 3: 사용자 데이터 기능
사용자 데이터를 기능으로 전환하는 것은 주로 포함할 집계가 없기 때문에 좀 더 간단합니다. 이 경우 소스 데이터의 기본 키는 기능의 기본 키와 동일하므로 행에 대해 집계를 수행하는 대신 단순히 열 값을 추출합니다.
source = Source (
entities = EntitySource (
snapshotTable = "data.users" , # This points to a table that contains daily snapshots of the entire product catalog
query = Query (
selects = select ( "user_id" , "account_created_ds" , "email_verified" ), # Select the fields we care about
)
))
v1 = GroupBy (
sources = [ source ],
keys = [ "user_id" ], # Primary key is the same as the primary key for the source table
aggregations = None # In this case, there are no aggregations or windows to define
)
사용자 GroupBy에서 가져옵니다.
다음으로, 모델 학습을 위해 이전에 단일 테이블에 백필을 정의한 기능이 필요합니다. 이는 Join
API를 사용하여 달성할 수 있습니다.
우리의 사용 사례에서는 올바른 타임스탬프를 기준으로 기능을 계산하는 것이 매우 중요합니다. 결제 흐름이 시작될 때 모델이 실행되므로 백필에서 해당 타임스탬프를 사용해야 합니다. 이렇게 하면 모델 학습을 위한 특성 값이 모델이 온라인 추론에서 볼 수 있는 값과 논리적으로 일치합니다.
Join
학습 데이터에 대한 기능 채우기를 구동하는 API입니다. 주로 다음 기능을 수행합니다.
Join
이름이 붙었습니다).조인은 다음과 같습니다.
source = Source (
events = EventSource (
table = "data.checkouts" ,
query = Query (
selects = select ( "user_id" ), # The primary key used to join various GroupBys together
time_column = "ts" ,
) # The event time used to compute feature values as-of
))
v1 = Join (
left = source ,
right_parts = [ JoinPart ( group_by = group_by ) for group_by in [ purchases_v1 , refunds_v1 , users ]] # Include the three GroupBys
)
training_set Join에서 가져옵니다.
조인의 left
은 백필의 타임스탬프와 기본 키를 정의하는 것입니다(사용 사례에 따라 checkout
이벤트 위에 구축된다는 점에 유의하세요).
이 Join
은 위의 세 GroupBy
를 하나의 데이터 정의로 결합합니다. 다음 단계에서는 이 전체 파이프라인에 대한 계산을 실행하는 명령을 실행합니다.
조인이 정의되면 다음 명령을 사용하여 컴파일합니다.
compile.py --conf=joins/quickstart/training_set.py
그러면 다음 명령을 사용하여 Spark에 제출할 수 있는 중고품 정의로 변환됩니다.
run.py --conf production/joins/quickstart/training_set.v1
백필의 출력에는 왼쪽 소스의 user_id 및 ts 열과 우리가 만든 3개의 GroupBys의 11개 기능 열이 포함됩니다.
기능 값은 시간적 정확성을 보장하면서 왼쪽의 각 user_id 및 ts에 대해 계산됩니다. 예를 들어 왼쪽 행 중 하나가 user_id = 123
이고 ts = 2023-10-01 10:11:23.195
인 경우 해당 사용자에 대해 purchase_price_avg_30d
기능은 정확히 30일 기간이 끝나는 날짜로 계산됩니다. 그 타임스탬프.
이제 Spark SQL 셸을 사용하여 백필된 데이터를 쿼리할 수 있습니다.
spark-sql
그런 다음:
spark - sql > SELECT user_id, quickstart_returns_v1_refund_amt_sum_30d, quickstart_purchases_v1_purchase_price_sum_14d, quickstart_users_v1_email_verified from default . quickstart_training_set_v1 limit 100 ;
이는 몇 개의 열만 선택한다는 점에 유의하세요. select * from default.quickstart_training_set_v1 limit 100
실행하여 모든 열을 볼 수도 있습니다. 그러나 테이블이 꽤 넓어 화면에서 결과를 읽기가 쉽지 않을 수도 있습니다.
SQL 쉘을 종료하려면 다음을 실행할 수 있습니다.
spark-sql > quit ;
이제 조인 및 백필 데이터를 만들었으므로 다음 단계는 모델을 교육하는 것입니다. 이는 이 튜토리얼의 일부는 아니지만 완료되었다고 가정하면 그 이후의 다음 단계는 모델을 온라인으로 제작하는 것입니다. 이를 위해서는 모델 추론을 위한 특징 벡터를 가져올 수 있어야 합니다. 이것이 다음 섹션에서 다루는 내용입니다.
온라인 흐름을 제공하려면 먼저 온라인 KV 스토어에 업로드된 데이터가 필요합니다. 이는 다음 두 가지 면에서 이전 단계에서 실행한 백필과 다릅니다.
구매 GroupBy를 업로드합니다.
run.py --mode upload --conf production/group_bys/quickstart/purchases.v1 --ds 2023-12-01
spark-submit --class ai. chronon .quickstart.online.Spark2MongoLoader --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_purchases_v1_upload mongodb://admin:admin@mongodb:27017/ ? authSource=admin
반품 GroupBy를 업로드합니다.
run.py --mode upload --conf production/group_bys/quickstart/returns.v1 --ds 2023-12-01
spark-submit --class ai. chronon .quickstart.online.Spark2MongoLoader --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_returns_v1_upload mongodb://admin:admin@mongodb:27017/ ? authSource=admin
FetchGroupby
대신 FetchJoin
API를 사용하려면 조인 메타데이터도 업로드해야 합니다.
run.py --mode metadata-upload --conf production/joins/quickstart/training_set.v2
이를 통해 온라인 가져오기 프로그램은 이 조인에 대한 요청을 받아 개별 GroupBy 요청으로 분할하고 통합 벡터를 반환하는 방법을 알 수 있습니다. 이는 조인 백필이 모든 기능이 포함된 와이드 뷰 테이블을 생성하는 방법과 유사합니다.
위의 엔터티를 정의하면 이제 간단한 API 호출로 특징 벡터를 쉽게 가져올 수 있습니다.
조인을 가져오는 중:
run.py --mode fetch --type join --name quickstart/training_set.v2 -k ' {"user_id":"5"} '
단일 GroupBy를 가져올 수도 있습니다(이전에 수행한 조인 메타데이터 업로드 단계가 필요하지 않음).
run.py --mode fetch --type group-by --name quickstart/purchases.v1 -k ' {"user_id":"5"} '
프로덕션의 경우 Java 클라이언트는 일반적으로 서비스에 직접 포함됩니다.
Map < String , String > keyMap = new HashMap <>();
keyMap . put ( "user_id" , "123" );
Fetcher . fetch_join ( new Request ( "quickstart/training_set_v1" , keyMap ))
샘플 응답
> '{"purchase_price_avg_3d":14.3241, "purchase_price_avg_14d":11.89352, ...}'
참고: 이 Java 코드는 docker env에서 실행할 수 없으며 단지 예시일 뿐입니다.
이 README의 소개 섹션에서 설명한 것처럼 chronon 의 핵심 보장 중 하나는 온라인/오프라인 일관성입니다. 이는 모델을 훈련하는 데 사용하는 데이터(오프라인)가 모델이 프로덕션 추론(온라인)을 위해 보는 데이터와 일치함을 의미합니다.
이것의 핵심 요소는 시간적 정확성입니다. 이는 다음과 같이 표현될 수 있습니다. 기능을 채울 때 조인의 왼쪽에서 제공되는 특정 timestamp
에 대해 생성되는 값은 해당 기능이 해당 특정 timestamp
에서 가져온 경우 온라인으로 반환되는 값과 동일해야 합니다 .
chronon 이러한 시간적 정확성을 보장할 뿐만 아니라 이를 측정하는 방법도 제공합니다.
측정 파이프라인은 온라인 가져오기 요청 로그로 시작됩니다. 이러한 로그에는 가져온 기능 값과 함께 요청의 기본 키와 타임스탬프가 포함됩니다. 그런 다음 chronon 키와 타임스탬프를 왼쪽의 Join 백필에 전달하여 컴퓨팅 엔진에 특성 값을 백필하도록 요청합니다. 그런 다음 백필된 값을 실제 가져온 값과 비교하여 일관성을 측정합니다.
1단계: 로그 가져오기
먼저 몇 가지 가져오기 요청을 실행했는지 확인하세요. 달리다:
run.py --mode fetch --type join --name quickstart/training_set.v2 -k '{"user_id":"5"}'
일부 가져오기를 생성하기 위해 몇 번.
완료되면 이를 실행하여 사용 가능한 로그 테이블을 생성할 수 있습니다(이 명령은 올바른 스키마를 사용하여 로깅 하이브 테이블을 생성합니다).
spark-submit --class ai. chronon .quickstart.online.MongoLoggingDumper --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default. chronon _log_table mongodb://admin:admin@mongodb:27017/ ? authSource=admin
compile.py --conf group_bys/quickstart/schema.py
run.py --mode backfill --conf production/group_bys/quickstart/schema.v1
run.py --mode log-flattener --conf production/joins/quickstart/training_set.v2 --log-table default. chronon _log_table --schema-table default.quickstart_schema_v1
이렇게 하면 이전에 수행한 각 가져오기 요청의 결과와 요청한 타임스탬프 및 요청한 user
포함된 default.quickstart_training_set_v2_logged
테이블이 생성됩니다.
참고: 위 명령을 실행하면 로그 파티션이 생성되고 "닫힙니다". 즉, 같은 날(UTC 시간)에 추가 가져오기를 수행하면 추가되지 않습니다. 돌아가서 온라인/오프라인 일관성에 대한 추가 요청을 생성하려면 위 명령을 다시 실행하기 전에 테이블을 삭제하면 됩니다( spark-sql
셸에서 DROP TABLE default.quickstart_training_set_v2_logged
실행).
이제 다음 명령을 사용하여 일관성 지표를 계산할 수 있습니다.
run.py --mode consistency-metrics-compute --conf production/joins/quickstart/training_set.v2
이 작업은 로그 테이블(이 경우 default.quickstart_training_set_v2_logged
)에서 기본 키와 타임스탬프를 가져와 이를 사용하여 조인 백필을 생성하고 실행합니다. 그런 다음 백필된 결과를 온라인에서 가져온 실제 기록된 값과 비교합니다.
두 개의 출력 테이블을 생성합니다.
default.quickstart_training_set_v2_consistency
: 일관성 검사 결과를 보기 위해 쿼리할 수 있는 사람이 읽을 수 있는 테이블입니다.spark-sql
실행하여 SQL 셸을 입력한 다음 테이블을 쿼리할 수 있습니다.DESC default.quickstart_training_set_v2_consistency
실행한 다음 쿼리할 몇 가지 열을 선택하는 것이 좋습니다.default.quickstart_training_set_v2_consistency_upload
: 온라인 KV 스토어에 업로드되는 KV 바이트 목록으로, 온라인 데이터 품질 모니터링 흐름을 강화하는 데 사용할 수 있습니다. 사람이 읽을 수 있도록 의도된 것은 아닙니다. 기능 엔지니어링 작업에 chronon 사용하면 다음과 같은 여러 가지 방법으로 ML 워크플로가 단순화되고 개선됩니다.
chronon 사용의 이점에 대한 자세한 내용은 chronon 설명서의 이점을 참조하세요.
chronon 일괄 워크플로가 아닌 실시간으로 요청을 처리하는 "온라인" 모델을 구축하려는 AI/ML 실무자에게 가장 큰 가치를 제공합니다.
chronon 없으면 이러한 프로젝트에 참여하는 엔지니어는 교육/평가 및 생산 추론을 위해 모델에 데이터를 가져오는 방법을 파악해야 합니다. 이러한 모델에 들어가는 데이터의 복잡성이 증가함에 따라(다중 소스, 창 집계와 같은 복잡한 변환 등) 이 데이터 연결을 지원하는 인프라 문제도 증가합니다.
일반적으로 우리는 ML 실무자가 다음 두 가지 접근 방식 중 하나를 취하는 것을 관찰했습니다.
이 접근 방식을 사용하면 사용자는 모델 추론이 실행되는 온라인 제공 환경에서 사용 가능한 데이터로 시작합니다. 관련 기능을 데이터 웨어하우스에 기록합니다. 충분한 데이터가 축적되면 로그에서 모델을 훈련하고 동일한 데이터를 제공합니다.
장점:
단점:
이 접근 방식을 사용하면 사용자는 데이터 웨어하우스의 데이터로 모델을 교육한 다음 온라인 환경에서 해당 기능을 복제하는 방법을 찾아냅니다.
장점:
단점:
chronon 접근 방식
chronon 사용하면 온라인과 오프라인 환경 간의 일관성을 보장하면서 데이터 웨어하우스의 모든 것, 스트리밍 소스, 서비스 호출 등을 포함하여 조직에서 사용 가능한 모든 데이터를 사용할 수 있습니다. 이 데이터 배관을 조정하고 유지 관리하는 인프라 복잡성을 추상화하므로 사용자는 간단한 API에서 기능을 간단히 정의하고 나머지는 chronon 통해 처리할 수 있습니다.
chronon 프로젝트에 대한 기여를 환영합니다! 자세한 내용은 CONTRIBUTING을 읽어보세요.
버그를 보고하거나 기능을 요청하려면 GitHub 문제 추적기를 사용하세요. 커뮤니티 Slack 작업 공간에 참여하여 토론, 팁 및 지원을 받으세요.