Авторские права Дебезиум Авторы. Лицензия Apache, версия 2.0.
Debezium — это проект с открытым исходным кодом, который предоставляет платформу потоковой передачи данных с низкой задержкой для сбора измененных данных (CDC). Этот соединитель обеспечивает реализацию приемника для потоковой передачи изменений, отправленных Debezium, в реляционную базу данных.
Эта реализация соединителя поддерживает исходный код Debezium. Это означает, что соединитель может использовать собственные события изменений Debezium без необходимости использования ExtractNewRecordState
для выравнивания структуры событий. Это уменьшает необходимость настройки для использования соединителя приемника JDBC. Кроме того, это также означает, что сторона приемника конвейера может использовать преимущества метаданных Debezium, такие как распространение типа столбца, для беспрепятственной поддержки правильного разрешения типа столбца на стороне соединителя приемника конвейера.
Соединитель приемника JDBC — это традиционный соединитель приемника Kafka Connect (также известный как потребитель). Его задача — читать записи из одной или нескольких тем Kafka и создавать операторы SQL, которые выполняются в настроенной целевой базе данных.
SinkRecordDescriptor
— это объект, который создается из каждого SinkRecord
. Большинство методов, которые в противном случае использовали бы SinkRecord
вместо этого используют этот объект дескриптора. Дескриптор по сути является предварительно обработанной версией SinkRecord
, что позволяет нам выполнить эту предварительную обработку один раз, а затем использовать эту информацию через соединитель. При добавлении новых методов вы обычно захотите использовать SinkRecordDescriptor
.
Каждая база данных-приемника обычно имеет собственную реализацию DatabaseDialect
, которая должна расширять GeneralDatabaseDialect
. Диалект — это один из основных механизмов, используемых соединителем приемника JDBC для разрешения операторов SQL и других характеристик базы данных для базы данных, в которую соединитель будет записывать потребляемые события. Соединитель приемника JDBC использует разрешение диалекта Hibernate для управления оболочкой диалекта, используемой соединителем.
Если для используемой базы данных приемника не обнаружено сопоставление диалектов, соединитель приемника JDBC по умолчанию будет использовать реализацию GeneralDatabaseDialect
. Эта обобщенная реализация не поддерживает все аспекты соединителя, например, режим вставки UPSERT не поддерживается при выборе этого диалекта, поскольку оператор UPSERT обычно уникален для используемой базы данных. Как правило, рекомендуется добавить новую реализацию диалекта, если новая база данных приемника должна иметь полную совместимость с обширным поведением соединителя приемника JDBC.
Каждое поле в сообщении Kafka связано с типом схемы, но эта информация о типе может также содержать другие метаданные, такие как имя или даже параметры, предоставленные исходным соединителем. Соединитель приемника JDBC использует систему типов, основанную на контракте io.debezium.connector.jdbc.type.Type
, для обработки привязки значений, разрешения значений по умолчанию и других характеристик, которые могут зависеть от типа.
Фактически существует три различных типа реализации Type
:
io.debezium.connector.jdbc.type.connect
.io.debezium.connector.jdbc.type.debezium
.io.debezium.connector.jdbc.dialect
.Типы регистрируются по иерархической схеме, начиная с типов Kafka Connect, затем типов Debezium и, наконец, типов, специфичных для диалекта. Это позволяет типам Debezium при необходимости переопределять типы Kafka Connect и, наконец, диалекту переопределять любой другой добавленный тип.
Типы определяются путем просмотра имени схемы Kafka и сопоставления его с регистрацией типа. Если у схемы нет имени, тип схемы затем используется для преобразования в тип. Это позволяет базовым типам Kafka Connect иметь последнее слово в том, как интерпретируются данные, если для поля не обнаружена другая реализация типа.
Соединитель приемника JDBC использует две стратегии именования:
TableNamingStrategy
ColumnNamingStrategy
Соединитель приемника JDBC поставляется с реализациями обоих по умолчанию, которые находятся в пакете io.debezium.connector.jdbc.naming
. Поведение по умолчанию этих двух стратегий следующее:
.
с _
и использует настроенное значение table.name.format
для разрешения окончательного имени таблицы. Итак, если предположить, что имя темы события — server.schema.table
со значением по умолчанию table.name.format=dbo.${topic}
, таблица назначения будет создана как dbo.server_schema_table
.Эти две стратегии можно переопределить, указав полные ссылки на имя класса в конфигурации соединителя. Пример конфигурации:
table.naming.strategy=io.debezium.connector.jdbc.naming.DefaultTableNamingStrategy
column.naming.strategy=io.debezium.connector.jdbc.naming.DefaultColumnNamingStrategy
Соединитель приемника JDBC поддерживает реляционную модель в памяти, аналогичную соединителям источника Debezium. Эти классы реляционных моделей можно найти в пакете io.debezium.connector.jdbc.relational
.
Для работы с базой кода соединителя приемника Debezium JDBC и его локальной сборки требуется следующее:
.mvnw
для команд Maven) Набор тестов в значительной степени основан на использовании TestContainer и автоматически запускает различные базы данных источника и приемника. Без среды, совместимой с Docker, интеграционные тесты не будут выполняться. Если у вас нет среды Docker, вы можете пропустить интеграционные тесты, используя аргумент командной строки -DskipITs
, как показано ниже:
$ ./mvnw clean verify -DskipITs
В наборе тестов есть три типа типов:
По умолчанию все модульные тесты выполняются как часть сборки. По умолчанию интеграционные тесты на основе приемника выполняются только для MySQL, PostgreSQL и SQL Server, тогда как ни один из сквозных матричных тестов не выполняется.
Чтобы выполнить интеграционные тесты на основе приемника для Oracle и DB2, необходимо указать аргумент -Dtest.tags
, чтобы включить их в сборку. Для этого добавьте все интеграционные тесты, которые необходимо выполнить, как показано ниже для всех баз данных:
$ ./mvnw clean install -Dtest.tags=it-mysql,it-postgresql,it-sqlserver,it-oracle,it-db2
Чтобы запустить все интеграционные тесты на основе приемников для всех баз данных, предоставляется сокращенный тег:
$ ./mvnw clean install -Dtest.tags=it
Аналогичным образом, чтобы включить определенные сквозные тесты, аргумент -Dtest.tags
также может быть снабжен необходимыми тегами для каждого типа базы данных-приемника:
$ ./mvnw clean install -Dtest.tags=e2e-mysql,e2e-postgresql,e2e-sqlserver,e2e-oracle,e2e-db2
Для запуска всех комплексных интеграционных тестов также предоставляется сокращенный тег:
$ ./mvnw clean install -Dtest.tags=e2e
Чтобы запустить все тесты для всех комбинаций источника/приемника:
$ ./mvnw clean install -Dtest.tags=all
Сообщество Debezium приветствует всех, кто хочет помочь любым способом, включая сообщение о проблемах, помощь с документацией или внесение изменений в код для исправления ошибок, добавления тестов или реализации новых функций. Подробности смотрите в этом документе.
Большое спасибо всем участникам приемника Debezium JDBC!
Этот проект распространяется по лицензии Apache версии 2.