Jkes เป็นเฟรมเวิร์กการค้นหาที่ใช้ Java, Kafka และ ElasticSearch Jkes จัดเตรียมการแมปวัตถุ/เอกสารสไตล์ JPA ที่ขับเคลื่อนด้วยคำอธิบายประกอบ โดยใช้ REST API สำหรับการค้นหาเอกสาร
คุณสามารถอ้างถึงโปรเจ็กต์ jkes-integration-test
เพื่อเรียนรู้วิธีใช้เฟรมเวิร์ก jkes ได้อย่างรวดเร็ว jkes-integration-test
เป็น Spring Boot Application ที่เราใช้เพื่อทดสอบความสมบูรณ์ของฟังก์ชัน
jkes-index-connector
และ jkes-delete-connector
ให้กับคลาสพาธของ Kafka Connectsudo bin/elasticsearch-plugin install analysis-smartcn
@ EnableAspectJAutoProxy
@ EnableJkes
@ Configuration
public class JkesConfig {
@ Bean
public PlatformTransactionManager transactionManager ( EntityManagerFactory factory , EventSupport eventSupport ) {
return new SearchPlatformTransactionManager ( new JpaTransactionManager ( factory ), eventSupport );
}
}
@ Component
@ Configuration
public class JkesConf extends DefaultJkesPropertiesImpl {
@ PostConstruct
public void setUp () {
Config . setJkesProperties ( this );
}
@ Override
public String getKafkaBootstrapServers () {
return "k1-test.com:9292,k2-test.com:9292,k3-test.com:9292" ;
}
@ Override
public String getKafkaConnectServers () {
return "http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084" ;
}
@ Override
public String getEsBootstrapServers () {
return "http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200" ;
}
@ Override
public String getDocumentBasePackage () {
return "com.timeyang.jkes.integration_test.domain" ;
}
@ Override
public String getClientId () {
return "integration_test" ;
}
}
สิ่งนี้สามารถยืดหยุ่นได้มาก หากคุณใช้ Spring Boot คุณสามารถใช้ @ConfigurationProperties
เพื่อจัดเตรียมการกำหนดค่า
Spring MVC
คุณสามารถเพิ่มจุดสิ้นสุดดัชนีได้ดังนี้ @ RestController
@ RequestMapping ( "/api/search" )
public class SearchEndpoint {
private Indexer indexer ;
@ Autowired
public SearchEndpoint ( Indexer indexer ) {
this . indexer = indexer ;
}
@ RequestMapping ( value = "/start_all" , method = RequestMethod . POST )
public void startAll () {
indexer . startAll ();
}
@ RequestMapping ( value = "/start/{entityClassName:.+}" , method = RequestMethod . POST )
public void start ( @ PathVariable ( "entityClassName" ) String entityClassName ) {
indexer . start ( entityClassName );
}
@ RequestMapping ( value = "/stop_all" , method = RequestMethod . PUT )
public Map < String , Boolean > stopAll () {
return indexer . stopAll ();
}
@ RequestMapping ( value = "/stop/{entityClassName:.+}" , method = RequestMethod . PUT )
public Boolean stop ( @ PathVariable ( "entityClassName" ) String entityClassName ) {
return indexer . stop ( entityClassName );
}
@ RequestMapping ( value = "/progress" , method = RequestMethod . GET )
public Map < String , IndexProgress > getProgress () {
return indexer . getProgress ();
}
}
ใช้คำอธิบายประกอบที่เกี่ยวข้องภายใต้แพ็คเกจ com.timeyang.jkes.core.annotation
เพื่อทำเครื่องหมายเอนทิตี
@ lombok . Data
@ Entity
@ Document
public class Person extends AuditedEntity {
// @Id will be identified automatically
// @Field(type = FieldType.Long)
@ Id
@ GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
@ MultiFields (
mainField = @ Field ( type = FieldType . Text ),
otherFields = {
@ InnerField ( suffix = "raw" , type = FieldType . Keyword ),
@ InnerField ( suffix = "english" , type = FieldType . Text , analyzer = "english" )
}
)
private String name ;
@ Field ( type = FieldType . Keyword )
private String gender ;
@ Field ( type = FieldType . Integer )
private Integer age ;
// don't add @Field to test whether ignored
// @Field(type = FieldType.Text)
private String description ;
@ Field ( type = FieldType . Object )
@ ManyToOne ( fetch = FetchType . EAGER )
@ JoinColumn ( name = "group_id" )
private PersonGroup personGroup ;
}
@ lombok . Data
@ Entity
@ Document ( type = "person_group" , alias = "person_group_alias" )
public class PersonGroup extends AuditedEntity {
@ Id
@ GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
private String name ;
private String interests ;
@ OneToMany ( fetch = FetchType . EAGER , cascade = CascadeType . ALL , mappedBy = "personGroup" , orphanRemoval = true )
private List < Person > persons ;
private String description ;
@ DocumentId
@ Field ( type = FieldType . Long )
public Long getId () {
return id ;
}
@ MultiFields (
mainField = @ Field ( type = FieldType . Text ),
otherFields = {
@ InnerField ( suffix = "raw" , type = FieldType . Keyword ),
@ InnerField ( suffix = "english" , type = FieldType . Text , analyzer = "english" )
}
)
public String getName () {
return name ;
}
@ Field ( type = FieldType . Text )
public String getInterests () {
return interests ;
}
@ Field ( type = FieldType . Nested )
public List < Person > getPersons () {
return persons ;
}
/**
* 不加Field注解,测试序列化时是否忽略
*/
public String getDescription () {
return description ;
}
}
เมื่ออัปเดตเอนทิตี เอกสารจะถูกสร้างดัชนีใน ElasticSearch โดยอัตโนมัติ เมื่อลบเอนทิตี เอกสารจะถูกลบออกจาก ElasticSearch โดยอัตโนมัติ
เริ่มบริการค้นหา jkes-search-service บริการค้นหาเป็น Spring Boot Application ที่ให้ API การค้นหาที่เหลือและทำงานบนพอร์ต 9000 ตามค่าเริ่มต้น
curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search?from=3&size=10
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"nested": {
"path": "persons",
"score_mode": "avg",
"query": {
"bool": {
"must": [
{
"range": {
"persons.age": {
"gt": 5
}
}
}
]
}
}
}
}
}
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"match": {
"interests": "Hadoop"
}
}
}
{
"query": {
"bool" : {
"must" : {
"match" : { "interests" : "Hadoop" }
},
"filter": {
"term" : { "name.raw" : "name0" }
},
"should" : [
{ "match" : { "interests" : "Flink" } },
{
"nested" : {
"path" : "persons",
"score_mode" : "avg",
"query" : {
"bool" : {
"must" : [
{ "match" : {"persons.name" : "name40"} },
{ "match" : {"persons.interests" : "interests"} }
],
"must_not" : {
"range" : {
"age" : { "gte" : 50, "lte" : 60 }
}
}
}
}
}
}
],
"minimum_should_match" : 1,
"boost" : 1.0
}
}
}
integration_test_person_group/person_group/_search
{
"_source": false,
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"_source": {
"includes": [ "name", "persons.*" ],
"excludes": [ "date*", "version", "persons.age" ]
},
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"prefix" : { "name" : "name" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"wildcard" : { "name" : "name*" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"regexp":{
"name": "na.*17"
}
}
}
การจัดทำดัชนีทำงานอย่างไร:
@Document
และสร้างเมตาดาต้าสำหรับเอนทิตีเหล่านั้นindex
และ mapping
ในรูปแบบ Json จากนั้นสร้าง/อัปเดตการกำหนด index
ผ่าน ElasticSearch Java Rest Client
Kafka ElasticSearch Connector
สำหรับแต่ละเอกสาร ซึ่งใช้ในการสร้าง/อัปเดตเอกสารJkes Deleter Connector
สำหรับทั้งโปรเจ็กต์ สำหรับการลบเอกสาร* save(*)
เป็น SaveEvent
และบันทึกลงใน EventContainer
ใช้พารามิเตอร์ของเมธอด (* delete*(..)
เพื่อสร้าง DeleteEvent/DeleteAllEvent
และบันทึกลงใน EventContainer
JkesKafkaProducer
เพื่อส่งเอนทิตีใน SaveEvent
ไปยัง Kafka จะใช้ JkesJsonSerializer
ที่เราจัดเตรียมไว้ให้เพื่อทำให้ข้อมูลที่ระบุเป็นอนุกรม จากนั้นจึงส่งไปที่ KafkaSaveEvent
ตรงที่ DeleteEvent
จะถูกทำให้เป็นอนุกรมโดยตรง จากนั้นจึงส่งไปยัง Kafka แทนที่จะส่งสำเนาข้อมูลเพียงอย่างเดียวSaveEvent
และ DeleteEvent
ตรงที่ DeleteAllEvent
จะไม่ส่งข้อมูลไปยัง Kafka แต่จะลบ index
ที่เกี่ยวข้องโดยตรงผ่าน ElasticSearch Java Rest Client
จากนั้นสร้างดัชนีใหม่และรีสตาร์ท Kafka ElasticSearch Connector
แบบสอบถามทำงานอย่างไร:
json
ดำเนินการประมวลผลล่วงหน้าบางส่วน และส่งต่อไปยัง ElasticSearch โดยใช้ ElasticSearch Java Rest Client
แยกวิเคราะห์การตอบสนอง และส่งกลับไปยังไคลเอ็นต์หลังจากประมวลผลเพิ่มเติม jkes-core
เป็นส่วนหลักของ jkes
ทั้งหมด ส่วนใหญ่ประกอบด้วยฟังก์ชันต่อไปนี้:
annotation
จัดเตรียมคำอธิบายประกอบหลักของ jkeselasticsearch
สรุปการดำเนินการที่เกี่ยวข้องกับ elasticsearch
เช่น การสร้าง/การอัปเดตดัชนีสำหรับเอกสารทั้งหมด และการอัปเดตการแมปkafka
มี Kafka Producer, Kafka Json Serializer, Kafka Connect Clientmetadata
มีการสร้างข้อมูลเมตาคำอธิบายประกอบหลักและแบบจำลองที่มีโครงสร้างevent
มีโมเดลเหตุการณ์และคอนเทนเนอร์exception
จัดให้มีข้อยกเว้น Jkes ทั่วไปhttp
สรุปคำขอ http json ทั่วไปตาม Apache Http Client
support
เปิดเผยการสนับสนุนการกำหนดค่าหลักของ Jkesutil
มีคลาสเครื่องมือบางอย่างเพื่ออำนวยความสะดวกในการพัฒนา เช่น: Asserts, ClassUtils, DocumentUtils, IOUtils, JsonUtils, ReflectionUtils, StringUtils jkes-boot
ใช้เพื่อรวมเข้ากับเฟรมเวิร์กโอเพ่นซอร์สของบุคคลที่สาม
ปัจจุบัน เรามีการบูรณาการกับ spring data jpa
ผ่าน jkes-spring-data-jpa
ด้วยการใช้กลไก AOP ของ Spring วิธี Repository
จะถูกดักและ SaveEvent/DeleteEvent/DeleteAllEvent
จะถูกสร้างและบันทึกลงใน EventContainer
ด้วยการใช้ SearchPlatformTransactionManager
ที่เรามีให้ เราจะรวมตัวจัดการธุรกรรมที่ใช้กันทั่วไป (เช่น JpaTransactionManager
) เพื่อจัดเตรียมฟังก์ชันการสกัดกั้นธุรกรรม
ในเวอร์ชันต่อๆ ไป เราจะจัดให้มีการบูรณาการกับเฟรมเวิร์กเพิ่มเติม
คำอธิบาย jkes-spring-data-jpa
:
ContextSupport
ใช้เพื่อรับ Repository Bean
จากโรงงาน bean@EnableJkes
ช่วยให้ไคลเอนต์เปิดใช้งานฟังก์ชัน Jkes ได้อย่างง่ายดาย และจัดเตรียมโมเดลการกำหนดค่าที่สอดคล้องกับ SpringEventSupport
จัดการรายละเอียดของเหตุการณ์ สร้างเหตุการณ์ที่เกี่ยวข้องและเก็บไว้ใน EventContainer
เมื่อบันทึกและลบข้อมูล และประมวลผลเหตุการณ์ที่เกี่ยวข้องเมื่อมีการคอมมิตและย้อนกลับธุรกรรมSearchPlatformTransactionManager
ล้อมตัวจัดการธุรกรรมของลูกค้าและเพิ่ม回调hook
เมื่อมีการทำธุรกรรมและย้อนกลับaudit
มีคลาสพาเรนต์ AuditedEntity
แบบง่ายเพื่ออำนวยความสะดวกในการเพิ่มฟังก์ชันการตรวจสอบ ข้อมูลเวอร์ชันสามารถใช้ร่วมกับกลไกเวอร์ชันของ ElasticSearch
เพื่อให้แน่ใจว่าข้อมูลเอกสารที่หมดอายุจะไม่ถูกจัดทำดัชนีexception
สรุปข้อยกเว้นทั่วไปintercept
มีจุดตัดและแง่มุมของ AOPindex
มีฟังก์ชัน全量索引
ในปัจจุบัน เรามีกลไกการจัดทำดัชนีตาม线程池
และกลไกการจัดทำดัชนีตาม ForkJoin
ในเวอร์ชันต่อๆ ไป เราจะปรับโครงสร้างโค้ดใหม่และเพิ่มแบบจำลอง生产者-消费者
โดยยึดตาม阻塞队列
เพื่อให้เกิดประสิทธิภาพการทำงานพร้อมกัน jkes-services
ใช้เพื่อให้บริการบางอย่างเป็นหลัก ปัจจุบัน jkes-services
ให้บริการดังต่อไปนี้:
jkes-delete-connector
jkes-delete-connector
เป็น Kafka Connector
ที่ใช้เพื่อรับเหตุการณ์การลบดัชนี ( DeleteEvent
) จากคลัสเตอร์ kafka จากนั้นใช้ Jest Client
เพื่อลบเอกสารที่เกี่ยวข้องใน ElasticSearch
ด้วยความช่วยเหลือของ API ผู้ดูแลระบบส่วนที่เหลือของ Kafka Connect เราจึงใช้ฟังก์ชันการลบเอกสารบนแพลตฟอร์มที่มีผู้เช่าหลายรายได้อย่างง่ายดาย เพียงเริ่มต้น jkes-delete-connector
สำหรับแต่ละโปรเจ็กต์ และการลบเอกสารสำหรับโปรเจ็กต์นั้นจะได้รับการจัดการโดยอัตโนมัติ วิธีนี้จะหลีกเลี่ยงว่าทุกครั้งที่เราเริ่มโปรเจ็กต์ใหม่ เราจะต้องเริ่ม Kafka Consumer ด้วยตนเองเพื่อจัดการการลบเอกสารของโปรเจ็กต์ แม้ว่างานประเภทนี้จะลดลงผ่านการสมัครสมาชิกปกติ แต่ก็ยังไม่ยืดหยุ่นมาก
jkes-search-service
jkes-search-service
เป็นบริการค้นหาแบบพักผ่อนซึ่งมี API การสืบค้นส่วนที่เหลือหลายเวอร์ชัน บริการสืบค้นมี API หลายเวอร์ชันสำหรับการวิวัฒนาการและความเข้ากันได้ของ APIjkes-search-service
รองรับการค้นหาสไตล์ URI และการค้นหาสไตล์เนื้อหาคำขอ JSONjson
ดำเนินการประมวลผลล่วงหน้าบางส่วน และส่งต่อไปยัง ElasticSearch โดยใช้ ElasticSearch Java Rest Client
แยกวิเคราะห์การตอบสนอง และส่งกลับไปยังไคลเอ็นต์หลังจากประมวลผลเพิ่มเติม ในอนาคต เราจะสร้างคลัสเตอร์ดัชนีตาม zookeeper
เพื่อมอบฟังก์ชันการจัดการดัชนีคลัสเตอร์
jkes-integration-test
เป็นโครงการทดสอบการรวม Spring Boot สำหรับ功能测试
วัด吞吐率
ของการดำเนินการทั่วไปบางอย่างด้วย
หากต้องการสร้างเวอร์ชันสำหรับการพัฒนา คุณจะต้องมี Kafka เวอร์ชันล่าสุด คุณสามารถสร้าง jkes ด้วย Maven ได้โดยใช้เฟสวงจรการใช้งานมาตรฐาน
โครงการนี้ได้รับอนุญาตภายใต้ Apache License 2.0