Autores do Debezium com direitos autorais. Licenciado sob a Licença Apache, Versão 2.0.
Debezium é um projeto de código aberto que fornece uma plataforma de streaming de dados de baixa latência para captura de dados alterados (CDC). Este conector fornece uma implementação de coletor para streaming de alterações emitidas pelo Debezium em um banco de dados relacional.
Esta implementação do conector reconhece a fonte Debezium. Isso significa que o conector pode consumir eventos de mudança nativos do Debezium sem precisar usar ExtractNewRecordState
para nivelar a estrutura do evento. Isso reduz a configuração necessária para usar um conector de coletor JDBC. Além disso, isso também significa que o lado do coletor do pipeline pode aproveitar as vantagens dos metadados do Debezium, como a propagação do tipo de coluna, para suportar perfeitamente a resolução adequada do tipo de coluna no lado do conector do coletor do pipeline.
O conector de coletor JDBC é um conector de coletor tradicional do Kafka Connect (também conhecido como consumidor). Sua função é ler registros de um ou mais tópicos Kafka e produzir instruções SQL que são executadas no banco de dados de destino configurado.
Um SinkRecordDescriptor
é um objeto construído a partir de cada SinkRecord
. A maioria dos métodos que, de outra forma, usariam um SinkRecord
usam esse objeto descritor. O descritor é, na verdade, uma versão pré-processada do SinkRecord
, o que nos permite realizar esse pré-processamento uma vez e depois utilizar essas informações no conector. Ao adicionar novos métodos, geralmente você desejará usar SinkRecordDescriptor
.
Cada banco de dados coletor normalmente terá sua própria implementação DatabaseDialect
que deve estender GeneralDatabaseDialect
. O dialeto é um dos principais mecanismos usados pelo conector coletor JDBC para resolver instruções SQL e outras características do banco de dados no qual o conector gravará eventos consumidos. O conector coletor JDBC depende da resolução de dialeto do Hibernate para acionar o wrapper de dialeto usado pelo conector.
Se nenhum mapeamento de dialeto for detectado para o banco de dados do coletor que está sendo usado, o conector do coletor JDBC usará como padrão a implementação GeneralDatabaseDialect
. Esta implementação generalizada não suporta todos os aspectos do conector, por exemplo, o modo de inserção UPSERT não é suportado quando este dialeto é escolhido, pois a instrução UPSERT geralmente é exclusiva do banco de dados que está sendo usado. Geralmente, é uma boa ideia adicionar uma nova implementação de dialeto se um novo banco de dados de coletor tiver compatibilidade total com o vasto comportamento do conector de coletor JDBC.
Cada campo em uma mensagem Kafka está associado a um tipo de esquema, mas essas informações de tipo também podem conter outros metadados, como um nome ou até mesmo parâmetros que foram fornecidos pelo conector de origem. O conector coletor JDBC utiliza um sistema de tipos, que é baseado no contrato io.debezium.connector.jdbc.type.Type
, para lidar com vinculação de valor, resolução de valor padrão e outras características que podem ser específicas do tipo.
Existem efetivamente três tipos diferentes de implementações Type
:
io.debezium.connector.jdbc.type.connect
.io.debezium.connector.jdbc.type.debezium
.io.debezium.connector.jdbc.dialect
.Os tipos são registrados em um padrão hierárquico, começando com os tipos Kafka Connect, depois os tipos Debezium e, por último, os tipos específicos do dialeto. Isso permite que os tipos Debezium substituam os tipos Kafka Connect, se necessário, e, finalmente, o dialeto substitua qualquer outro tipo contribuído.
Os tipos são resolvidos observando primeiro o nome do esquema Kafka e mapeando-o para um registro de tipo. Se o esquema não tiver um nome, o tipo do esquema será usado para resolver um tipo. Isso permite que os tipos básicos do Kafka Connect tenham a palavra final sobre como os dados são interpretados se nenhuma outra implementação de tipo for detectada para o campo.
Existem duas estratégias de nomenclatura usadas pelo conector coletor JDBC:
TableNamingStrategy
ColumnNamingStrategy
O conector coletor JDBC é fornecido com implementações padrão de ambos, encontradas no pacote io.debezium.connector.jdbc.naming
. O comportamento padrão dessas duas estratégias é o seguinte:
.
com _
e usa o valor configurado table.name.format
para resolver o nome final da tabela. Portanto, supondo que o nome do tópico do evento seja server.schema.table
com o padrão table.name.format=dbo.${topic}
, a tabela de destino será criada como dbo.server_schema_table
.Essas duas estratégias podem ser substituídas especificando referências de nome de classe totalmente qualificadas na configuração do conector. Um exemplo de configuração:
table.naming.strategy=io.debezium.connector.jdbc.naming.DefaultTableNamingStrategy
column.naming.strategy=io.debezium.connector.jdbc.naming.DefaultColumnNamingStrategy
O conector coletor JDBC mantém um modelo relacional na memória, semelhante aos conectores de origem Debezium. Essas classes de modelo relacional podem ser encontradas no pacote io.debezium.connector.jdbc.relational
.
O seguinte é necessário para trabalhar com o código base do conector de coletor JDBC do Debezium e para construí-lo localmente:
.mvnw
para comandos Maven) O conjunto de testes é fortemente baseado no uso do TestContainer e inicia automaticamente uma variedade de bancos de dados de origem e coletores. Sem um ambiente compatível com Docker, os testes de integração não serão executados. Se você não tiver um ambiente Docker, poderá pular os testes de integração usando o argumento de linha de comando -DskipITs
, mostrado abaixo:
$ ./mvnw clean verify -DskipITs
Existem três tipos de tipos no conjunto de testes:
Por padrão, todos os testes unitários são executados como parte da construção. Os testes de integração baseados em coletor são executados apenas para MySQL, PostgreSQL e SQL Server por padrão, enquanto nenhum dos testes baseados em matriz de ponta a ponta é executado.
Para executar os testes de integração baseados em sink para Oracle e DB2, o argumento -Dtest.tags
deve ser fornecido para incluí-los na construção. Para isso, adicione todos os testes de integração a serem executados, conforme mostrado abaixo para todos os bancos de dados:
$ ./mvnw clean install -Dtest.tags=it-mysql,it-postgresql,it-sqlserver,it-oracle,it-db2
Para executar todos os testes de integração baseados em sink para todos os bancos de dados, é fornecida uma tag de atalho:
$ ./mvnw clean install -Dtest.tags=it
Da mesma forma, para permitir testes específicos de ponta a ponta, o argumento -Dtest.tags
também pode ser fornecido com as tags necessárias para cada tipo de banco de dados coletor:
$ ./mvnw clean install -Dtest.tags=e2e-mysql,e2e-postgresql,e2e-sqlserver,e2e-oracle,e2e-db2
Para executar todos os testes de integração ponta a ponta, uma tag de atalho também é fornecida:
$ ./mvnw clean install -Dtest.tags=e2e
Para executar todos os testes para todas as combinações de fonte/coletor:
$ ./mvnw clean install -Dtest.tags=all
A comunidade Debezium dá as boas-vindas a qualquer pessoa que queira ajudar de alguma forma, seja isso incluindo relatar problemas, ajudar com documentação ou contribuir com alterações de código para corrigir bugs, adicionar testes ou implementar novos recursos. Consulte este documento para obter detalhes.
Um grande obrigado a todos os contribuidores do coletor Debezium JDBC!
Este projeto está licenciado sob a Licença Apache, versão 2.