Project

General

Profile

« Previous | Next » 

Revision 45806

Added by Eri Katsari almost 8 years ago

'updates'

View differences:

modules/dnet-openaire-lodinterlinking/branches/cacheOptimized/src/main/java/eu/dnetlib/data/mapreduce/hbase/lodExport/linkage/LinkCustomReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.lodExport.linkage;
2

  
3
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.MyComparator;
4
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.RedisUtils;
5
import org.aksw.limes.core.io.cache.MemoryCache;
6
import org.aksw.limes.core.io.config.reader.xml.XMLConfigurationReader;
7
import org.aksw.limes.core.io.preprocessing.Preprocessor;
8
import org.apache.hadoop.io.Text;
9
import org.apache.hadoop.mapreduce.Reducer;
10
import org.apache.log4j.Logger;
11

  
12
import java.io.IOException;
13
import java.util.HashMap;
14
import java.util.Map;
15

  
16
public class LinkCustomReducer extends Reducer<Text, Text, Text, Text> {
17

  
18
    private Logger log = Logger.getLogger(LinkCustomReducer.class);
19

  
20
    private static final String OPTIMAL_BLOCKS = "optimalBlockSize";
21
    private long optimalBlockSize;
22
    private RedisUtils redisUtils;
23

  
24

  
25
    public static enum LINK_REDUCER_COUNTERS {
26
        TARGET_TRIPLES,
27
        SOURCE_TRIPLES,
28
        WRITTEN_OUT_ENTITIES
29
    }
30

  
31

  
32
    @Override
33
    protected void setup(Context context) throws IOException, InterruptedException {
34
        try {
35
            redisUtils = new RedisUtils(context);
36
            optimalBlockSize = Integer.valueOf(redisUtils.getValue(OPTIMAL_BLOCKS));
37
            log.info("OPTIMAL BLOCK SIZE " + optimalBlockSize);
38
        } catch (Exception e) {
39
            log.error("Error Setting up reducer stats" + e.toString());
40
            throw new RuntimeException("Error Setting up reducr stats", e);
41
        }
42
    }
43

  
44
    @Override
45
    protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
46

  
47
        //each item in the list is a block with the given key
48

  
49
        Map<String, String> sourceRecords = new HashMap<>();
50
        Map<String, String> targetRecords = new HashMap<>();
51

  
52
        for (Text block : values) {
53
            try {
54
                context.getCounter(LimesReducer.LIMES_COUNTERS.LINKED_BLOCKS).increment(1);
55

  
56
                sourceRecords.clear();
57
                targetRecords.clear();
58
            } catch (Exception e) {
59
                log.error(e.toString(), e);
60
                throw new IOException(e);
61
            }
62

  
63
        }
64
    }
65

  
66
    @Override
67
    protected void cleanup(Context context) throws IOException, InterruptedException {
68

  
69
    }
70
    private void fillLimesCache(Text block, Context context) throws Exception {
71

  
72
        String[] split = block.toString().split(SEPERATOR);
73
        for (String recordId : split) {
74
            String record = redisUtils.getValue(recordId);
75
            if (record == null) {
76
                log.error("Record " + recordId + " not found! ");
77
                throw new Exception("Record " + recordId + " not found! ");
78

  
79
            }
80

  
81
            record = record.substring(record.indexOf(FIELD_DELIM) + 1).trim();
82
            String[] Triples = record.split(LINE_SEPERATOR);
83
            for (String triple : Triples) {
84
                String[] Fields = triple.split(FIELD_DELIM);
85
                String subject = Fields[0];
86
                //props in sourceKB do not contain <> ,so we need to replace them.
87
                String property = Fields[1].replaceAll("[<>]", "");
88
                String value = Fields[2];
89
                if (sourceKb.getProperties().contains(property) || targetKb.getProperties().contains(property)) {
90
                    if (recordId.contains("source_")) {
91
                        for (String propertyDub : sourceKb.getFunctions().get(property).keySet()) {
92
                            String processedValue = Preprocessor.process(value, sourceKb.getFunctions().get(property).get(propertyDub));
93
                            sourceCache.addTriple(subject, propertyDub, processedValue);
94
                        }
95
                        context.getCounter(LimesReducer.LIMES_COUNTERS.SOURCE_TRIPLES).increment(1);
96
                    } else {
97
                        for (String propertyDub : targetKb.getFunctions().get(property).keySet()) {
98
                            String processedValue = Preprocessor.process(value, targetKb.getFunctions().get(property).get(propertyDub));
99
                            targetCache.addTriple(subject, propertyDub, processedValue);
100
                        }
101
                        context.getCounter(LimesReducer.LIMES_COUNTERS.TARGET_TRIPLES).increment(1);
102
                    }
103

  
104
                }
105
            }
106

  
107
        }
108
    }
109

  
110
    private void compareRecords(Map<String, String> sourceRecords, Map<String, String> targetRecords, Context context) throws IOException, InterruptedException {
111
        for (String sourceId : sourceRecords.keySet()) {
112
            //TODO modify Redis to use hmset/ mget field in order to save records as a map and retrieve
113
            // fields individually as needed ( only those needed for comparison)
114
            String sourceRecord = (String) connection.get(sourceId);
115
            for (String targetId : targetRecords.keySet()) {
116
                String targetRecord = (String) connection.get(targetId);
117
                double similarity = MyComparator.findMatchingPair(sourceRecord, targetRecord);
118
                if (similarity > 0) {
119
                    context.write(new Text(sourceId), new Text(targetId + "," + similarity));
120
                    context.getCounter(LIMES_COUNTERS.WRITTEN_OUT_ENTITIES).increment(1);
121
                }
122
            }
123

  
124
        }
125
    }
126
}
127 0

  
modules/dnet-openaire-lodinterlinking/branches/cacheOptimized/src/test/java/Test.java
41 41
        System.out.println(Arrays.toString(myLine.split(" ")));
42 42
    }
43 43

  
44
    @org.junit.Test
45
    public void testType()
46 44

  
47
    {
48
        String id = "<http://dblp.l3s.de/d2r/resource/publications/journals/ijdsn/DanZYY13>";
49

  
50
        String s = id.substring(0, id.indexOf('D'));
51

  
52
        System.out.println(s);
53
    }
54

  
55

  
56 45
    @org.junit.Test
57 46
    public void writeToRedis() throws Exception {
58 47

  
modules/dnet-openaire-lodinterlinking/branches/cacheOptimized/src/main/java/eu/dnetlib/data/mapreduce/hbase/lodExport/utils/MyComparator.java
6 6
public class MyComparator {
7 7
    private static double FIELDS_SIMILARITY_THRESHOLD = 0.7;
8 8
    private static double RECORD_SIMILARITY_THRESHOLD = 0.8;
9
    private static String SEPARATOR = ",";
9
    private static final String LINE_SEPERATOR = "\t.\t";
10
    private static final String FIELD_DELIM = "\t";
10 11
    private static Map<String, String> sourceRecordMappings = new HashMap<>();
11 12

  
12 13
    static {
13 14
        //TODO remove later!!! make it configurable
14
        sourceRecordMappings.put("http://www.eurocris.org/ontologies/cerif/1.3#name",
15
                "http://www.w3.org/2000/01/rdf-schema#label");
16
        sourceRecordMappings.put("http://lod.openaire.eu/vocab/year", "http://purl.org/dc/terms/issued");
15
        sourceRecordMappings.put("<http://www.eurocris.org/ontologies/cerif/1.3#name>",
16
                "<http://www.w3.org/2000/01/rdf-schema#label>");
17
        sourceRecordMappings.put("<http://lod.openaire.eu/vocab/year>", "<http://purl.org/dc/terms/issued>");
17 18
    }
18 19

  
19 20
    public static double findMatchingPair(String source, String target) {
21
        Map<String, String> sourceRecordMap = getRecordsFiledMap(source);
22
        Map<String, String> targetRecordMap = getRecordsFiledMap(target);
20 23

  
21
        String[] sourceFields = source.split(SEPARATOR);
22
        Map<String, String> sourceFieldsMap = new HashMap<>();
23
        for (int j = 0; j < sourceFields.length; j++) {
24
            String[] split = sourceFields[j].split("\t");
25
            sourceFieldsMap.put(split[0], split[1]);
26
        }
27
        //get target fields
28
        String[] targetFields = target.split(",");
29
        Map<String, String> targetFieldsMap = new HashMap<>();
30
        for (int j = 0; j < targetFields.length; j++) {
31
            String[] split = targetFields[j].split("\t");
32
            targetFieldsMap.put(split[0], split[1]);
33
        }
34 24
        //similarity counters
35 25
        int matchedFields = 0;
36
        double totalFields = (double) targetFields.length;
26
        double totalFields = (double) sourceRecordMappings.size();
37 27
        double recordSimilarity = 0.0;
38 28

  
39
        for (Map.Entry<String, String> sourceField : sourceFieldsMap.entrySet()) {
29
        for (Map.Entry<String, String> sourceField : sourceRecordMap.entrySet()) {
40 30
            String correspondingTargetField = sourceRecordMappings.get(sourceField.getKey());
41
            String targetFieldValue = targetFieldsMap.get(correspondingTargetField);
31
            String targetFieldValue = targetRecordMap.get(correspondingTargetField);
42 32
            double fieldsSimilarity = compare(sourceField.getValue(), targetFieldValue);
43 33
            System.out.println(sourceField + "\n" + targetFieldValue + "\n : field similarity: " + fieldsSimilarity + "\n-----------------------------------------");
44 34
            recordSimilarity += fieldsSimilarity;
......
66 56
            }
67 57
        }
68 58
        //  System.out.println("Similar chars " + similarChars);
69
        return (sourceValue.length() >= targetValue.length() ? similarChars / (double) sourceValue.length() :  similarChars / (double)targetValue.length());
59
        return (sourceValue.length() >= targetValue.length() ? similarChars / (double) sourceValue.length() : similarChars / (double) targetValue.length());
70 60
    }
71 61

  
72

  
62
    private static Map<String, String> getRecordsFiledMap(String source) {
63
        String sourceRecord = source.substring(source.indexOf(FIELD_DELIM) + 1).trim();
64
        String[] sourceTriples = sourceRecord.split(LINE_SEPERATOR);
65
        Map<String, String> sourceFieldsMap = new HashMap<>();
66
        for (String sourceTriple : sourceTriples) {
67
            String[] split = sourceTriple.split(FIELD_DELIM);
68
            sourceFieldsMap.put(split[0], split[1]);
69
        }
70
        return sourceFieldsMap;
71
    }
73 72
}
modules/dnet-openaire-lodinterlinking/branches/cacheOptimized/src/main/java/eu/dnetlib/data/mapreduce/hbase/lodExport/linkage/LimesReducer.java
96 96
                throw new Exception("Record " + recordId + " not found! ");
97 97

  
98 98
            }
99

  
100 99
            record = record.substring(record.indexOf(FIELD_DELIM) + 1).trim();
101 100
            String[] Triples = record.split(LINE_SEPERATOR);
102 101
            for (String triple : Triples) {

Also available in: Unified diff