Jkes es un marco de búsqueda basado en Java, Kafka y ElasticSearch. Jkes proporciona mapeo de objetos/documentos estilo JPA basado en anotaciones, utilizando la API REST para la búsqueda de documentos.
Puede consultar el proyecto jkes-integration-test
para aprender rápidamente cómo utilizar el marco jkes. jkes-integration-test
es una aplicación Spring Boot que utilizamos para probar la integridad funcional.
jkes-index-connector
y jkes-delete-connector
en la ruta de clase de Kafka Connectsudo bin/elasticsearch-plugin install analysis-smartcn
@ EnableAspectJAutoProxy
@ EnableJkes
@ Configuration
public class JkesConfig {
@ Bean
public PlatformTransactionManager transactionManager ( EntityManagerFactory factory , EventSupport eventSupport ) {
return new SearchPlatformTransactionManager ( new JpaTransactionManager ( factory ), eventSupport );
}
}
@ Component
@ Configuration
public class JkesConf extends DefaultJkesPropertiesImpl {
@ PostConstruct
public void setUp () {
Config . setJkesProperties ( this );
}
@ Override
public String getKafkaBootstrapServers () {
return "k1-test.com:9292,k2-test.com:9292,k3-test.com:9292" ;
}
@ Override
public String getKafkaConnectServers () {
return "http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084" ;
}
@ Override
public String getEsBootstrapServers () {
return "http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200" ;
}
@ Override
public String getDocumentBasePackage () {
return "com.timeyang.jkes.integration_test.domain" ;
}
@ Override
public String getClientId () {
return "integration_test" ;
}
}
Esto puede ser muy flexible. Si usa Spring Boot, puede usar @ConfigurationProperties
para proporcionar configuración.
Spring MVC
, puede agregar el punto final del índice de la siguiente manera @ RestController
@ RequestMapping ( "/api/search" )
public class SearchEndpoint {
private Indexer indexer ;
@ Autowired
public SearchEndpoint ( Indexer indexer ) {
this . indexer = indexer ;
}
@ RequestMapping ( value = "/start_all" , method = RequestMethod . POST )
public void startAll () {
indexer . startAll ();
}
@ RequestMapping ( value = "/start/{entityClassName:.+}" , method = RequestMethod . POST )
public void start ( @ PathVariable ( "entityClassName" ) String entityClassName ) {
indexer . start ( entityClassName );
}
@ RequestMapping ( value = "/stop_all" , method = RequestMethod . PUT )
public Map < String , Boolean > stopAll () {
return indexer . stopAll ();
}
@ RequestMapping ( value = "/stop/{entityClassName:.+}" , method = RequestMethod . PUT )
public Boolean stop ( @ PathVariable ( "entityClassName" ) String entityClassName ) {
return indexer . stop ( entityClassName );
}
@ RequestMapping ( value = "/progress" , method = RequestMethod . GET )
public Map < String , IndexProgress > getProgress () {
return indexer . getProgress ();
}
}
Utilice las anotaciones relevantes en el paquete com.timeyang.jkes.core.annotation
para marcar entidades
@ lombok . Data
@ Entity
@ Document
public class Person extends AuditedEntity {
// @Id will be identified automatically
// @Field(type = FieldType.Long)
@ Id
@ GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
@ MultiFields (
mainField = @ Field ( type = FieldType . Text ),
otherFields = {
@ InnerField ( suffix = "raw" , type = FieldType . Keyword ),
@ InnerField ( suffix = "english" , type = FieldType . Text , analyzer = "english" )
}
)
private String name ;
@ Field ( type = FieldType . Keyword )
private String gender ;
@ Field ( type = FieldType . Integer )
private Integer age ;
// don't add @Field to test whether ignored
// @Field(type = FieldType.Text)
private String description ;
@ Field ( type = FieldType . Object )
@ ManyToOne ( fetch = FetchType . EAGER )
@ JoinColumn ( name = "group_id" )
private PersonGroup personGroup ;
}
@ lombok . Data
@ Entity
@ Document ( type = "person_group" , alias = "person_group_alias" )
public class PersonGroup extends AuditedEntity {
@ Id
@ GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
private String name ;
private String interests ;
@ OneToMany ( fetch = FetchType . EAGER , cascade = CascadeType . ALL , mappedBy = "personGroup" , orphanRemoval = true )
private List < Person > persons ;
private String description ;
@ DocumentId
@ Field ( type = FieldType . Long )
public Long getId () {
return id ;
}
@ MultiFields (
mainField = @ Field ( type = FieldType . Text ),
otherFields = {
@ InnerField ( suffix = "raw" , type = FieldType . Keyword ),
@ InnerField ( suffix = "english" , type = FieldType . Text , analyzer = "english" )
}
)
public String getName () {
return name ;
}
@ Field ( type = FieldType . Text )
public String getInterests () {
return interests ;
}
@ Field ( type = FieldType . Nested )
public List < Person > getPersons () {
return persons ;
}
/**
* 不加Field注解,测试序列化时是否忽略
*/
public String getDescription () {
return description ;
}
}
Cuando se actualiza una entidad, el documento se indexa automáticamente en ElasticSearch; cuando se elimina una entidad, el documento se elimina automáticamente de ElasticSearch.
Inicie el servicio de búsqueda jkes-search-service. El servicio de búsqueda es una aplicación Spring Boot que proporciona API de búsqueda en reposo y se ejecuta en el puerto 9000 de forma predeterminada.
curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search?from=3&size=10
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"nested": {
"path": "persons",
"score_mode": "avg",
"query": {
"bool": {
"must": [
{
"range": {
"persons.age": {
"gt": 5
}
}
}
]
}
}
}
}
}
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"match": {
"interests": "Hadoop"
}
}
}
{
"query": {
"bool" : {
"must" : {
"match" : { "interests" : "Hadoop" }
},
"filter": {
"term" : { "name.raw" : "name0" }
},
"should" : [
{ "match" : { "interests" : "Flink" } },
{
"nested" : {
"path" : "persons",
"score_mode" : "avg",
"query" : {
"bool" : {
"must" : [
{ "match" : {"persons.name" : "name40"} },
{ "match" : {"persons.interests" : "interests"} }
],
"must_not" : {
"range" : {
"age" : { "gte" : 50, "lte" : 60 }
}
}
}
}
}
}
],
"minimum_should_match" : 1,
"boost" : 1.0
}
}
}
integration_test_person_group/person_group/_search
{
"_source": false,
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"_source": {
"includes": [ "name", "persons.*" ],
"excludes": [ "date*", "version", "persons.age" ]
},
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"prefix" : { "name" : "name" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"wildcard" : { "name" : "name*" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"regexp":{
"name": "na.*17"
}
}
}
Cómo funciona la indexación:
@Document
y crea metadatos para ellas.index
y mapping
en formato Json y luego cree/actualice la configuración index
a través de ElasticSearch Java Rest Client
.Kafka ElasticSearch Connector
para cada documento, utilizado para crear/actualizar documentosJkes Deleter Connector
para todo el proyecto, para eliminar documentos* save(*)
como SaveEvent
y guárdelos en EventContainer
; use los parámetros del método (* delete*(..)
para generar un DeleteEvent/DeleteAllEvent
y guárdelo en EventContainer
.JkesKafkaProducer
para enviar las entidades en SaveEvent
a Kafka. Kafka usará JkesJsonSerializer
que proporcionamos para serializar los datos especificados y luego enviarlos a Kafka.SaveEvent
, DeleteEvent
se serializará directamente y luego se enviará a Kafka en lugar de simplemente enviar una copia de los datos.SaveEvent
y DeleteEvent
, DeleteAllEvent
no envía datos a Kafka, sino que elimina directamente el index
correspondiente a través ElasticSearch Java Rest Client
, luego reconstruye el índice y reinicia Kafka ElasticSearch Connector
Cómo funciona la consulta:
json
, realiza algún preprocesamiento y la reenvía a ElasticSearch utilizando ElasticSearch Java Rest Client
, analiza la respuesta y la devuelve al cliente después de un procesamiento adicional. jkes-core
es la parte central de todo jkes
. Incluye principalmente las siguientes funciones:
annotation
proporciona las anotaciones principales de jkes.elasticsearch
encapsula operaciones relacionadas con elasticsearch
, como crear/actualizar índices para todos los documentos y actualizar mapeos.kafka
proporciona Kafka Producer, Kafka Json Serializer y Kafka Connect Client.metadata
proporciona construcción de metadatos de anotaciones centrales y modelos estructurados.event
proporciona modelos y contenedores de eventos.exception
proporciona excepciones Jkes comunes.http
encapsula solicitudes http json comunes basadas en Apache Http Client
support
expone el soporte de configuración principal de Jkes.util
proporciona algunas clases de herramientas para facilitar el desarrollo. Tales como: Asserts, ClassUtils, DocumentUtils, IOUtils, JsonUtils, ReflectionUtils, StringUtils jkes-boot
se utiliza para integrarse con algunos marcos de código abierto de terceros.
Actualmente, proporcionamos integración con spring data jpa
a través de jkes-spring-data-jpa
. Al utilizar el mecanismo AOP de Spring, el método Repository
se intercepta y SaveEvent/DeleteEvent/DeleteAllEvent
se genera y guarda en EventContainer
. Al utilizar SearchPlatformTransactionManager
que proporcionamos, envolvemos administradores de transacciones de uso común (como JpaTransactionManager
) para proporcionar funciones de interceptación de transacciones.
En versiones posteriores, proporcionaremos integración con más marcos.
Descripción de jkes-spring-data-jpa
:
ContextSupport
se utiliza para obtener Repository Bean
de la fábrica de beans.@EnableJkes
permite a los clientes habilitar fácilmente la funcionalidad Jkes y proporciona un modelo de configuración consistente con SpringEventSupport
maneja los detalles de los eventos, genera los eventos correspondientes y los almacena en EventContainer
al guardar y eliminar datos, y procesa los eventos correspondientes cuando las transacciones se confirman y revierten.SearchPlatformTransactionManager
envuelve el administrador de transacciones del cliente y agrega回调hook
cuando las transacciones se confirman y revierten.audit
proporciona una clase principal AuditedEntity
simple para facilitar la adición de funciones de auditoría. La información de la versión se puede utilizar junto con el mecanismo de versión de ElasticSearch
para garantizar que los datos de los documentos caducados no se indexen.exception
encapsula excepciones comunes.intercept
proporciona puntos de corte y aspectos de AOP.index
proporciona una funcionalidad全量索引
. Actualmente, proporcionamos un mecanismo de indexación basado en线程池
y un mecanismo de indexación basado en ForkJoin
. En versiones posteriores, refactorizaremos el código y agregaremos un modelo生产者-消费者
basado en阻塞队列
para proporcionar rendimiento de concurrencia. jkes-services
se utiliza principalmente para proporcionar algunos servicios. Actualmente, jkes-services
proporciona los siguientes servicios:
jkes-delete-connector
jkes-delete-connector
es un Kafka Connector
que se utiliza para obtener eventos de eliminación de índice ( DeleteEvent
) del clúster Kafka y luego usar Jest Client
para eliminar los documentos correspondientes en ElasticSearch.
Con la ayuda de la API de administración de descanso de Kafka Connect, implementamos fácilmente la función de eliminación de documentos en una plataforma multiinquilino. Simplemente inicie un jkes-delete-connector
para cada proyecto y la eliminación de documentos para ese proyecto se manejará automáticamente. Esto evita tener que iniciar manualmente un consumidor de Kafka para manejar la eliminación de documentos del proyecto cada vez que iniciamos un nuevo proyecto. Aunque este tipo de trabajo se puede reducir mediante una suscripción regular, sigue siendo muy inflexible.
jkes-search-service
jkes-search-service
es un servicio de búsqueda relajante que proporciona múltiples versiones de la API de consultas de descanso. El servicio de consulta proporciona API de múltiples versiones para la evolución y compatibilidad de la APIjkes-search-service
actualmente admite la búsqueda de estilo URI y la búsqueda de estilo de cuerpo de solicitud JSON.json
, realiza algún preprocesamiento y la reenvía a ElasticSearch utilizando ElasticSearch Java Rest Client
, analiza la respuesta y la devuelve al cliente después de un procesamiento adicional. En el futuro, crearemos un grupo de índices basado en zookeeper
para proporcionar funciones de gestión de índices de grupos.
jkes-integration-test
es un proyecto de prueba de integración Spring Boot para功能测试
. Mida también吞吐率
de algunas operaciones comunes.
Para crear una versión de desarrollo, necesitará una versión reciente de Kafka. Puede crear jkes con Maven utilizando las fases del ciclo de vida estándar.
Este proyecto tiene la licencia Apache 2.0.