Jkes ist ein Suchframework, das auf Java, Kafka und ElasticSearch basiert. Jkes bietet eine annotationsgesteuerte Objekt-/Dokumentzuordnung im JPA-Stil und verwendet die REST-API für die Dokumentensuche.
Sie können sich auf das Projekt jkes-integration-test
beziehen, um schnell zu lernen, wie Sie das jkes-Framework verwenden. jkes-integration-test
ist eine Spring Boot-Anwendung, die wir zum Testen der Funktionsintegrität verwenden.
jkes-index-connector
und jkes-delete-connector
im Kafka Connect-Klassenpfadsudo bin/elasticsearch-plugin install analysis-smartcn
@ EnableAspectJAutoProxy
@ EnableJkes
@ Configuration
public class JkesConfig {
@ Bean
public PlatformTransactionManager transactionManager ( EntityManagerFactory factory , EventSupport eventSupport ) {
return new SearchPlatformTransactionManager ( new JpaTransactionManager ( factory ), eventSupport );
}
}
@ Component
@ Configuration
public class JkesConf extends DefaultJkesPropertiesImpl {
@ PostConstruct
public void setUp () {
Config . setJkesProperties ( this );
}
@ Override
public String getKafkaBootstrapServers () {
return "k1-test.com:9292,k2-test.com:9292,k3-test.com:9292" ;
}
@ Override
public String getKafkaConnectServers () {
return "http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084" ;
}
@ Override
public String getEsBootstrapServers () {
return "http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200" ;
}
@ Override
public String getDocumentBasePackage () {
return "com.timeyang.jkes.integration_test.domain" ;
}
@ Override
public String getClientId () {
return "integration_test" ;
}
}
Dies kann sehr flexibel sein, wenn Sie Spring Boot verwenden, können Sie @ConfigurationProperties
zur Konfiguration verwenden
Spring MVC
können Sie beispielsweise den Indexendpunkt wie folgt hinzufügen @ RestController
@ RequestMapping ( "/api/search" )
public class SearchEndpoint {
private Indexer indexer ;
@ Autowired
public SearchEndpoint ( Indexer indexer ) {
this . indexer = indexer ;
}
@ RequestMapping ( value = "/start_all" , method = RequestMethod . POST )
public void startAll () {
indexer . startAll ();
}
@ RequestMapping ( value = "/start/{entityClassName:.+}" , method = RequestMethod . POST )
public void start ( @ PathVariable ( "entityClassName" ) String entityClassName ) {
indexer . start ( entityClassName );
}
@ RequestMapping ( value = "/stop_all" , method = RequestMethod . PUT )
public Map < String , Boolean > stopAll () {
return indexer . stopAll ();
}
@ RequestMapping ( value = "/stop/{entityClassName:.+}" , method = RequestMethod . PUT )
public Boolean stop ( @ PathVariable ( "entityClassName" ) String entityClassName ) {
return indexer . stop ( entityClassName );
}
@ RequestMapping ( value = "/progress" , method = RequestMethod . GET )
public Map < String , IndexProgress > getProgress () {
return indexer . getProgress ();
}
}
Verwenden Sie die relevanten Anmerkungen im Paket com.timeyang.jkes.core.annotation
, um Entitäten zu markieren
@ lombok . Data
@ Entity
@ Document
public class Person extends AuditedEntity {
// @Id will be identified automatically
// @Field(type = FieldType.Long)
@ Id
@ GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
@ MultiFields (
mainField = @ Field ( type = FieldType . Text ),
otherFields = {
@ InnerField ( suffix = "raw" , type = FieldType . Keyword ),
@ InnerField ( suffix = "english" , type = FieldType . Text , analyzer = "english" )
}
)
private String name ;
@ Field ( type = FieldType . Keyword )
private String gender ;
@ Field ( type = FieldType . Integer )
private Integer age ;
// don't add @Field to test whether ignored
// @Field(type = FieldType.Text)
private String description ;
@ Field ( type = FieldType . Object )
@ ManyToOne ( fetch = FetchType . EAGER )
@ JoinColumn ( name = "group_id" )
private PersonGroup personGroup ;
}
@ lombok . Data
@ Entity
@ Document ( type = "person_group" , alias = "person_group_alias" )
public class PersonGroup extends AuditedEntity {
@ Id
@ GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
private String name ;
private String interests ;
@ OneToMany ( fetch = FetchType . EAGER , cascade = CascadeType . ALL , mappedBy = "personGroup" , orphanRemoval = true )
private List < Person > persons ;
private String description ;
@ DocumentId
@ Field ( type = FieldType . Long )
public Long getId () {
return id ;
}
@ MultiFields (
mainField = @ Field ( type = FieldType . Text ),
otherFields = {
@ InnerField ( suffix = "raw" , type = FieldType . Keyword ),
@ InnerField ( suffix = "english" , type = FieldType . Text , analyzer = "english" )
}
)
public String getName () {
return name ;
}
@ Field ( type = FieldType . Text )
public String getInterests () {
return interests ;
}
@ Field ( type = FieldType . Nested )
public List < Person > getPersons () {
return persons ;
}
/**
* 不加Field注解,测试序列化时是否忽略
*/
public String getDescription () {
return description ;
}
}
Wenn eine Entität aktualisiert wird, wird das Dokument automatisch in ElasticSearch indiziert. Wenn eine Entität gelöscht wird, wird das Dokument automatisch aus ElasticSearch gelöscht.
Starten Sie den Suchdienst jkes-search-service. Der Suchdienst ist eine Spring Boot-Anwendung, die eine Restsuch-API bereitstellt und standardmäßig auf Port 9000 ausgeführt wird.
curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search?from=3&size=10
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"nested": {
"path": "persons",
"score_mode": "avg",
"query": {
"bool": {
"must": [
{
"range": {
"persons.age": {
"gt": 5
}
}
}
]
}
}
}
}
}
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"match": {
"interests": "Hadoop"
}
}
}
{
"query": {
"bool" : {
"must" : {
"match" : { "interests" : "Hadoop" }
},
"filter": {
"term" : { "name.raw" : "name0" }
},
"should" : [
{ "match" : { "interests" : "Flink" } },
{
"nested" : {
"path" : "persons",
"score_mode" : "avg",
"query" : {
"bool" : {
"must" : [
{ "match" : {"persons.name" : "name40"} },
{ "match" : {"persons.interests" : "interests"} }
],
"must_not" : {
"range" : {
"age" : { "gte" : 50, "lte" : 60 }
}
}
}
}
}
}
],
"minimum_should_match" : 1,
"boost" : 1.0
}
}
}
integration_test_person_group/person_group/_search
{
"_source": false,
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"_source": {
"includes": [ "name", "persons.*" ],
"excludes": [ "date*", "version", "persons.age" ]
},
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"prefix" : { "name" : "name" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"wildcard" : { "name" : "name*" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"regexp":{
"name": "na.*17"
}
}
}
So funktioniert die Indizierung:
@Document
annotierten Entitäten und erstellt Metadaten für sie.index
und mapping
im Json-Format und erstellen/aktualisieren Sie dann die index
über den ElasticSearch Java Rest Client
.Kafka ElasticSearch Connector
für jedes Dokument, der zum Erstellen/Aktualisieren von Dokumenten verwendet wirdJkes Deleter Connector
für das gesamte Projekt, um Dokumente zu löschen* save(*)
zurückgegebenen Daten als SaveEvent
und speichern Sie sie im EventContainer
. Verwenden Sie die Parameter der Methode (* delete*(..)
um ein DeleteEvent/DeleteAllEvent
zu generieren und im EventContainer
zu speichern.JkesKafkaProducer
um die Entitäten im SaveEvent
an Kafka zu senden. Kafka verwendet JkesJsonSerializer
um die angegebenen Daten zu serialisieren und sie dann an Kafka zu senden.SaveEvent
wird DeleteEvent
direkt serialisiert und dann an Kafka gesendet, anstatt nur eine Kopie der Daten zu senden.SaveEvent
und DeleteEvent
sendet DeleteAllEvent
keine Daten an Kafka, sondern löscht den entsprechenden index
direkt über ElasticSearch Java Rest Client
, erstellt dann den Index neu und startet Kafka ElasticSearch Connector
neuSo funktioniert die Abfrage:
json
-Anfrage, führt eine Vorverarbeitung durch und leitet sie mithilfe ElasticSearch Java Rest Client
an ElasticSearch weiter, analysiert die Antwort und gibt sie nach der weiteren Verarbeitung an den Client zurück. jkes-core
ist der Kernbestandteil des gesamten jkes
. Enthält hauptsächlich die folgenden Funktionen:
annotation
stellt die Kernannotationen von jkes bereitelasticsearch
Paket kapselt elasticsearch
bezogene Vorgänge, wie das Erstellen/Aktualisieren von Indizes für alle Dokumente und das Aktualisieren der Zuordnung.kafka
Paket bietet Kafka Producer, Kafka Json Serializer und Kafka Connect Clientmetadata
bietet die Erstellung zentraler Annotationsmetadaten und strukturierte Modelle.event
Paket stellt Event-Modelle und Container bereitexception
stellt allgemeine Jkes-Ausnahmen bereithttp
Paket kapselt gängige http-JSON-Anfragen basierend auf Apache Http Client
support
stellt die Jkes-Kernkonfigurationsunterstützung bereitutil
-Paket stellt einige Toolklassen zur Erleichterung der Entwicklung bereit. Zum Beispiel: Asserts, ClassUtils, DocumentUtils, IOUtils, JsonUtils, ReflectionUtils, StringUtils jkes-boot
wird zur Integration mit einigen Open-Source-Frameworks von Drittanbietern verwendet.
Derzeit bieten wir die Integration mit spring data jpa
über jkes-spring-data-jpa
an. Mithilfe des AOP-Mechanismus von Spring wird die Repository
-Methode abgefangen und SaveEvent/DeleteEvent/DeleteAllEvent
generiert und im EventContainer
gespeichert. Durch die Verwendung SearchPlatformTransactionManager
verpacken wir häufig verwendete Transaktionsmanager (z. B. JpaTransactionManager
), um Funktionen zum Abfangen von Transaktionen bereitzustellen.
In nachfolgenden Versionen werden wir die Integration mit weiteren Frameworks bereitstellen.
jkes-spring-data-jpa
Beschreibung:
ContextSupport
Klasse wird verwendet, um Repository Bean
von der Bean-Factory abzurufen@EnableJkes
ermöglicht Clients die einfache Aktivierung der Jkes-Funktionalität und bietet ein mit Spring konsistentes KonfigurationsmodellEventSupport
verarbeitet die Details von Ereignissen, generiert entsprechende Ereignisse und speichert sie beim Speichern und Löschen von Daten im EventContainer
und verarbeitet entsprechende Ereignisse, wenn Transaktionen festgeschrieben und zurückgesetzt werden.SearchPlatformTransactionManager
umschließt den Transaktionsmanager des Clients und fügt回调hook
hinzu, wenn Transaktionen festgeschrieben und zurückgesetzt werden.audit
Prüfpaket stellt eine einfache übergeordnete Klasse „ AuditedEntity
bereit, um das Hinzufügen von Prüffunktionen zu erleichtern. Die Versionsinformationen können in Verbindung mit dem Versionsmechanismus von ElasticSearch
verwendet werden, um sicherzustellen, dass abgelaufene Dokumentdaten nicht indiziert werden.exception
kapselt häufige Ausnahmenintercept
Paket stellt AOP-Pointcuts und -Aspekte bereitindex
bietet全量索引
. Derzeit bieten wir einen线程池
-basierten Indizierungsmechanismus und einen ForkJoin
-basierten Indizierungsmechanismus an. In nachfolgenden Versionen werden wir den Code umgestalten und ein生产者-消费者
-Modell hinzufügen, das auf阻塞队列
basiert, um Parallelitätsleistung bereitzustellen. jkes-services
wird hauptsächlich zur Bereitstellung einiger Dienste verwendet. Derzeit bietet jkes-services
die folgenden Dienste an:
jkes-delete-connector
jkes-delete-connector
ist ein Kafka Connector
der verwendet wird, um Indexlöschereignisse ( DeleteEvent
) aus dem Kafka-Cluster abzurufen und dann Jest Client
zum Löschen der entsprechenden Dokumente in ElasticSearch zu verwenden.
Mit Hilfe der Rest-Administrator-API von Kafka Connect implementieren wir die Funktion zum Löschen von Dokumenten problemlos auf einer mandantenfähigen Plattform. Starten Sie einfach einen jkes-delete-connector
für jedes Projekt und das Löschen von Dokumenten für dieses Projekt wird automatisch durchgeführt. Dadurch wird vermieden, dass jedes Mal, wenn wir ein neues Projekt starten, manuell ein Kafka-Consumer gestartet werden muss, um das Löschen von Dokumenten für das Projekt durchzuführen. Obwohl sich dieser Arbeitsaufwand durch ein reguläres Abonnement reduzieren lässt, ist er dennoch sehr unflexibel.
jkes-search-service
jkes-search-service
ist ein Restful-Suchdienst, der mehrere Versionen der Rest-Query-API bereitstellt. Der Abfragedienst bietet eine API mit mehreren Versionen für API-Entwicklung und -Kompatibilitätjkes-search-service
unterstützt derzeit die Suche im URI-Stil und die Suche im JSON-Anforderungskörperstil.json
-Anfrage, führt eine Vorverarbeitung durch und leitet sie mithilfe ElasticSearch Java Rest Client
an ElasticSearch weiter, analysiert die Antwort und gibt sie nach der weiteren Verarbeitung an den Client zurück. In Zukunft werden wir einen Indexcluster basierend auf zookeeper
erstellen, um Cluster-Indexverwaltungsfunktionen bereitzustellen.
jkes-integration-test
ist ein Spring Boot-Integrationstestprojekt für功能测试
. Messen Sie auch吞吐率
einiger gängiger Vorgänge
Um eine Entwicklungsversion zu erstellen, benötigen Sie eine aktuelle Version von Kafka. Sie können Jkes mit Maven unter Verwendung der Standardlebenszyklusphasen erstellen.
Dieses Projekt ist unter der Apache-Lizenz 2.0 lizenziert.