Jkes是一個基於Java、Kafka、ElasticSearch的搜尋框架。 Jkes提供了註解驅動的JPA風格的物件/文檔映射,使用rest api用於文檔搜尋。
可以參考jkes-integration-test
專案快速掌握jkes框架的使用方法。 jkes-integration-test
是我們用來測試功能完整性的一個Spring Boot Application。
jkes-index-connector
和jkes-delete-connector
到Kafka Connect類別路徑sudo bin/elasticsearch-plugin install analysis-smartcn
@ EnableAspectJAutoProxy
@ EnableJkes
@ Configuration
public class JkesConfig {
@ Bean
public PlatformTransactionManager transactionManager ( EntityManagerFactory factory , EventSupport eventSupport ) {
return new SearchPlatformTransactionManager ( new JpaTransactionManager ( factory ), eventSupport );
}
}
@ Component
@ Configuration
public class JkesConf extends DefaultJkesPropertiesImpl {
@ PostConstruct
public void setUp () {
Config . setJkesProperties ( this );
}
@ Override
public String getKafkaBootstrapServers () {
return "k1-test.com:9292,k2-test.com:9292,k3-test.com:9292" ;
}
@ Override
public String getKafkaConnectServers () {
return "http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084" ;
}
@ Override
public String getEsBootstrapServers () {
return "http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200" ;
}
@ Override
public String getDocumentBasePackage () {
return "com.timeyang.jkes.integration_test.domain" ;
}
@ Override
public String getClientId () {
return "integration_test" ;
}
}
這裡可以很靈活,如果使用Spring Boot,可以使用@ConfigurationProperties
提供配置
Spring MVC
中,可以如下加入索引端點 @ RestController
@ RequestMapping ( "/api/search" )
public class SearchEndpoint {
private Indexer indexer ;
@ Autowired
public SearchEndpoint ( Indexer indexer ) {
this . indexer = indexer ;
}
@ RequestMapping ( value = "/start_all" , method = RequestMethod . POST )
public void startAll () {
indexer . startAll ();
}
@ RequestMapping ( value = "/start/{entityClassName:.+}" , method = RequestMethod . POST )
public void start ( @ PathVariable ( "entityClassName" ) String entityClassName ) {
indexer . start ( entityClassName );
}
@ RequestMapping ( value = "/stop_all" , method = RequestMethod . PUT )
public Map < String , Boolean > stopAll () {
return indexer . stopAll ();
}
@ RequestMapping ( value = "/stop/{entityClassName:.+}" , method = RequestMethod . PUT )
public Boolean stop ( @ PathVariable ( "entityClassName" ) String entityClassName ) {
return indexer . stop ( entityClassName );
}
@ RequestMapping ( value = "/progress" , method = RequestMethod . GET )
public Map < String , IndexProgress > getProgress () {
return indexer . getProgress ();
}
}
使用com.timeyang.jkes.core.annotation
包下相關註解標記實體
@ lombok . Data
@ Entity
@ Document
public class Person extends AuditedEntity {
// @Id will be identified automatically
// @Field(type = FieldType.Long)
@ Id
@ GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
@ MultiFields (
mainField = @ Field ( type = FieldType . Text ),
otherFields = {
@ InnerField ( suffix = "raw" , type = FieldType . Keyword ),
@ InnerField ( suffix = "english" , type = FieldType . Text , analyzer = "english" )
}
)
private String name ;
@ Field ( type = FieldType . Keyword )
private String gender ;
@ Field ( type = FieldType . Integer )
private Integer age ;
// don't add @Field to test whether ignored
// @Field(type = FieldType.Text)
private String description ;
@ Field ( type = FieldType . Object )
@ ManyToOne ( fetch = FetchType . EAGER )
@ JoinColumn ( name = "group_id" )
private PersonGroup personGroup ;
}
@ lombok . Data
@ Entity
@ Document ( type = "person_group" , alias = "person_group_alias" )
public class PersonGroup extends AuditedEntity {
@ Id
@ GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
private String name ;
private String interests ;
@ OneToMany ( fetch = FetchType . EAGER , cascade = CascadeType . ALL , mappedBy = "personGroup" , orphanRemoval = true )
private List < Person > persons ;
private String description ;
@ DocumentId
@ Field ( type = FieldType . Long )
public Long getId () {
return id ;
}
@ MultiFields (
mainField = @ Field ( type = FieldType . Text ),
otherFields = {
@ InnerField ( suffix = "raw" , type = FieldType . Keyword ),
@ InnerField ( suffix = "english" , type = FieldType . Text , analyzer = "english" )
}
)
public String getName () {
return name ;
}
@ Field ( type = FieldType . Text )
public String getInterests () {
return interests ;
}
@ Field ( type = FieldType . Nested )
public List < Person > getPersons () {
return persons ;
}
/**
* 不加Field注解,测试序列化时是否忽略
*/
public String getDescription () {
return description ;
}
}
當更新實體時,文件會被自動索引到ElasticSearch;刪除實體時,文件會自動從ElasticSearch刪除。
啟動搜尋服務jkes-search-service,搜尋服務是一個Spring Boot Application,提供rest搜尋api,預設運行在9000連接埠。
curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search?from=3&size=10
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"nested": {
"path": "persons",
"score_mode": "avg",
"query": {
"bool": {
"must": [
{
"range": {
"persons.age": {
"gt": 5
}
}
}
]
}
}
}
}
}
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"match": {
"interests": "Hadoop"
}
}
}
{
"query": {
"bool" : {
"must" : {
"match" : { "interests" : "Hadoop" }
},
"filter": {
"term" : { "name.raw" : "name0" }
},
"should" : [
{ "match" : { "interests" : "Flink" } },
{
"nested" : {
"path" : "persons",
"score_mode" : "avg",
"query" : {
"bool" : {
"must" : [
{ "match" : {"persons.name" : "name40"} },
{ "match" : {"persons.interests" : "interests"} }
],
"must_not" : {
"range" : {
"age" : { "gte" : 50, "lte" : 60 }
}
}
}
}
}
}
],
"minimum_should_match" : 1,
"boost" : 1.0
}
}
}
integration_test_person_group/person_group/_search
{
"_source": false,
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"_source": {
"includes": [ "name", "persons.*" ],
"excludes": [ "date*", "version", "persons.age" ]
},
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"prefix" : { "name" : "name" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"wildcard" : { "name" : "name*" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"regexp":{
"name": "na.*17"
}
}
}
索引工作原理:
@Document
註解的實體,為它們建立元資料。index
和mapping
Json格式的配置,然後透過ElasticSearch Java Rest Client
將建立/更新index
配置。Kafka ElasticSearch Connector
,用於建立/更新文檔Jkes Deleter Connector
,用於刪除文檔* save(*)
方法傳回的資料包裝為SaveEvent
儲存到EventContainer
;使用(* delete*(..)
方法的參數,產生一個DeleteEvent/DeleteAllEvent
儲存到EventContainer
。JkesKafkaProducer
發送SaveEvent
中的實體到Kafka,Kafka會使用我們提供的JkesJsonSerializer
序列化指定的數據,然後發送到Kafka。SaveEvent
不同, DeleteEvent
會直接被序列化,然後傳送到Kafka,而不是只傳送一份數據SaveEvent
和DeleteEvent
不同, DeleteAllEvent
不會傳送資料到Kafka,而是直接透過ElasticSearch Java Rest Client
刪除對應的index
,然後重建索引,重新啟動Kafka ElasticSearch Connector
查詢工作原理:
json
請求,進行一些預處理後,使用ElasticSearch Java Rest Client
轉送至ElasticSearch,將得到的回應進行解析,進一步處理後回到客戶端。 jkes-core
是整個jkes
的核心部分。主要包括以下功能:
annotation
包提供了jkes的核心註解elasticsearch
套件封裝了elasticsearch
相關的操作,如為所有的文件建立/更新索引,更新mappingkafka
套件提供了Kafka 生產者,Kafka Json Serializer,Kafka Connect Clientmetadata
套件提供了核心的註解元資料的建構與結構化模型event
包提供了事件模型與容器exception
套件提供了常見的Jkes異常http
套件基於Apache Http Client
封裝了常見的http json請求support
包暴露了Jkes核心配置支持util
套件提供了一些工具類,方便開發。如:Asserts, ClassUtils, DocumentUtils, IOUtils, JsonUtils, ReflectionUtils, StringUtils jkes-boot
用於與一些第三方開源框架進行整合。
目前,我們透過jkes-spring-data-jpa
,提供了與spring data jpa
的整合。透過使用Spring的AOP機制,對Repository
方法進行攔截,產生SaveEvent/DeleteEvent/DeleteAllEvent
儲存到EventContainer
。透過使用我們提供的SearchPlatformTransactionManager
,對常用的事務管理器(如JpaTransactionManager
)進行包裝,提供事務攔截功能。
在後續版本,我們會提供與更多框架的整合。
jkes-spring-data-jpa
說明:
ContextSupport
類別用於從bean工廠取得Repository Bean
@EnableJkes
讓客戶端能夠輕鬆開啟Jkes的功能,提供了與Spring一致的配置模型EventSupport
處理事件的細節,在儲存和刪除資料時產生對應事件存放到EventContainer
,在交易提交和回滾時處理對應的事件SearchPlatformTransactionManager
包裝了客戶端的事務管理器,在事務提交和回滾時加入了回调hook
audit
套件提供了一個簡單的AuditedEntity
父類,方便添加審計功能,版本資訊可用於結合ElasticSearch
的版本機制保證不會索引過期文檔數據exception
封裝了常見異常intercept
包提供了AOP切點和切面index
包提供了全量索引
功能。目前,我們提供了基於线程池
的索引機制和基於ForkJoin
的索引機制。在後續版本,我們會重構程式碼,增加基於阻塞队列
的生产者-消费者
模式,提供並發效能 jkes-services
主要用來提供一些服務。 目前, jkes-services
提供了以下服務:
jkes-delete-connector
jkes-delete-connector
是一個Kafka Connector
,用於從kafka叢集取得索引刪除事件( DeleteEvent
),然後使用Jest Client
刪除ElasticSearch中對應的文件。
借助於Kafka Connect的rest admin api,我們輕鬆實現了多租戶平台上的文件刪除功能。只要為每個專案啟動一個jkes-delete-connector
,就可以自動處理該專案的文件刪除工作。避免了每啟動一個新的項目,我們都得手動啟動一個Kafka Consumer來處理該項目的文檔刪除工作。儘管可以透過正規訂閱來減少這樣的工作,但還是非常不靈活
jkes-search-service
jkes-search-service
是一個restful的搜尋服務,提供了多版本的rest query api。查詢服務提供多版本API,用於API演化和相容jkes-search-service
目前支援URI風格的搜尋和JSON請求體風格的搜尋。json
請求,進行一些預處理後,使用ElasticSearch Java Rest Client
轉送至ElasticSearch,將得到的回應進行解析,進一步處理後回到客戶端。後續,我們將會基於zookeeper
索引集群,提供集群索引管理功能
jkes-integration-test
是一個基於Spring Boot整合測試項目,用於進行功能测试
。同時測量一些常見操作的吞吐率
To build a development version you'll need a recent version of Kafka. You can build jkes with Maven using the standard lifecycle phases.
This project is licensed under Apache License 2.0.