Jkes は、Java、Kafka、および ElasticSearch に基づく検索フレームワークです。 Jkes は、ドキュメント検索に REST API を使用して、アノテーション駆動型の JPA スタイルのオブジェクト/ドキュメント マッピングを提供します。
jkes-integration-test
プロジェクトを参照すると、jkes フレームワークの使用方法をすぐに学ぶことができます。 jkes-integration-test
機能の整合性をテストするために使用する Spring Boot アプリケーションです。
jkes-index-connector
とjkes-delete-connector
を Kafka Connect クラスパスにインストールしますsudo bin/elasticsearch-plugin install analysis-smartcn
@ EnableAspectJAutoProxy
@ EnableJkes
@ Configuration
public class JkesConfig {
@ Bean
public PlatformTransactionManager transactionManager ( EntityManagerFactory factory , EventSupport eventSupport ) {
return new SearchPlatformTransactionManager ( new JpaTransactionManager ( factory ), eventSupport );
}
}
@ Component
@ Configuration
public class JkesConf extends DefaultJkesPropertiesImpl {
@ PostConstruct
public void setUp () {
Config . setJkesProperties ( this );
}
@ Override
public String getKafkaBootstrapServers () {
return "k1-test.com:9292,k2-test.com:9292,k3-test.com:9292" ;
}
@ Override
public String getKafkaConnectServers () {
return "http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084" ;
}
@ Override
public String getEsBootstrapServers () {
return "http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200" ;
}
@ Override
public String getDocumentBasePackage () {
return "com.timeyang.jkes.integration_test.domain" ;
}
@ Override
public String getClientId () {
return "integration_test" ;
}
}
Spring Boot を使用する場合は、 @ConfigurationProperties
使用して構成を提供できます。
Spring MVC
では、次のようにインデックス エンドポイントを追加できます。 @ RestController
@ RequestMapping ( "/api/search" )
public class SearchEndpoint {
private Indexer indexer ;
@ Autowired
public SearchEndpoint ( Indexer indexer ) {
this . indexer = indexer ;
}
@ RequestMapping ( value = "/start_all" , method = RequestMethod . POST )
public void startAll () {
indexer . startAll ();
}
@ RequestMapping ( value = "/start/{entityClassName:.+}" , method = RequestMethod . POST )
public void start ( @ PathVariable ( "entityClassName" ) String entityClassName ) {
indexer . start ( entityClassName );
}
@ RequestMapping ( value = "/stop_all" , method = RequestMethod . PUT )
public Map < String , Boolean > stopAll () {
return indexer . stopAll ();
}
@ RequestMapping ( value = "/stop/{entityClassName:.+}" , method = RequestMethod . PUT )
public Boolean stop ( @ PathVariable ( "entityClassName" ) String entityClassName ) {
return indexer . stop ( entityClassName );
}
@ RequestMapping ( value = "/progress" , method = RequestMethod . GET )
public Map < String , IndexProgress > getProgress () {
return indexer . getProgress ();
}
}
エンティティをマークするには、 com.timeyang.jkes.core.annotation
パッケージの関連するアノテーションを使用します。
@ lombok . Data
@ Entity
@ Document
public class Person extends AuditedEntity {
// @Id will be identified automatically
// @Field(type = FieldType.Long)
@ Id
@ GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
@ MultiFields (
mainField = @ Field ( type = FieldType . Text ),
otherFields = {
@ InnerField ( suffix = "raw" , type = FieldType . Keyword ),
@ InnerField ( suffix = "english" , type = FieldType . Text , analyzer = "english" )
}
)
private String name ;
@ Field ( type = FieldType . Keyword )
private String gender ;
@ Field ( type = FieldType . Integer )
private Integer age ;
// don't add @Field to test whether ignored
// @Field(type = FieldType.Text)
private String description ;
@ Field ( type = FieldType . Object )
@ ManyToOne ( fetch = FetchType . EAGER )
@ JoinColumn ( name = "group_id" )
private PersonGroup personGroup ;
}
@ lombok . Data
@ Entity
@ Document ( type = "person_group" , alias = "person_group_alias" )
public class PersonGroup extends AuditedEntity {
@ Id
@ GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
private String name ;
private String interests ;
@ OneToMany ( fetch = FetchType . EAGER , cascade = CascadeType . ALL , mappedBy = "personGroup" , orphanRemoval = true )
private List < Person > persons ;
private String description ;
@ DocumentId
@ Field ( type = FieldType . Long )
public Long getId () {
return id ;
}
@ MultiFields (
mainField = @ Field ( type = FieldType . Text ),
otherFields = {
@ InnerField ( suffix = "raw" , type = FieldType . Keyword ),
@ InnerField ( suffix = "english" , type = FieldType . Text , analyzer = "english" )
}
)
public String getName () {
return name ;
}
@ Field ( type = FieldType . Text )
public String getInterests () {
return interests ;
}
@ Field ( type = FieldType . Nested )
public List < Person > getPersons () {
return persons ;
}
/**
* 不加Field注解,测试序列化时是否忽略
*/
public String getDescription () {
return description ;
}
}
エンティティが更新されると、ドキュメントは自動的に ElasticSearch にインデックス付けされ、エンティティが削除されると、ドキュメントは ElasticSearch から自動的に削除されます。
検索サービス jkes-search-service を開始します。検索サービスは、残りの検索 API を提供する Spring Boot アプリケーションであり、デフォルトでポート 9000 で実行されます。
curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search?from=3&size=10
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"nested": {
"path": "persons",
"score_mode": "avg",
"query": {
"bool": {
"must": [
{
"range": {
"persons.age": {
"gt": 5
}
}
}
]
}
}
}
}
}
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"match": {
"interests": "Hadoop"
}
}
}
{
"query": {
"bool" : {
"must" : {
"match" : { "interests" : "Hadoop" }
},
"filter": {
"term" : { "name.raw" : "name0" }
},
"should" : [
{ "match" : { "interests" : "Flink" } },
{
"nested" : {
"path" : "persons",
"score_mode" : "avg",
"query" : {
"bool" : {
"must" : [
{ "match" : {"persons.name" : "name40"} },
{ "match" : {"persons.interests" : "interests"} }
],
"must_not" : {
"range" : {
"age" : { "gte" : 50, "lte" : 60 }
}
}
}
}
}
}
],
"minimum_should_match" : 1,
"boost" : 1.0
}
}
}
integration_test_person_group/person_group/_search
{
"_source": false,
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"_source": {
"includes": [ "name", "persons.*" ],
"excludes": [ "date*", "version", "persons.age" ]
},
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"prefix" : { "name" : "name" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"wildcard" : { "name" : "name*" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"regexp":{
"name": "na.*17"
}
}
}
インデックス作成の仕組み:
@Document
で注釈が付けられたすべてのエンティティをスキャンし、それらのメタデータを構築します。index
とmapping
構成を作成し、 ElasticSearch Java Rest Client
を通じてindex
構成を作成/更新します。Kafka ElasticSearch Connector
作成/更新しますJkes Deleter Connector
開始/更新します。* save(*)
メソッドによって返されたデータをSaveEvent
としてパックし、 EventContainer
に保存します。 (* delete*(..)
メソッドのパラメーターを使用してDeleteEvent/DeleteAllEvent
生成し、 EventContainer
に保存します。JkesKafkaProducer
を使用してSaveEvent
内のエンティティを Kafka に送信します。Kafka は、指定されたデータをシリアル化するために提供されたJkesJsonSerializer
を使用して、それを Kafka に送信します。SaveEvent
とは異なり、 DeleteEvent
データのコピーを送信するだけではなく、直接シリアル化されてから Kafka に送信されます。SaveEvent
やDeleteEvent
とは異なり、 DeleteAllEvent
データを Kafka に送信しませんが、 ElasticSearch Java Rest Client
を通じて対応するindex
を直接削除し、インデックスを再構築してKafka ElasticSearch Connector
再起動します。クエリの仕組み:
json
リクエストを解析し、いくつかの前処理を実行して、 ElasticSearch Java Rest Client
使用して ElasticSearch に転送し、応答を解析し、さらに処理した後にクライアントに返します。jkes-core
jkes
全体のコア部分です。主に以下の機能が含まれます。
annotation
パッケージは、jkes のコア アノテーションを提供します。elasticsearch
パッケージは、すべてのドキュメントのインデックスの作成/更新やマッピングの更新など、 elasticsearch
関連の操作をカプセル化します。kafka
パッケージは、Kafka プロデューサー、Kafka Json シリアライザー、Kafka Connect クライアントを提供しますmetadata
パッケージは、コア アノテーション メタデータ構築と構造化モデルを提供します。event
パッケージはイベント モデルとコンテナを提供しますexception
パッケージは一般的な Jkes 例外を提供しますhttp
パッケージは、 Apache Http Client
に基づいて一般的な http json リクエストをカプセル化します。support
パッケージは Jkes コア構成サポートを公開しますutil
パッケージは、開発を容易にするいくつかのツール クラスを提供します。例: Asserts、ClassUtils、DocumentUtils、IOUtils、JsonUtils、ReflectionUtils、StringUtilsjkes-boot
、一部のサードパーティのオープンソース フレームワークと統合するために使用されます。
現在、 jkes-spring-data-jpa
を通じてspring data jpa
との統合を提供しています。 Spring の AOP メカニズムを使用することにより、 Repository
メソッドがインターセプトされ、 SaveEvent/DeleteEvent/DeleteAllEvent
が生成され、 EventContainer
に保存されます。弊社が提供するSearchPlatformTransactionManager
使用することで、一般的に使用されるトランザクション マネージャー ( JpaTransactionManager
など) をラップし、トランザクション インターセプト機能を提供します。
後続のバージョンでは、より多くのフレームワークとの統合が提供される予定です。
jkes-spring-data-jpa
の説明:
ContextSupport
クラスは、Bean ファクトリからRepository Bean
取得するために使用されます。@EnableJkes
使用すると、クライアントは Jkes 機能を簡単に有効にし、Spring と一貫した構成モデルを提供できます。EventSupport
イベントの詳細を処理し、データの保存および削除時に対応するイベントを生成してEventContainer
に保存し、トランザクションのコミットおよびロールバック時に対応するイベントを処理します。SearchPlatformTransactionManager
クライアントのトランザクション マネージャーをラップし、トランザクションがコミットおよびロールバックされるときに回调hook
を追加します。audit
パッケージは、監査機能の追加を容易にする単純なAuditedEntity
親クラスを提供し、バージョン情報をElasticSearch
のバージョン メカニズムと組み合わせて使用して、期限切れのドキュメント データがインデックス付けされないようにすることができます。exception
パッケージは一般的な例外をカプセル化します。intercept
パッケージは、AOP ポイントカットとアスペクトを提供しますindex
パッケージは、全量索引
作成機能を提供します。現在、线程池
ベースのインデックス作成メカニズムとForkJoin
ベースのインデックス作成メカニズムを提供しています。後続のバージョンでは、コードをリファクタリングし、阻塞队列
に基づいた生产者-消费者
モデルを追加して、同時実行パフォーマンスを提供します。jkes-services
主にいくつかのサービスを提供するために使用されます。 現在、 jkes-services
次のサービスを提供しています。
jkes-delete-connector
jkes-delete-connector
kafka クラスターからインデックス削除イベント ( DeleteEvent
) を取得し、 Jest Client
使用して ElasticSearch 内の対応するドキュメントを削除するために使用されるKafka Connector
です。
Kafka Connect の REST 管理 API を利用すると、マルチテナント プラットフォーム上でドキュメント削除機能を簡単に実装できます。各プロジェクトに対してjkes-delete-connector
開始するだけで、そのプロジェクトのドキュメントの削除が自動的に処理されます。これにより、新しいプロジェクトを開始するたびに、プロジェクトのドキュメントの削除を処理するために Kafka Consumer を手動で開始する必要がなくなりました。この種の作業は定期購読によって削減できますが、それでも柔軟性が非常に低いです。
jkes-search-service
jkes-search-service
複数のバージョンの REST Query API を提供する RESTful 検索サービスです。クエリ サービスは、API の進化と互換性のためにマルチバージョン API を提供しますjkes-search-service
現在、URI スタイルの検索と JSON リクエスト本文のスタイルの検索をサポートしています。json
リクエストを解析し、いくつかの前処理を実行して、 ElasticSearch Java Rest Client
使用して ElasticSearch に転送し、応答を解析し、さらに処理した後にクライアントに返します。将来的には、 zookeeper
ベースとしたインデックス クラスターを構築し、クラスター インデックス管理機能を提供する予定です。
jkes-integration-test
功能测试
用の Spring Boot 統合テスト プロジェクトです。いくつかの一般的な操作の吞吐率
も測定します
開発バージョンをビルドするには、Kafka の最新バージョンが必要です。標準のライフサイクル フェーズを使用して Maven で JKES をビルドできます。
このプロジェクトは、Apache License 2.0 に基づいてライセンスされています。