Jkes هو إطار بحث يعتمد على Java وKafka وElasticSearch. يوفر Jkes تعيين كائن/مستند بنمط JPA يعتمد على التعليقات التوضيحية، باستخدام REST API للبحث عن المستندات.
يمكنك الرجوع إلى مشروع jkes-integration-test
لتتعلم بسرعة كيفية استخدام إطار عمل jkes. jkes-integration-test
هو تطبيق Spring Boot نستخدمه لاختبار التكامل الوظيفي.
jkes-index-connector
و jkes-delete-connector
في مسار فئة Kafka Connectsudo bin/elasticsearch-plugin install analysis-smartcn
@ EnableAspectJAutoProxy
@ EnableJkes
@ Configuration
public class JkesConfig {
@ Bean
public PlatformTransactionManager transactionManager ( EntityManagerFactory factory , EventSupport eventSupport ) {
return new SearchPlatformTransactionManager ( new JpaTransactionManager ( factory ), eventSupport );
}
}
@ Component
@ Configuration
public class JkesConf extends DefaultJkesPropertiesImpl {
@ PostConstruct
public void setUp () {
Config . setJkesProperties ( this );
}
@ Override
public String getKafkaBootstrapServers () {
return "k1-test.com:9292,k2-test.com:9292,k3-test.com:9292" ;
}
@ Override
public String getKafkaConnectServers () {
return "http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084" ;
}
@ Override
public String getEsBootstrapServers () {
return "http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200" ;
}
@ Override
public String getDocumentBasePackage () {
return "com.timeyang.jkes.integration_test.domain" ;
}
@ Override
public String getClientId () {
return "integration_test" ;
}
}
يمكن أن يكون هذا مرنًا جدًا إذا كنت تستخدم Spring Boot، فيمكنك استخدام @ConfigurationProperties
لتوفير التكوين
Spring MVC
، يمكنك إضافة نقطة نهاية الفهرس على النحو التالي @ RestController
@ RequestMapping ( "/api/search" )
public class SearchEndpoint {
private Indexer indexer ;
@ Autowired
public SearchEndpoint ( Indexer indexer ) {
this . indexer = indexer ;
}
@ RequestMapping ( value = "/start_all" , method = RequestMethod . POST )
public void startAll () {
indexer . startAll ();
}
@ RequestMapping ( value = "/start/{entityClassName:.+}" , method = RequestMethod . POST )
public void start ( @ PathVariable ( "entityClassName" ) String entityClassName ) {
indexer . start ( entityClassName );
}
@ RequestMapping ( value = "/stop_all" , method = RequestMethod . PUT )
public Map < String , Boolean > stopAll () {
return indexer . stopAll ();
}
@ RequestMapping ( value = "/stop/{entityClassName:.+}" , method = RequestMethod . PUT )
public Boolean stop ( @ PathVariable ( "entityClassName" ) String entityClassName ) {
return indexer . stop ( entityClassName );
}
@ RequestMapping ( value = "/progress" , method = RequestMethod . GET )
public Map < String , IndexProgress > getProgress () {
return indexer . getProgress ();
}
}
استخدم التعليقات التوضيحية ذات الصلة ضمن حزمة com.timeyang.jkes.core.annotation
لوضع علامة على الكيانات
@ lombok . Data
@ Entity
@ Document
public class Person extends AuditedEntity {
// @Id will be identified automatically
// @Field(type = FieldType.Long)
@ Id
@ GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
@ MultiFields (
mainField = @ Field ( type = FieldType . Text ),
otherFields = {
@ InnerField ( suffix = "raw" , type = FieldType . Keyword ),
@ InnerField ( suffix = "english" , type = FieldType . Text , analyzer = "english" )
}
)
private String name ;
@ Field ( type = FieldType . Keyword )
private String gender ;
@ Field ( type = FieldType . Integer )
private Integer age ;
// don't add @Field to test whether ignored
// @Field(type = FieldType.Text)
private String description ;
@ Field ( type = FieldType . Object )
@ ManyToOne ( fetch = FetchType . EAGER )
@ JoinColumn ( name = "group_id" )
private PersonGroup personGroup ;
}
@ lombok . Data
@ Entity
@ Document ( type = "person_group" , alias = "person_group_alias" )
public class PersonGroup extends AuditedEntity {
@ Id
@ GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
private String name ;
private String interests ;
@ OneToMany ( fetch = FetchType . EAGER , cascade = CascadeType . ALL , mappedBy = "personGroup" , orphanRemoval = true )
private List < Person > persons ;
private String description ;
@ DocumentId
@ Field ( type = FieldType . Long )
public Long getId () {
return id ;
}
@ MultiFields (
mainField = @ Field ( type = FieldType . Text ),
otherFields = {
@ InnerField ( suffix = "raw" , type = FieldType . Keyword ),
@ InnerField ( suffix = "english" , type = FieldType . Text , analyzer = "english" )
}
)
public String getName () {
return name ;
}
@ Field ( type = FieldType . Text )
public String getInterests () {
return interests ;
}
@ Field ( type = FieldType . Nested )
public List < Person > getPersons () {
return persons ;
}
/**
* 不加Field注解,测试序列化时是否忽略
*/
public String getDescription () {
return description ;
}
}
عندما يتم تحديث كيان، تتم فهرسة المستند تلقائيًا في ElasticSearch؛ وعندما يتم حذف كيان، يتم حذف المستند تلقائيًا من ElasticSearch.
بدء تشغيل خدمة البحث jkes-search-service خدمة البحث هي تطبيق Spring Boot الذي يوفر واجهة برمجة تطبيقات البحث المريحة ويعمل على المنفذ 9000 بشكل افتراضي.
curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search?from=3&size=10
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"nested": {
"path": "persons",
"score_mode": "avg",
"query": {
"bool": {
"must": [
{
"range": {
"persons.age": {
"gt": 5
}
}
}
]
}
}
}
}
}
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"match": {
"interests": "Hadoop"
}
}
}
{
"query": {
"bool" : {
"must" : {
"match" : { "interests" : "Hadoop" }
},
"filter": {
"term" : { "name.raw" : "name0" }
},
"should" : [
{ "match" : { "interests" : "Flink" } },
{
"nested" : {
"path" : "persons",
"score_mode" : "avg",
"query" : {
"bool" : {
"must" : [
{ "match" : {"persons.name" : "name40"} },
{ "match" : {"persons.interests" : "interests"} }
],
"must_not" : {
"range" : {
"age" : { "gte" : 50, "lte" : 60 }
}
}
}
}
}
}
],
"minimum_should_match" : 1,
"boost" : 1.0
}
}
}
integration_test_person_group/person_group/_search
{
"_source": false,
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"_source": {
"includes": [ "name", "persons.*" ],
"excludes": [ "date*", "version", "persons.age" ]
},
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"prefix" : { "name" : "name" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"wildcard" : { "name" : "name*" }
}
}
integration_test_person_group/person_group/_search
{
"query": {
"regexp":{
"name": "na.*17"
}
}
}
آلية عمل الفهرسة:
@Document
وينشئ بيانات تعريفية لها.index
mapping
بتنسيق Json، ثم قم بإنشاء/تحديث تكوين index
من خلال ElasticSearch Java Rest Client
.Kafka ElasticSearch Connector
لكل مستند، يُستخدم لإنشاء/تحديث المستنداتJkes Deleter Connector
للمشروع بأكمله، لحذف المستندات* save(*)
كـ SaveEvent
واحفظها في EventContainer
؛ استخدم معلمات الطريقة (* delete*(..)
لإنشاء DeleteEvent/DeleteAllEvent
وحفظها في EventContainer
.JkesKafkaProducer
لإرسال الكيانات في SaveEvent
إلى Kafka وسيستخدم JkesJsonSerializer
الذي قدمناه لتسلسل البيانات المحددة ثم إرسالها إلى Kafka.SaveEvent
، سيتم إجراء تسلسل DeleteEvent
مباشرةً ثم إرساله إلى Kafka بدلاً من مجرد إرسال نسخة من البيانات.SaveEvent
و DeleteEvent
، لا يرسل DeleteAllEvent
البيانات إلى Kafka، ولكنه يحذف index
المقابل مباشرة من خلال ElasticSearch Java Rest Client
، ثم يعيد بناء الفهرس ويعيد تشغيل Kafka ElasticSearch Connector
كيف يعمل الاستعلام:
json
، وإجراء بعض المعالجة المسبقة، وإعادة توجيهه إلى ElasticSearch باستخدام ElasticSearch Java Rest Client
، وتوزيع الاستجابة، وإعادتها إلى العميل بعد مزيد من المعالجة. jkes-core
هو الجزء الأساسي من jkes
بأكمله. يتضمن بشكل رئيسي الوظائف التالية:
annotation
التعليقات التوضيحية الأساسية لـ jkeselasticsearch
العمليات المتعلقة بـ elasticsearch
، مثل إنشاء/تحديث الفهارس لجميع المستندات وتحديث الخرائط.kafka
منتج Kafka، وKafka Json Serializer، وKafka Connect Clientmetadata
إنشاء البيانات الوصفية الأساسية والنماذج المنظمة.event
نماذج وحاويات للحدثexception
استثناءات Jkes الشائعةhttp
طلبات http json الشائعة بناءً على Apache Http Client
support
عن دعم التكوين الأساسي لـ Jkesutil
بعض فئات الأدوات لتسهيل التطوير. مثل: التأكيدات، ClassUtils، DocumentUtils، IOUtils، JsonUtils، ReflectionUtils، StringUtils يتم استخدام jkes-boot
للتكامل مع بعض أطر العمل مفتوحة المصدر التابعة لجهات خارجية.
حاليًا، نحن نقدم التكامل مع spring data jpa
من خلال jkes-spring-data-jpa
. باستخدام آلية Spring's AOP، يتم اعتراض طريقة Repository
وإنشاء SaveEvent/DeleteEvent/DeleteAllEvent
وحفظه في EventContainer
. باستخدام SearchPlatformTransactionManager
الذي نقدمه، نقوم بتغليف مديري المعاملات شائعة الاستخدام (مثل JpaTransactionManager
) لتوفير وظائف اعتراض المعاملات.
في الإصدارات اللاحقة، سنوفر التكامل مع المزيد من الأطر.
وصف jkes-spring-data-jpa
:
ContextSupport
للحصول على Repository Bean
من مصنع الفول@EnableJkes
للعملاء تمكين وظائف Jkes بسهولة ويوفر نموذج تكوين متوافقًا مع SpringEventSupport
مع تفاصيل الأحداث، وينشئ الأحداث المقابلة ويخزنها في EventContainer
عند حفظ البيانات وحذفها، ويعالج الأحداث المقابلة عند الالتزام بالمعاملات وإرجاعها.SearchPlatformTransactionManager
مدير معاملات العميل ويضيف回调hook
عند تنفيذ المعاملات وإرجاعها.audit
فئة أصلية بسيطة AuditedEntity
لتسهيل إضافة وظائف التدقيق. يمكن استخدام معلومات الإصدار جنبًا إلى جنب مع آلية إصدار ElasticSearch
لضمان عدم فهرسة بيانات المستند منتهية الصلاحية.exception
الاستثناءات الشائعةintercept
نقاط وجوانب AOPindex
وظيفة全量索引
. حاليًا، نقدم آلية فهرسة تعتمد على线程池
وآلية فهرسة تعتمد على ForkJoin
. في الإصدارات اللاحقة، سنقوم بإعادة بناء الكود وإضافة نموذج生产者-消费者
استنادًا إلى阻塞队列
لتوفير أداء متزامن. تُستخدم jkes-services
بشكل أساسي لتوفير بعض الخدمات. حاليًا، توفر jkes-services
الخدمات التالية:
jkes-delete-connector
jkes-delete-connector
هو Kafka Connector
يُستخدم للحصول على أحداث حذف الفهرس ( DeleteEvent
) من مجموعة kafka، ثم استخدم Jest Client
لحذف المستندات المقابلة في ElasticSearch.
بمساعدة واجهة برمجة التطبيقات الإدارية الخاصة بـ Kafka Connect، يمكننا بسهولة تنفيذ وظيفة حذف المستندات على نظام أساسي متعدد المستأجرين. ما عليك سوى تشغيل jkes-delete-connector
لكل مشروع وسيتم التعامل مع حذف المستندات لهذا المشروع تلقائيًا. يؤدي هذا إلى تجنب الاضطرار إلى بدء تشغيل عميل Kafka يدويًا للتعامل مع حذف المستندات للمشروع في كل مرة نبدأ فيها مشروعًا جديدًا. على الرغم من أنه يمكن تقليل هذا النوع من العمل من خلال الاشتراك العادي، إلا أنه لا يزال غير مرن إلى حد كبير.
jkes-search-service
jkes-search-service
هي خدمة بحث مريحة توفر إصدارات متعددة من واجهة برمجة التطبيقات الخاصة بالاستعلام. توفر خدمة الاستعلام واجهة برمجة تطبيقات متعددة الإصدارات لتطوير واجهة برمجة التطبيقات وتوافقهاjkes-search-service
حاليًا البحث عن نمط URI والبحث عن نمط الجسم لطلب JSON.json
، وإجراء بعض المعالجة المسبقة، وإعادة توجيهه إلى ElasticSearch باستخدام ElasticSearch Java Rest Client
، وتوزيع الاستجابة، وإعادتها إلى العميل بعد مزيد من المعالجة. في المستقبل، سنقوم ببناء مجموعة فهرس تعتمد على zookeeper
لتوفير وظائف إدارة فهرس المجموعة.
jkes-integration-test
هو مشروع اختبار تكامل Spring Boot功能测试
. قم أيضًا بقياس吞吐率
بعض العمليات الشائعة
لإنشاء نسخة تطويرية، ستحتاج إلى إصدار حديث من Kafka. يمكنك إنشاء jkes باستخدام Maven باستخدام مراحل دورة الحياة القياسية.
تم ترخيص هذا المشروع بموجب ترخيص Apache 2.0.