官方 MongoDB Kafka 连接器。
连接器的文档可在 https://docs.mongodb.com/kafka-connector/current/ 上找到
连接器将发布在 Maven Central 上。
有关 MongoDB Kafka 连接器的问题、疑问或反馈,请查看我们的支持渠道。请不要直接向任何 Kafka 连接器开发人员发送电子邮件提出问题或疑问 - 您更有可能在 MongoDB 社区论坛上获得答案。
请至少在您的描述中包含您正在使用的驱动程序的确切版本。 如果您遇到连接问题,粘贴 Kafka 连接器配置通常也很有用。您还应该检查应用程序日志中是否有任何与连接相关的异常,并发布这些异常。
您认为您发现了错误吗?想要查看 Kafka 驱动程序中的新功能吗?请在我们的问题管理工具 JIRA 中打开案例:
创建帐户并登录。
导航到 KAFKA 项目。
单击创建问题- 请提供尽可能多的有关问题类型以及如何重现问题的信息。
JIRA 中连接器的错误报告是公开的。
如果您发现连接器或任何其他 MongoDB 项目中存在安全漏洞,请根据此处的说明进行报告。
MongoDB Kafka 连接器遵循语义版本控制。有关版本之间更改的信息,请参阅更改日志。
构建和编译源代码需要 Java 8+。构建并测试驱动程序:
$ git clone https://github.com/mongodb/mongo-kafka.git $ cd mongo-kafka $ ./gradlew check -Dorg.mongodb.test.uri=mongodb://localhost:27017
测试套件需要 mongod 运行。请注意,源连接器需要副本集。
罗斯·劳利 [email protected]
原始 Sink 连接器作者:Hans-Peter Grahsl:https://github.com/hpgrahsl/kafka-connect-mongodb
其他贡献者可以在这里找到。
./gradlew publishArchives
- 发布到 Maven
./gradlew createConfluentArchive
- 创建汇合存档/github 发布 zip 文件
在 IntelliJ 中运行代码需要几个手动配置步骤:
错误: java: cannot find symbol. symbol: variable Versions
修复:以下任意一项:
运行compileBuildConfig
任务:例如: ./gradlew compileBuildConfig
或通过Gradle>mongo-kafka>Tasks>other>compileBuildConfig
将compileBuildConfig
设置为Before Build 执行。通过 Gradle > 任务 > 其他 > 右键单击compileBuildConfig - 单击“在构建之前执行”
将所有构建操作委托给 Gradle:设置 > 构建、执行、部署 > 构建工具 > Gradle > Runner - 勾选“将 IDE 构建/运行操作委托给 gradle”
可以实现com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider
接口来提供com.mongodb.MongoCredential
类型的对象,该对象包装在为接收器和源连接器构建的 MongoClient 中。需要设置以下属性 -
mongo.custom.auth.mechanism.enable - set to true. mongo.custom.auth.mechanism.providerClass - qualified class name of the implementation class
其他属性可以根据需要在实现类中设置。连接器初始化时会调用实现类的 init 和 validate 方法。
当对图集使用 MONGODB-AWS 身份验证机制时,可以指定以下配置 -
"connection.uri": "mongodb+srv:///?authMechanism=MONGODB-AWS" "mongo.custom.auth.mechanism.enable": true, "mongo.custom.auth.mechanism.providerClass": "sample.AwsAssumeRoleCredentialProvider" "mongodbaws.auth.mechanism.roleArn": "arn:aws:iam:: :role/ "
此处, sample.AwsAssumeRoleCredentialProvider
必须在类路径上可用。 mongodbaws.auth.mechanism.roleArn
是可由sample.AwsAssumeRoleCredentialProvider
读取的自定义属性的示例。
这是可以工作的示例代码。
公共类 AwsAssumeRoleCredentialProvider 实现 CustomCredentialProvider { 公共 AwsAssumeRoleCredentialProvider() {} @Override public MongoCredential getCustomCredential(Map, ?> map) { AWSCredentialsProvider provider = new DefaultAWSCredentialsProviderChain(); } 供应商awsFreshCredentialSupplier = () -> { AWSSecurityTokenService stsClient = AWSSecurityTokenServiceAsyncClientBuilder.standard() .withCredentials(提供者) .withRegion("us-east-1") 。建造(); AssumeRoleRequest takeRoleRequest = new AssumeRoleRequest().withDurationSeconds(3600) .withRoleArn((String)map.get("mongodbaws.auth.mechanism.roleArn")) .withRoleSessionName("Test_Session"); AssumeRoleResult 假设RoleResult = stsClient.assumeRole(assumeRoleRequest); 凭证 creds = ShouldRoleResult.getCredentials(); // 添加代码以获取新凭证 return new AwsCredential(creds.getAccessKeyId(), creds.getSecretAccessKey(), creds.getSessionToken()); }; 返回 MongoCredential.createAwsCredential(null, null) .withMechanismProperty(MongoCredential.AWS_CREDENTIAL_PROVIDER_KEY, awsFreshCredentialSupplier); } @Override public void validate(Map, ?> map) { String roleArn = (String) map.get("mongodbaws.auth.mechanism.roleArn"); if (StringUtils.isNullOrEmpty(roleArn)) { throw new RuntimeException("为 customProperty 设置的值无效"); } } @Override public void init(Map, ?> map) { } }
这是可以构建包含 AwsAssumeRoleCredentialProvider 的完整 jar 的 pom.xml
<项目 xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http:// /maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <模型版本>4.0.0模型版本>示例 AwsAssumeRoleCredentialProvider <版本>1.0-SNAPSHOT版本> <构建> <插件> <插件>org.apache.maven.plugins maven-shade-plugin <版本>3.5.3版本> <配置> 配置> <处决> <执行> <阶段>封装阶段> <目标> <目标>阴影目标> 目标> 执行> 处决> 插件> 插件> 构建> <依赖关系> <依赖关系>org.mongodb mongodb-driver-sync <版本>5.1.0版本> 依赖> <依赖关系>com.amazonaws aws-java-sdk <版本>1.12.723版本> 依赖> <依赖关系>org.slf4j slf4j-jdk14 <版本>1.7.28版本> 依赖> <依赖关系>kafka-connect kafka-connect <范围>系统范围> <版本>1.12.1-SNAPSHOT版本> <系统路径>/Users/jagadish.nallapaneni/mongo-kafka/build/libs/mongo-kafka-connect-1.12.1-SNAPSHOT-confluence.jar 依赖> 依赖关系> <属性>17 17 UTF-8 属性> 项目>