复制代码代码如下:
Die öffentliche Klasse SolrIndexer implementiert Indexer, Searcher und DisposableBean {
//~ Statische Felder/Initialisierer ========================================= ==
static final Logger logger = LoggerFactory.getLogger(SolrIndexer.class);
private static final long SHUTDOWN_TIMEOUT = 5 * 60 * 1000L; // lang genug
privates statisches final int INPUT_QUEUE_LENGTH = 16384;
//~ Instanzfelder =========================================== ===========
privater CommonsHttpSolrServer-Server;
private BlockingQueue<Operation> inputQueue;
privater Thread updateThread;
volatile boolean running = true;
volatile boolean ShuttingDown = false;
//~ Konstruktoren ============================================ =============
public SolrIndexer(String url) löst MalformedURLException {
server = new CommonsHttpSolrServer(url);
inputQueue = new ArrayBlockingQueue<Operation>(INPUT_QUEUE_LENGTH);
updateThread = neuer Thread(new UpdateTask());
updateThread.setName("SolrIndexer");
updateThread.start();
}
//~ Methoden ============================================ ==================
public void setSoTimeout(int timeout) {
server.setSoTimeout(timeout);
}
public void setConnectionTimeout(int timeout) {
server.setConnectionTimeout(timeout);
}
public void setAllowCompression(booleanallowCompression) {
server.setAllowCompression(allowCompression);
}
public void addIndex(Indexable indexable) löst IndexingException { aus
if (shuttingDown) {
throw new IllegalStateException("SolrIndexer wird heruntergefahren");
}
inputQueue.offer(new Operation(indexable, OperationType.UPDATE));
}
public void delIndex(Indexable indexable) wirft IndexingException {
if (shuttingDown) {
throw new IllegalStateException("SolrIndexer wird heruntergefahren");
}
inputQueue.offer(new Operation(indexable, OperationType.DELETE));
}
private void updateIndices(String type, List<Indexable> indices) löst IndexingException {
if (indices == null || indices.size() == 0) {
zurückkehren;
}
logger.debug("Updating {} indices", indices.size());
UpdateRequest req = new UpdateRequest("/" + type + "/update");
req.setAction(UpdateRequest.ACTION.COMMIT, false, false);
for (Indexierbare idx: Indizes) {
Doc doc = idx.getDoc();
SolrInputDocument solrDoc = new SolrInputDocument();
solrDoc.setDocumentBoost(doc.getDocumentBoost());
for (Iterator<Field> i = doc.iterator(); i.hasNext();) {
Feld field = i.next();
solrDoc.addField(field.getName(), field.getValue(), field.getBoost());
}
req.add(solrDoc);
}
versuchen {
req.process(server);
} Catch (SolrServerException e) {
logger.error("SolrServerException aufgetreten", e);
throw new IndexingException(e);
} Catch (IOException e) {
logger.error("IOException aufgetreten", e);
throw new IndexingException(e);
}
}
private void delIndices(String type, List<Indexable> indices) löst IndexingException {
if (indices == null || indices.size() == 0) {
zurückkehren;
}
logger.debug("Löschen von {}-Indizes", indices.size());
UpdateRequest req = new UpdateRequest("/" + type + "/update");
req.setAction(UpdateRequest.ACTION.COMMIT, false, false);
for (Indexierbar indexierbar: Indizes) {
req.deleteById(indexable.getDocId());
}
versuchen {
req.process(server);
} Catch (SolrServerException e) {
logger.error("SolrServerException aufgetreten", e);
throw new IndexingException(e);
} Catch (IOException e) {
logger.error("IOException aufgetreten", e);
throw new IndexingException(e);
}
}
öffentliche QueryResult-Suche (Abfrageabfrage) löst IndexingException {
SolrQuery sq = new SolrQuery();
sq.setQuery(query.getQuery());
if (query.getFilter() != null) {
sq.addFilterQuery(query.getFilter());
}
if (query.getOrderField() != null) {
sq.addSortField(query.getOrderField(), query.getOrder() == Query.Order.DESC ? SolrQuery.ORDER.desc : SolrQuery.ORDER.asc);
}
sq.setStart(query.getOffset());
sq.setRows(query.getLimit());
QueryRequest req = new QueryRequest(sq);
req.setPath("/" + query.getType() + "/select");
versuchen {
QueryResponse rsp = req.process(server);
SolrDocumentList docs = rsp.getResults();
QueryResult result = new QueryResult();
result.setOffset(docs.getStart());
result.setTotal(docs.getNumFound());
result.setSize(sq.getRows());
List<Doc> resultDocs = new ArrayList<Doc>(result.getSize());
for (Iterator<SolrDocument> i = docs.iterator(); i.hasNext();) {
SolrDocument solrDocument = i.next();
Doc doc = new Doc();
for (Iterator<Map.Entry<String, Object>> iter = solrDocument.iterator(); iter.hasNext();) {
Map.Entry<String, Object> field = iter.next();
doc.addField(field.getKey(), field.getValue());
}
resultDocs.add(doc);
}
result.setDocs(resultDocs);
Ergebnis zurückgeben;
} Catch (SolrServerException e) {
logger.error("SolrServerException aufgetreten", e);
throw new IndexingException(e);
}
}
public void destroy() löst eine Ausnahme aus {
Shutdown(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
}
öffentliches boolesches Herunterfahren (lange Zeitüberschreitung, TimeUnit-Einheit) {
if (shuttingDown) {
logger.info("Unterdrückung doppelter Versuche zum Herunterfahren");
return false;
}
ShuttingDown = true;
String baseName = updateThread.getName();
updateThread.setName(baseName + " - HERUNTERFAHREN");
boolean rv = false;
versuchen {
// Bedingt warten
if (timeout > 0) {
updateThread.setName(baseName + " - HERUNTERFAHREN (wartet)");
rv = waitForQueue(timeout, unit);
}
} Endlich {
// Aber beginnen Sie immer mit der Abschaltsequenz
läuft = false;
updateThread.setName(baseName + " - HERUNTERFAHREN (informierter Client)");
}
Rückkehr RV;
}
/**
* @param-Zeitüberschreitung
* @param-Einheit
* @zurückkehren
*/
privater boolescher Wert waitForQueue(long timeout, TimeUnit unit) {
CountDownLatch-Latch = new CountDownLatch(1);
inputQueue.add(new StopOperation(latch));
versuchen {
return Latch.await(timeout, unit);
} Catch (InterruptedException e) {
throw new RuntimeException("Unterbrochenes Warten auf Warteschlangen", e);
}
}
Klasse UpdateTask implementiert Runnable {
public void run() {
während (laufend) {
versuchen {
syncIndices();
} Catch (Throwable e) {
if (shuttingDown) {
logger.warn("Ausnahme beim Herunterfahren aufgetreten", e);
} anders {
logger.error("Problem bei der Aktualisierung der Solr-Indizierung", e);
}
}
}
logger.info("SolrIndexer herunterfahren");
}
}
void syncIndices() wirft InterruptedException {
Operation op = inputQueue.poll(1000L, TimeUnit.MILLISECONDS);
if (op == null) {
zurückkehren;
}
if (op-Instanz von StopOperation) {
((StopOperation) op).stop();
zurückkehren;
}
// 1 Sekunde warten
versuchen {
Thread.sleep(1000);
} Catch (InterruptedException e) {
}
List<Operation> ops = new ArrayList<Operation>(inputQueue.size() + 1);
ops.add(op);
inputQueue.drainTo(ops);
Map<String, List<Indexable>> deleteMap = new HashMap<String, List<Indexable>>(4);
Map<String, List<Indexable>> updateMap = new HashMap<String, List<Indexable>>(4);
for (Operation o : ops) {
if (o Instanz von StopOperation) {
((StopOperation) o).stop();
} anders {
Indexierbar indexable = o.indexable;
if (o.type == OperationType.DELETE) {
List<Indexable> docs = deleteMap.get(indexable.getType());
if (docs == null) {
docs = new LinkedList<Indexable>();
deleteMap.put(indexable.getType(), docs);
}
docs.add(indexierbar);
} anders {
List<Indexable> docs = updateMap.get(indexable.getType());
if (docs == null) {
docs = new LinkedList<Indexable>();
updateMap.put(indexable.getType(), docs);
}
docs.add(indexierbar);
}
}
}
for (Iterator<Map.Entry<String, List<Indexable>>> i = deleteMap.entrySet().iterator(); i.hasNext();) {
Map.Entry<String, List<Indexable>> enter = i.next();
delIndices(entry.getKey(), enter.getValue());
}
for (Iterator<Map.Entry<String, List<Indexable>>> i = updateMap.entrySet().iterator(); i.hasNext();) {
Map.Entry<String, List<Indexable>> enter = i.next();
updateIndices(entry.getKey(), enter.getValue());
}
}
enum OperationType { DELETE, UPDATE, SHUTDOWN }
statische Klassenoperation {
OperationType-Typ;
Indexierbar indexierbar;
Operation() {}
Operation(Indexable indexable, OperationType type) {
this.indexable = indexierbar;
this.type = Typ;
}
}
Die statische Klasse StopOperation erweitert Operation {
CountDownLatch-Latch;
StopOperation(CountDownLatch-Latch) {
this.latch = Latch;
this.type = OperationType.SHUTDOWN;
}
public void stop() {
Latch.countDown();
}
}
//~ Accessoren ===============
}