معلومات المنتج:
الطبقة العامة SolrIndexer تنفذ المفهرس، الباحث، DisposableBean {
//~ الحقول/المُهيئات الثابتة ========================= ==
static Final Logger logger = LoggerFactory.getLogger(SolrIndexer.class);
نهائي ثابت خاص SHUTDOWN_TIMEOUT = 5 * 60 * 1000 لتر؛ // طويلة بما فيه الكفاية
int النهائي الثابت الخاص INPUT_QUEUE_LENGTH = 16384؛
// ~ حقول المثيل ======================================================= ===========
خادم CommonsHttpSolrServer الخاص؛
BlockingQueue الخاص <Operation> inputQueue؛
موضوع خاص لتحديث الموضوع؛
تشغيل منطقي متقلب = صحيح؛
الإغلاق المنطقي المتطاير = خطأ؛
// ~ البنائين ================================= =============
يطرح SolrIndexer العام (عنوان URL للسلسلة) MalformedURLException {
server = new CommonsHttpSolrServer(url);
inputQueue = new ArrayBlockingQueue<Operation>(INPUT_QUEUE_LENGTH);
updateThread = new Thread(new UpdateTask());
updateThread.setName("SolrIndexer");
updateThread.start();
}
//~ الطرق ======================================== ============================================================
مجموعة الفراغ العامة SoTimeout(int timeout) {
server.setSoTimeout(timeout);
}
مجموعة الفراغ العامConnectionTimeout(int timeout) {
server.setConnectionTimeout(timeout);
}
مجموعة الفراغ العامةAllowCompression(booleanallowCompression) {
server.setAllowCompression(allowCompression);
}
addIndex باطلة عامة (قابلة للفهرسة) تطرح IndexingException {
إذا (إيقاف التشغيل) {
طرح IllegalStateException الجديد("يتم إغلاق SolrIndexer");
}
inputQueue.offer(new Operation(indexable, OperationType.UPDATE));
}
delIndex الفراغ العام (قابل للفهرسة) يطرح IndexingException {
إذا (إيقاف التشغيل) {
طرح IllegalStateException الجديد("يتم إغلاق SolrIndexer");
}
inputQueue.offer(new Operation(indexable, OperationType.DELETE));
}
UpdateIndices باطلة خاصة (نوع السلسلة، قائمة <Indexable> الفهارس) يطرح IndexingException {
إذا (indices == null || indices.size() == 0) {
يعود؛
}
logger.debug("تحديث المؤشرات {}"، indices.size());
UpdateRequest req = new UpdateRequest("/" + type + "/update");
req.setAction(UpdateRequest.ACTION.COMMIT, false, false);
لـ (idx القابل للفهرسة: المؤشرات) {
Doc doc = idx.getDoc();
SolrInputDocument solrDoc = new SolrInputDocument();
solrDoc.setDocumentBoost(doc.getDocumentBoost());
for (Iterator<Field> i = doc.iterator(); i.hasNext();) {
حقل الحقل = i.next();
solrDoc.addField(field.getName(), field.getValue(), field.getBoost());
}
req.add(solrDoc);
}
يحاول {
req.process(server);
} قبض على (SolrServerException ه) {
logger.error("حدث SolrServerException"، e);
طرح استثناء الفهرسة الجديد (e)؛
} قبض (IOException ه) {
logger.error("حدث IOException"، e);
طرح استثناء الفهرسة الجديد (e)؛
}
}
delIndices باطلة خاصة (نوع السلسلة، قائمة <Indexable> الفهارس) يطرح IndexingException {
إذا (indices == null || indices.size() == 0) {
يعود؛
}
logger.debug("حذف المؤشرات {}"، indices.size());
UpdateRequest req = new UpdateRequest("/" + type + "/update");
req.setAction(UpdateRequest.ACTION.COMMIT, false, false);
لـ (قابل للفهرسة: المؤشرات) {
req.deleteById(indexable.getDocId());
}
يحاول {
req.process(server);
} قبض على (SolrServerException ه) {
logger.error("حدث SolrServerException"، e);
طرح استثناء الفهرسة الجديد (e)؛
} قبض (IOException ه) {
logger.error("حدث IOException"، e);
طرح استثناء الفهرسة الجديد (e)؛
}
}
بحث QueryResult العام (استعلام استعلام) يطرح IndexingException {
SolrQuery sq = new SolrQuery();
sq.setQuery(query.getQuery());
إذا (query.getFilter ()!= فارغة) {
sq.addFilterQuery(query.getFilter());
}
إذا (query.getOrderField ()!= فارغة) {
sq.addSortField(query.getOrderField(), query.getOrder() == Query.Order.DESC ? SolrQuery.ORDER.desc : SolrQuery.ORDER.asc);
}
sq.setStart(query.getOffset());
sq.setRows(query.getLimit());
QueryRequest req = new QueryRequest(sq);
req.setPath("/" + query.getType() + "/select");
يحاول {
QueryResponse rsp = req.process(server);
SolrDocumentList docs = rsp.getResults();
QueryResult result = new QueryResult();
result.setOffset(docs.getStart());
result.setTotal(docs.getNumFound());
result.setSize(sq.getRows());
List<Doc> resultDocs = new ArrayList<Doc>(result.getSize());
for (Iterator<SolrDocument> i = docs.iterator(); i.hasNext();) {
SolrDocument solrDocument = i.next();
Doc doc = new Doc();
for (Iterator<Map.Entry<String, Object>> iter = solrDocument.iterator(); iter.hasNext();) {
Map.Entry<String, Object> field = iter.next();
doc.addField(field.getKey(), field.getValue());
}
resultDocs.add(doc);
}
result.setDocs(resultDocs);
نتيجة الإرجاع؛
} قبض على (SolrServerException ه) {
logger.error("حدث SolrServerException"، e);
طرح استثناء الفهرسة الجديد (e)؛
}
}
تدمير الفراغ العام () يلقي استثناء {
Shutdown(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
}
إيقاف التشغيل المنطقي العام (مهلة طويلة، وحدة TimeUnit) {
إذا (إيقاف التشغيل) {
logger.info("منع محاولات الإغلاق المكررة");
عودة كاذبة.
}
ShuttingDown = true;
String baseName = updateThread.getName();
updateThread.setName(baseName + "- إيقاف التشغيل");
منطقية rv = خطأ؛
يحاول {
// انتظر بشكل مشروط
إذا (المهلة > 0) {
updateThread.setName(baseName + "- إيقاف التشغيل (في انتظار)");
rv = waitForQueue(timeout,unit);
}
} أخيراً {
// لكن ابدأ دائمًا تسلسل إيقاف التشغيل
تشغيل = خطأ؛
updateThread.setName(baseName + "- إيقاف التشغيل (عميل مطلع)");
}
عودة عربة سكن متنقلة.
}
/**
* @param المهلة
* @وحدة المعلمة
* @يعود
*/
انتظار منطقي خاص (مهلة طويلة، وحدة TimeUnit) {
مزلاج CountDownLatch = new CountDownLatch(1);
inputQueue.add(new StopOperation(latch));
يحاول {
return latch.await(timeout,unit);
} قبض على (InterruptedException e) {
throw new RuntimeException("توقف انتظار قوائم الانتظار"، e);
}
}
فئة UpdateTask تنفذ Runnable {
تشغيل الفراغ العام () {
أثناء (الجري) {
يحاول {
syncIndices();
} قبض (قابل للرمي) {
إذا (إيقاف التشغيل) {
logger.warn("حدث استثناء أثناء إيقاف التشغيل"، e);
} آخر {
logger.error("مشكلة في معالجة تحديث فهرسة solr"، e);
}
}
}
logger.info("إيقاف تشغيل SolrIndexer");
}
}
مزامنة باطلة () تطرح InterruptedException {
العملية op = inputQueue.poll(1000L, TimeUnit.MILLISECONDS);
إذا (المرجع == فارغ) {
يعود؛
}
إذا (مثيل المرجع StopOperation) {
((StopOperation) op).stop();
يعود؛
}
// انتظر ثانية واحدة
يحاول {
Thread.sleep(1000);
} قبض على (InterruptedException e) {
}
List<Operation> ops = new ArrayList<Operation>(inputQueue.size() + 1);
ops.add(op);
inputQueue.drainTo(ops);
Map<String, List<Indexable>>deleteMap = new HashMap<String, List<Indexable>>(4);
Map<String, List<Indexable>> updateMap = new HashMap<String, List<Indexable>>(4);
من أجل (العملية o: ops) {
إذا (س مثيل StopOperation) {
((StopOperation) o).stop();
} آخر {
قابل للفهرسة قابل للفهرسة = o.indexable;
إذا (o.type == OperationType.DELETE) {
List<Indexable> docs =deleteMap.get(indexable.getType());
إذا (مستندات == فارغة) {
docs = new LinkedList<Indexable>();
deleteMap.put(indexable.getType(), docs);
}
docs.add(indexable);
} آخر {
List<Indexable> docs = updateMap.get(indexable.getType());
إذا (مستندات == فارغة) {
docs = new LinkedList<Indexable>();
updateMap.put(indexable.getType(), docs);
}
docs.add(indexable);
}
}
}
for (Iterator<Map.Entry<String, List<Indexable>>> i =deleteMap.entrySet().iterator(); i.hasNext();) {
Map.Entry<String, List<Indexable>> input = i.next();
delIndices(entry.getKey(),entry.getValue());
}
for (Iterator<Map.Entry<String, List<Indexable>>> i = updateMap.entrySet().iterator(); i.hasNext();) {
Map.Entry<String, List<Indexable>> input = i.next();
updateIndices(entry.getKey(),entry.getValue());
}
}
نوع العملية {حذف، تحديث، إيقاف التشغيل}
عملية فئة ثابتة {
نوع العملية؛
فهرسة قابلة للفهرسة.
عملية() {}
العملية (قابلة للفهرسة، نوع OperationType) {
this.indexable = قابل للفهرسة؛
this.type = type;
}
}
تعمل StopOperation فئة ثابتة على توسيع العملية {
مزلاج CountDownLatch؛
StopOperation(مزلاج CountDownLatch) {
this.latch = latch;
this.type = OperationType.SHUTDOWN;
}
توقف الفراغ العام () {
latch.countDown();
}
}
//~ الملحقات ==============
}