Revision 45806
Added by Eri Katsari almost 8 years ago
modules/dnet-openaire-lodinterlinking/branches/cacheOptimized/src/main/java/eu/dnetlib/data/mapreduce/hbase/lodExport/linkage/LinkCustomReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.lodExport.linkage; |
|
2 |
|
|
3 |
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.MyComparator; |
|
4 |
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.RedisUtils; |
|
5 |
import org.aksw.limes.core.io.cache.MemoryCache; |
|
6 |
import org.aksw.limes.core.io.config.reader.xml.XMLConfigurationReader; |
|
7 |
import org.aksw.limes.core.io.preprocessing.Preprocessor; |
|
8 |
import org.apache.hadoop.io.Text; |
|
9 |
import org.apache.hadoop.mapreduce.Reducer; |
|
10 |
import org.apache.log4j.Logger; |
|
11 |
|
|
12 |
import java.io.IOException; |
|
13 |
import java.util.HashMap; |
|
14 |
import java.util.Map; |
|
15 |
|
|
16 |
public class LinkCustomReducer extends Reducer<Text, Text, Text, Text> { |
|
17 |
|
|
18 |
private Logger log = Logger.getLogger(LinkCustomReducer.class); |
|
19 |
|
|
20 |
private static final String OPTIMAL_BLOCKS = "optimalBlockSize"; |
|
21 |
private long optimalBlockSize; |
|
22 |
private RedisUtils redisUtils; |
|
23 |
|
|
24 |
|
|
25 |
public static enum LINK_REDUCER_COUNTERS { |
|
26 |
TARGET_TRIPLES, |
|
27 |
SOURCE_TRIPLES, |
|
28 |
WRITTEN_OUT_ENTITIES |
|
29 |
} |
|
30 |
|
|
31 |
|
|
32 |
@Override |
|
33 |
protected void setup(Context context) throws IOException, InterruptedException { |
|
34 |
try { |
|
35 |
redisUtils = new RedisUtils(context); |
|
36 |
optimalBlockSize = Integer.valueOf(redisUtils.getValue(OPTIMAL_BLOCKS)); |
|
37 |
log.info("OPTIMAL BLOCK SIZE " + optimalBlockSize); |
|
38 |
} catch (Exception e) { |
|
39 |
log.error("Error Setting up reducer stats" + e.toString()); |
|
40 |
throw new RuntimeException("Error Setting up reducr stats", e); |
|
41 |
} |
|
42 |
} |
|
43 |
|
|
44 |
@Override |
|
45 |
protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException { |
|
46 |
|
|
47 |
//each item in the list is a block with the given key |
|
48 |
|
|
49 |
Map<String, String> sourceRecords = new HashMap<>(); |
|
50 |
Map<String, String> targetRecords = new HashMap<>(); |
|
51 |
|
|
52 |
for (Text block : values) { |
|
53 |
try { |
|
54 |
context.getCounter(LimesReducer.LIMES_COUNTERS.LINKED_BLOCKS).increment(1); |
|
55 |
|
|
56 |
sourceRecords.clear(); |
|
57 |
targetRecords.clear(); |
|
58 |
} catch (Exception e) { |
|
59 |
log.error(e.toString(), e); |
|
60 |
throw new IOException(e); |
|
61 |
} |
|
62 |
|
|
63 |
} |
|
64 |
} |
|
65 |
|
|
66 |
@Override |
|
67 |
protected void cleanup(Context context) throws IOException, InterruptedException { |
|
68 |
|
|
69 |
} |
|
70 |
private void fillLimesCache(Text block, Context context) throws Exception { |
|
71 |
|
|
72 |
String[] split = block.toString().split(SEPERATOR); |
|
73 |
for (String recordId : split) { |
|
74 |
String record = redisUtils.getValue(recordId); |
|
75 |
if (record == null) { |
|
76 |
log.error("Record " + recordId + " not found! "); |
|
77 |
throw new Exception("Record " + recordId + " not found! "); |
|
78 |
|
|
79 |
} |
|
80 |
|
|
81 |
record = record.substring(record.indexOf(FIELD_DELIM) + 1).trim(); |
|
82 |
String[] Triples = record.split(LINE_SEPERATOR); |
|
83 |
for (String triple : Triples) { |
|
84 |
String[] Fields = triple.split(FIELD_DELIM); |
|
85 |
String subject = Fields[0]; |
|
86 |
//props in sourceKB do not contain <> ,so we need to replace them. |
|
87 |
String property = Fields[1].replaceAll("[<>]", ""); |
|
88 |
String value = Fields[2]; |
|
89 |
if (sourceKb.getProperties().contains(property) || targetKb.getProperties().contains(property)) { |
|
90 |
if (recordId.contains("source_")) { |
|
91 |
for (String propertyDub : sourceKb.getFunctions().get(property).keySet()) { |
|
92 |
String processedValue = Preprocessor.process(value, sourceKb.getFunctions().get(property).get(propertyDub)); |
|
93 |
sourceCache.addTriple(subject, propertyDub, processedValue); |
|
94 |
} |
|
95 |
context.getCounter(LimesReducer.LIMES_COUNTERS.SOURCE_TRIPLES).increment(1); |
|
96 |
} else { |
|
97 |
for (String propertyDub : targetKb.getFunctions().get(property).keySet()) { |
|
98 |
String processedValue = Preprocessor.process(value, targetKb.getFunctions().get(property).get(propertyDub)); |
|
99 |
targetCache.addTriple(subject, propertyDub, processedValue); |
|
100 |
} |
|
101 |
context.getCounter(LimesReducer.LIMES_COUNTERS.TARGET_TRIPLES).increment(1); |
|
102 |
} |
|
103 |
|
|
104 |
} |
|
105 |
} |
|
106 |
|
|
107 |
} |
|
108 |
} |
|
109 |
|
|
110 |
private void compareRecords(Map<String, String> sourceRecords, Map<String, String> targetRecords, Context context) throws IOException, InterruptedException { |
|
111 |
for (String sourceId : sourceRecords.keySet()) { |
|
112 |
//TODO modify Redis to use hmset/ mget field in order to save records as a map and retrieve |
|
113 |
// fields individually as needed ( only those needed for comparison) |
|
114 |
String sourceRecord = (String) connection.get(sourceId); |
|
115 |
for (String targetId : targetRecords.keySet()) { |
|
116 |
String targetRecord = (String) connection.get(targetId); |
|
117 |
double similarity = MyComparator.findMatchingPair(sourceRecord, targetRecord); |
|
118 |
if (similarity > 0) { |
|
119 |
context.write(new Text(sourceId), new Text(targetId + "," + similarity)); |
|
120 |
context.getCounter(LIMES_COUNTERS.WRITTEN_OUT_ENTITIES).increment(1); |
|
121 |
} |
|
122 |
} |
|
123 |
|
|
124 |
} |
|
125 |
} |
|
126 |
} |
|
127 | 0 |
modules/dnet-openaire-lodinterlinking/branches/cacheOptimized/src/test/java/Test.java | ||
---|---|---|
41 | 41 |
System.out.println(Arrays.toString(myLine.split(" "))); |
42 | 42 |
} |
43 | 43 |
|
44 |
@org.junit.Test |
|
45 |
public void testType() |
|
46 | 44 |
|
47 |
{ |
|
48 |
String id = "<http://dblp.l3s.de/d2r/resource/publications/journals/ijdsn/DanZYY13>"; |
|
49 |
|
|
50 |
String s = id.substring(0, id.indexOf('D')); |
|
51 |
|
|
52 |
System.out.println(s); |
|
53 |
} |
|
54 |
|
|
55 |
|
|
56 | 45 |
@org.junit.Test |
57 | 46 |
public void writeToRedis() throws Exception { |
58 | 47 |
|
modules/dnet-openaire-lodinterlinking/branches/cacheOptimized/src/main/java/eu/dnetlib/data/mapreduce/hbase/lodExport/utils/MyComparator.java | ||
---|---|---|
6 | 6 |
public class MyComparator { |
7 | 7 |
private static double FIELDS_SIMILARITY_THRESHOLD = 0.7; |
8 | 8 |
private static double RECORD_SIMILARITY_THRESHOLD = 0.8; |
9 |
private static String SEPARATOR = ","; |
|
9 |
private static final String LINE_SEPERATOR = "\t.\t"; |
|
10 |
private static final String FIELD_DELIM = "\t"; |
|
10 | 11 |
private static Map<String, String> sourceRecordMappings = new HashMap<>(); |
11 | 12 |
|
12 | 13 |
static { |
13 | 14 |
//TODO remove later!!! make it configurable |
14 |
sourceRecordMappings.put("http://www.eurocris.org/ontologies/cerif/1.3#name",
|
|
15 |
"http://www.w3.org/2000/01/rdf-schema#label");
|
|
16 |
sourceRecordMappings.put("http://lod.openaire.eu/vocab/year", "http://purl.org/dc/terms/issued");
|
|
15 |
sourceRecordMappings.put("<http://www.eurocris.org/ontologies/cerif/1.3#name>",
|
|
16 |
"<http://www.w3.org/2000/01/rdf-schema#label>");
|
|
17 |
sourceRecordMappings.put("<http://lod.openaire.eu/vocab/year>", "<http://purl.org/dc/terms/issued>");
|
|
17 | 18 |
} |
18 | 19 |
|
19 | 20 |
public static double findMatchingPair(String source, String target) { |
21 |
Map<String, String> sourceRecordMap = getRecordsFiledMap(source); |
|
22 |
Map<String, String> targetRecordMap = getRecordsFiledMap(target); |
|
20 | 23 |
|
21 |
String[] sourceFields = source.split(SEPARATOR); |
|
22 |
Map<String, String> sourceFieldsMap = new HashMap<>(); |
|
23 |
for (int j = 0; j < sourceFields.length; j++) { |
|
24 |
String[] split = sourceFields[j].split("\t"); |
|
25 |
sourceFieldsMap.put(split[0], split[1]); |
|
26 |
} |
|
27 |
//get target fields |
|
28 |
String[] targetFields = target.split(","); |
|
29 |
Map<String, String> targetFieldsMap = new HashMap<>(); |
|
30 |
for (int j = 0; j < targetFields.length; j++) { |
|
31 |
String[] split = targetFields[j].split("\t"); |
|
32 |
targetFieldsMap.put(split[0], split[1]); |
|
33 |
} |
|
34 | 24 |
//similarity counters |
35 | 25 |
int matchedFields = 0; |
36 |
double totalFields = (double) targetFields.length;
|
|
26 |
double totalFields = (double) sourceRecordMappings.size();
|
|
37 | 27 |
double recordSimilarity = 0.0; |
38 | 28 |
|
39 |
for (Map.Entry<String, String> sourceField : sourceFieldsMap.entrySet()) {
|
|
29 |
for (Map.Entry<String, String> sourceField : sourceRecordMap.entrySet()) {
|
|
40 | 30 |
String correspondingTargetField = sourceRecordMappings.get(sourceField.getKey()); |
41 |
String targetFieldValue = targetFieldsMap.get(correspondingTargetField);
|
|
31 |
String targetFieldValue = targetRecordMap.get(correspondingTargetField);
|
|
42 | 32 |
double fieldsSimilarity = compare(sourceField.getValue(), targetFieldValue); |
43 | 33 |
System.out.println(sourceField + "\n" + targetFieldValue + "\n : field similarity: " + fieldsSimilarity + "\n-----------------------------------------"); |
44 | 34 |
recordSimilarity += fieldsSimilarity; |
... | ... | |
66 | 56 |
} |
67 | 57 |
} |
68 | 58 |
// System.out.println("Similar chars " + similarChars); |
69 |
return (sourceValue.length() >= targetValue.length() ? similarChars / (double) sourceValue.length() : similarChars / (double)targetValue.length());
|
|
59 |
return (sourceValue.length() >= targetValue.length() ? similarChars / (double) sourceValue.length() : similarChars / (double) targetValue.length());
|
|
70 | 60 |
} |
71 | 61 |
|
72 |
|
|
62 |
private static Map<String, String> getRecordsFiledMap(String source) { |
|
63 |
String sourceRecord = source.substring(source.indexOf(FIELD_DELIM) + 1).trim(); |
|
64 |
String[] sourceTriples = sourceRecord.split(LINE_SEPERATOR); |
|
65 |
Map<String, String> sourceFieldsMap = new HashMap<>(); |
|
66 |
for (String sourceTriple : sourceTriples) { |
|
67 |
String[] split = sourceTriple.split(FIELD_DELIM); |
|
68 |
sourceFieldsMap.put(split[0], split[1]); |
|
69 |
} |
|
70 |
return sourceFieldsMap; |
|
71 |
} |
|
73 | 72 |
} |
modules/dnet-openaire-lodinterlinking/branches/cacheOptimized/src/main/java/eu/dnetlib/data/mapreduce/hbase/lodExport/linkage/LimesReducer.java | ||
---|---|---|
96 | 96 |
throw new Exception("Record " + recordId + " not found! "); |
97 | 97 |
|
98 | 98 |
} |
99 |
|
|
100 | 99 |
record = record.substring(record.indexOf(FIELD_DELIM) + 1).trim(); |
101 | 100 |
String[] Triples = record.split(LINE_SEPERATOR); |
102 | 101 |
for (String triple : Triples) { |
Also available in: Unified diff
'updates'