Copyright Debezium-Autoren. Lizenziert unter der Apache-Lizenz, Version 2.0.
Debezium ist ein Open-Source-Projekt, das eine Daten-Streaming-Plattform mit geringer Latenz für die Change Data Capture (CDC) bereitstellt. Dieser Connector bietet eine Senkenimplementierung für das Streaming von von Debezium ausgegebenen Änderungen in eine relationale Datenbank.
Diese Connector-Implementierung ist Debezium-Quellen-fähig. Das bedeutet, dass der Connector native Debezium-Änderungsereignisse nutzen kann, ohne dass ExtractNewRecordState
zum Glätten der Ereignisstruktur verwendet werden muss. Dadurch wird die erforderliche Konfiguration zur Verwendung eines JDBC-Sink-Connectors reduziert. Darüber hinaus bedeutet dies auch, dass die Senkenseite der Pipeline Debezium-Metadaten nutzen kann, wie z. B. die Weitergabe von Spaltentypen, um nahtlos die richtige Spaltentypauflösung auf der Senken-Connector-Seite der Pipeline zu unterstützen.
Der JDBC-Sink-Connector ist ein traditioneller Kafka Connect-Sink-Connector (auch Consumer genannt). Seine Aufgabe besteht darin, Datensätze aus einem oder mehreren Kafka-Themen zu lesen und SQL-Anweisungen zu erstellen, die in der konfigurierten Zieldatenbank ausgeführt werden.
Ein SinkRecordDescriptor
ist ein Objekt, das aus jedem SinkRecord
erstellt wird. Die meisten Methoden, die andernfalls einen SinkRecord
verwenden würden, verwenden stattdessen dieses Deskriptorobjekt. Der Deskriptor ist praktisch eine vorverarbeitete Version von SinkRecord
, die es uns ermöglicht, diese Vorverarbeitung einmal durchzuführen und diese Informationen dann im gesamten Connector zu nutzen. Wenn Sie neue Methoden hinzufügen, sollten Sie im Allgemeinen einen SinkRecordDescriptor
verwenden.
Jede Senkendatenbank verfügt normalerweise über eine eigene DatabaseDialect
-Implementierung, die GeneralDatabaseDialect
erweitern sollte. Der Dialekt ist einer der Kernmechanismen, die der JDBC-Sink-Connector verwendet, um SQL-Anweisungen und andere Datenbankmerkmale für die Datenbank aufzulösen, in die der Connector konsumierte Ereignisse schreibt. Der JDBC-Sink-Connector nutzt die Dialektauflösung von Hibernate, um den vom Connector verwendeten Dialekt-Wrapper zu steuern.
Wenn für die verwendete Senkendatenbank keine Dialektzuordnung erkannt wird, verwendet der JDBC-Senken-Connector standardmäßig die GeneralDatabaseDialect
Implementierung. Diese verallgemeinerte Implementierung unterstützt nicht jeden Aspekt des Connectors. Beispielsweise wird der UPSERT-Einfügemodus nicht unterstützt, wenn dieser Dialekt ausgewählt wird, da die UPSERT-Anweisung im Allgemeinen nur für die verwendete Datenbank gilt. Im Allgemeinen ist es eine gute Idee, eine neue Dialektimplementierung hinzuzufügen, wenn eine neue Senkendatenbank vollständige Kompatibilität mit dem umfangreichen Verhalten des JDBC-Senken-Connectors aufweisen soll.
Jedes Feld in einer Kafka-Nachricht ist einem Schematyp zugeordnet, diese Typinformationen können jedoch auch andere Metadaten wie einen Namen oder sogar Parameter enthalten, die vom Quellkonnektor bereitgestellt wurden. Der JDBC-Sink-Connector verwendet ein Typsystem, das auf dem Vertrag io.debezium.connector.jdbc.type.Type
basiert, um die Wertbindung, die Standardwertauflösung und andere Merkmale zu handhaben, die typspezifisch sein könnten.
Es gibt effektiv drei verschiedene Arten von Type
:
io.debezium.connector.jdbc.type.connect
.io.debezium.connector.jdbc.type.debezium
.io.debezium.connector.jdbc.dialect
.Typen werden in einem hierarchischen Muster registriert, beginnend mit den Kafka Connect-Typen, dann den Debezium-Typen und schließlich den dialektspezifischen Typen. Dies ermöglicht es den Debezium-Typen, bei Bedarf Kafka Connect-Typen zu überschreiben, und schließlich dem Dialekt, jeden anderen beigetragenen Typ zu überschreiben.
Typen werden aufgelöst, indem man sich zunächst den Namen des Kafka-Schemas ansieht und diesen einer Typregistrierung zuordnet. Wenn das Schema keinen Namen hat, wird der Typ des Schemas zur Auflösung in einen Typ verwendet. Dadurch können die Kafka Connect-Basistypen das letzte Wort bei der Interpretation von Daten haben, wenn keine andere Typimplementierung für das Feld erkannt wird.
Der JDBC-Sink-Connector verwendet zwei Benennungsstrategien:
TableNamingStrategy
ColumnNamingStrategy
Der JDBC-Sink-Connector wird mit Standardimplementierungen beider ausgeliefert, die im Paket io.debezium.connector.jdbc.naming
zu finden sind. Das Standardverhalten dieser beiden Strategien ist wie folgt:
.
mit _
und verwendet den konfigurierten table.name.format
-Wert, um den endgültigen Namen der Tabelle aufzulösen. Unter der Annahme, dass der Themenname des Ereignisses server.schema.table
mit dem Standardwert table.name.format=dbo.${topic}
, wird die Zieltabelle als dbo.server_schema_table
erstellt.Diese beiden Strategien können überschrieben werden, indem in der Connector-Konfiguration vollständig qualifizierte Klassennamenverweise angegeben werden. Eine Beispielkonfiguration:
table.naming.strategy=io.debezium.connector.jdbc.naming.DefaultTableNamingStrategy
column.naming.strategy=io.debezium.connector.jdbc.naming.DefaultColumnNamingStrategy
Der JDBC-Sink-Connector verwaltet ein relationales In-Memory-Modell, ähnlich den Debezium-Source-Connectors. Diese relationalen Modellklassen finden Sie im Paket io.debezium.connector.jdbc.relational
.
Folgendes ist erforderlich, um mit der Codebasis des Debezium JDBC Sink Connectors zu arbeiten und sie lokal zu erstellen:
.mvnw
für Maven-Befehle auf) Die Testsuite basiert stark auf der TestContainer-Nutzung und startet automatisch eine Vielzahl von Quell- und Senkendatenbanken. Ohne eine Docker-kompatible Umgebung können die Integrationstests nicht ausgeführt werden. Wenn Sie keine Docker-Umgebung haben, können Sie die Integrationstests überspringen, indem Sie das unten gezeigte Befehlszeilenargument -DskipITs
verwenden:
$ ./mvnw clean verify -DskipITs
Es gibt drei Arten von Typen in der Testsuite:
Standardmäßig werden alle Komponententests als Teil des Builds ausgeführt. Die sinkenbasierten Integrationstests werden standardmäßig nur für MySQL, PostgreSQL und SQL Server ausgeführt, während keiner der End-to-End-Matrix-basierten Tests ausgeführt wird.
Um die sinkenbasierten Integrationstests für Oracle und DB2 auszuführen, muss das Argument -Dtest.tags
bereitgestellt werden, um diese in den Build einzubeziehen. Fügen Sie dazu alle auszuführenden Integrationstests hinzu, wie unten für alle Datenbanken dargestellt:
$ ./mvnw clean install -Dtest.tags=it-mysql,it-postgresql,it-sqlserver,it-oracle,it-db2
Um alle sinkenbasierten Integrationstests für alle Datenbanken auszuführen, wird ein Shortcut-Tag bereitgestellt:
$ ./mvnw clean install -Dtest.tags=it
Um bestimmte End-to-End-Tests zu ermöglichen, kann das Argument -Dtest.tags
auch mit den erforderlichen Tags für jeden Senkendatenbanktyp bereitgestellt werden:
$ ./mvnw clean install -Dtest.tags=e2e-mysql,e2e-postgresql,e2e-sqlserver,e2e-oracle,e2e-db2
Um alle End-to-End-Integrationstests auszuführen, wird außerdem ein Shortcut-Tag bereitgestellt:
$ ./mvnw clean install -Dtest.tags=e2e
Um alle Tests für alle Quellen-/Senkenkombinationen auszuführen:
$ ./mvnw clean install -Dtest.tags=all
Die Debezium-Community heißt jeden willkommen, der in irgendeiner Weise helfen möchte, sei es beim Melden von Problemen, beim Helfen bei der Dokumentation oder beim Beisteuern von Codeänderungen, um Fehler zu beheben, Tests hinzuzufügen oder neue Funktionen zu implementieren. Einzelheiten finden Sie in diesem Dokument.
Ein großes Dankeschön an alle Debezium JDBC Sink-Mitwirkenden!
Dieses Projekt ist unter der Apache-Lizenz, Version 2, lizenziert.