Jkes est un framework de recherche basé sur Java, Kafka et ElasticSearch. Jkes fournit un mappage objet/document de style JPA basé sur des annotations, en utilisant l'API REST pour la recherche de documents.
Vous pouvez vous référer au projet jkes-integration-test
pour apprendre rapidement à utiliser le framework jkes. jkes-integration-test
est une application Spring Boot que nous utilisons pour tester l'intégrité fonctionnelle.
jkes-index-connector
et jkes-delete-connector
sur le chemin de classe Kafka Connectsudo bin/elasticsearch-plugin install analysis-smartcn
@ EnableAspectJAutoProxy
@ EnableJkes
@ Configuration
public class JkesConfig {
@ Bean
public PlatformTransactionManager transactionManager ( EntityManagerFactory factory , EventSupport eventSupport ) {
return new SearchPlatformTransactionManager ( new JpaTransactionManager ( factory ), eventSupport );
}
}
@ Component
@ Configuration
public class JkesConf extends DefaultJkesPropertiesImpl {
@ PostConstruct
public void setUp () {
Config . setJkesProperties ( this );
}
@ Override
public String getKafkaBootstrapServers () {
return "k1-test.com:9292,k2-test.com:9292,k3-test.com:9292" ;
}
@ Override
public String getKafkaConnectServers () {
return "http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084" ;
}
@ Override
public String getEsBootstrapServers () {
return "http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200" ;
}
@ Override
public String getDocumentBasePackage () {
return "com.timeyang.jkes.integration_test.domain" ;
}
@ Override
public String getClientId () {
return "integration_test" ;
}
}
Cela peut être très flexible si vous utilisez Spring Boot, vous pouvez utiliser @ConfigurationProperties
pour fournir la configuration.
Spring MVC
, vous pouvez ajouter le point de terminaison d'index comme suit @ RestController
@ RequestMapping ( "/api/search" )
public class SearchEndpoint {
private Indexer indexer ;
@ Autowired
public SearchEndpoint ( Indexer indexer ) {
this . indexer = indexer ;
}
@ RequestMapping ( value = "/start_all" , method = RequestMethod . POST )
public void startAll () {
indexer . startAll ();
}
@ RequestMapping ( value = "/start/{entityClassName:.+}" , method = RequestMethod . POST )
public void start ( @ PathVariable ( "entityClassName" ) String entityClassName ) {
indexer . start ( entityClassName );
}
@ RequestMapping ( value = "/stop_all" , method = RequestMethod . PUT )
public Map < String , Boolean > stopAll () {
return indexer . stopAll ();
}
@ RequestMapping ( value = "/stop/{entityClassName:.+}" , method = RequestMethod . PUT )
public Boolean stop ( @ PathVariable ( "entityClassName" ) String entityClassName ) {
return indexer . stop ( entityClassName );
}
@ RequestMapping ( value = "/progress" , method = RequestMethod . GET )
public Map < String , IndexProgress > getProgress () {
return indexer . getProgress ();
}
}
Utilisez les annotations pertinentes sous le package com.timeyang.jkes.core.annotation
pour marquer les entités
@ lombok . Data
@ Entity
@ Document
public class Person extends AuditedEntity {
// @Id will be identified automatically
// @Field(type = FieldType.Long)
@ Id
@ GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
@ MultiFields (
mainField = @ Field ( type = FieldType . Text ),
otherFields = {
@ InnerField ( suffix = "raw" , type = FieldType . Keyword ),
@ InnerField ( suffix = "english" , type = FieldType . Text , analyzer = "english" )
}
)
private String name ;
@ Field ( type = FieldType . Keyword )
private String gender ;
@ Field ( type = FieldType . Integer )
private Integer age ;
// don't add @Field to test whether ignored
// @Field(type = FieldType.Text)
private String description ;
@ Field ( type = FieldType . Object )
@ ManyToOne ( fetch = FetchType . EAGER )
@ JoinColumn ( name = "group_id" )
private PersonGroup personGroup ;
}
@ lombok . Data
@ Entity
@ Document ( type = "person_group" , alias = "person_group_alias" )
public class PersonGroup extends AuditedEntity {
@ Id
@ GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
private String name ;
private String interests ;
@ OneToMany ( fetch = FetchType . EAGER , cascade = CascadeType . ALL , mappedBy = "personGroup" , orphanRemoval = true )
private List < Person > persons ;
private String description ;
@ DocumentId
@ Field ( type = FieldType . Long )
public Long getId () {
return id ;
}
@ MultiFields (
mainField = @ Field ( type = FieldType . Text ),
otherFields = {
@ InnerField ( suffix = "raw" , type = FieldType . Keyword ),
@ InnerField ( suffix = "english" , type = FieldType . Text , analyzer = "english" )
}
)
public String getName () {
return name ;
}
@ Field ( type = FieldType . Text )
public String getInterests () {
return interests ;
}
@ Field ( type = FieldType . Nested )
public List < Person > getPersons () {
return persons ;
}
/**
* 不加Field注解,测试序列化时是否忽略
*/
public String getDescription () {
return description ;
}
}
Lorsqu'une entité est mise à jour, le document est automatiquement indexé dans ElasticSearch ; lorsqu'une entité est supprimée, le document est automatiquement supprimé d'ElasticSearch.
Démarrez le service de recherche jkes-search-service. Le service de recherche est une application Spring Boot qui fournit une API de recherche de repos et s'exécute par défaut sur le port 9000.
curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search?from=3&size=10
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"nested": {
"path": "persons",
"score_mode": "avg",
"query": {
"bool": {
"must": [
{
"range": {
"persons.age": {
"gt": 5
}
}
}
]
}
}
}
}
}
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"match": {
"interests": "Hadoop"
}
}
}
{
"query": {
"bool" : {
"must" : {
"match" : { "interests" : "Hadoop" }
},
"filter": {
"term" : { "name.raw" : "name0" }
},
"should" : [
{ "match" : { "interests" : "Flink" } },
{
"nested" : {
"path" : "persons",
"score_mode" : "avg",
"query" : {
"bool" : {
"must" : [
{ "match" : {"persons.name" : "name40"} },
{ "match" : {"persons.interests" : "interests"} }
],
"must_not" : {
"range" : {
"age" : { "gte" : 50, "lte" : 60 }
}
}
}
}
}
}
],
"minimum_should_match" : 1,
"boost" : 1.0
}
}
}
integration_test_person_group/person_group/_search
{
"_source": false,
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"_source": {
"includes": [ "name", "persons.*" ],
"excludes": [ "date*", "version", "persons.age" ]
},
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"prefix" : { "name" : "name" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"wildcard" : { "name" : "name*" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"regexp":{
"name": "na.*17"
}
}
}
Comment fonctionne l'indexation :
@Document
et crée des métadonnées pour elles.index
et mapping
au format Json, puis créez/mettez à jour la configuration index
via ElasticSearch Java Rest Client
.Kafka ElasticSearch Connector
pour chaque document, utilisé pour créer/mettre à jour des documentsJkes Deleter Connector
pour l'ensemble du projet, pour supprimer des documents* save(*)
en tant que SaveEvent
et enregistrez-les dans EventContainer
; utilisez les paramètres de la méthode (* delete*(..)
pour générer un DeleteEvent/DeleteAllEvent
et enregistrez-le dans EventContainer
.JkesKafkaProducer
pour envoyer les entités SaveEvent
à Kafka. Kafka utilisera JkesJsonSerializer
que nous avons fourni pour sérialiser les données spécifiées, puis les enverra à Kafka.SaveEvent
, DeleteEvent
sera sérialisé directement puis envoyé à Kafka au lieu de simplement envoyer une copie des données.SaveEvent
et DeleteEvent
, DeleteAllEvent
n'envoie pas de données à Kafka, mais supprime directement l' index
correspondant via ElasticSearch Java Rest Client
, puis reconstruit l'index et redémarre Kafka ElasticSearch Connector
Comment fonctionne la requête :
json
, effectue un prétraitement et la transmet à ElasticSearch à l'aide ElasticSearch Java Rest Client
, analyse la réponse et la renvoie au client après un traitement ultérieur. jkes-core
est la partie centrale de l'ensemble jkes
. Comprend principalement les fonctions suivantes :
annotation
fournit les annotations de base de jkeselasticsearch
encapsule les opérations liées à elasticsearch
, telles que la création/mise à jour d'index pour tous les documents et la mise à jour du mappage.kafka
fournit Kafka Producer, Kafka Json Serializer et Kafka Connect Client.metadata
fournit la construction de métadonnées d'annotation de base et des modèles structurés.event
fournit des modèles d'événements et des conteneursexception
fournit des exceptions Jkes couranteshttp
encapsule les requêtes http json courantes basées sur Apache Http Client
support
expose la prise en charge de la configuration de base de Jkesutil
fournit quelques classes d’outils pour faciliter le développement. Tels que : Asserts, ClassUtils, DocumentUtils, IOUtils, JsonUtils, ReflectionUtils, StringUtils jkes-boot
est utilisé pour s'intégrer à certains frameworks open source tiers.
Actuellement, nous proposons l'intégration avec spring data jpa
via jkes-spring-data-jpa
. En utilisant le mécanisme AOP de Spring, la méthode Repository
est interceptée et SaveEvent/DeleteEvent/DeleteAllEvent
est générée et enregistrée dans EventContainer
. En utilisant SearchPlatformTransactionManager
que nous fournissons, nous encapsulons les gestionnaires de transactions couramment utilisés (tels que JpaTransactionManager
) pour fournir des fonctions d'interception de transactions.
Dans les versions ultérieures, nous proposerons l’intégration avec davantage de frameworks.
Description jkes-spring-data-jpa
:
ContextSupport
est utilisée pour obtenir Repository Bean
de l'usine de bean@EnableJkes
permet aux clients d'activer facilement la fonctionnalité Jkes et fournit un modèle de configuration cohérent avec SpringEventSupport
gère les détails des événements, génère les événements correspondants et les stocke dans EventContainer
lors de l'enregistrement et de la suppression de données, et traite les événements correspondants lorsque les transactions sont validées et annulées.SearchPlatformTransactionManager
enveloppe le gestionnaire de transactions du client et ajoute回调hook
lorsque les transactions sont validées et annulées.audit
fournit une classe parent AuditedEntity
simple pour faciliter l'ajout de fonctions d'audit. Les informations de version peuvent être utilisées conjointement avec le mécanisme de version d' ElasticSearch
pour garantir que les données de documents expirées ne seront pas indexées.exception
encapsule les exceptions courantesintercept
fournit des points de coupe et des aspects AOPindex
fournit une fonctionnalité全量索引
. Actuellement, nous fournissons un mécanisme d'indexation basé sur线程池
et un mécanisme d'indexation basé sur ForkJoin
. Dans les versions ultérieures, nous refactoriserons le code et ajouterons un modèle生产者-消费者
basé sur阻塞队列
pour fournir des performances de concurrence. jkes-services
est principalement utilisé pour fournir certains services. Actuellement, jkes-services
fournit les services suivants :
jkes-delete-connector
jkes-delete-connector
est un Kafka Connector
utilisé pour obtenir des événements de suppression d'index ( DeleteEvent
) du cluster kafka, puis utiliser Jest Client
pour supprimer les documents correspondants dans ElasticSearch.
Avec l'aide de l'API rest admin de Kafka Connect, nous implémentons facilement la fonction de suppression de documents sur une plateforme multi-tenant. Démarrez simplement un jkes-delete-connector
pour chaque projet et la suppression des documents pour ce projet sera automatiquement gérée. Cela évite d'avoir à démarrer manuellement un consommateur Kafka pour gérer la suppression des documents du projet à chaque fois que nous démarrons un nouveau projet. Bien que ce type de travail puisse être réduit grâce à un abonnement régulier, il reste très peu flexible.
jkes-search-service
jkes-search-service
est un service de recherche reposant qui fournit plusieurs versions de l'API de requête repos. Le service de requête fournit une API multiversion pour l'évolution et la compatibilité des APIjkes-search-service
prend actuellement en charge la recherche de style URI et la recherche de style de corps de requête JSON.json
, effectue un prétraitement et la transmet à ElasticSearch à l'aide ElasticSearch Java Rest Client
, analyse la réponse et la renvoie au client après un traitement ultérieur. À l'avenir, nous créerons un cluster d'index basé sur zookeeper
pour fournir des fonctions de gestion d'index de cluster.
jkes-integration-test
est un projet de test d'intégration Spring Boot pour功能测试
. Mesurez également吞吐率
de certaines opérations courantes
Pour créer une version de développement, vous aurez besoin d'une version récente de Kafka. Vous pouvez créer des jkes avec Maven en utilisant les phases de cycle de vie standard.
Ce projet est sous licence Apache License 2.0.