chronon adalah platform yang menghilangkan kompleksitas komputasi data dan melayani aplikasi AI/ML. Pengguna mendefinisikan fitur sebagai transformasi data mentah, lalu chronon dapat melakukan komputasi batch dan streaming, pengisian ulang yang dapat diskalakan, penyajian latensi rendah, jaminan kebenaran dan konsistensi, serta sejumlah alat observasi dan pemantauan.
Hal ini memungkinkan Anda untuk memanfaatkan semua data dalam organisasi Anda, mulai dari tabel batch, aliran peristiwa, atau layanan untuk mendukung proyek AI/ML Anda, tanpa perlu khawatir tentang semua orkestrasi rumit yang biasanya diperlukan.
Informasi lebih lanjut tentang chronon dapat ditemukan di chronon .ai.
chronon menawarkan API untuk pengambilan waktu nyata yang mengembalikan nilai terkini untuk fitur Anda. Ini mendukung:
Praktisi ML sering kali memerlukan tampilan historis nilai fitur untuk pelatihan dan evaluasi model. isi ulang chronon adalah:
chronon menawarkan visibilitas ke:
chronon mendukung berbagai jenis agregasi. Untuk daftar lengkap lihat dokumentasi di sini.
Semua agregasi ini dapat dikonfigurasi untuk dihitung pada ukuran jendela yang berubah-ubah.
Bagian ini memandu Anda melalui langkah-langkah untuk membuat kumpulan data pelatihan dengan chronon , menggunakan kumpulan data mentah dasar buatan.
Termasuk:
GroupBy
dan Join
.Tidak termasuk:
Untuk memulai chronon , yang perlu Anda lakukan hanyalah mengunduh file docker-compose.yml dan menjalankannya secara lokal:
curl -o docker-compose.yml https://chronon.ai/docker-compose.yml
docker-compose up
Setelah Anda melihat beberapa data dicetak dengan only showing top 20 rows
, Anda siap untuk melanjutkan tutorial.
Dalam contoh ini, anggap saja kita adalah pengecer online besar, dan kita telah mendeteksi vektor penipuan berdasarkan pengguna yang melakukan pembelian dan kemudian mengembalikan barang. Kami ingin melatih model yang akan dipanggil saat alur pembayaran dimulai dan memprediksi apakah transaksi ini kemungkinan besar akan menghasilkan pengembalian yang menipu.
Data mentah buatan disertakan dalam direktori data. Ini mencakup empat tabel:
Di jendela terminal baru, jalankan:
docker-compose exec main bash
Ini akan membuka shell di dalam wadah buruh pelabuhan chronon .
Sekarang setelah langkah penyiapan selesai, kita dapat mulai membuat dan menguji berbagai objek chronon untuk menentukan transformasi dan agregasi, serta menghasilkan data.
Mari kita mulai dengan tiga rangkaian fitur, yang dibuat berdasarkan sumber masukan mentah kita.
Catatan: Definisi python ini sudah ada di gambar chronon
Anda. Tidak ada yang dapat Anda jalankan hingga Langkah 3 - Mengisi Ulang Data ketika Anda akan menjalankan komputasi untuk definisi ini.
Kumpulan fitur 1: Fitur data pembelian
Kami dapat menggabungkan data log pembelian ke tingkat pengguna, untuk memberi kami gambaran tentang aktivitas pengguna sebelumnya di platform kami. Secara khusus, kita dapat menghitung SUM
s COUNT
dan AVERAGE
dari jumlah pembelian sebelumnya di berbagai jendela.
Karena fitur ini dibangun berdasarkan sumber yang menyertakan tabel dan topik, fitur-fiturnya dapat dihitung dalam batch dan streaming.
source = Source (
events = EventSource (
table = "data.purchases" , # This points to the log table with historical purchase events
topic = None , # Streaming is not currently part of quickstart, but this would be where you define the topic for realtime events
query = Query (
selects = select ( "user_id" , "purchase_price" ), # Select the fields we care about
time_column = "ts" ) # The event time
))
window_sizes = [ Window ( length = day , timeUnit = TimeUnit . DAYS ) for day in [ 3 , 14 , 30 ]] # Define some window sizes to use below
v1 = GroupBy (
sources = [ source ],
keys = [ "user_id" ], # We are aggregating by user
aggregations = [ Aggregation (
input_column = "purchase_price" ,
operation = Operation . SUM ,
windows = window_sizes
), # The sum of purchases prices in various windows
Aggregation (
input_column = "purchase_price" ,
operation = Operation . COUNT ,
windows = window_sizes
), # The count of purchases in various windows
Aggregation (
input_column = "purchase_price" ,
operation = Operation . AVERAGE ,
windows = window_sizes
) # The average purchases by user in various windows
],
)
Lihat seluruh file kode di sini: pembelian GroupBy. Ini juga ada di gambar buruh pelabuhan Anda. Kami akan menjalankan komputasi untuk itu dan GroupBy lainnya di Langkah 3 - Mengisi Ulang Data.
Kumpulan fitur 2: Mengembalikan fitur data
Kami melakukan serangkaian agregasi serupa pada data pengembalian di GroupBy pengembalian. Kode tidak disertakan di sini karena tampilannya mirip dengan contoh di atas.
Kumpulan fitur 3: Fitur data pengguna
Mengubah data Pengguna menjadi fitur sedikit lebih sederhana, terutama karena tidak ada agregasi untuk disertakan. Dalam hal ini, kunci utama data sumber sama dengan kunci utama fitur, jadi kami hanya mengekstraksi nilai kolom daripada melakukan agregasi pada baris:
source = Source (
entities = EntitySource (
snapshotTable = "data.users" , # This points to a table that contains daily snapshots of the entire product catalog
query = Query (
selects = select ( "user_id" , "account_created_ds" , "email_verified" ), # Select the fields we care about
)
))
v1 = GroupBy (
sources = [ source ],
keys = [ "user_id" ], # Primary key is the same as the primary key for the source table
aggregations = None # In this case, there are no aggregations or windows to define
)
Diambil dari pengguna GroupBy.
Selanjutnya, kita memerlukan fitur-fitur yang sebelumnya kita definisikan untuk diisi ulang dalam satu tabel untuk pelatihan model. Hal ini dapat dicapai dengan menggunakan Join
API.
Untuk kasus penggunaan kami, sangat penting agar fitur dihitung berdasarkan stempel waktu yang benar. Karena model kita berjalan saat alur pembayaran dimulai, pastikan untuk menggunakan stempel waktu yang sesuai dalam pengisian ulang, sehingga nilai fitur untuk pelatihan model secara logis cocok dengan apa yang akan dilihat model dalam inferensi online.
Join
adalah API yang mendorong pengisian ulang fitur untuk data pelatihan. Ini terutama melakukan fungsi-fungsi berikut:
Join
).Berikut tampilan gabungan kami:
source = Source (
events = EventSource (
table = "data.checkouts" ,
query = Query (
selects = select ( "user_id" ), # The primary key used to join various GroupBys together
time_column = "ts" ,
) # The event time used to compute feature values as-of
))
v1 = Join (
left = source ,
right_parts = [ JoinPart ( group_by = group_by ) for group_by in [ purchases_v1 , refunds_v1 , users ]] # Include the three GroupBys
)
Diambil dari training_set Gabung.
Sisi left
gabungan adalah yang menentukan stempel waktu dan kunci utama untuk pengisian ulang (perhatikan bahwa ini dibuat di atas peristiwa checkout
, seperti yang ditentukan oleh kasus penggunaan kita).
Perhatikan bahwa Join
ini menggabungkan ketiga GroupBy
di atas menjadi satu definisi data. Pada langkah berikutnya, kita akan menjalankan perintah untuk mengeksekusi komputasi untuk keseluruhan pipeline ini.
Setelah gabungan ditentukan, kami mengkompilasinya menggunakan perintah ini:
compile.py --conf=joins/quickstart/training_set.py
Ini mengubahnya menjadi definisi penghematan yang dapat kita kirimkan ke spark dengan perintah berikut:
run.py --conf production/joins/quickstart/training_set.v1
Output dari pengisian ulang akan berisi kolom user_id dan ts dari sumber kiri, serta 11 kolom fitur dari tiga GroupBy yang kita buat.
Nilai fitur akan dihitung untuk setiap user_id dan ts di sisi kiri, dengan akurasi temporal terjamin. Jadi, misalnya, jika salah satu baris di sebelah kiri adalah untuk user_id = 123
dan ts = 2023-10-01 10:11:23.195
, maka fitur purchase_price_avg_30d
akan dihitung untuk pengguna tersebut dengan jendela tepat 30 hari yang berakhir pada stempel waktu itu.
Anda sekarang dapat menanyakan data yang diisi ulang menggunakan shell sql spark:
spark-sql
Kemudian:
spark - sql > SELECT user_id, quickstart_returns_v1_refund_amt_sum_30d, quickstart_purchases_v1_purchase_price_sum_14d, quickstart_users_v1_email_verified from default . quickstart_training_set_v1 limit 100 ;
Perhatikan bahwa ini hanya memilih beberapa kolom. Anda juga dapat menjalankan select * from default.quickstart_training_set_v1 limit 100
untuk melihat semua kolom, namun perhatikan bahwa tabelnya cukup lebar dan hasilnya mungkin tidak terlalu terbaca di layar Anda.
Untuk keluar dari shell sql Anda dapat menjalankan:
spark-sql > quit ;
Sekarang kita telah membuat data gabungan dan mengisi ulang, langkah berikutnya adalah melatih model. Itu bukan bagian dari tutorial ini, tapi dengan asumsi sudah selesai, langkah selanjutnya adalah memproduksi model secara online. Untuk melakukan ini, kita harus dapat mengambil vektor fitur untuk inferensi model. Itulah yang dibahas pada bagian selanjutnya.
Untuk melayani arus online, pertama-tama kita memerlukan data yang diupload ke toko KV online. Ini berbeda dengan pengisian ulang yang kita jalankan pada langkah sebelumnya dalam dua cara:
Unggah pembelian GroupBy:
run.py --mode upload --conf production/group_bys/quickstart/purchases.v1 --ds 2023-12-01
spark-submit --class ai. chronon .quickstart.online.Spark2MongoLoader --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_purchases_v1_upload mongodb://admin:admin@mongodb:27017/ ? authSource=admin
Unggah pengembalian GroupBy:
run.py --mode upload --conf production/group_bys/quickstart/returns.v1 --ds 2023-12-01
spark-submit --class ai. chronon .quickstart.online.Spark2MongoLoader --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_returns_v1_upload mongodb://admin:admin@mongodb:27017/ ? authSource=admin
Jika kita ingin menggunakan api FetchJoin
daripada FetchGroupby
, kita juga perlu mengunggah metadata gabungan:
run.py --mode metadata-upload --conf production/joins/quickstart/training_set.v2
Hal ini membuat pengambil online mengetahui cara mengambil permintaan untuk gabungan ini dan memecahnya menjadi permintaan GroupBy individual, mengembalikan vektor terpadu, serupa dengan cara pengisian ulang Gabung menghasilkan tabel tampilan lebar dengan semua fitur.
Dengan mendefinisikan entitas di atas, Anda kini dapat dengan mudah mengambil vektor fitur dengan panggilan API sederhana.
Mengambil bergabung:
run.py --mode fetch --type join --name quickstart/training_set.v2 -k ' {"user_id":"5"} '
Anda juga dapat mengambil satu GroupBy (ini tidak memerlukan langkah unggah metadata Gabung yang dilakukan sebelumnya):
run.py --mode fetch --type group-by --name quickstart/purchases.v1 -k ' {"user_id":"5"} '
Untuk produksi, klien Java biasanya tertanam langsung ke dalam layanan.
Map < String , String > keyMap = new HashMap <>();
keyMap . put ( "user_id" , "123" );
Fetcher . fetch_join ( new Request ( "quickstart/training_set_v1" , keyMap ))
respons sampel
> '{"purchase_price_avg_3d":14.3241, "purchase_price_avg_14d":11.89352, ...}'
Catatan: Kode Java ini tidak dapat dijalankan di docker env, ini hanya contoh ilustrasi.
Seperti yang dibahas di bagian pendahuluan README ini, salah satu jaminan inti chronon adalah konsistensi online/offline. Artinya, data yang Anda gunakan untuk melatih model (offline) cocok dengan data yang dilihat model untuk inferensi produksi (online).
Elemen kuncinya adalah akurasi temporal. Hal ini dapat diutarakan sebagai berikut: saat mengisi ulang fitur, nilai yang dihasilkan untuk timestamp
tertentu yang disediakan di sisi kiri gabungan harus sama dengan nilai yang akan dikembalikan secara online jika fitur tersebut diambil pada timestamp
tersebut .
chronon tidak hanya menjamin akurasi temporal ini, namun juga menawarkan cara untuk mengukurnya.
Alur pengukuran dimulai dengan log permintaan pengambilan online. Log ini mencakup kunci utama dan stempel waktu permintaan, serta nilai fitur yang diambil. chronon kemudian meneruskan kunci dan stempel waktu ke pengisian ulang Gabung di sisi kiri, meminta mesin komputasi untuk mengisi ulang nilai fitur. Kemudian membandingkan nilai yang diisi ulang dengan nilai aktual yang diambil untuk mengukur konsistensi.
Langkah 1: pengambilan log
Pertama, pastikan Anda telah menjalankan beberapa permintaan pengambilan. Berlari:
run.py --mode fetch --type join --name quickstart/training_set.v2 -k '{"user_id":"5"}'
Beberapa kali untuk menghasilkan beberapa pengambilan.
Setelah selesai, Anda dapat menjalankan ini untuk membuat tabel log yang dapat digunakan (perintah ini menghasilkan tabel kumpulan logging dengan skema yang benar):
spark-submit --class ai. chronon .quickstart.online.MongoLoggingDumper --master local[ * ] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default. chronon _log_table mongodb://admin:admin@mongodb:27017/ ? authSource=admin
compile.py --conf group_bys/quickstart/schema.py
run.py --mode backfill --conf production/group_bys/quickstart/schema.v1
run.py --mode log-flattener --conf production/joins/quickstart/training_set.v2 --log-table default. chronon _log_table --schema-table default.quickstart_schema_v1
Ini membuat tabel default.quickstart_training_set_v2_logged
yang berisi hasil setiap permintaan pengambilan yang Anda buat sebelumnya, bersama dengan stempel waktu saat Anda membuatnya dan user
yang Anda minta.
Catatan: Setelah Anda menjalankan perintah di atas, perintah di atas akan membuat dan "menutup" partisi log, artinya jika Anda melakukan pengambilan tambahan pada hari yang sama (waktu UTC), partisi tersebut tidak akan ditambahkan. Jika Anda ingin kembali dan menghasilkan lebih banyak permintaan untuk konsistensi online/offline, Anda dapat menghapus tabel (jalankan DROP TABLE default.quickstart_training_set_v2_logged
di shell spark-sql
) sebelum menjalankan kembali perintah di atas.
Sekarang Anda dapat menghitung metrik konsistensi dengan perintah ini:
run.py --mode consistency-metrics-compute --conf production/joins/quickstart/training_set.v2
Pekerjaan ini akan mengambil kunci utama dan stempel waktu dari tabel log ( default.quickstart_training_set_v2_logged
dalam kasus ini), dan menggunakannya untuk membuat dan menjalankan pengisian ulang gabungan. Kemudian membandingkan hasil pengisian ulang dengan nilai log aktual yang diambil secara online
Ini menghasilkan dua tabel keluaran:
default.quickstart_training_set_v2_consistency
: Tabel yang dapat dibaca manusia yang dapat Anda kueri untuk melihat hasil pemeriksaan konsistensi.spark-sql
dari sesi docker bash Anda, lalu menanyakan tabelnya.DESC default.quickstart_training_set_v2_consistency
terlebih dahulu, lalu pilih beberapa kolom yang ingin Anda kueri.default.quickstart_training_set_v2_consistency_upload
: Daftar byte KV yang diunggah ke penyimpanan KV online, yang dapat digunakan untuk mendukung aliran pemantauan kualitas data online. Tidak dimaksudkan agar dapat dibaca manusia. Menggunakan chronon untuk pekerjaan rekayasa fitur Anda menyederhanakan dan meningkatkan Alur Kerja ML Anda dalam beberapa cara:
Untuk melihat lebih detail manfaat penggunaan chronon , lihat dokumentasi Manfaat chronon .
chronon menawarkan nilai terbaik bagi praktisi AI/ML yang mencoba membangun model "online" yang melayani permintaan secara real-time dibandingkan dengan alur kerja batch.
Tanpa chronon , para insinyur yang mengerjakan proyek ini perlu mencari cara untuk memasukkan data ke model mereka untuk pelatihan/evaluasi serta inferensi produksi. Ketika kompleksitas data yang dimasukkan ke dalam model ini meningkat (berbagai sumber, transformasi kompleks seperti agregasi berjendela, dll), tantangan infrastruktur untuk mendukung penyaluran data ini juga meningkat.
Secara umum, kami mengamati praktisi ML mengambil salah satu dari dua pendekatan:
Dengan pendekatan ini, pengguna memulai dengan data yang tersedia di lingkungan penyajian online tempat inferensi model akan dijalankan. Catat fitur yang relevan ke gudang data. Setelah cukup data terakumulasi, latih model pada log, dan sajikan dengan data yang sama.
Kelebihan:
Kontra:
Dengan pendekatan ini, pengguna melatih model dengan data dari gudang data, lalu mencari cara untuk mereplikasi fitur tersebut di lingkungan online.
Kelebihan:
Kontra:
Pendekatan chronon
Dengan chronon Anda dapat menggunakan data apa pun yang tersedia di organisasi Anda, termasuk semua yang ada di gudang data, sumber streaming apa pun, panggilan layanan, dll, dengan jaminan konsistensi antara lingkungan online dan offline. Ini menghilangkan kompleksitas infrastruktur dalam mengatur dan memelihara saluran data ini, sehingga pengguna dapat dengan mudah menentukan fitur dalam API sederhana, dan mempercayai chronon untuk menangani sisanya.
Kami menyambut baik kontribusi pada proyek chronon ! Silakan baca KONTRIBUSI untuk detailnya.
Gunakan pelacak masalah GitHub untuk melaporkan bug atau permintaan fitur. Bergabunglah dengan komunitas kami Ruang kerja Slack untuk berdiskusi, tips, dan dukungan.