著作権は Debezium の作者にあります。 Apache License、バージョン 2.0 に基づいてライセンスされています。
Debezium は、変更データ キャプチャ (CDC) 用の低遅延データ ストリーミング プラットフォームを提供するオープン ソース プロジェクトです。このコネクタは、Debezium によってリレーショナル データベースに送信された変更をストリーミングするためのシンク実装を提供します。
このコネクタ実装は Debezium ソース対応です。これは、コネクタがExtractNewRecordState
を使用してイベント構造をフラット化する必要がなく、ネイティブの Debezium 変更イベントを使用できることを意味します。これにより、JDBC シンク コネクタを使用するために必要な構成が軽減されます。さらに、これは、パイプラインのシンク側で列型の伝播などの Debezium メタデータを利用して、パイプラインのシンク コネクタ側で適切な列型の解決をシームレスにサポートできることも意味します。
JDBC シンク コネクタは、従来の Kafka Connect シンク コネクタ (別名コンシューマ) です。そのジョブは、1 つ以上の Kafka トピックからレコードを読み取り、構成された宛先データベースで実行される SQL ステートメントを生成することです。
SinkRecordDescriptor
は、すべてのSinkRecord
から構築されるオブジェクトです。 SinkRecord
を受け取るほとんどのメソッドは、代わりにこの記述子オブジェクトを受け取ります。記述子は事実上、 SinkRecord
の前処理されたバージョンであり、この前処理を 1 回実行するだけで、コネクタ全体でこの情報を利用できるようになります。新しいメソッドを追加するときは、通常、 SinkRecordDescriptor
使用します。
通常、各シンク データベースには、 GeneralDatabaseDialect
を拡張する独自のDatabaseDialect
実装があります。ダイアレクトは、JDBC シンク コネクタが使用するイベントを書き込むデータベースの SQL ステートメントやその他のデータベース特性を解決するために、JDBC シンク コネクタによって使用されるコア メカニズムの 1 つです。 JDBC シンク コネクタは、Hibernate の方言解決に依存して、コネクタで使用される方言ラッパーを駆動します。
使用されているシンク データベースで方言マッピングが検出されない場合、JDBC シンク コネクタはデフォルトでGeneralDatabaseDialect
実装を使用します。この一般化された実装は、コネクタのすべての側面をサポートしているわけではありません。たとえば、UPSERT ステートメントは通常、使用されているデータベースに固有であるため、この方言が選択された場合、UPSERT 挿入モードはサポートされません。新しいシンク データベースに JDBC シンク コネクタの広範な動作との完全な互換性を持たせるには、新しい方言実装を追加することをお勧めします。
Kafka メッセージのすべてのフィールドはスキーマ タイプに関連付けられますが、このタイプ情報には、名前やソース コネクタによって提供されたパラメータなどの他のメタデータも含まれる場合があります。 JDBC シンク コネクタは、値のバインディング、デフォルト値の解決、および型固有のその他の特性を処理するために、 io.debezium.connector.jdbc.type.Type
コントラクトに基づく型システムを利用します。
Type
実装には実質的に 3 つの異なるタイプがあります。
io.debezium.connector.jdbc.type.connect
にあります。io.debezium.connector.jdbc.type.debezium
にあります。io.debezium.connector.jdbc.dialect
階層にあります。型は、Kafka Connect 型から始まり、次に Debezium 型、最後に方言固有の型という階層パターンで登録されます。これにより、必要に応じて Debezium 型が Kafka Connect 型をオーバーライドできるようになり、最終的に方言が他の提供された型をオーバーライドできるようになります。
型は、最初に Kafka スキーマ名を確認し、これを型登録にマッピングすることによって解決されます。スキーマに名前がない場合は、スキーマの型を使用して型が解決されます。これにより、フィールドに他の型の実装が検出されない場合、基本の Kafka Connect 型がデータの解釈方法について最終的な決定権を持つことができます。
JDBC シンク コネクタで使用される命名戦略は 2 つあります。
TableNamingStrategy
ColumnNamingStrategy
JDBC シンク コネクタは、 io.debezium.connector.jdbc.naming
パッケージに含まれる両方のデフォルト実装とともに出荷されます。これら 2 つの戦略のデフォルトの動作は次のとおりです。
.
_
を使用し、構成されたtable.name.format
値を使用してテーブルの最終的な名前を解決します。したがって、イベントのトピック名がserver.schema.table
で、デフォルトのtable.name.format=dbo.${topic}
であると仮定すると、宛先テーブルはdbo.server_schema_table
として作成されます。これら 2 つの戦略は、コネクタ構成で完全修飾クラス名参照を指定することでオーバーライドできます。構成例:
table.naming.strategy=io.debezium.connector.jdbc.naming.DefaultTableNamingStrategy
column.naming.strategy=io.debezium.connector.jdbc.naming.DefaultColumnNamingStrategy
JDBC シンク コネクタは、Debezium ソース コネクタと同様に、メモリ内リレーショナル モデルを維持します。これらのリレーショナル モデル クラスは、 io.debezium.connector.jdbc.relational
パッケージにあります。
Debezium JDBC シンク コネクタ コード ベースを操作し、ローカルでビルドするには、次のものが必要です。
.mvnw
を使用してラッパーを呼び出します)テスト スイートは TestContainer の使用に大きく基づいており、さまざまなソース データベースとシンク データベースを自動的に起動します。 Docker 互換環境がないと、統合テストは実行されません。 Docker 環境がない場合は、次に示す-DskipITs
コマンド ライン引数を使用して統合テストをスキップできます。
$ ./mvnw clean verify -DskipITs
テスト スイートには 3 つのタイプがあります。
デフォルトでは、すべての単体テストはビルドの一部として実行されます。デフォルトでは、シンクベースの統合テストは MySQL、PostgreSQL、SQL Server に対してのみ実行されますが、エンドツーエンドのマトリックスベースのテストは実行されません。
Oracle と DB2 のシンクベースの統合テストを実行するには、 -Dtest.tags
引数を指定して、これらをビルドに含める必要があります。これを行うには、以下に示すように、すべてのデータベースに対して実行するすべての統合テストを追加します。
$ ./mvnw clean install -Dtest.tags=it-mysql,it-postgresql,it-sqlserver,it-oracle,it-db2
すべてのデータベースに対してすべてのシンクベースの統合テストを実行するために、ショートカット タグが提供されています。
$ ./mvnw clean install -Dtest.tags=it
同様に、特定のエンドツーエンド テストを有効にするために、シンク データベースの種類ごとに必要なタグを-Dtest.tags
引数に指定することもできます。
$ ./mvnw clean install -Dtest.tags=e2e-mysql,e2e-postgresql,e2e-sqlserver,e2e-oracle,e2e-db2
すべてのエンドツーエンドの統合テストを実行するために、ショートカット タグも提供されています。
$ ./mvnw clean install -Dtest.tags=e2e
すべてのソース/シンクの組み合わせに対してすべてのテストを実行するには、次の手順を実行します。
$ ./mvnw clean install -Dtest.tags=all
Debezium コミュニティは、問題の報告、ドキュメントの作成、バグ修正、テストの追加、新機能の実装のためのコード変更の貢献など、あらゆる方法で支援したい人を歓迎します。詳細については、このドキュメントを参照してください。
Debezium JDBC シンクの貢献者の皆様に心より感謝いたします。
このプロジェクトは、Apache License バージョン 2 に基づいてライセンスされています。