Jkes é uma estrutura de pesquisa baseada em Java, Kafka e ElasticSearch. Jkes fornece mapeamento de objeto/documento no estilo JPA baseado em anotações, usando a API REST para pesquisa de documentos.
Você pode consultar o projeto jkes-integration-test
para aprender rapidamente como usar a estrutura jkes. jkes-integration-test
é um aplicativo Spring Boot que usamos para testar a integridade funcional.
jkes-index-connector
e jkes-delete-connector
no classpath do Kafka Connectsudo bin/elasticsearch-plugin install analysis-smartcn
@ EnableAspectJAutoProxy
@ EnableJkes
@ Configuration
public class JkesConfig {
@ Bean
public PlatformTransactionManager transactionManager ( EntityManagerFactory factory , EventSupport eventSupport ) {
return new SearchPlatformTransactionManager ( new JpaTransactionManager ( factory ), eventSupport );
}
}
@ Component
@ Configuration
public class JkesConf extends DefaultJkesPropertiesImpl {
@ PostConstruct
public void setUp () {
Config . setJkesProperties ( this );
}
@ Override
public String getKafkaBootstrapServers () {
return "k1-test.com:9292,k2-test.com:9292,k3-test.com:9292" ;
}
@ Override
public String getKafkaConnectServers () {
return "http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084" ;
}
@ Override
public String getEsBootstrapServers () {
return "http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200" ;
}
@ Override
public String getDocumentBasePackage () {
return "com.timeyang.jkes.integration_test.domain" ;
}
@ Override
public String getClientId () {
return "integration_test" ;
}
}
Isso pode ser muito flexível. Se você usar Spring Boot, poderá usar @ConfigurationProperties
para fornecer configuração.
Spring MVC
, você pode adicionar o endpoint do índice da seguinte maneira @ RestController
@ RequestMapping ( "/api/search" )
public class SearchEndpoint {
private Indexer indexer ;
@ Autowired
public SearchEndpoint ( Indexer indexer ) {
this . indexer = indexer ;
}
@ RequestMapping ( value = "/start_all" , method = RequestMethod . POST )
public void startAll () {
indexer . startAll ();
}
@ RequestMapping ( value = "/start/{entityClassName:.+}" , method = RequestMethod . POST )
public void start ( @ PathVariable ( "entityClassName" ) String entityClassName ) {
indexer . start ( entityClassName );
}
@ RequestMapping ( value = "/stop_all" , method = RequestMethod . PUT )
public Map < String , Boolean > stopAll () {
return indexer . stopAll ();
}
@ RequestMapping ( value = "/stop/{entityClassName:.+}" , method = RequestMethod . PUT )
public Boolean stop ( @ PathVariable ( "entityClassName" ) String entityClassName ) {
return indexer . stop ( entityClassName );
}
@ RequestMapping ( value = "/progress" , method = RequestMethod . GET )
public Map < String , IndexProgress > getProgress () {
return indexer . getProgress ();
}
}
Use as anotações relevantes no pacote com.timeyang.jkes.core.annotation
para marcar entidades
@ lombok . Data
@ Entity
@ Document
public class Person extends AuditedEntity {
// @Id will be identified automatically
// @Field(type = FieldType.Long)
@ Id
@ GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
@ MultiFields (
mainField = @ Field ( type = FieldType . Text ),
otherFields = {
@ InnerField ( suffix = "raw" , type = FieldType . Keyword ),
@ InnerField ( suffix = "english" , type = FieldType . Text , analyzer = "english" )
}
)
private String name ;
@ Field ( type = FieldType . Keyword )
private String gender ;
@ Field ( type = FieldType . Integer )
private Integer age ;
// don't add @Field to test whether ignored
// @Field(type = FieldType.Text)
private String description ;
@ Field ( type = FieldType . Object )
@ ManyToOne ( fetch = FetchType . EAGER )
@ JoinColumn ( name = "group_id" )
private PersonGroup personGroup ;
}
@ lombok . Data
@ Entity
@ Document ( type = "person_group" , alias = "person_group_alias" )
public class PersonGroup extends AuditedEntity {
@ Id
@ GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
private String name ;
private String interests ;
@ OneToMany ( fetch = FetchType . EAGER , cascade = CascadeType . ALL , mappedBy = "personGroup" , orphanRemoval = true )
private List < Person > persons ;
private String description ;
@ DocumentId
@ Field ( type = FieldType . Long )
public Long getId () {
return id ;
}
@ MultiFields (
mainField = @ Field ( type = FieldType . Text ),
otherFields = {
@ InnerField ( suffix = "raw" , type = FieldType . Keyword ),
@ InnerField ( suffix = "english" , type = FieldType . Text , analyzer = "english" )
}
)
public String getName () {
return name ;
}
@ Field ( type = FieldType . Text )
public String getInterests () {
return interests ;
}
@ Field ( type = FieldType . Nested )
public List < Person > getPersons () {
return persons ;
}
/**
* 不加Field注解,测试序列化时是否忽略
*/
public String getDescription () {
return description ;
}
}
Quando uma entidade é atualizada, o documento é automaticamente indexado no ElasticSearch; quando uma entidade é excluída, o documento é automaticamente excluído do ElasticSearch;
Inicie o serviço de pesquisa jkes-search-service O serviço de pesquisa é um aplicativo Spring Boot que fornece API de pesquisa restante e é executado na porta 9000 por padrão.
curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search?from=3&size=10
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"nested": {
"path": "persons",
"score_mode": "avg",
"query": {
"bool": {
"must": [
{
"range": {
"persons.age": {
"gt": 5
}
}
}
]
}
}
}
}
}
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"match": {
"interests": "Hadoop"
}
}
}
{
"query": {
"bool" : {
"must" : {
"match" : { "interests" : "Hadoop" }
},
"filter": {
"term" : { "name.raw" : "name0" }
},
"should" : [
{ "match" : { "interests" : "Flink" } },
{
"nested" : {
"path" : "persons",
"score_mode" : "avg",
"query" : {
"bool" : {
"must" : [
{ "match" : {"persons.name" : "name40"} },
{ "match" : {"persons.interests" : "interests"} }
],
"must_not" : {
"range" : {
"age" : { "gte" : 50, "lte" : 60 }
}
}
}
}
}
}
],
"minimum_should_match" : 1,
"boost" : 1.0
}
}
}
integration_test_person_group/person_group/_search
{
"_source": false,
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"_source": {
"includes": [ "name", "persons.*" ],
"excludes": [ "date*", "version", "persons.age" ]
},
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"prefix" : { "name" : "name" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"wildcard" : { "name" : "name*" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"regexp":{
"name": "na.*17"
}
}
}
Como funciona a indexação:
@Document
e cria metadados para elas.index
e mapping
no formato Json e, em seguida, crie/atualize a configuração index
por meio do ElasticSearch Java Rest Client
.Kafka ElasticSearch Connector
para cada documento, usado para criar/atualizar documentosJkes Deleter Connector
para todo o projeto, para exclusão de documentos* save(*)
como SaveEvent
e salve-os no EventContainer
; use os parâmetros do método (* delete*(..)
para gerar um DeleteEvent/DeleteAllEvent
e salve-os no EventContainer
.JkesKafkaProducer
para enviar as entidades SaveEvent
para Kafka, usará JkesJsonSerializer
que fornecemos para serializar os dados especificados e, em seguida, enviá-los para Kafka.SaveEvent
, DeleteEvent
será serializado diretamente e enviado ao Kafka em vez de apenas enviar uma cópia dos dados.SaveEvent
e DeleteEvent
, DeleteAllEvent
não envia dados para Kafka, mas exclui diretamente o index
correspondente por meio ElasticSearch Java Rest Client
, depois reconstrói o índice e reinicia Kafka ElasticSearch Connector
Como funciona a consulta:
json
, executa algum pré-processamento e a encaminha para ElasticSearch usando ElasticSearch Java Rest Client
, analisa a resposta e a retorna ao cliente após processamento adicional. jkes-core
é a parte central de todo o jkes
. Inclui principalmente as seguintes funções:
annotation
fornece as anotações principais do jkeselasticsearch
encapsula operações relacionadas elasticsearch
, como criação/atualização de índices para todos os documentos e atualização de mapeamentokafka
fornece Kafka Producer, Kafka Json Serializer, Kafka Connect Clientmetadata
fornece construção de metadados de anotação básica e modelos estruturados.event
fornece modelos de eventos e contêineresexception
fornece exceções Jkes comunshttp
encapsula solicitações http json comuns baseadas no Apache Http Client
support
expõe o suporte à configuração principal do Jkesutil
fornece algumas classes de ferramentas para facilitar o desenvolvimento. Tais como: Afirmações, ClassUtils, DocumentUtils, IOUtils, JsonUtils, ReflectionUtils, StringUtils jkes-boot
é usado para integração com algumas estruturas de código aberto de terceiros.
Atualmente, fornecemos integração com spring data jpa
por meio de jkes-spring-data-jpa
. Usando o mecanismo AOP do Spring, o método Repository
é interceptado e SaveEvent/DeleteEvent/DeleteAllEvent
é gerado e salvo em EventContainer
. Ao usar SearchPlatformTransactionManager
que fornecemos, agrupamos gerenciadores de transações comumente usados (como JpaTransactionManager
) para fornecer funções de interceptação de transações.
Nas versões subsequentes, forneceremos integração com mais frameworks.
Descrição de jkes-spring-data-jpa
:
ContextSupport
é usada para obter Repository Bean
da fábrica de beans@EnableJkes
permite que os clientes habilitem facilmente a funcionalidade Jkes e fornece um modelo de configuração consistente com SpringEventSupport
lida com os detalhes dos eventos, gera eventos correspondentes e os armazena em EventContainer
ao salvar e excluir dados e processa eventos correspondentes quando as transações são confirmadas e revertidas.SearchPlatformTransactionManager
envolve o gerenciador de transações do cliente e adiciona回调hook
quando as transações são confirmadas e revertidas.audit
fornece uma classe pai AuditedEntity
simples para facilitar a adição de funções de auditoria. As informações de versão podem ser usadas em conjunto com o mecanismo de versão do ElasticSearch
para garantir que os dados do documento expirado não serão indexados.exception
encapsula exceções comunsintercept
fornece pontos e aspectos AOPindex
fornece funcionalidade全量索引
. Atualmente, fornecemos um mecanismo de indexação baseado em线程池
e um mecanismo de indexação baseado em ForkJoin
. Nas versões subsequentes, refatoraremos o código e adicionaremos um modelo生产者-消费者
baseado no阻塞队列
para fornecer desempenho de simultaneidade. jkes-services
é usado principalmente para fornecer alguns serviços. Atualmente, jkes-services
fornece os seguintes serviços:
jkes-delete-connector
jkes-delete-connector
é um Kafka Connector
usado para obter eventos de exclusão de índice ( DeleteEvent
) do cluster kafka e, em seguida, usar Jest Client
para excluir os documentos correspondentes no ElasticSearch.
Com a ajuda da API rest admin do Kafka Connect, implementamos facilmente a função de exclusão de documentos em uma plataforma multilocatário. Basta iniciar um jkes-delete-connector
para cada projeto e a exclusão de documentos desse projeto será tratada automaticamente. Isso evita que toda vez que iniciarmos um novo projeto, tenhamos que iniciar manualmente um Kafka Consumer para tratar da exclusão do documento do projeto. Embora este tipo de trabalho possa ser reduzido através de assinaturas regulares, ainda é muito inflexível.
jkes-search-service
jkes-search-service
é um serviço de pesquisa tranquilo que fornece várias versões da API de consulta restante. O serviço de consulta fornece API multiversão para evolução e compatibilidade de APIjkes-search-service
atualmente suporta pesquisa de estilo URI e pesquisa de estilo de corpo de solicitação JSON.json
, executa algum pré-processamento e a encaminha para ElasticSearch usando ElasticSearch Java Rest Client
, analisa a resposta e a retorna ao cliente após processamento adicional. No futuro, construiremos um cluster de índice baseado no zookeeper
para fornecer funções de gerenciamento de índice de cluster.
jkes-integration-test
é um projeto de teste de integração do Spring Boot para功能测试
. Meça também吞吐率
de algumas operações comuns
Para construir uma versão de desenvolvimento, você precisará de uma versão recente do Kafka. Você pode construir jkes com Maven usando as fases padrão do ciclo de vida.
Este projeto está licenciado sob a Licença Apache 2.0.