저작권 Debezium 작성자. Apache 라이센스 버전 2.0에 따라 라이센스가 부여되었습니다.
Debezium은 변경 데이터 캡처(CDC)를 위한 대기 시간이 짧은 데이터 스트리밍 플랫폼을 제공하는 오픈 소스 프로젝트입니다. 이 커넥터는 Debezium에서 내보낸 변경 사항을 관계형 데이터베이스로 스트리밍하기 위한 싱크 구현을 제공합니다.
이 커넥터 구현은 Debezium 소스를 인식합니다. 이는 커넥터가 이벤트 구조를 평면화하기 위해 ExtractNewRecordState
를 사용할 필요 없이 기본 Debezium 변경 이벤트를 사용할 수 있음을 의미합니다. 이렇게 하면 JDBC 싱크 커넥터를 사용하는 데 필요한 구성이 줄어듭니다. 또한 이는 파이프라인의 싱크 측이 파이프라인의 싱크 커넥터 측에서 적절한 열 유형 확인을 원활하게 지원하기 위해 열 유형 전파와 같은 Debezium 메타데이터를 활용할 수 있음을 의미합니다.
JDBC 싱크 커넥터는 전통적인 Kafka Connect 싱크 커넥터(소비자라고도 함)입니다. 그 작업은 하나 이상의 Kafka 주제에서 레코드를 읽고 구성된 대상 데이터베이스에서 실행되는 SQL 문을 생성하는 것입니다.
SinkRecordDescriptor
는 모든 SinkRecord
에서 생성되는 개체입니다. SinkRecord
사용하는 대부분의 메서드는 대신 이 설명자 개체를 사용합니다. 설명자는 실제로 SinkRecord
의 사전 처리된 버전이므로 이 사전 처리를 한 번 수행한 다음 커넥터 전체에서 이 정보를 사용할 수 있습니다. 새로운 메서드를 추가할 때 일반적으로 SinkRecordDescriptor
를 사용하는 것이 좋습니다.
각 싱크 데이터베이스에는 일반적으로 GeneralDatabaseDialect
확장해야 하는 자체 DatabaseDialect
구현이 있습니다. 방언은 커넥터가 사용된 이벤트를 기록할 데이터베이스에 대한 SQL 문 및 기타 데이터베이스 특성을 확인하기 위해 JDBC 싱크 커넥터에서 사용하는 핵심 메커니즘 중 하나입니다. JDBC 싱크 커넥터는 커넥터가 사용하는 방언 래퍼를 구동하기 위해 Hibernate의 방언 분석에 의존합니다.
사용 중인 싱크 데이터베이스에 대해 언어 매핑이 감지되지 않으면 JDBC 싱크 커넥터는 기본적으로 GeneralDatabaseDialect
구현을 사용합니다. 이 일반화된 구현은 커넥터의 모든 측면을 지원하지 않습니다. 예를 들어 UPSERT 문은 일반적으로 사용 중인 데이터베이스에 고유하므로 이 방언을 선택할 때 UPSERT 삽입 모드는 지원되지 않습니다. 새 싱크 데이터베이스가 JDBC 싱크 커넥터의 광범위한 동작과 완전히 호환되도록 하려면 일반적으로 새 방언 구현을 추가하는 것이 좋습니다.
Kafka 메시지의 모든 필드는 스키마 유형과 연관되어 있지만 이 유형 정보는 소스 커넥터에서 제공한 이름이나 매개변수와 같은 다른 메타데이터도 전달할 수 있습니다. JDBC 싱크 커넥터는 값 바인딩, 기본값 확인 및 유형별 특성을 처리하기 위해 io.debezium.connector.jdbc.type.Type
계약을 기반으로 하는 유형 시스템을 활용합니다.
Type
구현에는 사실상 세 가지 유형이 있습니다.
io.debezium.connector.jdbc.type.connect
에 있는 Kafka Connect의 스키마 유형을 지원하는 것입니다.io.debezium.connector.jdbc.type.debezium
에 있는 Debezium 관련 명명된 스키마 유형을 지원하는 것입니다.io.debezium.connector.jdbc.dialect
계층 구조에 있는 방언 관련 유형입니다.유형은 Kafka Connect 유형, Debezium 유형, 마지막으로 방언별 유형으로 시작하여 계층적 패턴으로 등록됩니다. 이를 통해 필요한 경우 Debezium 유형이 Kafka Connect 유형을 재정의하고 마지막으로 방언이 기여된 다른 유형을 재정의할 수 있습니다.
유형은 먼저 Kafka 스키마 이름을 확인하고 이를 유형 등록에 매핑하여 해결됩니다. 스키마에 이름이 없으면 스키마 유형을 사용하여 유형을 확인합니다. 이를 통해 기본 Kafka Connect 유형은 필드에 대해 다른 유형 구현이 감지되지 않는 경우 데이터가 해석되는 방식에 대한 최종 결정권을 가질 수 있습니다.
JDBC 싱크 커넥터에는 두 가지 명명 전략이 사용됩니다.
TableNamingStrategy
ColumnNamingStrategy
JDBC 싱크 커넥터는 io.debezium.connector.jdbc.naming
패키지에 있는 두 가지 모두의 기본 구현과 함께 제공됩니다. 이 두 가지 전략의 기본 동작은 다음과 같습니다.
.
_
사용하고 구성된 table.name.format
값을 사용하여 테이블의 최종 이름을 확인합니다. 따라서 이벤트의 주제 이름이 server.schema.table
이고 기본 table.name.format=dbo.${topic}
이라고 가정하면 대상 테이블은 dbo.server_schema_table
로 생성됩니다.커넥터 구성에서 정규화된 클래스 이름 참조를 지정하여 이러한 두 가지 전략을 재정의할 수 있습니다. 예시 구성:
table.naming.strategy=io.debezium.connector.jdbc.naming.DefaultTableNamingStrategy
column.naming.strategy=io.debezium.connector.jdbc.naming.DefaultColumnNamingStrategy
JDBC 싱크 커넥터는 Debezium 소스 커넥터와 유사한 메모리 내 관계형 모델을 유지합니다. 이러한 관계형 모델 클래스는 io.debezium.connector.jdbc.relational
패키지에서 찾을 수 있습니다.
Debezium JDBC 싱크 커넥터 코드 기반으로 작업하고 로컬로 빌드하려면 다음이 필요합니다.
.mvnw
사용하여 래퍼 호출) 테스트 스위트는 TestContainer 사용법을 기반으로 하며 다양한 소스 및 싱크 데이터베이스를 자동으로 시작합니다. Docker 호환 환경이 없으면 통합 테스트가 실행되지 않습니다. Docker 환경이 없는 경우 아래와 같이 -DskipITs
명령줄 인수를 사용하여 통합 테스트를 건너뛸 수 있습니다.
$ ./mvnw clean verify -DskipITs
테스트 스위트에는 세 가지 유형이 있습니다.
기본적으로 모든 단위 테스트는 빌드의 일부로 실행됩니다. 싱크 기반 통합 테스트는 기본적으로 MySQL, PostgreSQL 및 SQL Server에 대해서만 실행되며, 엔드투엔드 매트릭스 기반 테스트는 실행되지 않습니다.
Oracle 및 DB2에 대한 싱크 기반 통합 테스트를 실행하려면 이를 빌드에 포함하도록 -Dtest.tags
인수를 제공해야 합니다. 이를 수행하려면 모든 데이터베이스에 대해 아래 표시된 대로 실행할 모든 통합 테스트를 추가하십시오.
$ ./mvnw clean install -Dtest.tags=it-mysql,it-postgresql,it-sqlserver,it-oracle,it-db2
모든 데이터베이스에 대해 모든 싱크 기반 통합 테스트를 실행하기 위해 바로가기 태그가 제공됩니다.
$ ./mvnw clean install -Dtest.tags=it
마찬가지로 특정 종단 간 테스트를 활성화하기 위해 -Dtest.tags
인수에 각 싱크 데이터베이스 유형에 필요한 태그를 제공할 수도 있습니다.
$ ./mvnw clean install -Dtest.tags=e2e-mysql,e2e-postgresql,e2e-sqlserver,e2e-oracle,e2e-db2
모든 엔드투엔드 통합 테스트를 실행하기 위해 바로가기 태그도 제공됩니다.
$ ./mvnw clean install -Dtest.tags=e2e
모든 소스/싱크 조합에 대해 모든 테스트를 실행하려면 다음을 수행하세요.
$ ./mvnw clean install -Dtest.tags=all
Debezium 커뮤니티는 문제 보고, 문서화 지원, 버그 수정, 테스트 추가, 새로운 기능 구현을 위한 코드 변경 기여 등 어떤 방식으로든 도움을 주고 싶은 모든 사람을 환영합니다. 자세한 내용은 이 문서를 참조하세요.
모든 Debezium JDBC 싱크 기여자에게 큰 감사를 드립니다!
이 프로젝트는 Apache 라이선스 버전 2에 따라 라이선스가 부여됩니다.