Hak Cipta Debezium Penulis. Berlisensi di bawah Lisensi Apache, Versi 2.0.
Debezium adalah proyek sumber terbuka yang menyediakan platform streaming data latensi rendah untuk pengambilan data perubahan (CDC). Konektor ini menyediakan implementasi sink untuk mengalirkan perubahan yang dikeluarkan oleh Debezium ke dalam database relasional.
Implementasi konektor ini sadar akan sumber Debezium. Artinya konektor dapat menggunakan peristiwa perubahan Debezium asli tanpa perlu menggunakan ExtractNewRecordState
untuk meratakan struktur peristiwa. Hal ini mengurangi konfigurasi yang diperlukan untuk menggunakan konektor sink JDBC. Selain itu, hal ini juga berarti bahwa sisi sink dari pipeline dapat memanfaatkan metadata Debezium, seperti propagasi tipe kolom untuk secara lancar mendukung resolusi tipe kolom yang tepat di sisi konektor sink dari pipeline.
Konektor wastafel JDBC adalah konektor wastafel Kafka Connect tradisional (alias konsumen). Tugasnya adalah membaca catatan dari satu atau beberapa topik Kafka dan menghasilkan pernyataan SQL yang dieksekusi pada database tujuan yang dikonfigurasi.
SinkRecordDescriptor
adalah objek yang dibuat dari setiap SinkRecord
. Kebanyakan metode yang seharusnya menggunakan SinkRecord
menggunakan objek deskriptor ini sebagai gantinya. Deskriptor sebenarnya adalah versi SinkRecord
yang telah diproses sebelumnya, yang memungkinkan kita melakukan pra-pemrosesan ini satu kali dan kemudian menggunakan informasi ini di seluruh konektor. Saat menambahkan metode baru, biasanya Anda ingin menggunakan SinkRecordDescriptor
.
Setiap database sink biasanya memiliki implementasi DatabaseDialect
sendiri yang harus memperluas GeneralDatabaseDialect
. Dialek adalah salah satu mekanisme inti yang digunakan oleh konektor sink JDBC untuk menyelesaikan pernyataan SQL dan karakteristik database lainnya untuk database tempat konektor akan menulis peristiwa yang digunakan. Konektor sink JDBC mengandalkan resolusi dialek Hibernate untuk menggerakkan pembungkus dialek yang digunakan oleh konektor.
Jika tidak ada pemetaan dialek yang terdeteksi untuk database sink yang digunakan, konektor sink JDBC akan menggunakan implementasi GeneralDatabaseDialect
secara default. Implementasi umum ini tidak mendukung setiap aspek konektor, misalnya mode penyisipan UPSERT tidak didukung ketika dialek ini dipilih karena pernyataan UPSERT umumnya unik untuk database yang digunakan. Biasanya merupakan ide bagus untuk menambahkan implementasi dialek baru jika database sink baru ingin memiliki kompatibilitas penuh dengan perilaku luas konektor sink JDBC.
Setiap bidang dalam pesan Kafka dikaitkan dengan tipe skema, namun informasi tipe ini juga dapat membawa metadata lain seperti nama atau bahkan parameter yang telah disediakan oleh konektor sumber. Konektor sink JDBC menggunakan sistem tipe, yang didasarkan pada kontrak io.debezium.connector.jdbc.type.Type
, untuk menangani pengikatan nilai, resolusi nilai default, dan karakteristik lain yang mungkin spesifik tipe.
Ada tiga jenis implementasi Type
yang berbeda:
io.debezium.connector.jdbc.type.connect
.io.debezium.connector.jdbc.type.debezium
.io.debezium.connector.jdbc.dialect
.Tipe didaftarkan dalam pola hierarki, dimulai dengan tipe Kafka Connect, lalu tipe Debezium, dan terakhir tipe khusus dialek. Hal ini memungkinkan tipe Debezium untuk mengganti tipe Kafka Connect jika diperlukan dan akhirnya dialek untuk mengganti tipe kontribusi lainnya.
Tipe diselesaikan dengan terlebih dahulu melihat nama skema Kafka dan memetakannya ke registrasi tipe. Jika skema tidak memiliki nama, jenis skema tersebut kemudian digunakan untuk menyelesaikan suatu jenis. Hal ini memungkinkan tipe dasar Kafka Connect untuk mengambil keputusan akhir tentang bagaimana data diinterpretasikan jika tidak ada implementasi tipe lain yang terdeteksi untuk bidang tersebut.
Ada dua strategi penamaan yang digunakan oleh konektor sink JDBC:
TableNamingStrategy
ColumnNamingStrategy
Konektor sink JDBC dikirimkan dengan implementasi default keduanya, yang ditemukan dalam paket io.debezium.connector.jdbc.naming
. Perilaku default kedua strategi ini adalah sebagai berikut:
.
dengan _
dan menggunakan nilai table.name.format
yang dikonfigurasi untuk menentukan nama akhir tabel. Jadi dengan asumsi nama topik acara adalah server.schema.table
dengan table.name.format=dbo.${topic}
default, tabel tujuan akan dibuat sebagai dbo.server_schema_table
.Kedua strategi ini dapat diatasi dengan menentukan referensi nama kelas yang sepenuhnya memenuhi syarat dalam konfigurasi konektor. Contoh konfigurasi:
table.naming.strategy=io.debezium.connector.jdbc.naming.DefaultTableNamingStrategy
column.naming.strategy=io.debezium.connector.jdbc.naming.DefaultColumnNamingStrategy
Konektor sink JDBC mempertahankan model relasional dalam memori, mirip dengan konektor sumber Debezium. Kelas model relasional ini dapat ditemukan di paket io.debezium.connector.jdbc.relational
.
Hal berikut ini diperlukan agar dapat bekerja dengan basis kode konektor sink Debezium JDBC, dan untuk membangunnya secara lokal:
.mvnw
untuk perintah Maven) Rangkaian pengujian sangat didasarkan pada penggunaan TestContainer, dan secara otomatis memulai berbagai database sumber dan sink secara otomatis. Tanpa lingkungan yang kompatibel dengan Docker, pengujian integrasi tidak akan berjalan. Jika Anda tidak memiliki lingkungan Docker, Anda dapat melewati pengujian integrasi dengan menggunakan argumen baris perintah -DskipITs
, yang ditunjukkan di bawah ini:
$ ./mvnw clean verify -DskipITs
Ada tiga jenis tipe dalam rangkaian pengujian:
Secara default, semua pengujian unit dijalankan sebagai bagian dari build. Pengujian integrasi berbasis sink hanya dijalankan untuk MySQL, PostgreSQL, dan SQL Server secara default, sementara tidak ada pengujian berbasis matriks end-to-end yang dijalankan.
Untuk menjalankan pengujian integrasi berbasis sink untuk Oracle dan DB2, argumen -Dtest.tags
harus disediakan untuk menyertakannya dalam build. Untuk melakukan ini, tambahkan semua pengujian integrasi yang akan dijalankan, seperti yang ditunjukkan di bawah ini untuk semua database:
$ ./mvnw clean install -Dtest.tags=it-mysql,it-postgresql,it-sqlserver,it-oracle,it-db2
Untuk menjalankan semua pengujian integrasi berbasis sink untuk semua database, tag pintasan disediakan:
$ ./mvnw clean install -Dtest.tags=it
Demikian pula, untuk mengaktifkan pengujian ujung ke ujung yang spesifik, argumen -Dtest.tags
juga dapat diberikan dengan tag yang diperlukan untuk setiap jenis database sink:
$ ./mvnw clean install -Dtest.tags=e2e-mysql,e2e-postgresql,e2e-sqlserver,e2e-oracle,e2e-db2
Untuk menjalankan semua pengujian integrasi ujung ke ujung, tag pintasan juga disediakan:
$ ./mvnw clean install -Dtest.tags=e2e
Untuk menjalankan semua pengujian untuk semua kombinasi sumber/sink:
$ ./mvnw clean install -Dtest.tags=all
Komunitas Debezium menyambut siapa saja yang ingin membantu dengan cara apa pun, baik itu termasuk melaporkan masalah, membantu dokumentasi, atau berkontribusi dalam perubahan kode untuk memperbaiki bug, menambahkan pengujian, atau mengimplementasikan fitur baru. Lihat dokumen ini untuk detailnya.
Terima kasih sebesar-besarnya kepada semua kontributor sink Debezium JDBC!
Proyek ini dilisensikan di bawah Lisensi Apache, versi 2.