Jkes adalah kerangka pencarian berdasarkan Java, Kafka, dan ElasticSearch. Jkes menyediakan pemetaan objek/dokumen bergaya JPA berbasis anotasi, menggunakan REST API untuk pencarian dokumen.
Anda dapat merujuk ke proyek jkes-integration-test
untuk mempelajari cara menggunakan kerangka kerja jkes dengan cepat. jkes-integration-test
adalah Aplikasi Spring Boot yang kami gunakan untuk menguji integritas fungsional.
jkes-index-connector
dan jkes-delete-connector
ke classpath Kafka Connectsudo bin/elasticsearch-plugin install analysis-smartcn
@ EnableAspectJAutoProxy
@ EnableJkes
@ Configuration
public class JkesConfig {
@ Bean
public PlatformTransactionManager transactionManager ( EntityManagerFactory factory , EventSupport eventSupport ) {
return new SearchPlatformTransactionManager ( new JpaTransactionManager ( factory ), eventSupport );
}
}
@ Component
@ Configuration
public class JkesConf extends DefaultJkesPropertiesImpl {
@ PostConstruct
public void setUp () {
Config . setJkesProperties ( this );
}
@ Override
public String getKafkaBootstrapServers () {
return "k1-test.com:9292,k2-test.com:9292,k3-test.com:9292" ;
}
@ Override
public String getKafkaConnectServers () {
return "http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084" ;
}
@ Override
public String getEsBootstrapServers () {
return "http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200" ;
}
@ Override
public String getDocumentBasePackage () {
return "com.timeyang.jkes.integration_test.domain" ;
}
@ Override
public String getClientId () {
return "integration_test" ;
}
}
Ini bisa sangat fleksibel. Jika Anda menggunakan Spring Boot, Anda dapat menggunakan @ConfigurationProperties
untuk menyediakan konfigurasi
Spring MVC
, Anda dapat menambahkan titik akhir indeks sebagai berikut @ RestController
@ RequestMapping ( "/api/search" )
public class SearchEndpoint {
private Indexer indexer ;
@ Autowired
public SearchEndpoint ( Indexer indexer ) {
this . indexer = indexer ;
}
@ RequestMapping ( value = "/start_all" , method = RequestMethod . POST )
public void startAll () {
indexer . startAll ();
}
@ RequestMapping ( value = "/start/{entityClassName:.+}" , method = RequestMethod . POST )
public void start ( @ PathVariable ( "entityClassName" ) String entityClassName ) {
indexer . start ( entityClassName );
}
@ RequestMapping ( value = "/stop_all" , method = RequestMethod . PUT )
public Map < String , Boolean > stopAll () {
return indexer . stopAll ();
}
@ RequestMapping ( value = "/stop/{entityClassName:.+}" , method = RequestMethod . PUT )
public Boolean stop ( @ PathVariable ( "entityClassName" ) String entityClassName ) {
return indexer . stop ( entityClassName );
}
@ RequestMapping ( value = "/progress" , method = RequestMethod . GET )
public Map < String , IndexProgress > getProgress () {
return indexer . getProgress ();
}
}
Gunakan anotasi yang relevan di bawah paket com.timeyang.jkes.core.annotation
untuk menandai entitas
@ lombok . Data
@ Entity
@ Document
public class Person extends AuditedEntity {
// @Id will be identified automatically
// @Field(type = FieldType.Long)
@ Id
@ GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
@ MultiFields (
mainField = @ Field ( type = FieldType . Text ),
otherFields = {
@ InnerField ( suffix = "raw" , type = FieldType . Keyword ),
@ InnerField ( suffix = "english" , type = FieldType . Text , analyzer = "english" )
}
)
private String name ;
@ Field ( type = FieldType . Keyword )
private String gender ;
@ Field ( type = FieldType . Integer )
private Integer age ;
// don't add @Field to test whether ignored
// @Field(type = FieldType.Text)
private String description ;
@ Field ( type = FieldType . Object )
@ ManyToOne ( fetch = FetchType . EAGER )
@ JoinColumn ( name = "group_id" )
private PersonGroup personGroup ;
}
@ lombok . Data
@ Entity
@ Document ( type = "person_group" , alias = "person_group_alias" )
public class PersonGroup extends AuditedEntity {
@ Id
@ GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
private String name ;
private String interests ;
@ OneToMany ( fetch = FetchType . EAGER , cascade = CascadeType . ALL , mappedBy = "personGroup" , orphanRemoval = true )
private List < Person > persons ;
private String description ;
@ DocumentId
@ Field ( type = FieldType . Long )
public Long getId () {
return id ;
}
@ MultiFields (
mainField = @ Field ( type = FieldType . Text ),
otherFields = {
@ InnerField ( suffix = "raw" , type = FieldType . Keyword ),
@ InnerField ( suffix = "english" , type = FieldType . Text , analyzer = "english" )
}
)
public String getName () {
return name ;
}
@ Field ( type = FieldType . Text )
public String getInterests () {
return interests ;
}
@ Field ( type = FieldType . Nested )
public List < Person > getPersons () {
return persons ;
}
/**
* 不加Field注解,测试序列化时是否忽略
*/
public String getDescription () {
return description ;
}
}
Ketika suatu entitas diperbarui, dokumen secara otomatis diindeks ke ElasticSearch; ketika suatu entitas dihapus, dokumen tersebut secara otomatis dihapus dari ElasticSearch.
Mulai layanan pencarian jkes-search-service. Layanan pencarian adalah Aplikasi Spring Boot yang menyediakan API pencarian lainnya dan berjalan pada port 9000 secara default.
curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search?from=3&size=10
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"nested": {
"path": "persons",
"score_mode": "avg",
"query": {
"bool": {
"must": [
{
"range": {
"persons.age": {
"gt": 5
}
}
}
]
}
}
}
}
}
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"match": {
"interests": "Hadoop"
}
}
}
{
"query": {
"bool" : {
"must" : {
"match" : { "interests" : "Hadoop" }
},
"filter": {
"term" : { "name.raw" : "name0" }
},
"should" : [
{ "match" : { "interests" : "Flink" } },
{
"nested" : {
"path" : "persons",
"score_mode" : "avg",
"query" : {
"bool" : {
"must" : [
{ "match" : {"persons.name" : "name40"} },
{ "match" : {"persons.interests" : "interests"} }
],
"must_not" : {
"range" : {
"age" : { "gte" : 50, "lte" : 60 }
}
}
}
}
}
}
],
"minimum_should_match" : 1,
"boost" : 1.0
}
}
}
integration_test_person_group/person_group/_search
{
"_source": false,
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"_source": {
"includes": [ "name", "persons.*" ],
"excludes": [ "date*", "version", "persons.age" ]
},
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"prefix" : { "name" : "name" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"wildcard" : { "name" : "name*" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"regexp":{
"name": "na.*17"
}
}
}
Cara kerja pengindeksan:
@Document
dan membuat metadata untuk entitas tersebut.index
dan mapping
dalam format Json, lalu buat/perbarui konfigurasi index
melalui ElasticSearch Java Rest Client
.Kafka ElasticSearch Connector
untuk setiap dokumen, yang digunakan untuk membuat/memperbarui dokumenJkes Deleter Connector
untuk keseluruhan proyek, untuk menghapus dokumen* save(*)
sebagai SaveEvent
dan simpan ke EventContainer
; gunakan parameter metode (* delete*(..)
untuk menghasilkan DeleteEvent/DeleteAllEvent
dan simpan ke EventContainer
.JkesKafkaProducer
untuk mengirim entitas di SaveEvent
ke Kafka akan menggunakan JkesJsonSerializer
yang kami sediakan untuk membuat serial data yang ditentukan dan kemudian mengirimkannya ke Kafka.SaveEvent
, DeleteEvent
akan diserialkan secara langsung dan kemudian dikirim ke Kafka, bukan hanya mengirimkan salinan datanya.SaveEvent
dan DeleteEvent
, DeleteAllEvent
tidak mengirim data ke Kafka, namun langsung menghapus index
terkait melalui ElasticSearch Java Rest Client
, lalu membangun kembali indeks dan memulai ulang Kafka ElasticSearch Connector
Cara kerja kueri:
json
, melakukan beberapa pra-pemrosesan, dan meneruskannya ke ElasticSearch menggunakan ElasticSearch Java Rest Client
, mem-parsing respons, dan mengembalikannya ke klien setelah pemrosesan lebih lanjut. jkes-core
adalah bagian inti dari keseluruhan jkes
. Terutama mencakup fungsi-fungsi berikut:
annotation
menyediakan anotasi inti jkeselasticsearch
merangkum operasi terkait elasticsearch
, seperti membuat/memperbarui indeks untuk semua dokumen dan memperbarui pemetaan.kafka
menyediakan Kafka Producer, Kafka Json Serializer, Kafka Connect Clientmetadata
menyediakan konstruksi metadata anotasi inti dan model terstruktur.event
menyediakan model dan wadah acaraexception
menyediakan pengecualian umum Jkeshttp
merangkum permintaan http json umum berdasarkan Apache Http Client
support
memperlihatkan dukungan konfigurasi inti Jkesutil
menyediakan beberapa kelas alat untuk memfasilitasi pengembangan. Seperti: Menegaskan, ClassUtils, DocumentUtils, IOUtils, JsonUtils, ReflectionUtils, StringUtils jkes-boot
digunakan untuk berintegrasi dengan beberapa kerangka kerja sumber terbuka pihak ketiga.
Saat ini, kami menyediakan integrasi dengan spring data jpa
melalui jkes-spring-data-jpa
. Dengan menggunakan mekanisme AOP Spring, metode Repository
dicegat dan SaveEvent/DeleteEvent/DeleteAllEvent
dibuat dan disimpan ke EventContainer
. Dengan menggunakan SearchPlatformTransactionManager
yang kami sediakan, kami menggabungkan manajer transaksi yang umum digunakan (seperti JpaTransactionManager
) untuk menyediakan fungsi intersepsi transaksi.
Pada versi berikutnya, kami akan menyediakan integrasi dengan lebih banyak kerangka kerja.
deskripsi jkes-spring-data-jpa
:
ContextSupport
digunakan untuk mendapatkan Repository Bean
dari pabrik kacang@EnableJkes
memungkinkan klien mengaktifkan fungsionalitas Jkes dengan mudah dan menyediakan model konfigurasi yang konsisten dengan SpringEventSupport
menangani detail peristiwa, menghasilkan peristiwa terkait dan menyimpannya di EventContainer
saat menyimpan dan menghapus data, dan memproses peristiwa terkait saat transaksi dilakukan dan dibatalkan.SearchPlatformTransactionManager
membungkus manajer transaksi klien dan menambahkan回调hook
ketika transaksi dilakukan dan dibatalkan.audit
menyediakan kelas induk AuditedEntity
sederhana untuk memfasilitasi penambahan fungsi audit. Informasi versi dapat digunakan bersama dengan mekanisme versi ElasticSearch
untuk memastikan bahwa data dokumen yang kedaluwarsa tidak akan diindeks.exception
merangkum pengecualian umumintercept
menyediakan titik potong dan aspek AOPindex
menyediakan fungsionalitas全量索引
. Saat ini, kami menyediakan mekanisme pengindeksan berbasis线程池
dan mekanisme pengindeksan berbasis ForkJoin
. Pada versi berikutnya, kami akan memfaktorkan ulang kode dan menambahkan model生产者-消费者
berdasarkan阻塞队列
untuk memberikan kinerja konkurensi. jkes-services
terutama digunakan untuk menyediakan beberapa layanan. Saat ini, jkes-services
menyediakan layanan berikut:
jkes-delete-connector
jkes-delete-connector
adalah Kafka Connector
yang digunakan untuk mendapatkan peristiwa penghapusan indeks ( DeleteEvent
) dari cluster kafka, dan kemudian menggunakan Jest Client
untuk menghapus dokumen terkait di ElasticSearch.
Dengan bantuan API admin lainnya Kafka Connect, kami dengan mudah mengimplementasikan fungsi penghapusan dokumen pada platform multi-penyewa. Cukup mulai jkes-delete-connector
untuk setiap proyek dan penghapusan dokumen untuk proyek tersebut akan ditangani secara otomatis. Hal ini menghindari keharusan memulai Kafka Consumer secara manual untuk menangani penghapusan dokumen proyek setiap kali kita memulai proyek baru. Meskipun pekerjaan semacam ini dapat dikurangi melalui langganan reguler, namun tetap saja sangat tidak fleksibel.
jkes-search-service
jkes-search-service
adalah layanan pencarian tenang yang menyediakan beberapa versi api kueri lainnya. Layanan kueri menyediakan API multi-versi untuk evolusi dan kompatibilitas APIjkes-search-service
saat ini mendukung pencarian gaya URI dan pencarian gaya isi permintaan JSON.json
, melakukan beberapa pra-pemrosesan, dan meneruskannya ke ElasticSearch menggunakan ElasticSearch Java Rest Client
, mem-parsing respons, dan mengembalikannya ke klien setelah pemrosesan lebih lanjut. Di masa depan, kami akan membangun cluster indeks berdasarkan zookeeper
untuk menyediakan fungsi manajemen indeks cluster.
jkes-integration-test
adalah proyek pengujian integrasi Spring Boot untuk功能测试
. Juga ukur吞吐率
beberapa operasi umum
Untuk membuat versi pengembangan, Anda memerlukan Kafka versi terbaru. Anda dapat membuat jkes dengan Maven menggunakan fase siklus hidup standar.
Proyek ini dilisensikan di bawah Lisensi Apache 2.0.