Jkes — это платформа поиска, основанная на Java, Kafka и ElasticSearch. Jkes обеспечивает сопоставление объектов и документов в стиле JPA на основе аннотаций, используя REST API для поиска документов.
Вы можете обратиться к проекту jkes-integration-test
чтобы быстро научиться использовать среду jkes. jkes-integration-test
— это приложение Spring Boot, которое мы используем для проверки функциональной целостности.
jkes-index-connector
и jkes-delete-connector
в путь к классам Kafka Connect.sudo bin/elasticsearch-plugin install analysis-smartcn
@ EnableAspectJAutoProxy
@ EnableJkes
@ Configuration
public class JkesConfig {
@ Bean
public PlatformTransactionManager transactionManager ( EntityManagerFactory factory , EventSupport eventSupport ) {
return new SearchPlatformTransactionManager ( new JpaTransactionManager ( factory ), eventSupport );
}
}
@ Component
@ Configuration
public class JkesConf extends DefaultJkesPropertiesImpl {
@ PostConstruct
public void setUp () {
Config . setJkesProperties ( this );
}
@ Override
public String getKafkaBootstrapServers () {
return "k1-test.com:9292,k2-test.com:9292,k3-test.com:9292" ;
}
@ Override
public String getKafkaConnectServers () {
return "http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084" ;
}
@ Override
public String getEsBootstrapServers () {
return "http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200" ;
}
@ Override
public String getDocumentBasePackage () {
return "com.timeyang.jkes.integration_test.domain" ;
}
@ Override
public String getClientId () {
return "integration_test" ;
}
}
Это может быть очень гибко. Если вы используете Spring Boot, вы можете использовать @ConfigurationProperties
для предоставления конфигурации.
Spring MVC
вы можете добавить конечную точку индекса следующим образом: @ RestController
@ RequestMapping ( "/api/search" )
public class SearchEndpoint {
private Indexer indexer ;
@ Autowired
public SearchEndpoint ( Indexer indexer ) {
this . indexer = indexer ;
}
@ RequestMapping ( value = "/start_all" , method = RequestMethod . POST )
public void startAll () {
indexer . startAll ();
}
@ RequestMapping ( value = "/start/{entityClassName:.+}" , method = RequestMethod . POST )
public void start ( @ PathVariable ( "entityClassName" ) String entityClassName ) {
indexer . start ( entityClassName );
}
@ RequestMapping ( value = "/stop_all" , method = RequestMethod . PUT )
public Map < String , Boolean > stopAll () {
return indexer . stopAll ();
}
@ RequestMapping ( value = "/stop/{entityClassName:.+}" , method = RequestMethod . PUT )
public Boolean stop ( @ PathVariable ( "entityClassName" ) String entityClassName ) {
return indexer . stop ( entityClassName );
}
@ RequestMapping ( value = "/progress" , method = RequestMethod . GET )
public Map < String , IndexProgress > getProgress () {
return indexer . getProgress ();
}
}
Используйте соответствующие аннотации в пакете com.timeyang.jkes.core.annotation
, чтобы отмечать объекты.
@ lombok . Data
@ Entity
@ Document
public class Person extends AuditedEntity {
// @Id will be identified automatically
// @Field(type = FieldType.Long)
@ Id
@ GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
@ MultiFields (
mainField = @ Field ( type = FieldType . Text ),
otherFields = {
@ InnerField ( suffix = "raw" , type = FieldType . Keyword ),
@ InnerField ( suffix = "english" , type = FieldType . Text , analyzer = "english" )
}
)
private String name ;
@ Field ( type = FieldType . Keyword )
private String gender ;
@ Field ( type = FieldType . Integer )
private Integer age ;
// don't add @Field to test whether ignored
// @Field(type = FieldType.Text)
private String description ;
@ Field ( type = FieldType . Object )
@ ManyToOne ( fetch = FetchType . EAGER )
@ JoinColumn ( name = "group_id" )
private PersonGroup personGroup ;
}
@ lombok . Data
@ Entity
@ Document ( type = "person_group" , alias = "person_group_alias" )
public class PersonGroup extends AuditedEntity {
@ Id
@ GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
private String name ;
private String interests ;
@ OneToMany ( fetch = FetchType . EAGER , cascade = CascadeType . ALL , mappedBy = "personGroup" , orphanRemoval = true )
private List < Person > persons ;
private String description ;
@ DocumentId
@ Field ( type = FieldType . Long )
public Long getId () {
return id ;
}
@ MultiFields (
mainField = @ Field ( type = FieldType . Text ),
otherFields = {
@ InnerField ( suffix = "raw" , type = FieldType . Keyword ),
@ InnerField ( suffix = "english" , type = FieldType . Text , analyzer = "english" )
}
)
public String getName () {
return name ;
}
@ Field ( type = FieldType . Text )
public String getInterests () {
return interests ;
}
@ Field ( type = FieldType . Nested )
public List < Person > getPersons () {
return persons ;
}
/**
* 不加Field注解,测试序列化时是否忽略
*/
public String getDescription () {
return description ;
}
}
Когда сущность обновляется, документ автоматически индексируется в ElasticSearch. При удалении сущности документ автоматически удаляется из ElasticSearch.
Запустите службу поиска jkes-search-service. Служба поиска представляет собой загрузочное приложение Spring, которое предоставляет API поиска для отдыха и по умолчанию работает на порту 9000.
curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search?from=3&size=10
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"nested": {
"path": "persons",
"score_mode": "avg",
"query": {
"bool": {
"must": [
{
"range": {
"persons.age": {
"gt": 5
}
}
}
]
}
}
}
}
}
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"match": {
"interests": "Hadoop"
}
}
}
{
"query": {
"bool" : {
"must" : {
"match" : { "interests" : "Hadoop" }
},
"filter": {
"term" : { "name.raw" : "name0" }
},
"should" : [
{ "match" : { "interests" : "Flink" } },
{
"nested" : {
"path" : "persons",
"score_mode" : "avg",
"query" : {
"bool" : {
"must" : [
{ "match" : {"persons.name" : "name40"} },
{ "match" : {"persons.interests" : "interests"} }
],
"must_not" : {
"range" : {
"age" : { "gte" : 50, "lte" : 60 }
}
}
}
}
}
}
],
"minimum_should_match" : 1,
"boost" : 1.0
}
}
}
integration_test_person_group/person_group/_search
{
"_source": false,
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"_source": {
"includes": [ "name", "persons.*" ],
"excludes": [ "date*", "version", "persons.age" ]
},
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"prefix" : { "name" : "name" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"wildcard" : { "name" : "name*" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"regexp":{
"name": "na.*17"
}
}
}
Как работает индексация:
@Document
, и создает для них метаданные.index
и mapping
в формате Json, а затем создайте/обновите конфигурацию index
с помощью ElasticSearch Java Rest Client
.Kafka ElasticSearch Connector
для каждого документа, используемый для создания/обновления документов.Jkes Deleter Connector
для всего проекта для удаления документов.* save(*)
как SaveEvent
, и сохраните их в EventContainer
; используйте параметры метода (* delete*(..)
для создания DeleteEvent/DeleteAllEvent
и сохраните их в EventContainer
.JkesKafkaProducer
для отправки объектов в SaveEvent
в Kafka. Kafka будет использовать предоставленный нами JkesJsonSerializer
для сериализации указанных данных, а затем отправит их в Kafka.SaveEvent
, DeleteEvent
будет сериализован напрямую, а затем отправлен в Kafka, а не просто отправит копию данных.SaveEvent
и DeleteEvent
, DeleteAllEvent
не отправляет данные в Kafka, а напрямую удаляет соответствующий index
через ElasticSearch Java Rest Client
, затем перестраивает индекс и перезапускает Kafka ElasticSearch Connector
Как работает запрос:
json
, выполняет некоторую предварительную обработку и пересылает его в ElasticSearch с помощью ElasticSearch Java Rest Client
, анализирует ответ и возвращает его клиенту после дальнейшей обработки. jkes-core
— это основная часть всего jkes
. В основном включает в себя следующие функции:
annotation
предоставляет основные аннотации jkes.elasticsearch
инкапсулирует операции, связанные с elasticsearch
, такие как создание/обновление индексов для всех документов и обновление сопоставления.kafka
включает Kafka Producer, сериализатор Kafka Json, клиент Kafka Connect.metadata
обеспечивает построение основных метаданных аннотаций и структурированные модели.event
предоставляет модели событий и контейнеры.exception
предоставляет распространенные исключения Jkes.http
инкапсулирует распространенные запросы http json на основе Apache Http Client
support
предоставляет поддержку базовой конфигурации Jkes.util
предоставляет несколько классов инструментов для облегчения разработки. Например: Asserts, ClassUtils, DocumentUtils, IOUtils, JsonUtils, ReflectionUtils, StringUtils. jkes-boot
используется для интеграции с некоторыми сторонними платформами с открытым исходным кодом.
В настоящее время мы обеспечиваем интеграцию с spring data jpa
через jkes-spring-data-jpa
. Используя механизм AOP Spring, метод Repository
перехватывается и генерируется SaveEvent/DeleteEvent/DeleteAllEvent
и сохраняется в EventContainer
. Используя предоставляемый нами SearchPlatformTransactionManager
, мы оборачиваем часто используемые менеджеры транзакций (такие как JpaTransactionManager
) для обеспечения функций перехвата транзакций.
В последующих версиях мы обеспечим интеграцию с большим количеством фреймворков.
Описание jkes-spring-data-jpa
:
ContextSupport
используется для получения Repository Bean
из фабрики компонентов.@EnableJkes
позволяет клиентам легко включать функциональность Jkes и предоставляет модель конфигурации, соответствующую Spring.EventSupport
обрабатывает детали событий, генерирует соответствующие события и сохраняет их в EventContainer
при сохранении и удалении данных, а также обрабатывает соответствующие события при фиксации и откате транзакций.SearchPlatformTransactionManager
оборачивает диспетчер транзакций клиента и добавляет回调hook
при фиксации и откате транзакций.audit
предоставляет простой родительский класс AuditedEntity
для облегчения добавления функций аудита. Информацию о версии можно использовать вместе с механизмом версий ElasticSearch
чтобы гарантировать, что данные документа с истекшим сроком действия не будут индексироваться.exception
инкапсулирует распространенные исключения.intercept
предоставляет точки и аспекты AOP.index
обеспечивает全量索引
. В настоящее время мы предоставляем механизм индексации на основе线程池
и механизм индексации на основе ForkJoin
. В последующих версиях мы проведем рефакторинг кода и добавим модель生产者-消费者
основанную на阻塞队列
для обеспечения производительности параллелизма. jkes-services
в основном используется для предоставления некоторых услуг. На данный момент jkes-services
предоставляет следующие услуги:
jkes-delete-connector
jkes-delete-connector
— это Kafka Connector
используемый для получения событий удаления индекса ( DeleteEvent
) из кластера Kafka, а затем использования Jest Client
для удаления соответствующих документов в ElasticSearch.
С помощью API-интерфейса администратора Kafka Connect мы легко реализуем функцию удаления документов на мультитенантной платформе. Просто запустите jkes-delete-connector
для каждого проекта, и удаление документов для этого проекта будет выполнено автоматически. Это позволяет избежать необходимости вручную запускать Kafka Consumer каждый раз, когда мы начинаем новый проект, для обработки удаления документа проекта. Хотя такого рода работу можно сократить за счет обычной подписки, она все равно очень негибкая.
jkes-search-service
jkes-search-service
— это служба поиска restful, предоставляющая несколько версий API запросов rest. Служба запросов предоставляет многоверсионный API для развития и совместимости API.jkes-search-service
в настоящее время поддерживает поиск по стилю URI и поиск по стилю тела запроса JSON.json
, выполняет некоторую предварительную обработку и пересылает его в ElasticSearch с помощью ElasticSearch Java Rest Client
, анализирует ответ и возвращает его клиенту после дальнейшей обработки. В будущем мы создадим кластер индексов на базе zookeeper
чтобы обеспечить функции управления индексами кластера.
jkes-integration-test
— это проект интеграционного тестирования Spring Boot для功能测试
. Также измерьте吞吐率
некоторых распространенных операций.
Для создания версии для разработки вам понадобится последняя версия Kafka. Вы можете создавать jke с помощью Maven, используя стандартные этапы жизненного цикла.
Этот проект распространяется по лицензии Apache License 2.0.