复制番号代番号次のように:
public class SolrIndexer は Indexer、Searcher、DisposableBean を実装します {
//~ 静的フィールド/イニシャライザ =========================================== ==
静的最終ロガー logger = LoggerFactory.getLogger(SolrIndexer.class);
プライベート静的最終ロングSHUTDOWN_TIMEOUT = 5 * 60 * 1000L; // 十分な長さ
プライベート静的最終整数INPUT_QUEUE_LENGTH = 16384;
//~ インスタンスフィールド ============================================= ===========
プライベート CommonsHttpSolrServer サーバー。
プライベート BlockingQueue<Operation> inputQueue;
プライベートスレッド updateThread;
揮発性ブール値実行 = true;
揮発性ブール値 shuttingDown = false;
//~ コンストラクター ============================================== =============
public SolrIndexer(String url) throws MalformedURLException {
サーバー = 新しい CommonsHttpSolrServer(url);
inputQueue = new ArrayBlockingQueue<操作>(INPUT_QUEUE_LENGTH);
updateThread = 新しいスレッド(新しい UpdateTask());
updateThread.setName("SolrIndexer");
updateThread.start();
}
//~ メソッド ============================================== ==================
public void setSoTimeout(int timeout) {
サーバー.setSoTimeout(タイムアウト);
}
public void setConnectionTimeout(int timeout) {
サーバー.setConnectionTimeout(タイムアウト);
}
public void setAllowCompression(boolean allowedCompression) {
server.setAllowCompression(allowCompression);
}
public void addIndex(Indexable Indexable) throws IndexingException {
if (シャットダウン中) {
throw new IllegalStateException("SolrIndexer がシャットダウンしています");
}
inputQueue.offer(new Operation(indexable, OperationType.UPDATE));
}
public void delIndex(Indexable Indexable) throws IndexingException {
if (シャットダウン中) {
throw new IllegalStateException("SolrIndexer がシャットダウンしています");
}
inputQueue.offer(new Operation(indexable, OperationType.DELETE));
}
private void updateIndices(String type, List<Indexable> インデックス) throws IndexingException {
if (indices == null || indices.size() == 0) {
戻る;
}
logger.debug("{} インデックスを更新しています", indices.size());
UpdateRequest req = new UpdateRequest("/" + type + "/update");
req.setAction(UpdateRequest.ACTION.COMMIT, false, false);
for (インデックス可能な idx : インデックス) {
ドキュメント doc = idx.getDoc();
SolrInputDocument solrDoc = 新しい SolrInputDocument();
solrDoc.setDocumentBoost(doc.getDocumentBoost());
for (Iterator<Field> i = doc.iterator(); i.hasNext();) {
フィールド field = i.next();
solrDoc.addField(field.getName(), field.getValue(), field.getBoost());
}
req.add(solrDoc);
}
試す {
req.プロセス(サーバー);
} catch (SolrServerException e) {
logger.error("SolrServerException が発生しました", e);
新しい IndexingException(e) をスローします。
} キャッチ (IOException e) {
logger.error("IOException が発生しました", e);
新しい IndexingException(e) をスローします。
}
}
private void delIndices(String type, List<Indexable> インデックス) throws IndexingException {
if (indices == null || indices.size() == 0) {
戻る;
}
logger.debug("{} インデックスを削除しています", indices.size());
UpdateRequest req = new UpdateRequest("/" + type + "/update");
req.setAction(UpdateRequest.ACTION.COMMIT, false, false);
for (インデックス可能 インデックス可能 : インデックス) {
req.deleteById(indexable.getDocId());
}
試す {
req.プロセス(サーバー);
} catch (SolrServerException e) {
logger.error("SolrServerException が発生しました", e);
新しい IndexingException(e) をスローします。
} キャッチ (IOException e) {
logger.error("IOException が発生しました", e);
新しい IndexingException(e) をスローします。
}
}
public QueryResult search(クエリクエリ) throws IndexingException {
SolrQuery sq = 新しい SolrQuery();
sq.setQuery(query.getQuery());
if (query.getFilter() != null) {
sq.addFilterQuery(query.getFilter());
}
if (query.getOrderField() != null) {
sq.addSortField(query.getOrderField(), query.getOrder() == Query.Order.DESC ? SolrQuery.ORDER.desc : SolrQuery.ORDER.asc);
}
sq.setStart(query.getOffset());
sq.setRows(query.getLimit());
QueryRequest 要求 = 新しい QueryRequest(sq);
req.setPath("/" + query.getType() + "/select");
試す {
QueryResponse rsp = req.process(server);
SolrDocumentList docs = rsp.getResults();
QueryResult 結果 = new QueryResult();
result.setOffset(docs.getStart());
result.setTotal(docs.getNumFound());
result.setSize(sq.getRows());
List<Doc> resultDocs = new ArrayList<Doc>(result.getSize());
for (Iterator<SolrDocument> i = docs.iterator(); i.hasNext();) {
SolrDocument solrDocument = i.next();
ドキュメント ドキュメント = 新しいドキュメント();
for (Iterator<Map.Entry<String, Object>> iter = solrDocument.iterator(); iter.hasNext();) {
Map.Entry<String, Object> フィールド = iter.next();
doc.addField(field.getKey(), field.getValue());
}
resultDocs.add(doc);
}
result.setDocs(resultDocs);
結果を返します。
} catch (SolrServerException e) {
logger.error("SolrServerException が発生しました", e);
新しい IndexingException(e) をスローします。
}
}
public void destroy() は例外をスローします {
シャットダウン(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
}
public boolean shutdown(長いタイムアウト、TimeUnit単位) {
if (シャットダウン中) {
logger.info("シャットダウンの重複試行を抑制しています");
false を返します。
}
シャットダウン = true;
文字列baseName = updateThread.getName();
updateThread.setName(baseName + " - シャットダウン");
ブール値 rv = false;
試す {
// 条件付きで待機
if (タイムアウト > 0) {
updateThread.setName(baseName + " - シャットダウン (待機中)");
rv = waitForQueue(タイムアウト, ユニット);
}
} ついに {
// ただし、常にシャットダウン シーケンスを開始します
実行中 = false;
updateThread.setName(baseName + " - シャットダウン中 (クライアントに通知)");
}
RVを返します。
}
/**
* @param タイムアウト
* @パラメータユニット
* @戻る
*/
private boolean waitForQueue(長いタイムアウト、TimeUnit単位) {
CountDownLatch ラッチ = 新しい CountDownLatch(1);
inputQueue.add(新しい StopOperation(ラッチ));
試す {
return latch.await(タイムアウト, ユニット);
} catch (InterruptedException e) {
throw new RuntimeException("キューの待機が中断されました", e);
}
}
クラス UpdateTask は Runnable {を実装します
public void run() {
(実行中) {
試す {
syncIndices();
} catch (Throwable e) {
if (シャットダウン中) {
logger.warn("シャットダウン中に例外が発生しました", e);
} それ以外 {
logger.error("solr インデックス更新処理の問題", e);
}
}
}
logger.info("SolrIndexer をシャットダウンします");
}
}
void syncIndices() は InterruptedException をスローします {
操作 op = inputQueue.poll(1000L, TimeUnit.MILLISECONDS);
if (op == null) {
戻る;
}
if (op インスタンスの StopOperation) {
((StopOperation) op).stop();
戻る;
}
// 1秒待ちます
試す {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
List<オペレーション> ops = new ArrayList<オペレーション>(inputQueue.size() + 1);
ops.add(op);
inputQueue.drainTo(ops);
Map<String, List<Indexable>> deleteMap = new HashMap<String, List<Indexable>>(4);
Map<String, List<Indexable>> updateMap = new HashMap<String, List<Indexable>>(4);
for (オペレーション o : ops) {
if (o stopOperation のインスタンス) {
((StopOperation) o).stop();
} それ以外 {
インデックス可能 Indexable = o.indexable;
if (o.type == OperationType.DELETE) {
List<Indexable> docs = deleteMap.get(indexable.getType());
if (docs == null) {
docs = new LinkedList<Indexable>();
deleteMap.put(indexable.getType()、ドキュメント);
}
docs.add(インデックス可能);
} それ以外 {
List<Indexable> docs = updateMap.get(indexable.getType());
if (docs == null) {
docs = new LinkedList<Indexable>();
updateMap.put(indexable.getType(), ドキュメント);
}
docs.add(インデックス可能);
}
}
}
for (Iterator<Map.Entry<String, List<Indexable>>> i = deleteMap.entrySet().iterator(); i.hasNext();) {
Map.Entry<String, List<Indexable>> エントリ = i.next();
delIndices(entry.getKey()、entry.getValue());
}
for (Iterator<Map.Entry<String, List<Indexable>>> i = updateMap.entrySet().iterator(); i.hasNext();) {
Map.Entry<String, List<Indexable>> エントリ = i.next();
updateIndices(entry.getKey()、entry.getValue());
}
}
enum OperationType { DELETE、UPDATE、SHUTDOWN }
静的クラスの操作 {
OperationType タイプ。
インデックス可能インデックス可能。
手術() {}
Operation(インデックス可能インデックス可能、OperationType タイプ) {
this.indexable = インデックス可能;
this.type = タイプ;
}
}
静的クラス StopOperation extends Operation {
CountDownLatch ラッチ。
StopOperation(CountDownLatch ラッチ) {
this.latch = ラッチ;
this.type = 操作タイプ.SHUTDOWN;
}
public void stop() {
latch.countDown();
}
}
//~ アクセサ ===============
}