Copyright Debezium Auteurs. Sous licence Apache, version 2.0.
Debezium est un projet open source qui fournit une plateforme de streaming de données à faible latence pour la capture de données modifiées (CDC). Ce connecteur fournit une implémentation de récepteur pour diffuser les modifications émises par Debezium dans une base de données relationnelle.
Cette implémentation de connecteur prend en charge la source Debezium. Cela signifie que le connecteur peut consommer les événements de modification natifs de Debezium sans avoir besoin d'utiliser ExtractNewRecordState
pour aplatir la structure des événements. Cela réduit la configuration nécessaire pour utiliser un connecteur récepteur JDBC. En outre, cela signifie également que le côté récepteur du pipeline peut tirer parti des métadonnées Debezium, telles que la propagation du type de colonne, pour prendre en charge de manière transparente une résolution appropriée du type de colonne du côté du connecteur récepteur du pipeline.
Le connecteur récepteur JDBC est un connecteur récepteur Kafka Connect traditionnel (alias consommateur). Son travail consiste à lire les enregistrements d'un ou plusieurs sujets Kafka et à produire des instructions SQL qui sont exécutées sur la base de données de destination configurée.
Un SinkRecordDescriptor
est un objet construit à partir de chaque SinkRecord
. La plupart des méthodes qui autrement prendraient un SinkRecord
prendraient cet objet descripteur à la place. Le descripteur est en fait une version prétraitée du SinkRecord
, ce qui nous permet d'effectuer ce prétraitement une fois et d'utiliser ensuite ces informations à travers le connecteur. Lors de l’ajout de nouvelles méthodes, vous souhaiterez généralement utiliser un SinkRecordDescriptor
.
Chaque base de données récepteur aura généralement sa propre implémentation DatabaseDialect
qui devrait étendre GeneralDatabaseDialect
. Le dialecte est l'un des principaux mécanismes utilisés par le connecteur récepteur JDBC afin de résoudre les instructions SQL et d'autres caractéristiques de base de données pour la base de données dans laquelle le connecteur écrira les événements consommés. Le connecteur récepteur JDBC s'appuie sur la résolution dialectale d'Hibernate pour piloter le wrapper dialecte utilisé par le connecteur.
Si aucun mappage de dialecte n'est détecté pour la base de données récepteur utilisée, le connecteur récepteur JDBC utilisera par défaut l'implémentation GeneralDatabaseDialect
. Cette implémentation généralisée ne prend pas en charge tous les aspects du connecteur, par exemple le mode d'insertion UPSERT n'est pas pris en charge lorsque ce dialecte est choisi car l'instruction UPSERT est généralement unique à la base de données utilisée. C'est généralement une bonne idée d'ajouter une nouvelle implémentation de dialecte si une nouvelle base de données de récepteur doit avoir une compatibilité totale avec le vaste comportement du connecteur récepteur JDBC.
Chaque champ d'un message Kafka est associé à un type de schéma, mais ces informations de type peuvent également contenir d'autres métadonnées telles qu'un nom ou même des paramètres fournis par le connecteur source. Le connecteur récepteur JDBC utilise un système de types, basé sur le contrat io.debezium.connector.jdbc.type.Type
, afin de gérer la liaison de valeurs, la résolution de valeurs par défaut et d'autres caractéristiques qui pourraient être spécifiques au type.
Il existe en fait trois types différents d’implémentations Type
:
io.debezium.connector.jdbc.type.connect
.io.debezium.connector.jdbc.type.debezium
.io.debezium.connector.jdbc.dialect
.Les types sont enregistrés selon un modèle hiérarchique, en commençant par les types Kafka Connect, puis les types Debezium et enfin les types spécifiques au dialecte. Cela permet aux types Debezium de remplacer les types Kafka Connect si nécessaire et enfin au dialecte de remplacer tout autre type contribué.
Les types sont résolus en examinant d'abord le nom du schéma Kafka et en le mappant à un enregistrement de type. Si le schéma n’a pas de nom, le type du schéma est ensuite utilisé pour résoudre un type. Cela permet aux types de base Kafka Connect d'avoir le dernier mot sur la façon dont les données sont interprétées si aucune autre implémentation de type n'est détectée pour le champ.
Il existe deux stratégies de dénomination utilisées par le connecteur récepteur JDBC :
TableNamingStrategy
ColumnNamingStrategy
Le connecteur récepteur JDBC est livré avec les implémentations par défaut des deux, trouvées dans le package io.debezium.connector.jdbc.naming
. Le comportement par défaut de ces deux stratégies est le suivant :
.
avec _
et utilise la valeur table.name.format
configurée pour résoudre le nom final de la table. Donc, en supposant que le nom du sujet de l'événement est server.schema.table
avec la valeur par défaut table.name.format=dbo.${topic}
, la table de destination sera créée sous le nom dbo.server_schema_table
.Ces deux stratégies peuvent être remplacées en spécifiant des références de nom de classe complètes dans la configuration du connecteur. Un exemple de configuration :
table.naming.strategy=io.debezium.connector.jdbc.naming.DefaultTableNamingStrategy
column.naming.strategy=io.debezium.connector.jdbc.naming.DefaultColumnNamingStrategy
Le connecteur récepteur JDBC maintient un modèle relationnel en mémoire, similaire aux connecteurs source Debezium. Ces classes de modèles relationnels se trouvent dans le package io.debezium.connector.jdbc.relational
.
Les éléments suivants sont requis pour travailler avec la base de code du connecteur récepteur Debezium JDBC et pour la construire localement :
.mvnw
pour les commandes Maven) La suite de tests est fortement basée sur l'utilisation de TestContainer et démarre automatiquement diverses bases de données sources et récepteurs. Sans un environnement compatible Docker, les tests d'intégration ne s'exécuteront pas. Si vous ne disposez pas d'un environnement Docker, vous pouvez ignorer les tests d'intégration en utilisant l'argument de ligne de commande -DskipITs
, illustré ci-dessous :
$ ./mvnw clean verify -DskipITs
Il existe trois types de types dans la suite de tests :
Par défaut, tous les tests unitaires sont exécutés dans le cadre de la build. Les tests d'intégration basés sur le récepteur ne sont exécutés par défaut que pour MySQL, PostgreSQL et SQL Server, alors qu'aucun des tests matriciels de bout en bout n'est exécuté.
Afin d'exécuter les tests d'intégration basés sur les récepteurs pour Oracle et DB2, l'argument -Dtest.tags
doit être fourni pour les inclure dans la build. Pour ce faire, ajoutez tous les tests d'intégration à exécuter, comme indiqué ci-dessous pour toutes les bases de données :
$ ./mvnw clean install -Dtest.tags=it-mysql,it-postgresql,it-sqlserver,it-oracle,it-db2
Afin d'exécuter tous les tests d'intégration basés sur les récepteurs pour toutes les bases de données, une balise de raccourci est fournie :
$ ./mvnw clean install -Dtest.tags=it
De même, afin de permettre des tests spécifiques de bout en bout, l'argument -Dtest.tags
peut également être fourni avec les balises nécessaires pour chaque type de base de données récepteur :
$ ./mvnw clean install -Dtest.tags=e2e-mysql,e2e-postgresql,e2e-sqlserver,e2e-oracle,e2e-db2
Afin d'exécuter tous les tests d'intégration de bout en bout, une balise de raccourci est également fournie :
$ ./mvnw clean install -Dtest.tags=e2e
Afin d'exécuter tous les tests pour toutes les combinaisons source/puits :
$ ./mvnw clean install -Dtest.tags=all
La communauté Debezium accueille toute personne souhaitant aider de quelque manière que ce soit, que ce soit en signalant des problèmes, en aidant avec la documentation ou en contribuant à des modifications de code pour corriger des bogues, ajouter des tests ou implémenter de nouvelles fonctionnalités. Consultez ce document pour plus de détails.
Un grand merci à tous les contributeurs du récepteur Debezium JDBC !
Ce projet est sous licence Apache, version 2.