Revision 47517
Added by Alessia Bardi over 7 years ago
modules/cnr-mongo-mdstore/trunk/src/test/java/eu/dnetlib/data/mdstore/modular/mongodb/IndexFieldParserTest.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mdstore.modular.mongodb; |
2 | 2 |
|
3 |
import eu.dnetlib.data.mdstore.modular.MDFormatDescription; |
|
4 |
import eu.dnetlib.data.mdstore.modular.mongodb.utils.IndexFieldRecordParser; |
|
5 |
import org.apache.commons.io.IOUtils; |
|
6 |
import org.junit.Test; |
|
7 |
|
|
8 | 3 |
import java.io.IOException; |
9 | 4 |
import java.io.InputStream; |
10 | 5 |
import java.util.ArrayList; |
11 | 6 |
import java.util.List; |
12 | 7 |
import java.util.Map; |
13 | 8 |
|
9 |
import eu.dnetlib.data.mdstore.MDStoreServiceException; |
|
10 |
import eu.dnetlib.data.mdstore.modular.MDFormatDescription; |
|
11 |
import eu.dnetlib.data.mdstore.modular.mongodb.utils.IndexFieldRecordParser; |
|
12 |
import eu.dnetlib.data.mdstore.modular.mongodb.utils.IndexFieldRecordParserException; |
|
13 |
import org.apache.commons.io.IOUtils; |
|
14 |
import org.junit.Test; |
|
15 |
|
|
14 | 16 |
/** |
15 | 17 |
* Created by sandro on 11/29/16. |
16 | 18 |
*/ |
... | ... | |
18 | 20 |
|
19 | 21 |
|
20 | 22 |
@Test |
21 |
public void parserTest() throws IOException { |
|
23 |
public void parserTest() throws IOException, MDStoreServiceException, IndexFieldRecordParserException {
|
|
22 | 24 |
InputStream inputStream = this.getClass().getResourceAsStream("/eu/dnetlib/data/mdstore/modular/mongodb/inputRecord.xml"); |
23 | 25 |
|
24 | 26 |
final String inputRecord = IOUtils.toString(inputStream); |
modules/cnr-mongo-mdstore/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/mongodb/utils/IndexFieldRecordParserException.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mdstore.modular.mongodb.utils; |
|
2 |
|
|
3 |
/** |
|
4 |
* Created by Alessia Bardi on 14/06/2017. |
|
5 |
* |
|
6 |
* @author Alessia Bardi |
|
7 |
*/ |
|
8 |
public class IndexFieldRecordParserException extends Exception { |
|
9 |
|
|
10 |
public IndexFieldRecordParserException() { |
|
11 |
super(); |
|
12 |
} |
|
13 |
|
|
14 |
public IndexFieldRecordParserException(final String message) { |
|
15 |
super(message); |
|
16 |
} |
|
17 |
|
|
18 |
public IndexFieldRecordParserException(final String message, final Throwable cause) { |
|
19 |
super(message, cause); |
|
20 |
} |
|
21 |
|
|
22 |
public IndexFieldRecordParserException(final Throwable cause) { |
|
23 |
super(cause); |
|
24 |
} |
|
25 |
} |
modules/cnr-mongo-mdstore/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/mongodb/utils/IndexFieldRecordParser.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mdstore.modular.mongodb.utils; |
2 | 2 |
|
3 |
import java.util.ArrayList; |
|
4 |
import java.util.HashMap; |
|
5 |
import java.util.List; |
|
6 |
import java.util.Map; |
|
7 |
|
|
3 | 8 |
import com.ximpleware.AutoPilot; |
4 | 9 |
import com.ximpleware.VTDGen; |
5 | 10 |
import com.ximpleware.VTDNav; |
... | ... | |
7 | 12 |
import org.apache.commons.logging.Log; |
8 | 13 |
import org.apache.commons.logging.LogFactory; |
9 | 14 |
|
10 |
import java.util.ArrayList; |
|
11 |
import java.util.HashMap; |
|
12 |
import java.util.List; |
|
13 |
import java.util.Map; |
|
14 |
|
|
15 | 15 |
/** |
16 | 16 |
* Created by sandro on 11/29/16. |
17 | 17 |
*/ |
... | ... | |
19 | 19 |
|
20 | 20 |
private static final Log log = LogFactory.getLog(IndexFieldRecordParser.class); |
21 | 21 |
|
22 |
public static List<String> getTextValue(final AutoPilot ap, final VTDNav vn, final String xpath) throws Exception {
|
|
22 |
private static List<String> getTextValue(final AutoPilot ap, final VTDNav vn, final String xpath) throws Exception {
|
|
23 | 23 |
List<String> results = new ArrayList<>(); |
24 | 24 |
ap.selectXPath(xpath); |
25 | 25 |
while (ap.evalXPath() != -1) { |
... | ... | |
29 | 29 |
return results; |
30 | 30 |
} |
31 | 31 |
|
32 |
public Map<String, List<String>> parseRecord(final String record, final List<MDFormatDescription> mdformats) { |
|
32 |
public Map<String, List<String>> parseRecord(final String record, final List<MDFormatDescription> mdformats) throws IndexFieldRecordParserException {
|
|
33 | 33 |
if (mdformats == null || mdformats.size() == 0) |
34 | 34 |
return null; |
35 | 35 |
final Map<String, List<String>> result = new HashMap<>(); |
... | ... | |
46 | 46 |
result.put(description.getName(), xpathResult); |
47 | 47 |
} |
48 | 48 |
return result; |
49 |
|
|
50 | 49 |
} catch (Throwable e) { |
51 |
log.error("Error on parsing record " + record, e);
|
|
50 |
throw new IndexFieldRecordParserException("Cannot index record", e);
|
|
52 | 51 |
} |
53 |
return result; |
|
54 | 52 |
} |
55 | 53 |
|
56 | 54 |
|
modules/cnr-mongo-mdstore/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/mongodb/MongoBulkWritesManager.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mdstore.modular.mongodb; |
2 | 2 |
|
3 |
import java.util.List; |
|
4 |
import java.util.Map; |
|
5 |
|
|
3 | 6 |
import com.google.common.collect.Lists; |
4 | 7 |
import com.mongodb.BasicDBObject; |
5 | 8 |
import com.mongodb.DBObject; |
6 | 9 |
import com.mongodb.WriteConcern; |
7 | 10 |
import com.mongodb.client.MongoCollection; |
8 | 11 |
import com.mongodb.client.model.*; |
12 |
import eu.dnetlib.data.mdstore.MDStoreServiceException; |
|
9 | 13 |
import eu.dnetlib.data.mdstore.modular.MDFormatDescription; |
10 | 14 |
import eu.dnetlib.data.mdstore.modular.RecordParser; |
11 | 15 |
import eu.dnetlib.data.mdstore.modular.mongodb.utils.IndexFieldRecordParser; |
16 |
import eu.dnetlib.data.mdstore.modular.mongodb.utils.IndexFieldRecordParserException; |
|
12 | 17 |
import org.apache.commons.logging.Log; |
13 | 18 |
import org.apache.commons.logging.LogFactory; |
14 | 19 |
|
15 |
import java.util.List; |
|
16 |
import java.util.Map; |
|
17 |
|
|
18 | 20 |
public class MongoBulkWritesManager { |
19 | 21 |
|
20 | 22 |
private static final Log log = LogFactory.getLog(MongoBulkWritesManager.class); |
21 | 23 |
private final boolean discardRecords; |
22 |
private final boolean indexRecords;
|
|
23 |
private final IndexFieldRecordParser indexFieldRecordParser = new IndexFieldRecordParser();
|
|
24 |
private final List<MDFormatDescription> mdref;
|
|
25 |
private RecordParser recordParser;
|
|
24 |
private final boolean indexRecords;
|
|
25 |
private final IndexFieldRecordParser indexFieldRecordParser = new IndexFieldRecordParser();
|
|
26 |
private final List<MDFormatDescription> mdref;
|
|
27 |
private RecordParser recordParser;
|
|
26 | 28 |
private MongoCollection<DBObject> validCollection; |
27 | 29 |
private List<WriteModel<DBObject>> validBulkOperationList; |
28 | 30 |
private BulkWriteOptions writeOptions; |
... | ... | |
31 | 33 |
private int bulkSize; |
32 | 34 |
|
33 | 35 |
public MongoBulkWritesManager(final MongoCollection<DBObject> collection, |
34 |
final MongoCollection<DBObject> discardedCollection,
|
|
35 |
final List<MDFormatDescription> mdref,
|
|
36 |
final int bulkSize,
|
|
37 |
final RecordParser parser,
|
|
38 |
final boolean discardRecords) {
|
|
36 |
final MongoCollection<DBObject> discardedCollection,
|
|
37 |
final List<MDFormatDescription> mdref,
|
|
38 |
final int bulkSize,
|
|
39 |
final RecordParser parser,
|
|
40 |
final boolean discardRecords) {
|
|
39 | 41 |
this.validCollection = collection.withWriteConcern(WriteConcern.ACKNOWLEDGED); |
40 | 42 |
this.validBulkOperationList = Lists.newArrayList(); |
41 | 43 |
|
... | ... | |
45 | 47 |
this.bulkSize = bulkSize; |
46 | 48 |
this.recordParser = parser; |
47 | 49 |
this.discardRecords = discardRecords; |
48 |
this.mdref = mdref;
|
|
50 |
this.mdref = mdref;
|
|
49 | 51 |
|
50 |
this.indexRecords = this.mdref != null;
|
|
51 |
this.writeOptions = new BulkWriteOptions().ordered(false);
|
|
52 |
this.indexRecords = this.mdref != null;
|
|
53 |
this.writeOptions = new BulkWriteOptions().ordered(false);
|
|
52 | 54 |
} |
53 | 55 |
|
54 |
public void insert(final String record) { |
|
56 |
public void insert(final String record) throws MDStoreServiceException { |
|
57 |
Map<String, String> recordProperties = null; |
|
55 | 58 |
try { |
56 |
final Map<String, String> recordProperties = recordParser.parseRecord(record); |
|
57 |
Map<String, List<String>> indexRecordField = null; |
|
58 |
if (indexRecords) { |
|
59 |
indexRecordField = indexFieldRecordParser.parseRecord(record, mdref); |
|
60 |
} |
|
61 |
|
|
62 |
log.debug("found props: " + recordProperties); |
|
63 |
if (recordProperties.containsKey("id")) { |
|
64 |
final DBObject obj = buildDBObject(record, recordProperties, indexRecordField); |
|
65 |
if (log.isDebugEnabled()) { |
|
66 |
log.debug("Saving object" + obj); |
|
67 |
} |
|
68 |
validBulkOperationList.add(new ReplaceOneModel(new BasicDBObject("id", obj.get("id")), obj, new UpdateOptions().upsert(true))); |
|
69 |
if (((validBulkOperationList.size() % bulkSize) == 0) && !validBulkOperationList.isEmpty()) { |
|
70 |
validCollection.bulkWrite(validBulkOperationList, writeOptions); |
|
71 |
validBulkOperationList.clear(); |
|
72 |
} |
|
73 |
} else { |
|
74 |
if (discardRecords) { |
|
75 |
log.debug("parsed record seems invalid"); |
|
76 |
discardRecord(record); |
|
77 |
} |
|
78 |
} |
|
59 |
recordProperties = recordParser.parseRecord(record); |
|
79 | 60 |
} catch (Throwable e) { |
80 | 61 |
if (discardRecords) { |
81 | 62 |
log.debug("unhandled exception: " + e.getMessage()); |
82 | 63 |
discardRecord(record); |
83 | 64 |
} |
84 | 65 |
} |
66 |
Map<String, List<String>> indexRecordField = null; |
|
67 |
try { |
|
68 |
if (indexRecords) { |
|
69 |
indexRecordField = indexFieldRecordParser.parseRecord(record, mdref); |
|
70 |
} |
|
71 |
} catch (IndexFieldRecordParserException e) { |
|
72 |
// could not index record fields |
|
73 |
throw new MDStoreServiceException("Are you using the correct type of store / index definition for the records in " + validCollection.getNamespace() + " ?", e); |
|
74 |
} |
|
75 |
|
|
76 |
log.debug("found props: " + recordProperties); |
|
77 |
if (recordProperties.containsKey("id")) { |
|
78 |
final DBObject obj = buildDBObject(record, recordProperties, indexRecordField); |
|
79 |
if (log.isDebugEnabled()) { |
|
80 |
log.debug("Saving object" + obj); |
|
81 |
} |
|
82 |
validBulkOperationList.add(new ReplaceOneModel(new BasicDBObject("id", obj.get("id")), obj, new UpdateOptions().upsert(true))); |
|
83 |
if (((validBulkOperationList.size() % bulkSize) == 0) && !validBulkOperationList.isEmpty()) { |
|
84 |
validCollection.bulkWrite(validBulkOperationList, writeOptions); |
|
85 |
validBulkOperationList.clear(); |
|
86 |
} |
|
87 |
} else { |
|
88 |
if (discardRecords) { |
|
89 |
log.debug("parsed record seems invalid"); |
|
90 |
discardRecord(record); |
|
91 |
} |
|
92 |
} |
|
85 | 93 |
} |
86 | 94 |
|
87 | 95 |
private void discardRecord(final String record) { |
... | ... | |
109 | 117 |
discardedCollection = getCollectionWithWriteConcern(discardedCollection, WriteConcern.ACKNOWLEDGED); |
110 | 118 |
} |
111 | 119 |
|
112 |
private DBObject buildDBObject(final String record, final Map<String, String> recordProperties, final Map<String, List<String>> indexFieldProperties) {
|
|
113 |
final DBObject obj = new BasicDBObject();
|
|
120 |
private DBObject buildDBObject(final String record, final Map<String, String> recordProperties, final Map<String, List<String>> indexFieldProperties) {
|
|
121 |
final DBObject obj = new BasicDBObject();
|
|
114 | 122 |
obj.put("id", recordProperties.get("id")); |
115 | 123 |
obj.put("originalId", recordProperties.get("originalId")); |
116 | 124 |
obj.put("body", record); |
117 | 125 |
obj.put("timestamp", System.currentTimeMillis()); |
118 |
if (indexFieldProperties != null)
|
|
119 |
obj.putAll(indexFieldProperties);
|
|
120 |
return obj;
|
|
126 |
if (indexFieldProperties != null)
|
|
127 |
obj.putAll(indexFieldProperties);
|
|
128 |
return obj;
|
|
121 | 129 |
} |
122 | 130 |
|
123 | 131 |
private MongoCollection<DBObject> getCollectionWithWriteConcern(MongoCollection<DBObject> collection, WriteConcern writeConcern) { |
Also available in: Unified diff
Stroing in mdstore fails if the VTD Parser cannot properly apply the xpaths on records to index