Jkes is a search framework based on Java, Kafka, and ElasticSearch. Jkes provides annotation-driven JPA-style object/document mapping, using the REST API for document search.
You can refer to the jkes-integration-test
project to quickly learn how to use the jkes framework. jkes-integration-test
is a Spring Boot Application we use to test functional integrity.
jkes-index-connector
and jkes-delete-connector
to the Kafka Connect classpathsudo bin/elasticsearch-plugin install analysis-smartcn
@ EnableAspectJAutoProxy
@ EnableJkes
@ Configuration
public class JkesConfig {
@ Bean
public PlatformTransactionManager transactionManager ( EntityManagerFactory factory , EventSupport eventSupport ) {
return new SearchPlatformTransactionManager ( new JpaTransactionManager ( factory ), eventSupport );
}
}
@ Component
@ Configuration
public class JkesConf extends DefaultJkesPropertiesImpl {
@ PostConstruct
public void setUp () {
Config . setJkesProperties ( this );
}
@ Override
public String getKafkaBootstrapServers () {
return "k1-test.com:9292,k2-test.com:9292,k3-test.com:9292" ;
}
@ Override
public String getKafkaConnectServers () {
return "http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084" ;
}
@ Override
public String getEsBootstrapServers () {
return "http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200" ;
}
@ Override
public String getDocumentBasePackage () {
return "com.timeyang.jkes.integration_test.domain" ;
}
@ Override
public String getClientId () {
return "integration_test" ;
}
}
This can be very flexible. If you use Spring Boot, you can use @ConfigurationProperties
to provide configuration
Spring MVC
, you can add the index endpoint as follows @ RestController
@ RequestMapping ( "/api/search" )
public class SearchEndpoint {
private Indexer indexer ;
@ Autowired
public SearchEndpoint ( Indexer indexer ) {
this . indexer = indexer ;
}
@ RequestMapping ( value = "/start_all" , method = RequestMethod . POST )
public void startAll () {
indexer . startAll ();
}
@ RequestMapping ( value = "/start/{entityClassName:.+}" , method = RequestMethod . POST )
public void start ( @ PathVariable ( "entityClassName" ) String entityClassName ) {
indexer . start ( entityClassName );
}
@ RequestMapping ( value = "/stop_all" , method = RequestMethod . PUT )
public Map < String , Boolean > stopAll () {
return indexer . stopAll ();
}
@ RequestMapping ( value = "/stop/{entityClassName:.+}" , method = RequestMethod . PUT )
public Boolean stop ( @ PathVariable ( "entityClassName" ) String entityClassName ) {
return indexer . stop ( entityClassName );
}
@ RequestMapping ( value = "/progress" , method = RequestMethod . GET )
public Map < String , IndexProgress > getProgress () {
return indexer . getProgress ();
}
}
Use the relevant annotations under the com.timeyang.jkes.core.annotation
package to mark entities
@ lombok . Data
@ Entity
@ Document
public class Person extends AuditedEntity {
// @Id will be identified automatically
// @Field(type = FieldType.Long)
@ Id
@ GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
@ MultiFields (
mainField = @ Field ( type = FieldType . Text ),
otherFields = {
@ InnerField ( suffix = "raw" , type = FieldType . Keyword ),
@ InnerField ( suffix = "english" , type = FieldType . Text , analyzer = "english" )
}
)
private String name ;
@ Field ( type = FieldType . Keyword )
private String gender ;
@ Field ( type = FieldType . Integer )
private Integer age ;
// don't add @Field to test whether ignored
// @Field(type = FieldType.Text)
private String description ;
@ Field ( type = FieldType . Object )
@ ManyToOne ( fetch = FetchType . EAGER )
@ JoinColumn ( name = "group_id" )
private PersonGroup personGroup ;
}
@ lombok . Data
@ Entity
@ Document ( type = "person_group" , alias = "person_group_alias" )
public class PersonGroup extends AuditedEntity {
@ Id
@ GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
private String name ;
private String interests ;
@ OneToMany ( fetch = FetchType . EAGER , cascade = CascadeType . ALL , mappedBy = "personGroup" , orphanRemoval = true )
private List < Person > persons ;
private String description ;
@ DocumentId
@ Field ( type = FieldType . Long )
public Long getId () {
return id ;
}
@ MultiFields (
mainField = @ Field ( type = FieldType . Text ),
otherFields = {
@ InnerField ( suffix = "raw" , type = FieldType . Keyword ),
@ InnerField ( suffix = "english" , type = FieldType . Text , analyzer = "english" )
}
)
public String getName () {
return name ;
}
@ Field ( type = FieldType . Text )
public String getInterests () {
return interests ;
}
@ Field ( type = FieldType . Nested )
public List < Person > getPersons () {
return persons ;
}
/**
* 不加Field注解,测试序列化时是否忽略
*/
public String getDescription () {
return description ;
}
}
When an entity is updated, the document is automatically indexed into ElasticSearch; when an entity is deleted, the document is automatically deleted from ElasticSearch.
Start the search service jkes-search-service. The search service is a Spring Boot Application that provides rest search API and runs on port 9000 by default.
curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search?from=3&size=10
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"nested": {
"path": "persons",
"score_mode": "avg",
"query": {
"bool": {
"must": [
{
"range": {
"persons.age": {
"gt": 5
}
}
}
]
}
}
}
}
}
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"match": {
"interests": "Hadoop"
}
}
}
{
"query": {
"bool" : {
"must" : {
"match" : { "interests" : "Hadoop" }
},
"filter": {
"term" : { "name.raw" : "name0" }
},
"should" : [
{ "match" : { "interests" : "Flink" } },
{
"nested" : {
"path" : "persons",
"score_mode" : "avg",
"query" : {
"bool" : {
"must" : [
{ "match" : {"persons.name" : "name40"} },
{ "match" : {"persons.interests" : "interests"} }
],
"must_not" : {
"range" : {
"age" : { "gte" : 50, "lte" : 60 }
}
}
}
}
}
}
],
"minimum_should_match" : 1,
"boost" : 1.0
}
}
}
integration_test_person_group/person_group/_search
{
"_source": false,
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"_source": {
"includes": [ "name", "persons.*" ],
"excludes": [ "date*", "version", "persons.age" ]
},
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"prefix" : { "name" : "name" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"wildcard" : { "name" : "name*" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"regexp":{
"name": "na.*17"
}
}
}
How indexing works:
@Document
and builds metadata for them.index
and mapping
configurations in Json format, and then create/update the index
configuration through ElasticSearch Java Rest Client
.Kafka ElasticSearch Connector
for each document, used to create/update documentsJkes Deleter Connector
for the entire project, for deleting documents* save(*)
method as SaveEvent
and save it to EventContainer
; use the parameters of the (* delete*(..)
method to generate a DeleteEvent/DeleteAllEvent
and save it to EventContainer
.JkesKafkaProducer
to send the entities in SaveEvent
to Kafka. Kafka will use JkesJsonSerializer
we provided to serialize the specified data and then send it to Kafka.SaveEvent
, DeleteEvent
will be serialized directly and then sent to Kafka instead of just sending a copy of the data.SaveEvent
and DeleteEvent
, DeleteAllEvent
does not send data to Kafka, but directly deletes the corresponding index
through ElasticSearch Java Rest Client
, then rebuilds the index and restarts Kafka ElasticSearch Connector
How query works:
json
request, performs some preprocessing, and forwards it to ElasticSearch using ElasticSearch Java Rest Client
, parses the response, and returns it to the client after further processing. jkes-core
is the core part of the entire jkes
. Mainly includes the following functions:
annotation
package provides the core annotations of jkeselasticsearch
package encapsulates elasticsearch
related operations, such as creating/updating indexes for all documents and updating mapping.kafka
package provides Kafka Producer, Kafka Json Serializer, Kafka Connect Clientmetadata
package provides core annotation metadata construction and structured models.event
package provides event models and containersexception
package provides common Jkes exceptionshttp
package encapsulates common http json requests based on Apache Http Client
support
package exposes Jkes core configuration supportutil
package provides some tool classes to facilitate development. Such as: Asserts, ClassUtils, DocumentUtils, IOUtils, JsonUtils, ReflectionUtils, StringUtils jkes-boot
is used to integrate with some third-party open source frameworks.
Currently, we provide integration with spring data jpa
through jkes-spring-data-jpa
. By using Spring's AOP mechanism, the Repository
method is intercepted and SaveEvent/DeleteEvent/DeleteAllEvent
is generated and saved to EventContainer
. By using SearchPlatformTransactionManager
we provide, we wrap commonly used transaction managers (such as JpaTransactionManager
) to provide transaction interception functions.
In subsequent versions, we will provide integration with more frameworks.
jkes-spring-data-jpa
description:
ContextSupport
class is used to obtain Repository Bean
from bean factory@EnableJkes
allows clients to easily enable Jkes functionality and provides a configuration model consistent with SpringEventSupport
handles the details of events, generates corresponding events and stores them in EventContainer
when saving and deleting data, and processes corresponding events when transactions are committed and rolled back.SearchPlatformTransactionManager
wraps the client's transaction manager and adds回调hook
when transactions are committed and rolled back.audit
package provides a simple AuditedEntity
parent class to facilitate adding audit functions. The version information can be used in conjunction with ElasticSearch
's version mechanism to ensure that expired document data will not be indexed.exception
package encapsulates common exceptionsintercept
package provides AOP pointcuts and aspectsindex
package provides全量索引
functionality. Currently, we provide a线程池
-based indexing mechanism and a ForkJoin
-based indexing mechanism. In subsequent versions, we will refactor the code and add a生产者-消费者
model based on阻塞队列
to provide concurrency performance. jkes-services
is mainly used to provide some services. Currently, jkes-services
provides the following services:
jkes-delete-connector
jkes-delete-connector
is a Kafka Connector
used to get index deletion events ( DeleteEvent
) from the kafka cluster, and then use Jest Client
to delete the corresponding documents in ElasticSearch.
With the help of Kafka Connect's rest admin API, we easily implement the document deletion function on a multi-tenant platform. Simply start a jkes-delete-connector
for each project and document deletion for that project will be automatically handled. This avoids having to manually start a Kafka Consumer to handle document deletion for the project every time we start a new project. Although this kind of work can be reduced through regular subscription, it is still very inflexible.
jkes-search-service
jkes-search-service
is a restful search service that provides multiple versions of rest query api. Query service provides multi-version API for API evolution and compatibilityjkes-search-service
currently supports URI style search and JSON request body style search.json
request, performs some preprocessing, and forwards it to ElasticSearch using ElasticSearch Java Rest Client
, parses the response, and returns it to the client after further processing. In the future, we will build an index cluster based on zookeeper
to provide cluster index management functions.
jkes-integration-test
is a Spring Boot integration test project for功能测试
. Also measure吞吐率
of some common operations
To build a development version you'll need a recent version of Kafka. You can build jkes with Maven using the standard lifecycle phases.
This project is licensed under Apache License 2.0.