O conector oficial do MongoDB Kafka.
A documentação do conector está disponível em https://docs.mongodb.com/kafka-connector/current/
O conector será publicado no maven central.
Para problemas, dúvidas ou comentários sobre o MongoDB Kafka Connector, consulte nossos canais de suporte. Não envie e-mail diretamente a nenhum dos desenvolvedores do conector Kafka com problemas ou perguntas - é mais provável que você obtenha uma resposta nos Fóruns da Comunidade MongoDB.
No mínimo, inclua na sua descrição a versão exata do driver que você está usando. Se você estiver tendo problemas de conectividade, muitas vezes também é útil colar a configuração do conector Kafka. Você também deve verificar os logs do seu aplicativo em busca de exceções relacionadas à conectividade e publicá-las também.
Acha que encontrou um bug? Quer ver um novo recurso no driver Kafka? Abra um caso em nossa ferramenta de gerenciamento de problemas, JIRA:
Crie uma conta e faça login.
Navegue até o projeto KAFKA.
Clique em Criar problema - Forneça o máximo de informações possível sobre o tipo de problema e como reproduzi-lo.
Os relatórios de bugs no JIRA para o conector são públicos .
Se você identificou uma vulnerabilidade de segurança em um conector ou qualquer outro projeto do MongoDB, relate-a de acordo com as instruções aqui.
O MongoDB Kafka Connector segue o versionamento semântico. Consulte o changelog para obter informações sobre mudanças entre versões.
Java 8+ é necessário para construir e compilar o código-fonte. Para construir e testar o driver:
$ git clone https://github.com/mongodb/mongo-kafka.git $ cd mongo-kafka $ ./gradlew check -Dorg.mongodb.test.uri=mongodb://localhost:27017
O conjunto de testes requer que o mongod esteja em execução. Observe que o conector de origem requer um replicaSet.
Ross Lawley [email protected]
Conector Sink original trabalhado por: Hans-Peter Grahsl: https://github.com/hpgrahsl/kafka-connect-mongodb
Colaboradores adicionais podem ser encontrados aqui.
./gradlew publishArchives
- publica no Maven
./gradlew createConfluentArchive
- cria o arquivo confluente / arquivo zip de lançamento do github
Algumas etapas de configuração manual são necessárias para executar o código no IntelliJ:
Erro: java: cannot find symbol. symbol: variable Versions
Correções: Qualquer um dos seguintes:
Execute a tarefa compileBuildConfig
: por exemplo: ./gradlew compileBuildConfig
ou via Gradle > mongo-kafka > Tarefas > outro > compileBuildConfig
Defina compileBuildConfig
para executar antes da compilação. via Gradle> Tarefas> outro> clique com o botão direito em compileBuildConfig - clique em "Executar antes de construir"
Delegue todas as ações de compilação ao Gradle: Configurações > Compilação, Execução, Implantação > Ferramentas de compilação > Gradle > Runner - marque "Delegar ações de compilação/execução do IDE para gradle"
A interface com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider
pode ser implementada para fornecer um objeto do tipo com.mongodb.MongoCredential
que é agrupado no MongoClient que é construído para o coletor e o conector de origem. As seguintes propriedades precisam ser definidas -
mongo.custom.auth.mechanism.enable - set to true. mongo.custom.auth.mechanism.providerClass - qualified class name of the implementation class
Propriedades adicionais podem ser definidas conforme necessário na classe de implementação. Os métodos init e validar da classe de implementação são chamados quando o conector é inicializado.
Ao usar o mecanismo de autenticação MONGODB-AWS para atlas, pode-se especificar a seguinte configuração -
"connection.uri": "mongodb+srv:///?authMechanism=MONGODB-AWS" "mongo.custom.auth.mechanism.enable": true, "mongo.custom.auth.mechanism.providerClass": "sample.AwsAssumeRoleCredentialProvider" "mongodbaws.auth.mechanism.roleArn": "arn:aws:iam:: :role/ "
Aqui, sample.AwsAssumeRoleCredentialProvider
deve estar disponível no caminho de classe. mongodbaws.auth.mechanism.roleArn
é um exemplo de propriedades customizadas que podem ser lidas por sample.AwsAssumeRoleCredentialProvider
.
Aqui está um exemplo de código que pode funcionar.
classe pública AwsAssumeRoleCredentialProvider implementa CustomCredentialProvider { public AwsAssumeRoleCredentialProvider() {} @Override public MongoCredential getCustomCredential(Map, ?> map) { AWSCredentialsProvider Provider = new DefaultAWSCredentialsProviderChain(); FornecedorawsFreshCredentialSupplier = () -> { AWSSecurityTokenService stsClient = AWSSecurityTokenServiceAsyncClientBuilder.standard() .withCredentials(provedor) .withRegion("us-east-1") .construir(); AssumeRoleRequest assumeRoleRequest = new AssumeRoleRequest().withDurationSeconds(3600) .withRoleArn((String)map.get("mongodbaws.auth.mechanism.roleArn")) .withRoleSessionName("Test_Session"); AssumeRoleResult assumeRoleResult = stsClient.assumeRole(assumeRoleRequest); Credenciais cred = assumeRoleResult.getCredentials(); // Adicione seu código para buscar novas credenciais return new AwsCredential(creds.getAccessKeyId(), creds.getSecretAccessKey(), creds.getSessionToken()); }; retornar MongoCredential.createAwsCredential (nulo, nulo) .withMechanismProperty(MongoCredential.AWS_CREDENTIAL_PROVIDER_KEY, awsFreshCredentialSupplier); } @Override public void validar(Map, ?> map) { String roleArn = (String) map.get("mongodbaws.auth.mechanism.roleArn"); if (StringUtils.isNullOrEmpty(roleArn)) { throw new RuntimeException("Valor inválido definido para customProperty"); } } @Override public void init(Map, ?> map) { } }
Aqui está o pom.xml que pode construir o jar completo contendo o AwsAssumeRoleCredentialProvider
4.0.0 amostra AwsAssumeRoleCredentialProvider 1.0-SNAPSHOT org.apache.maven.plugins plugin maven-shade 3.5.3 pacote sombra org.mongodb mongodb-driver-sync 5.1.0 com.amazonaws aws-java-sdk 1.12.723 org.slf4j slf4j-jdk14 1.7.28 kafka-connect kafka-connect sistema 1.12.1-SNAPSHOT /Users/jagadish.nallapaneni/mongo-kafka/build/libs/mongo-kafka-connect-1.12.1-SNAPSHOT-confluent.jar 17 17 UTF-8