复制代码代码如下:
公共类 SolrIndexer 实现 Indexer、Searcher、DisposableBean {
//~ 静态字段/初始化器 ============================================= ==
静态最终 Logger logger = LoggerFactory.getLogger(SolrIndexer.class);
私有静态最终长SHUTDOWN_TIMEOUT = 5 * 60 * 1000L; // 足够长
私有静态最终 int INPUT_QUEUE_LENGTH = 16384;
//~ 实例字段 =============================================== ===========
私有 CommonsHttpSolrServer 服务器;
私有BlockingQueue<操作> inputQueue;
私有线程更新线程;
易失性布尔运行 = true;
易失性布尔关闭= false;
//~ 构造函数 ================================================ =============
公共 SolrIndexer(String url) 抛出 MalformedURLException {
服务器=新的CommonsHttpSolrServer(url);
inputQueue = new ArrayBlockingQueue<操作>(INPUT_QUEUE_LENGTH);
updateThread = 新线程(新 UpdateTask());
updateThread.setName("SolrIndexer");
updateThread.start();
}
//~ 方法================================================ =================
公共无效setSoTimeout(int超时){
server.setSoTimeout(超时);
}
公共无效setConnectionTimeout(int超时){
server.setConnectionTimeout(超时);
}
公共无效setAllowCompression(布尔允许压缩){
server.setAllowCompression(allowCompression);
}
公共无效addIndex(可索引可索引)抛出IndexingException {
如果(关闭){
throw new IllegalStateException("SolrIndexer 正在关闭");
}
inputQueue.offer(新操作(可索引,OperationType.UPDATE));
}
公共无效delIndex(可索引可索引)抛出IndexingException {
如果(关闭){
throw new IllegalStateException("SolrIndexer 正在关闭");
}
inputQueue.offer(新操作(可索引,OperationType.DELETE));
}
private void updateIndices(String type, List<Indexable>索引) 抛出 IndexingException {
if (indices == null ||indexes.size() == 0) {
返回;
}
logger.debug("正在更新 {} 索引",indices.size());
UpdateRequest req = new UpdateRequest("/" + type + "/update");
req.setAction(UpdateRequest.ACTION.COMMIT, false, false);
for (可索引 idx : 索引) {
文档 doc = idx.getDoc();
SolrInputDocument solrDoc = new SolrInputDocument();
solrDoc.setDocumentBoost(doc.getDocumentBoost());
for (Iterator<Field> i = doc.iterator(); i.hasNext();) {
字段字段 = i.next();
solrDoc.addField(field.getName(), field.getValue(), field.getBoost());
}
请求.add(solrDoc);
}
尝试 {
请求过程(服务器);
} catch (SolrServerException e) {
logger.error("发生 SolrServerException", e);
抛出新的 IndexingException(e);
} catch (IOException e) {
logger.error("发生 IOException", e);
抛出新的 IndexingException(e);
}
}
private void delIndices(String type, List<Indexable>索引) 抛出 IndexingException {
if (indices == null ||indexes.size() == 0) {
返回;
}
logger.debug("删除 {} 索引",indices.size());
UpdateRequest req = new UpdateRequest("/" + type + "/update");
req.setAction(UpdateRequest.ACTION.COMMIT, false, false);
for (可索引可索引:索引) {
req.deleteById(indexable.getDocId());
}
尝试 {
请求过程(服务器);
} catch (SolrServerException e) {
logger.error("发生 SolrServerException", e);
抛出新的 IndexingException(e);
} catch (IOException e) {
logger.error("发生 IOException", e);
抛出新的 IndexingException(e);
}
}
公共 QueryResult 搜索(查询查询)抛出 IndexingException {
SolrQuery sq = new SolrQuery();
sq.setQuery(query.getQuery());
if (query.getFilter() != null) {
sq.addFilterQuery(query.getFilter());
}
if (query.getOrderField() != null) {
sq.addSortField(query.getOrderField(), query.getOrder() == Query.Order.DESC ? SolrQuery.ORDER.desc : SolrQuery.ORDER.asc);
}
sq.setStart(query.getOffset());
sq.setRows(query.getLimit());
QueryRequest req = new QueryRequest(sq);
req.setPath("/" + query.getType() + "/select");
尝试 {
QueryResponse rsp = req.process(server);
SolrDocumentList 文档 = rsp.getResults();
QueryResult 结果 = new QueryResult();
result.setOffset(docs.getStart());
result.setTotal(docs.getNumFound());
result.setSize(sq.getRows());
List<Doc> resultDocs = new ArrayList<Doc>(result.getSize());
for (Iterator<SolrDocument> i = docs.iterator(); i.hasNext();) {
SolrDocument solrDocument = i.next();
文档文档=新文档();
for (Iterator<Map.Entry<String, Object>> iter = solrDocument.iterator(); iter.hasNext();) {
Map.Entry<String, Object> field = iter.next();
doc.addField(field.getKey(), field.getValue());
}
resultDocs.add(doc);
}
result.setDocs(resultDocs);
返回结果;
} catch (SolrServerException e) {
logger.error("发生 SolrServerException", e);
抛出新的 IndexingException(e);
}
}
公共无效销毁()抛出异常{
关闭(SHUTDOWN_TIMEOUT,TimeUnit.MILLISECONDS);
}
公共布尔关闭(长超时,TimeUnit单位){
如果(关闭){
logger.info("禁止重复尝试关闭");
返回假;
}
关闭=真;
String baseName = updateThread.getName();
updateThread.setName(baseName + " - 关闭");
布尔值 rv = false;
尝试 {
// 有条件等待
如果(超时> 0){
updateThread.setName(baseName + " - 关闭(等待)");
rv = waitForQueue(超时,单位);
}
} 最后 {
// 但总是开始关闭序列
运行=假;
updateThread.setName(baseName + " - 关闭(通知客户端)");
}
返回房车;
}
/**
* @参数超时
* @参数单位
* @返回
*/
private boolean waitForQueue(长超时, TimeUnit 单位) {
CountDownLatch 锁存器 = new CountDownLatch(1);
inputQueue.add(new StopOperation(latch));
尝试 {
返回latch.await(超时,单位);
} catch (InterruptedException e) {
throw new RuntimeException("队列等待中断", e);
}
}
类 UpdateTask 实现 Runnable {
公共无效运行(){
当(运行){
尝试 {
同步索引();
} catch (Throwable e) {
如果(关闭){
logger.warn("关机时发生异常", e);
} 别的 {
logger.error("处理 solr 索引更新问题", e);
}
}
}
logger.info("关闭 SolrIndexer");
}
}
voidsyncIndices() 抛出 InterruptedException {
操作 op = inputQueue.poll(1000L, TimeUnit.MILLISECONDS);
如果(操作==空){
返回;
}
if (op 实例停止操作) {
((StopOperation) op).stop();
返回;
}
// 等待1秒
尝试 {
线程睡眠(1000);
} catch (InterruptedException e) {
}
List<操作> ops = new ArrayList<操作>(inputQueue.size() + 1);
ops.add(op);
inputQueue.drainTo(ops);
Map<String, List<Indexable>> deleteMap = new HashMap<String, List<Indexable>>(4);
Map<String, List<Indexable>> updateMap = new HashMap<String, List<Indexable>>(4);
for (操作 o : ops) {
if (o 停止操作实例) {
((StopOperation) o).stop();
} 别的 {
可转位可转位 = o.indexable;
if (o.type == OperationType.DELETE) {
List<Indexable> docs = deleteMap.get(indexable.getType());
如果(文档==空){
docs = new LinkedList<Indexable>();
deleteMap.put(indexable.getType(), docs);
}
docs.add(可索引);
} 别的 {
List<Indexable> docs = updateMap.get(indexable.getType());
如果(文档==空){
docs = new LinkedList<Indexable>();
updateMap.put(indexable.getType(), docs);
}
docs.add(可索引);
}
}
}
for (Iterator<Map.Entry<String, List<Indexable>>> i = deleteMap.entrySet().iterator(); i.hasNext();) {
Map.Entry<String, List<Indexable>>entry = i.next();
delIndices(entry.getKey(),entry.getValue());
}
for (Iterator<Map.Entry<String, List<Indexable>>> i = updateMap.entrySet().iterator(); i.hasNext();) {
Map.Entry<String, List<Indexable>>entry = i.next();
updateIndices(entry.getKey(),entry.getValue());
}
}
枚举操作类型 { 删除、更新、关闭 }
静态类操作{
操作类型类型;
可转位 可转位;
手术() {}
操作(可索引可索引,OperationType 类型){
this.indexable = 可索引;
this.type = 类型;
}
}
静态类 StopOperation 扩展操作 {
CountDownLatch锁存器;
StopOperation(CountDownLatch 锁存器) {
this.latch = 锁存器;
this.type = OperationType.SHUTDOWN;
}
公共无效停止(){
闩锁.countDown();
}
}
//~ 访问器 ===============
}