Revision 45479
Added by Alessia Bardi almost 8 years ago
modules/dnet-data-provision-services/trunk/src/main/java/eu/dnetlib/oai/mongo/MongoPublisherStore.java | ||
---|---|---|
32 | 32 |
import com.mongodb.client.result.DeleteResult; |
33 | 33 |
import com.mongodb.client.result.UpdateResult; |
34 | 34 |
import eu.dnetlib.cql.CqlTranslator; |
35 |
import eu.dnetlib.cql.mongo.MongoCqlTranslator; |
|
36 | 35 |
import eu.dnetlib.oai.PublisherField; |
37 | 36 |
import eu.dnetlib.oai.PublisherStore; |
38 | 37 |
import eu.dnetlib.oai.RecordChangeDetector; |
... | ... | |
246 | 245 |
final Object sentinel = new Object(); |
247 | 246 |
this.dropDiscarded(source); |
248 | 247 |
final Date feedDate = new Date(); |
249 |
final Thread background = new Thread(new Runnable() { |
|
250 |
|
|
251 |
@Override |
|
252 |
public void run() { |
|
253 |
// For fast feeding we want to use a collection with unack write concern |
|
254 |
final MongoCollection<DBObject> unackCollection = MongoPublisherStore.this.collection.withWriteConcern(WriteConcern.UNACKNOWLEDGED); |
|
255 |
while (true) { |
|
256 |
try { |
|
257 |
final Object record = queue.take(); |
|
258 |
if (record == sentinel) { |
|
259 |
break; |
|
260 |
} |
|
261 |
safeFeedRecord((String) record, source, feedDate, unackCollection); |
|
262 |
} catch (final InterruptedException e) { |
|
263 |
log.fatal("got exception in background thread", e); |
|
264 |
throw new IllegalStateException(e); |
|
248 |
final Thread background = new Thread(() -> { |
|
249 |
// For fast feeding we want to use a collection with unack write concern |
|
250 |
final MongoCollection<DBObject> unackCollection = MongoPublisherStore.this.collection.withWriteConcern(WriteConcern.UNACKNOWLEDGED); |
|
251 |
while (true) { |
|
252 |
try { |
|
253 |
final Object record = queue.take(); |
|
254 |
if (record == sentinel) { |
|
255 |
break; |
|
265 | 256 |
} |
257 |
safeFeedRecord((String) record, source, feedDate, unackCollection); |
|
258 |
} catch (final InterruptedException e) { |
|
259 |
log.fatal("got exception in background thread", e); |
|
260 |
throw new IllegalStateException(e); |
|
266 | 261 |
} |
267 | 262 |
} |
268 | 263 |
}); |
... | ... | |
380 | 375 |
*/ |
381 | 376 |
private boolean feedRecord(final String record, final String source, final Date feedDate, final MongoCollection<DBObject> unackCollection) { |
382 | 377 |
final PublisherRecordParser parser = new PublisherRecordParser(this.mongoFields); |
383 |
final Multimap<String, String> recordProperties = parser.parseRecord(record); |
|
378 |
log.debug("configured parser for fields: "+this.mongoFields); |
|
379 |
final Multimap<String, String> recordProperties = parser.parseRecord(record, source); |
|
384 | 380 |
String id = ""; |
385 | 381 |
String oaiID = ""; |
386 | 382 |
if (recordProperties.containsKey(OAIConfigurationReader.ID_FIELD)) { |
... | ... | |
399 | 395 |
} |
400 | 396 |
} else { |
401 | 397 |
log.error("parsed record seems invalid -- no identifier property with name: " + OAIConfigurationReader.ID_FIELD); |
398 |
log.debug("Extracted property map: \n"+recordProperties); |
|
399 |
log.debug("from: \n"+record); |
|
402 | 400 |
this.discardedCollection |
403 | 401 |
.insertOne(new BasicDBObject(OAIConfigurationReader.SET_FIELD, source).append(OAIConfigurationReader.BODY_FIELD, record).append( |
404 | 402 |
OAIConfigurationReader.DATESTAMP_FIELD, feedDate)); |
modules/dnet-data-provision-services/trunk/src/main/java/eu/dnetlib/oai/mongo/MongoPublisherStoreDAO.java | ||
---|---|---|
188 | 188 |
|
189 | 189 |
final Element elem = this.mongoOaistoreCache.get(k); |
190 | 190 |
if (elem != null) { |
191 |
log.debug("Store retreived from cache and alwaysNewRecord is" + this.alwaysNewRecord);
|
|
191 |
log.debug("Store retrieved from cache and alwaysNewRecord is" + this.alwaysNewRecord);
|
|
192 | 192 |
final MongoPublisherStore store = (MongoPublisherStore) elem.getObjectValue(); |
193 | 193 |
store.setAlwaysNewRecord(this.alwaysNewRecord); |
194 |
log.debug(store); |
|
194 | 195 |
return store; |
195 | 196 |
} else { |
196 |
log.debug("Store retreived, cache miss, alwaysNewRecord is" + this.alwaysNewRecord);
|
|
197 |
log.debug("Store retrieved, cache miss, alwaysNewRecord is" + this.alwaysNewRecord);
|
|
197 | 198 |
log.fatal("Not using cache to create oaistore from dbObject: " + k); |
198 | 199 |
final MongoPublisherStore store = new MongoPublisherStore(storeId, mdFormat, mdInterpreation, mdLayout, db.getCollection(storeId, DBObject.class), |
199 | 200 |
this.configuration.getFields(mdFormat, mdInterpreation, mdLayout), this.cqlTranslator, this.recordInfoGenerator, |
modules/dnet-data-provision-services/trunk/src/main/java/eu/dnetlib/oai/parser/PublisherRecordParser.java | ||
---|---|---|
8 | 8 |
import com.google.common.collect.Iterables; |
9 | 9 |
import com.google.common.collect.Multimap; |
10 | 10 |
import eu.dnetlib.oai.PublisherField; |
11 |
import eu.dnetlib.oai.conf.OAIConfigurationExistReader; |
|
12 |
import org.apache.commons.lang3.StringUtils; |
|
11 | 13 |
import org.apache.commons.logging.Log; |
12 | 14 |
import org.apache.commons.logging.LogFactory; |
13 | 15 |
import org.dom4j.Document; |
... | ... | |
37 | 39 |
* |
38 | 40 |
* @param record |
39 | 41 |
* the XML string to parse. |
42 |
* @param source |
|
43 |
* String identifying the source of the record. Can be null. |
|
40 | 44 |
* @return a Multimap describing the values to be indexed for this record. |
41 | 45 |
*/ |
42 | 46 |
@SuppressWarnings({ "unchecked", "rawtypes" }) |
43 |
public Multimap<String, String> parseRecord(final String record) { |
|
47 |
public Multimap<String, String> parseRecord(final String record, final String source) {
|
|
44 | 48 |
Multimap<String, String> recordProps = ArrayListMultimap.create(); |
45 | 49 |
try { |
46 | 50 |
Document doc = this.saxReader.read(new StringReader(record)); |
51 |
if(StringUtils.isNotBlank(source)) recordProps.put(OAIConfigurationExistReader.SET_FIELD, source); |
|
47 | 52 |
for (PublisherField field : this.storeIndices) { |
48 | 53 |
for (Entry<String, String> indexEntry : field.getSources().entries()) { |
49 | 54 |
// each xpath can return a list of nodes or strings, depending on the xpath |
... | ... | |
53 | 58 |
recordProps.putAll(field.getFieldName(), xPathResult); |
54 | 59 |
} else { |
55 | 60 |
if (containsNodes(xPathResult)) { |
61 |
|
|
56 | 62 |
recordProps.putAll(field.getFieldName(), Iterables.transform(xPathResult, obj -> { |
57 | 63 |
if (obj == null) return ""; |
58 | 64 |
Node node = (Node) obj; |
... | ... | |
69 | 75 |
recordProps = null; |
70 | 76 |
} |
71 | 77 |
return recordProps; |
72 |
|
|
73 | 78 |
} |
74 | 79 |
|
75 | 80 |
@SuppressWarnings("rawtypes") |
... | ... | |
97 | 102 |
} |
98 | 103 |
|
99 | 104 |
public PublisherRecordParser(final List<PublisherField> storeIndices) { |
100 |
super(); |
|
101 | 105 |
this.storeIndices = storeIndices; |
102 | 106 |
} |
103 | 107 |
|
modules/dnet-data-provision-services/trunk/src/main/java/eu/dnetlib/oai/sync/OAIStoreSynchronizer.java | ||
---|---|---|
35 | 35 |
MongoPublisherStore store = this.getStore(sourceMetadataFormat, dbName, alwaysNewRecord); |
36 | 36 |
int count = store.feed(records, recordSource); |
37 | 37 |
|
38 |
|
|
39 | 38 |
log.info("Content synchronized: store " + sourceMetadataFormat + " fed with " + count + " records"); |
40 | 39 |
executeCallback(callback); |
41 | 40 |
} catch (Exception e) { |
Also available in: Unified diff
More logs for debug