ตัวเชื่อมต่อ MongoDB Kafka อย่างเป็นทางการ
เอกสารประกอบสำหรับตัวเชื่อมต่อมีอยู่ที่https://docs.mongodb.com/kafka-connector/current/
ตัวเชื่อมต่อจะถูกเผยแพร่บน Maven Central
สำหรับปัญหา คำถาม หรือข้อเสนอแนะเกี่ยวกับ MongoDB Kafka Connector โปรดดูช่องทางการสนับสนุนของเรา โปรดอย่าส่งอีเมลถึงผู้พัฒนาตัวเชื่อมต่อ Kafka โดยตรงเพื่อแจ้งปัญหาหรือคำถาม คุณมีแนวโน้มที่จะได้รับคำตอบในฟอรัมชุมชน MongoDB มากขึ้น
อย่างน้อยที่สุด โปรดระบุเวอร์ชันของไดรเวอร์ที่คุณใช้อยู่ในคำอธิบายของคุณ หากคุณมีปัญหาในการเชื่อมต่อ การวางการกำหนดค่าตัวเชื่อมต่อ Kafka ก็มักจะเป็นประโยชน์เช่นกัน คุณควรตรวจสอบบันทึกแอปพลิเคชันของคุณเพื่อดูข้อยกเว้นที่เกี่ยวข้องกับการเชื่อมต่อและโพสต์สิ่งเหล่านั้นด้วย
คิดว่าคุณได้พบข้อบกพร่องหรือไม่? ต้องการดูคุณสมบัติใหม่ในไดรเวอร์ Kafka หรือไม่? โปรดเปิดเคสในเครื่องมือการจัดการปัญหาของเรา JIRA:
สร้างบัญชีและเข้าสู่ระบบ
นำทางไปยังโครงการ KAFKA
คลิก สร้างปัญหา - โปรดระบุข้อมูลให้มากที่สุดเท่าที่เป็นไปได้เกี่ยวกับประเภทปัญหาและวิธีการทำซ้ำ
รายงานข้อผิดพลาดใน JIRA สำหรับตัวเชื่อมต่อเป็น แบบสาธารณะ
หากคุณระบุช่องโหว่ด้านความปลอดภัยในตัวเชื่อมต่อหรือโปรเจ็กต์ MongoDB อื่นๆ โปรดรายงานตามคำแนะนำที่นี่
MongoDB Kafka Connector เป็นไปตามการกำหนดเวอร์ชันเชิงความหมาย ดูบันทึกการเปลี่ยนแปลงสำหรับข้อมูลเกี่ยวกับการเปลี่ยนแปลงระหว่างรุ่นต่างๆ
ต้องใช้ Java 8+ เพื่อสร้างและคอมไพล์ซอร์ส วิธีสร้างและทดสอบไดรเวอร์:
$ git clone https://github.com/mongodb/mongo-kafka.git $ cd mongo-kafka $ ./gradlew check -Dorg.mongodb.test.uri=mongodb://localhost:27017
ชุดทดสอบต้องการให้ Mongod ทำงาน หมายเหตุ ตัวเชื่อมต่อต้นทางจำเป็นต้องมีชุดจำลอง
รอสส์ ลอว์ลีย์ [email protected]
ตัวเชื่อมต่อ Sink ดั้งเดิมทำงานโดย: Hans-Peter Grahsl : https://github.com/hpgrahsl/kafka-connect-mongodb
สามารถดูผู้มีส่วนร่วมเพิ่มเติมได้ที่นี่
./gradlew publishArchives
- เผยแพร่ไปยัง Maven
./gradlew createConfluentArchive
- สร้างไฟล์ zip ที่ไหลมารวมกัน / github release
จำเป็นต้องมีขั้นตอนการกำหนดค่าด้วยตนเองสองสามขั้นตอนเพื่อเรียกใช้โค้ดใน IntelliJ:
ข้อผิดพลาด: java: cannot find symbol. symbol: variable Versions
การแก้ไข: สิ่งใดสิ่งหนึ่งต่อไปนี้:
รันงาน compileBuildConfig
: เช่น ./gradlew compileBuildConfig
หรือผ่าน Gradle > mongo-kafka > Tasks > other > CompileBuildConfig
ตั้งค่า compileBuildConfig
ให้ดำเนินการก่อน Build ผ่าน Gradle > งาน > อื่น ๆ > คลิกขวาที่รวบรวม BuildConfig - คลิกที่ "ดำเนินการก่อนสร้าง"
มอบหมายการดำเนินการบิลด์ทั้งหมดให้กับ Gradle: การตั้งค่า > สร้าง, การดำเนินการ, การปรับใช้ > เครื่องมือสร้าง > Gradle > Runner - ทำเครื่องหมายที่ "มอบหมายการดำเนินการสร้าง/รัน IDE เพื่อไล่ระดับ"
อินเทอร์เฟซ com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider
สามารถนำไปใช้เพื่อจัดเตรียมอ็อบเจ็กต์ประเภท com.mongodb.MongoCredential
ซึ่งรวมอยู่ใน MongoClient ที่สร้างขึ้นสำหรับตัวเชื่อมต่อ sink และแหล่งที่มา จำเป็นต้องตั้งค่าคุณสมบัติต่อไปนี้ -
mongo.custom.auth.mechanism.enable - set to true. mongo.custom.auth.mechanism.providerClass - qualified class name of the implementation class
คุณสมบัติเพิ่มเติมและสามารถตั้งค่าได้ตามต้องการภายในคลาสการใช้งาน วิธีการเริ่มต้นและตรวจสอบความถูกต้องของคลาสการใช้งานจะถูกเรียกเมื่อตัวเชื่อมต่อเริ่มต้น
เมื่อใช้กลไกการตรวจสอบสิทธิ์ MONGODB-AWS สำหรับ Atlas เราสามารถระบุการกำหนดค่าต่อไปนี้ -
"connection.uri": "mongodb+srv:///?authMechanism=MONGODB-AWS" "mongo.custom.auth.mechanism.enable": true, "mongo.custom.auth.mechanism.providerClass": "sample.AwsAssumeRoleCredentialProvider" "mongodbaws.auth.mechanism.roleArn": "arn:aws:iam:: :role/ "
ที่นี่ sample.AwsAssumeRoleCredentialProvider
จะต้องพร้อมใช้งานบน classpath mongodbaws.auth.mechanism.roleArn
คือตัวอย่างของคุณสมบัติแบบกำหนดเองที่ sample.AwsAssumeRoleCredentialProvider
สามารถอ่านได้
นี่คือโค้ดตัวอย่างที่สามารถใช้งานได้
AwsAssumeRoleCredentialProvider คลาสสาธารณะใช้ CustomCredentialProvider { public AwsAssumeRoleCredentialProvider() {} @Override สาธารณะ MongoCredential getCustomCredential(แผนที่ , ?> แผนที่) { ผู้ให้บริการ AWSCredentialsProvider = ใหม่ DefaultAWSCredentialsProviderChain(); ซัพพลายเออร์awsFreshCredentialSupplier = () -> { AWSSecurityTokenService stsClient = AWSSecurityTokenServiceAsyncClientBuilder.standard() .withCredentials(ผู้ให้บริการ) .withRegion("us-east-1") .สร้าง(); AssumeRoleRequest ถือว่า AssumeRoleRequest = ใหม่ AssumeRoleRequest().withDurationSeconds(3600) .withRoleArn((สตริง)map.get("mongodbaws.auth.mechanism.roleArn")) .withRoleSessionName("Test_Session"); AssumeRoleResult assumeRoleResult = stsClient.assumeRole(assumeRoleRequest); หนังสือรับรอง =ถือว่าRoleResult.getCredentials(); // เพิ่มโค้ดของคุณเพื่อดึงข้อมูลรับรองใหม่ ส่งคืน AwsCredential ใหม่(creds.getAccessKeyId(), creds.getSecretAccessKey(), creds.getSessionToken()); - กลับ MongoCredential.createAwsCredential (null, null) .withMechanismProperty(MongoCredential.AWS_CREDENTIAL_PROVIDER_KEY, awsFreshCredentialSupplier); } @แทนที่การตรวจสอบโมฆะสาธารณะ (แผนที่ , ?> แผนที่) { String RoleArn = (สตริง) map.get("mongodbaws.auth.mechanism.roleArn"); if (StringUtils.isNullOrEmpty(roleArn)) { โยน RuntimeException ใหม่ ("ชุดค่าไม่ถูกต้องสำหรับ customProperty"); - } @แทนที่โมฆะสาธารณะ init(แผนที่, ?> แผนที่) { - -
นี่คือ pom.xml ที่สามารถสร้าง jar ที่สมบูรณ์ซึ่งมี AwsAssumeRoleCredentialProvider
<โครงการ xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation = "http:/ /maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">4.0.0 ตัวอย่าง AwsAssumeRoleCredentialProvider 1.0-SNAPSHOT <สร้าง> <ปลั๊กอิน> <ปลั๊กอิน>org.apache.maven.plugins maven-shade-plugin <เวอร์ชั่น>3.5.3เวอร์ชั่น> <การกำหนดค่า> การกำหนดค่า> <การดำเนินการ> <การดำเนินการ> <เฟส>แพ็คเกจเฟส> <เป้าหมาย> <เป้าหมาย>สีเป้าหมาย> เป้าหมาย> ดำเนินการ> การดำเนินการ> ปลั๊กอิน> ปลั๊กอิน> สร้าง> <การพึ่งพา> <การพึ่งพา>org.mongodb mongodb-ไดร์เวอร์ซิงค์ <เวอร์ชัน>5.1.0เวอร์ชัน> การพึ่งพา> <การพึ่งพา>com.amazonaws aws-java-sdk <เวอร์ชัน>1.12.723เวอร์ชัน> การพึ่งพา> <การพึ่งพา>org.slf4j slf4j-jdk14 <เวอร์ชั่น>1.7.28เวอร์ชั่น> การพึ่งพา> <การพึ่งพา>เชื่อมต่อ kafka เชื่อมต่อ kafka <ขอบเขต>ระบบขอบเขต>1.12.1-SNAPSHOT /Users/jagadish.nallapaneni/mongo-kafka/build/libs/mongo-kafka-connect-1.12.1-SNAPSHOT-confluent.jar การพึ่งพา> การพึ่งพา> <คุณสมบัติ>17 17 UTF-8 คุณสมบัติ> โครงการ>