Project

General

Profile

« Previous | Next » 

Revision 54836

propagation of country from institutional repository and of result projects through semantic link

View differences:

modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/PropagationConstants.java
1 1
package eu.dnetlib.data.mapreduce.hbase.propagation;
2 2

  
3
import com.google.common.collect.Sets;
4
import eu.dnetlib.data.proto.RelTypeProtos;
3 5
import eu.dnetlib.data.proto.TypeProtos;
4 6

  
7
import java.util.Set;
8

  
5 9
public class PropagationConstants {
6 10

  
11
    public enum Type {
12
        valid,
13
        notvalid,
14
        fromsemrel,
15
        fromresult;
16
    }
7 17

  
8 18
    public static final String ZERO = "0";
9

  
10 19
    public static final String ONE = "1";
11 20

  
21
    public static final int FROM_SEM_REL = 10;//for the relation to projects propagated thank to a semantic relation
22
    public static final int FROM_RESULT = 20;//for the relations to projects already owned by the result
23

  
12 24
    public static final String COUNTER_PROPAGATION = "Propagation";
13 25

  
14
    public final static int PROJECT = TypeProtos.Type.project.getNumber();
15
    public final static int DATASOURCE = TypeProtos.Type.datasource.getNumber();
16
    public final static int ORGANIZATION = TypeProtos.Type.organization.getNumber();
17
    public final static int PUBLICATION = TypeProtos.Type.result.getNumber();
18 26

  
27
    public final static String DATA_INFO_TYPE = "propagation";
28
    public final static String SCHEMA_NAME = "dnet:provenanceActions";
29
    public final static String SCHEMA_ID = "dnet:provenanceActions";
30

  
31
    public final static String DNET_COUNTRY_SCHEMA = "dnet:countries";
32
    public final static String DNET_RELATION_SCHEMA = "dnet:result_project_relations";
33
    public final static String CLASS_RELATION_ID = "propagation::project::semrel";
34
    public final static String CLASS_COUNTRY_ID = "propagation::country::instrepos";
35

  
36
    public final static int PROJECT = TypeProtos.Type.project.getNumber();//40
37
    public final static int DATASOURCE = TypeProtos.Type.datasource.getNumber();//10
38
    public final static int ORGANIZATION = TypeProtos.Type.organization.getNumber();//20
39
    public final static int PUBLICATION = TypeProtos.Type.result.getNumber();//50
40

  
41
    public final static RelTypeProtos.RelType REL_TYPE = RelTypeProtos.RelType.resultProject;
42
    public final static RelTypeProtos.SubRelType SUBREL_TYPE = RelTypeProtos.SubRelType.outcome;
43
    public static final String REL_PROJECT_RESULT = "produces";
44
    public static final String REL_RESULT_PROJECT = "isProducedBy";
45

  
46
    public static final String RELATION = REL_TYPE + "_" + SUBREL_TYPE + "_";
47
    public static final String OUTCOME_PRODUCEDBY = RELATION + REL_RESULT_PROJECT;
48

  
49

  
50
    public static final String[] DEFAULT_RELATION_SET = new String[]{"resultResult_supplement_isSupplementedBy"};
51
    public static final Set<String> DEFAULT_ALLOWED_DATASOURCES = Sets.newHashSet("pubsrepository::institutional");
52

  
53
    private Set<String> whiteList = Sets.newHashSet("10|opendoar____::300891a62162b960cf02ce3827bb363c");
54
    private Set<String> blackList = Sets.newHashSet("");
55

  
19 56
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/Utils.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation;
2

  
3
import com.google.protobuf.InvalidProtocolBufferException;
4
import eu.dnetlib.data.proto.*;
5
import org.apache.hadoop.hbase.client.Result;
6
import org.apache.hadoop.hbase.util.Bytes;
7

  
8
import java.util.Map;
9

  
10
public final class Utils {
11
    public static  OafProtos.OafEntity getEntity(Result value, TypeProtos.Type type) throws InvalidProtocolBufferException {
12
        final Map<byte[],byte[]> map = value.getFamilyMap(Bytes.toBytes(type.toString()));
13

  
14
        final byte[] body = map.get(Bytes.toBytes("body"));
15
        if (body != null){
16
            OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom(body);
17
            if(oaf.getDataInfo().getDeletedbyinference())
18
                return null;
19
            return oaf.getEntity();
20
        }
21
        return null;
22
    }
23

  
24
    public static FieldTypeProtos.DataInfo.Builder getDataInfo(String trust, String class_id,String schema_id,String schema_name, String data_info_type,String class_name) {
25
        FieldTypeProtos.DataInfo.Builder builder = FieldTypeProtos.DataInfo.newBuilder()
26
                .setInferred(true)
27
                .setProvenanceaction(getQualifier(class_id,schema_id,schema_name,class_name)                        )
28
                .setInferenceprovenance(data_info_type)
29
                .setTrust(trust);
30
        return builder;
31

  
32
    }
33

  
34
    public static FieldTypeProtos.Qualifier.Builder getQualifier(String class_id, String schema_id, String schema_name, String class_name){
35
        return FieldTypeProtos.Qualifier.newBuilder()
36
                .setClassid(class_id)
37
                .setClassname(class_name)
38
                .setSchemeid(schema_id)
39
                .setSchemename(schema_name);
40
    }
41

  
42

  
43
    public static FieldTypeProtos.Qualifier.Builder getCountry(String countryValue, String trust,String dnet_schema, String class_id,String schema_id,String schema_name, String data_info_type, String class_name) {
44

  
45
        final FieldTypeProtos.Qualifier.Builder country = FieldTypeProtos.Qualifier.newBuilder()
46
                .setClassid(countryValue)
47
                .setClassname(countryValue)
48
                .setSchemeid(dnet_schema)
49
                .setSchemename(dnet_schema);
50
        country.setDataInfo(getDataInfo(trust,class_id,schema_id,schema_name,data_info_type,class_name));//"Propagation of country information from datasources belonging to institutional repositories"));
51

  
52
        return country;
53
    }
54

  
55

  
56

  
57
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/country/institutionalrepositories/PropagationCountryFromDsOrgResultFileReducer.java
1 1
package eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories;
2 2

  
3 3
import com.googlecode.protobuf.format.JsonFormat;
4

  
5
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
6
import eu.dnetlib.data.proto.*;
4
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException;
5
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator;
6
import eu.dnetlib.data.proto.OafProtos;
7 7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
8 9
import org.apache.hadoop.io.Text;
9 10
import org.apache.hadoop.mapreduce.Reducer;
10 11

  
11 12
import java.io.IOException;
12
import java.util.Iterator;
13
import java.util.List;
13 14

  
14
import org.apache.commons.logging.LogFactory;
15
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.COUNTER_PROPAGATION;
15 16

  
16
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
17

  
18 17
public class PropagationCountryFromDsOrgResultFileReducer extends Reducer<InstOrgKey, Text, Text,Text> {
19 18

  
20 19
    private static final Log log = LogFactory.getLog(PropagationCountryFromDsOrgResultFileReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
......
23 22

  
24 23
    private Text outValue;
25 24

  
26
    final static String DNETCOUNTRYSCHEMA = "dnet:countries";
27
    private final static String DATA_INFO_TYPE = "propagation";
28
    private final static String SCHEMA_NAME = "dnet:provenanceActions";
29
    private final static String CLASS_ID = "propagation::country::instrepos";
30
    private final static String SCHEMA_ID = "dnet:provenanceActions";
31

  
32

  
33 25
    @Override
34 26
    protected void setup(final Context context) throws IOException, InterruptedException {
35 27
        super.setup(context);
......
42 34

  
43 35
        outValue.set(data.getBytes());
44 36
        try {
37
            keyOut.set(key);
45 38
            context.write(keyOut, outValue);
46 39
        } catch (Exception e) {
47 40
            e.printStackTrace();
......
52 45
    @Override
53 46
    protected void reduce(InstOrgKey key, Iterable<Text> values, Context context)  {
54 47

  
55
        final Iterator<Text> it = values.iterator();
56
        if(!it.hasNext()){
57
            context.getCounter(COUNTER_PROPAGATION,"empty information for key").increment(1);
48
        ResultIterator rh = null;
49
        try {
50
            rh = new ResultCountryIterator(values,key.getKeyType().get());
51
        } catch (NotValidResultSequenceException e) {
52
            context.getCounter(COUNTER_PROPAGATION,e.getMessage()).increment(1);
58 53
            return;
59 54
        }
55
        context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1);
60 56

  
61
        final Value first = Value.fromJson(it.next().toString());
62
        if (!(first.getValue().equals(ZERO) || first.getValue().equals(ONE))) {
63
            if (key.getKeyType().get() == TypeProtos.Type.organization.getNumber())
64
                context.getCounter(COUNTER_PROPAGATION,"First Element in reducer is not type of datasource,  but the organization exists").increment(1);
65
            else {
66
               // context.getCounter(COUNTER_PROPAGATION,String.format("WARNINGS: First element value is  %s " , first));
67
                while (it.hasNext()) {
68

  
69
                    String resultId = Value.fromJson(it.next().toString()).getValue();
70
                    if (!resultId.startsWith("50|"))
71
                        context.getCounter(COUNTER_PROPAGATION,"ERROR ORDERING CHECK").increment(1);
72
                }
73
            }
57
        while(rh.hasNext()){
58
            OafProtos.Oaf oafUpdate = rh.next().get(0);
59
            emit(context, oafUpdate.getEntity().getId(), JsonFormat.printToString(oafUpdate));
60
            context.getCounter(COUNTER_PROPAGATION, " added country to product ").increment(1);
74 61
        }
75 62

  
76
        //ensure we are dealing with an institutional repository
77
        if (first.getValue().equals(ONE)) {
78
            context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1);
79
            if(!it.hasNext()){
80
                context.getCounter(COUNTER_PROPAGATION,"no information apart of type of datasource").increment(1);
81
                return;
82
            }
83

  
84
            final String country = Value.fromJson(it.next().toString()).getValue(); // need to update the information for the country to each element in the iterator
85
            if(country.trim().length() != 2){
86
                context.getCounter(COUNTER_PROPAGATION,"second element in reducer is not country").increment(1);
87
                return;
88
            }
89

  
90
            boolean propagate = true;
91
            while(it.hasNext()) {
92
                String resultId = Value.fromJson(it.next().toString()).getValue();
93

  
94
                if (!resultId.startsWith(("50|"))) {
95
                    if(!resultId.equalsIgnoreCase(country)) {
96
                        context.getCounter(COUNTER_PROPAGATION, "resultId expected in ordering was not found" ).increment(1);
97
                        propagate = false;
98
                    }
99

  
100
                }else{
101
                    if (propagate){
102
                        OafProtos.Oaf oafUpdate = getOafCountry(resultId, country,first.getTrust());
103
                        emit(context, resultId, JsonFormat.printToString(oafUpdate));
104

  
105
                        context.getCounter(COUNTER_PROPAGATION, " added country to product ").increment(1);
106
                    }
107

  
108
                }
109

  
110
            }
111
        } else {
112
            context.getCounter(COUNTER_PROPAGATION,"not allowed dsType institutional datasource").increment(1);
63
        if (!rh.getPropagate()){
64
            context.getCounter(COUNTER_PROPAGATION, "resultId expected in ordering was not found" ).increment(1);
113 65
        }
114
    }
115 66

  
116
    private FieldTypeProtos.DataInfo.Builder getDataInfo(String trust) {
117
        FieldTypeProtos.DataInfo.Builder builder = FieldTypeProtos.DataInfo.newBuilder()
118
                .setInferred(true)
119
                .setProvenanceaction(
120
                        FieldTypeProtos.Qualifier.newBuilder()
121
                                .setClassid(CLASS_ID)
122
                                .setClassname("Propagation of country information from datasources belonging to institutional repositories")
123
                                .setSchemeid(SCHEMA_ID)
124
                                .setSchemename(SCHEMA_NAME))
125
                .setInferenceprovenance(DATA_INFO_TYPE)
126
                .setTrust(trust);
127
        return builder;
128 67

  
129 68
    }
130 69

  
131
    private OafProtos.Oaf getOafCountry(String resultId, String countryValue, String trust) {
132 70

  
133
        final FieldTypeProtos.Qualifier.Builder country = FieldTypeProtos.Qualifier.newBuilder()
134
                .setClassid(countryValue)
135
                .setClassname(countryValue)
136
                .setSchemeid(DNETCOUNTRYSCHEMA)
137
                .setSchemename(DNETCOUNTRYSCHEMA);
138
        country.setDataInfo(getDataInfo(trust));
139 71

  
140
        final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder().addCountry(country);
141
        final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata);
142
        final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder()
143
                .setType(TypeProtos.Type.result)
144
                .setId(resultId)
145
                .setResult(result);
146 72

  
147
        return OafProtos.Oaf.newBuilder()
148
                .setKind(KindProtos.Kind.entity)
149
                .setEntity(entity)
150
                .build();
151
    }
152

  
153

  
154 73
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/country/institutionalrepositories/PropagationCountryFromDsOrgResultReducer.java
1 1
package eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories;
2 2

  
3 3
import java.io.IOException;
4
import java.util.Iterator;
4
import java.util.List;
5 5

  
6 6

  
7
import com.google.gson.Gson;
7
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException;
8
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator;
8 9
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
9
import eu.dnetlib.data.proto.*;
10
import org.apache.commons.lang3.StringUtils;
10
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
11
import eu.dnetlib.data.proto.OafProtos;
11 12
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12 13
import org.apache.hadoop.hbase.mapreduce.TableReducer;
13 14
import org.apache.hadoop.hbase.util.Bytes;
......
23 24

  
24 25
    private static final Log log = LogFactory.getLog(PropagationCountryFromDsOrgResultReducer.class);
25 26

  
26
    final static String DNETCOUNTRYSCHEMA = "dnet:countries";
27
    private final static String DATA_INFO_TYPE = "propagation";
28
    private final static String SCHEMA_NAME = "dnet:provenanceActions";
29
    private final static String CLASS_ID = "propagation::country::instrepos";
30
    private final static String SCHEMA_ID = "dnet:provenanceActions";
31 27

  
32 28
    private ImmutableBytesWritable keyOut;
33 29

  
......
40 36
    @Override
41 37
    protected void reduce(final InstOrgKey key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
42 38

  
43
        final Iterator<Text> it = values.iterator();
44
        if(!it.hasNext()){
45
            context.getCounter(COUNTER_PROPAGATION,"empty information for key").increment(1);
39

  
40
        ResultIterator rh = null;
41
        try {
42
            rh = new ResultCountryIterator(values,key.getKeyType().get());
43
        } catch (NotValidResultSequenceException e) {
44
            context.getCounter(COUNTER_PROPAGATION,e.getMessage()).increment(1);
46 45
            return;
47 46
        }
48
        final Value first = Value.fromJson(it.next().toString());
49 47

  
50
		if (!(first.getValue().equals(ZERO) || first.getValue().equals(ONE))) {
51
		    if (key.getKeyType().get() == TypeProtos.Type.organization.getNumber())
52
                context.getCounter(COUNTER_PROPAGATION,"First Element in reducer is not type of datasource,  but the organization exists").increment(1);
53
		    else {
54
		        context.getCounter(COUNTER_PROPAGATION, "WARNING: unexpected first element").increment(1);
55
                while (it.hasNext()) {
48
        context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1);
56 49

  
57
                    String resultId = Value.fromJson(it.next().toString()).getValue();
58
                    if (!resultId.startsWith("50|")) {
59
                        context.getCounter(COUNTER_PROPAGATION,"ERROR ORDERING CHECK").increment(1);
60
                    }
61
                }
62
            }
63
		}
50
        while(rh.hasNext()){
51
            try{
52
                List<OafProtos.Oaf> oap = rh.next();
53
                byte[] oafUpdate = oap.get(0).toByteArray();
64 54

  
65
        //ensure we are dealing with an institutional repository
66
        if (first.getValue().equals(ONE)) {
67
            context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1);
68
            if(!it.hasNext()) {
69
                context.getCounter(COUNTER_PROPAGATION,"no information apart of type of datasource").increment(1);
70
                return;
71
            }
55
                byte[] targetRowKey = Bytes.toBytes(oap.get(0).getEntity().getId());
56
                OafRowKeyDecoder.decode(targetRowKey);
57
                final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), oafUpdate);
58
                keyOut.set(targetRowKey);
59
                context.write(keyOut, put);
60
                context.getCounter(COUNTER_PROPAGATION, "added country to product").increment(1);
72 61

  
73

  
74
            final String country = Value.fromJson(it.next().toString()).getValue(); // need to update the information for the country to each element in the iterator
75
            if(country.trim().length() != 2) {
76
                try {
77
                    Integer.parseInt(country.trim());
78
                } catch(Exception e) { }
79
                context.getCounter(COUNTER_PROPAGATION,"second element in reducer is not country").increment(1);
80
                return;
62
            }catch(IllegalArgumentException e){
63
                context.getCounter(COUNTER_PROPAGATION,"not valid result id in result list for country propagation").increment(1);
81 64
            }
82 65

  
83
            boolean propagate = true;
84
			while(it.hasNext()) {
66
        }
85 67

  
86
				final String resultId = Value.fromJson(it.next().toString()).getValue();
87

  
88
				if (!resultId.startsWith(("50|"))) {
89
				    if(!resultId.equalsIgnoreCase(country)) {
90
                        context.getCounter(COUNTER_PROPAGATION, "resultId expected in ordering was not found").increment(1);
91
                        //throw new RuntimeException("Problem in country values of institutional repositories" + targetRowKey);
92
                        propagate = false;
93
                    }
94

  
95
                } else {
96
				    if (propagate) {
97

  
98
                        byte[] oafUpdate = getOafCountry(resultId, country, first.getTrust());
99
                        byte[] targetRowKey = Bytes.toBytes(resultId);
100
                        final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), oafUpdate);
101
                        keyOut.set(targetRowKey);
102
                        context.write(keyOut, put);
103
                        context.getCounter(COUNTER_PROPAGATION, "added country to product").increment(1);
104
                    }
105
                }
106
            }
107
        } else {
108
            context.getCounter(COUNTER_PROPAGATION,"not allowed dsType institutional datasource").increment(1);
68
        if (!rh.getPropagate()){
69
            context.getCounter(COUNTER_PROPAGATION, "resultId expected in ordering was not found" ).increment(1);
109 70
        }
110 71

  
111
    }
112 72

  
113
    private FieldTypeProtos.DataInfo.Builder getDataInfo(String trust) {
114
        FieldTypeProtos.DataInfo.Builder builder = FieldTypeProtos.DataInfo.newBuilder()
115
                .setInferred(true)
116
                .setProvenanceaction(
117
                        FieldTypeProtos.Qualifier.newBuilder()
118
                                .setClassid(CLASS_ID)
119
                                .setClassname("Propagation of country information from datasources belonging to institutional repositories")
120
                                .setSchemeid(SCHEMA_ID)
121
                                .setSchemename(SCHEMA_NAME))
122
                .setInferenceprovenance(DATA_INFO_TYPE)
123
                .setTrust(trust);
124
        return builder;
125 73

  
126 74
    }
127 75

  
128
	private byte[] getOafCountry(String resultId, String countryValue, String trust) {
129 76

  
130
		final FieldTypeProtos.Qualifier.Builder country = FieldTypeProtos.Qualifier.newBuilder()
131
			.setClassid(countryValue)
132
			.setClassname(countryValue)
133
			.setSchemeid(DNETCOUNTRYSCHEMA)
134
			.setSchemename(DNETCOUNTRYSCHEMA);
135
		country.setDataInfo(getDataInfo(trust));
136

  
137
		final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder().addCountry(country);
138
		final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata);
139
		final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder()
140
				.setType(TypeProtos.Type.result)
141
				.setId(resultId)
142
				.setResult(result);
143

  
144
    	return OafProtos.Oaf.newBuilder()
145
				.setKind(KindProtos.Kind.entity)
146
				.setEntity(entity)
147
				.build()
148
				.toByteArray();
149
	}
150

  
151 77
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/country/institutionalrepositories/ResultCountryIterator.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories;
2

  
3
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException;
4
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator;
5
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
6
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
7
import eu.dnetlib.data.proto.KindProtos;
8
import eu.dnetlib.data.proto.OafProtos;
9
import eu.dnetlib.data.proto.ResultProtos;
10
import eu.dnetlib.data.proto.TypeProtos;
11
import org.apache.hadoop.io.Text;
12

  
13
import java.util.ArrayList;
14
import java.util.Arrays;
15
import java.util.List;
16

  
17
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
18
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getCountry;
19

  
20
public class ResultCountryIterator  extends ResultIterator {
21
    private String country;
22

  
23

  
24
    public ResultCountryIterator(final Iterable<Text> values, int key) throws NotValidResultSequenceException {
25
        super(values,key);
26
    }
27

  
28
    @Override
29
    protected void checkSequence() throws NotValidResultSequenceException {
30
        if (!it.hasNext()) {
31
            throw new NotValidResultSequenceException("Empty information for key");
32

  
33
        }
34
        final Value first = Value.fromJson(it.next().toString());
35
        trust = first.getTrust();
36
        if (!(first.getValue().equals(ZERO) || first.getValue().equals(ONE))) {
37
            if (key != TypeProtos.Type.datasource.getNumber()) {
38
                throw new NotValidResultSequenceException("First Element in reducer is not type of datasource,  but the organization exists");
39
            } else {
40
                while (it.hasNext()) {
41

  
42
                    resultId = Value.fromJson(it.next().toString()).getValue();
43
                    if (!resultId.startsWith("50|")) {
44
                        throw new NotValidResultSequenceException("ERROR ORDERING CHECK");
45
                    }
46
                }
47
                throw new NotValidResultSequenceException("WARNING: unexpected first element");
48
            }
49
        }
50

  
51
        //ensure we are dealing with an institutional repository
52
        if (first.getValue().equals(ONE)) {
53
            //context.getCounter(COUNTER_PROPAGATION, "institutional datasource").increment(1);
54
            if (!it.hasNext()) {
55
                throw new NotValidResultSequenceException("No information apart of type of datasource");
56
            }
57

  
58

  
59
            country = Value.fromJson(it.next().toString()).getValue(); // need to update the information for the country to each element in the iterator
60
            if (country.trim().length() != 2) {
61
                try {
62
                    Integer.parseInt(country.trim());
63
                } catch (Exception e) {
64

  
65
                }
66
                throw new NotValidResultSequenceException("Second element in reducer is not country");
67
            }
68
            boolean iterate = true;
69
            resultId = TERMINATOR;
70
            while(it.hasNext() && iterate){
71
                resultId = Value.fromJson(it.next().toString()).getValue();
72
                if (!resultId.startsWith(("50|"))) {
73
                    if (!resultId.equalsIgnoreCase(country)) {
74
                        propagate = false;
75
                        iterate = false;
76

  
77
                    }
78
                }else{
79
                    propagate = true;
80
                    iterate = false;
81
                }
82

  
83
            }
84
            if (!resultId.equals(TERMINATOR)){
85
                try {
86
                    OafRowKeyDecoder.decode(resultId);
87
                }catch(IllegalArgumentException e){
88
                    throw new NotValidResultSequenceException("No result in sequence");
89
                }
90
            }
91
        }else
92
            throw new NotValidResultSequenceException("Not allowed dsType institutional datasource");
93
    }
94

  
95
    @Override
96
    public List<OafProtos.Oaf> next() {
97
        final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder().addCountry(
98
                getCountry(country,trust,DNET_COUNTRY_SCHEMA,CLASS_COUNTRY_ID,SCHEMA_ID,SCHEMA_NAME,DATA_INFO_TYPE,"Propagation of country information from datasources belonging to institutional repositories"));
99
        final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata);
100
        final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder()
101
                .setType(TypeProtos.Type.result)
102
                .setId(resultId)
103
                .setResult(result);
104
        if(it.hasNext())
105
            resultId=Value.fromJson(it.next().toString()).getValue();
106
        else
107
            resultId = TERMINATOR;
108

  
109
        return new ArrayList<OafProtos.Oaf>(Arrays.asList(OafProtos.Oaf.newBuilder()
110
                .setKind(KindProtos.Kind.entity)
111
                .setEntity(entity)
112
                .build()));
113
    }
114

  
115

  
116

  
117
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/country/institutionalrepositories/PropagationCountryFromDsOrgResultMapper.java
21 21
import java.util.stream.Collectors;
22 22

  
23 23
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
24

  
24
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.*;
25 25
/**
26 26
 * Created by miriam on 17/08/2018.
27 27
 */
......
111 111
                        String hostedBy = instance.getHostedby().getKey();
112 112
                        valueOut.set(Value.newInstance(entity.getId()).toJson());
113 113
                        context.write(InstOrgKey.publication(hostedBy),valueOut);
114
                        context.getCounter(COUNTER_PROPAGATION, "emit publication ").increment(1);
114
                        context.getCounter(COUNTER_PROPAGATION, "emit hostedby | collectedfrom for publication ").increment(1);
115 115
                        String collectedFrom = instance.getCollectedfrom().getKey();
116 116
                        if (!hostedBy.equals(collectedFrom)) {
117 117
                            context.write(InstOrgKey.publication(collectedFrom), valueOut);
118
                            context.getCounter(COUNTER_PROPAGATION, "emit publication ").increment(1);
118
                            context.getCounter(COUNTER_PROPAGATION, "emit hostedby | collectedfrom for publication ").increment(1);
119 119
                        }
120 120
                    }
121 121
                    break;
......
135 135
        context.getCounter(COUNTER_PROPAGATION, String.format("%s in propagation allowed list", dsType)).increment(1);
136 136
    }
137 137

  
138
    private OafProtos.OafEntity getEntity(Result value, TypeProtos.Type type) throws InvalidProtocolBufferException {
139
        final Map<byte[],byte[]> map = value.getFamilyMap(Bytes.toBytes(type.toString()));
140 138

  
141
        final byte[] body = map.get(Bytes.toBytes("body"));
142
        if (body != null){
143
            OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom(body);
144
            if(oaf.getDataInfo().getDeletedbyinference())
145
                return null;
146
            return oaf.getEntity();
147
        }
148
        return null;
149
    }
150 139

  
151 140
    private String getTrust(Result value) throws InvalidProtocolBufferException {
152 141
        final Map<byte[],byte[]> map = value.getFamilyMap(Bytes.toBytes("datasource"));
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/NotValidResultSequenceException.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation;
2

  
3
public class NotValidResultSequenceException extends Exception {
4

  
5

  
6
    private String message;
7

  
8

  
9
    public NotValidResultSequenceException(String message){
10
        this.message = message;
11
    }
12

  
13
    @Override
14
    public String getMessage() {
15
        return message;
16
    }
17
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/projecttoresult/ResultProjectIterator.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult;
2

  
3
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException;
4
import eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants;
5
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator;
6
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
7
import eu.dnetlib.data.proto.KindProtos;
8
import eu.dnetlib.data.proto.OafProtos;
9
import eu.dnetlib.data.proto.RelMetadataProtos;
10
import eu.dnetlib.data.proto.ResultProjectProtos;
11
import org.apache.commons.lang3.StringUtils;
12
import org.apache.hadoop.io.Text;
13

  
14
import java.util.*;
15
import java.util.stream.Collectors;
16

  
17
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
18
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getDataInfo;
19
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getQualifier;
20

  
21
public class ResultProjectIterator extends ResultIterator {
22

  
23
    private Set<String> projectList;
24
    private Iterator<String> pli ;
25

  
26

  
27
    private final static String MSG = "Propagation of relation to project through semantic relation among results";
28

  
29

  
30
    public ResultProjectIterator(final Iterable<Text> values, final String key) throws NotValidResultSequenceException{
31
        super(values, key);
32
    }
33

  
34
    @Override
35
    protected void checkSequence() throws NotValidResultSequenceException {
36
        if(!it.hasNext()){
37
            throw new NotValidResultSequenceException("Empty information for key");
38
        }
39
        projectList = new HashSet<>();
40
        loadProjectList();
41
        pli = projectList.iterator();
42
        getNext();
43
    }
44

  
45
    private void getNext(){
46
        if (pli.hasNext())
47
            resultId = pli.next();
48
        else
49
            resultId = TERMINATOR;
50
    }
51

  
52
    private void loadProjectList() {
53
        final Set<String> fromResult = new HashSet<>();
54
        final Set<String> fromSemRel = new HashSet<>();
55
        while (it.hasNext()) {
56
            Value v = Value.fromJson(it.next().toString());
57
            if(trust == null) trust = v.getTrust();
58
            if (v.getType() == PropagationConstants.Type.fromresult) {
59
                if(StringUtils.isNotBlank(v.getValue()))
60
                    fromResult.addAll(Arrays.asList(StringUtils.split(v.getValue(), ",")));
61
            } else {
62
                fromSemRel.addAll(Arrays.asList(StringUtils.split(v.getValue(), ",")));
63
            }
64
        }
65
       //takes the projects to be associated to the result that have not already been associated to it
66

  
67
        projectList.addAll(fromSemRel.stream()
68
                .filter(it->!fromResult.contains(it))
69
                .collect(Collectors.toCollection((HashSet::new))));
70

  
71

  
72
    }
73
    @Override
74
    public List<OafProtos.Oaf> next() {
75

  
76
        ArrayList<OafProtos.Oaf> ret = new ArrayList<OafProtos.Oaf> (Arrays.asList(
77
                getOafRel(keyb,resultId,REL_RESULT_PROJECT),
78
                getOafRel(resultId,keyb,REL_PROJECT_RESULT)
79
        ));
80

  
81
        getNext();
82

  
83
        return ret;
84
    }
85

  
86
    private OafProtos.Oaf getOafRel(String source, String target, String semantics){
87
        final ResultProjectProtos.ResultProject.Builder rpb = ResultProjectProtos.ResultProject.newBuilder()
88
                .setOutcome(
89
                        ResultProjectProtos.ResultProject.Outcome.newBuilder()
90
                                .setRelMetadata(
91
                                        RelMetadataProtos.RelMetadata.newBuilder()
92
                                                .setSemantics(
93
                                                        getQualifier(semantics,semantics,DNET_RELATION_SCHEMA,DNET_RELATION_SCHEMA)
94
                                                )
95
                                )
96
                );
97

  
98
        final OafProtos.OafRel.Builder relation = OafProtos.OafRel.newBuilder()
99
                .setChild(false)
100
                .setSubRelType(SUBREL_TYPE)
101
                .setRelType(REL_TYPE)
102
                .setRelClass(semantics)
103
                .setTarget(target)
104
                .setSource(source)
105
                .setResultProject(rpb);
106

  
107
        return OafProtos.Oaf.newBuilder().setKind(KindProtos.Kind.relation)
108

  
109
                .setRel(relation)
110
                .setDataInfo(getDataInfo(trust,CLASS_RELATION_ID,SCHEMA_ID,SCHEMA_NAME,DATA_INFO_TYPE,MSG))
111
                .build();
112
    }
113

  
114

  
115
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/projecttoresult/SemanticRelationCounter.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult;
2

  
3
import com.google.protobuf.InvalidProtocolBufferException;
4
import eu.dnetlib.data.proto.OafProtos.Oaf;
5
import org.apache.hadoop.hbase.client.Result;
6
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
7
import org.apache.hadoop.hbase.mapreduce.TableMapper;
8
import org.apache.hadoop.hbase.util.Bytes;
9
import org.apache.hadoop.io.Writable;
10
import java.io.IOException;
11
import java.util.Map;
12

  
13
public class SemanticRelationCounter extends TableMapper<ImmutableBytesWritable, Writable> {
14

  
15
    private String[] sem_rels;
16

  
17
    @Override
18
    protected void setup(final Context context) throws IOException, InterruptedException {
19
        super.setup(context);
20

  
21
        String[] default_set = {"resultResult_supplement_isSupplementedBy"};
22

  
23
        sem_rels = context.getConfiguration().getStrings("propagatetoproject.semanticrelations",default_set);
24
        if (sem_rels.length == 0)
25
            throw new InterruptedException("No semantic relation over which propagate is specified");
26
    }
27

  
28
    @Override
29
    protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
30

  
31
        final Map<byte[], byte[]> resultMap = value.getFamilyMap(Bytes.toBytes("result"));
32

  
33
        final byte[] body = resultMap.get(Bytes.toBytes("body"));
34

  
35

  
36
        if (body != null) {
37
            final Oaf res_oaf = Oaf.parseFrom(body);
38
            if (res_oaf.getDataInfo().getDeletedbyinference()){
39
                context.getCounter("Relation counter","del by inference result").increment(1);
40
                return;
41
            }
42
            context.getCounter("Relation counter", "not null body ").increment(1);
43
            boolean foundRelation = false;
44
            for (String sem_rel : sem_rels)
45
                foundRelation = foundRelation || countRelation(sem_rel,context,value);
46
            if(countRelation("resultProject_outcome_isProducedBy",context,value)){
47
                if (foundRelation)
48
                    context.getCounter("Relation counter", "Association to project in result with semantic relation" ).increment(1);
49
            }
50

  
51
        }
52
    }
53

  
54
    private boolean countRelation(String rel_to_find, Context context, Result value) throws InvalidProtocolBufferException {
55
        final Map<byte[], byte[]> relationMap = value.getFamilyMap(Bytes.toBytes(rel_to_find));
56
        boolean ret = false;
57
        for(byte[] relation:relationMap.values() ){
58
            final Oaf rel_oaf = Oaf.parseFrom(relation);
59
            if (!isValid(rel_oaf)) {
60
                context.getCounter("Relation counter", rel_to_find + " not present in relation set").increment(1);
61
                return ret;
62
            }
63
            if (rel_oaf.getDataInfo().getDeletedbyinference()){
64
                context.getCounter("Relation counter ",rel_to_find + " del by inference" ).increment(1);
65

  
66
            }else{
67
                context.getCounter("Relation counter ",rel_to_find ).increment(1);
68
                ret = true;
69
            }
70
        }
71
        return ret;
72
    }
73

  
74

  
75

  
76
    private boolean isValid(final Oaf oaf) {
77
        return (oaf != null) && oaf.isInitialized();
78
    }
79
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/projecttoresult/ProjectToResultKey.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult;
2

  
3
import com.google.common.collect.ComparisonChain;
4
import eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult.ProjectToResultKey;
5
import org.apache.hadoop.io.IntWritable;
6
import org.apache.hadoop.io.Text;
7
import org.apache.hadoop.io.WritableComparable;
8

  
9
import java.io.DataInput;
10
import java.io.DataOutput;
11
import java.io.IOException;
12

  
13
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.FROM_SEM_REL;
14
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.FROM_RESULT;
15

  
16
public class ProjectToResultKey implements WritableComparable<ProjectToResultKey> {
17

  
18

  
19
    private IntWritable keyType;
20

  
21
    private Text id;
22

  
23
    public ProjectToResultKey() {
24
    }
25

  
26
    public static ProjectToResultKey create(final int keyType, final String id) {
27
        return new ProjectToResultKey(keyType, id);
28
    }
29

  
30
    public static ProjectToResultKey existSemRel(final String id) {
31
        return new ProjectToResultKey(FROM_SEM_REL, id);
32
    }
33

  
34
    public static ProjectToResultKey doesNotExistSemRel(final String id) {
35
        return new ProjectToResultKey(FROM_RESULT, id);
36
    }
37

  
38

  
39
    public ProjectToResultKey(final int keyType, final String id) {
40
        this.id = new Text(id);
41
        this.keyType = new IntWritable(keyType);
42
    }
43

  
44
    public void setKeyType(final IntWritable keyType) {
45
        this.keyType = keyType;
46
    }
47

  
48
    public void setId(final Text id) {
49
        this.id = id;
50
    }
51

  
52
    public Text getId() {
53
        return id;
54
    }
55

  
56
    public IntWritable getKeyType() {
57
        return keyType;
58
    }
59

  
60
    @Override
61
    public int compareTo(final ProjectToResultKey o) {
62
        final int res = ComparisonChain.start()
63
                .compare(getId(), o.getId())
64
                .compare(getKeyType(), o.getKeyType())
65
                .result();
66

  
67
        //System.out.println(String.format("%s.compareTo(%s) = %s", toString(), o.toString(), res));
68
        return res;
69
    }
70

  
71
    @Override
72
    public void write(final DataOutput out) throws IOException {
73
        keyType.write(out);
74
        id.write(out);
75
    }
76

  
77
    @Override
78
    public void readFields(final DataInput in) throws IOException {
79
        keyType = new IntWritable();
80
        keyType.readFields(in);
81
        id = new Text();
82
        id.readFields(in);
83
    }
84

  
85
    @Override
86
    public String toString() {
87
        return (new StringBuilder())
88
                .append('{')
89
                .append(getKeyType().get())
90
                .append(',')
91
                .append(getId())
92
                .append('}')
93
                .toString();
94
    }
95
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/projecttoresult/PropagationProjectToResultReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult;
2

  
3
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException;
4
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator;
5
import eu.dnetlib.data.proto.*;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.apache.hadoop.hbase.client.Put;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.hbase.mapreduce.TableReducer;
11
import org.apache.hadoop.hbase.util.Bytes;
12
import org.apache.hadoop.io.Text;
13

  
14
import java.io.IOException;
15

  
16
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
17

  
18
public class PropagationProjectToResultReducer extends TableReducer<ImmutableBytesWritable, Text, ImmutableBytesWritable> {
19
    private static final Log log = LogFactory.getLog(PropagationProjectToResultReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
20
    private ImmutableBytesWritable keyOut;
21

  
22

  
23

  
24
    @Override
25
    protected void setup(final Context context) throws IOException, InterruptedException {
26
        super.setup(context);
27
        keyOut = new ImmutableBytesWritable();
28
    }
29

  
30

  
31
    @Override
32
    protected void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
33
        ResultIterator rh = null;
34
        try {
35
            rh = new ResultProjectIterator(values,Bytes.toString(key.copyBytes()));
36
        } catch (NotValidResultSequenceException e) {
37
            context.getCounter(COUNTER_PROPAGATION,e.getMessage()).increment(1);
38
            return;
39
        }
40
        while(rh.hasNext()){
41
            for(OafProtos.Oaf oaf:rh.next()){
42
                final String source = oaf.getRel().getSource();
43
                final Put put = new Put(Bytes.toBytes(source)).add(Bytes.toBytes(RELATION + oaf.getRel().getRelClass()),Bytes.toBytes(oaf.getRel().getTarget()),oaf.toByteArray());
44
                keyOut.set(Bytes.toBytes(source));
45
                context.write(keyOut, put);
46

  
47
            }
48
            context.getCounter(COUNTER_PROPAGATION,"Added relation to project").increment(1);
49
        }
50

  
51
    }
52

  
53

  
54

  
55

  
56

  
57
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/projecttoresult/PropagationProjectToResultFileReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult;
2

  
3

  
4
import com.googlecode.protobuf.format.JsonFormat;
5
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException;
6
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator;
7
import eu.dnetlib.data.proto.*;
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
11
import org.apache.hadoop.hbase.util.Bytes;
12
import org.apache.hadoop.io.Text;
13
import org.apache.hadoop.mapreduce.Reducer;
14

  
15
import java.io.IOException;
16

  
17
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
18

  
19
public class PropagationProjectToResultFileReducer extends Reducer<ImmutableBytesWritable, Text, Text,Text> {
20
    private static final Log log = LogFactory.getLog(PropagationProjectToResultReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
21

  
22
    private Text keyOut;
23
    private Text outValue;
24

  
25

  
26
    @Override
27
    protected void setup(final Context context) throws IOException, InterruptedException {
28
        super.setup(context);
29
        keyOut = new Text("");
30
        outValue = new Text();
31
    }
32

  
33

  
34
    @Override
35
    protected void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
36
        ResultIterator rh = null;
37
        try {
38
            rh = new ResultProjectIterator(values, Bytes.toString(key.copyBytes()));
39
        } catch (NotValidResultSequenceException e) {
40
            context.getCounter(COUNTER_PROPAGATION, e.getMessage()).increment(1);
41
            return;
42
        }
43
        while (rh.hasNext()) {
44
            for (OafProtos.Oaf oaf : rh.next()) {
45
                keyOut.set(oaf.getRel().getTarget());
46
                outValue.set(JsonFormat.printToString(oaf).getBytes());
47
                context.write(keyOut, outValue);
48
            }
49
            context.getCounter(COUNTER_PROPAGATION, "Added relation to project").increment(1);
50
        }
51

  
52
    }
53
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/projecttoresult/NaturalProjectToResultKeyPartitioner.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult;
2

  
3
import eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories.InstOrgKey;
4
import org.apache.hadoop.io.Text;
5
import org.apache.hadoop.mapreduce.Partitioner;
6

  
7
/**
8
 * Created by miriam on 17/08/2018.
9
 */
10
public class NaturalProjectToResultKeyPartitioner extends Partitioner<ProjectToResultKey, Text > {
11

  
12
    @Override
13
    public int getPartition(ProjectToResultKey key, Text val, int numPartitions) {
14
        final int res = Math.abs(key.getId().hashCode() % numPartitions);
15

  
16
        return res;
17
    }
18
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/projecttoresult/NaturalProjectToResultKeyGroupingComparator.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult;
2

  
3
import eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories.InstOrgKey;
4
import org.apache.hadoop.io.WritableComparable;
5
import org.apache.hadoop.io.WritableComparator;
6

  
7
/**
8
 * Created by miriam on 17/08/2018.
9
 */
10
public class NaturalProjectToResultKeyGroupingComparator extends WritableComparator {
11

  
12
    protected NaturalProjectToResultKeyGroupingComparator() {
13
        super(ProjectToResultKey.class, true);
14
    }
15

  
16
    @Override
17
    public int compare(WritableComparable w1, WritableComparable w2) {
18
        final InstOrgKey k1 = (InstOrgKey) w1;
19
        final InstOrgKey k2 = (InstOrgKey) w2;
20

  
21
        return k1.getId().compareTo(k2.getId());
22
    }
23
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/projecttoresult/ProjectToResultMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult;
2

  
3
import com.google.protobuf.InvalidProtocolBufferException;
4
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
5
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
6
import eu.dnetlib.data.proto.OafProtos;
7
import eu.dnetlib.data.proto.TypeProtos;
8
import org.apache.commons.lang3.StringUtils;
9
import org.apache.hadoop.hbase.client.Result;
10
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
11
import org.apache.hadoop.hbase.mapreduce.TableMapper;
12
import org.apache.hadoop.hbase.util.Bytes;
13
import org.apache.hadoop.io.Text;
14
import java.io.IOException;
15
import java.util.*;
16
import java.util.stream.Collectors;
17

  
18
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
19
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getEntity;
20

  
21

  
22
//public class ProjectToResultMapper extends TableMapper<ProjectToResultKey, Text> {
23
public class ProjectToResultMapper extends TableMapper<ImmutableBytesWritable, Text> {
24
    private static final String SEPARATOR = ",";
25
    private String[] sem_rels;
26
    private String trust;
27

  
28
    private ImmutableBytesWritable keyOut;
29
    private Text valueOut;
30

  
31

  
32

  
33
    @Override
34
    protected void setup(final Context context) throws IOException, InterruptedException {
35

  
36
        keyOut = new ImmutableBytesWritable();
37
        valueOut = new Text();
38

  
39
        sem_rels = context.getConfiguration().getStrings("propagatetoproject.semanticrelations", DEFAULT_RELATION_SET);
40
        trust = context.getConfiguration().get("propagatetoproject.trust","0.85");
41

  
42
    }
43

  
44
    @Override
45
    protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
46
        final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType();
47
        //If the type is not result I do not need to process it
48
        if(!type.equals(TypeProtos.Type.result)) {
49
            return;
50
        }
51
        //verify if entity is valid
52
        final OafProtos.OafEntity entity = getEntity(value, type);
53
        if (entity == null) {
54
            context.getCounter(COUNTER_PROPAGATION,"Del by inference or null body for result").increment(1);
55
            return;
56
        }
57

  
58
        context.getCounter(COUNTER_PROPAGATION, "Valid result ").increment(1);
59

  
60
        //selection of all the projects associated to this result
61
        String projectIdList = getProjectIdList(value);
62

  
63
        //if the list of projects is not empty, verify if it exists some allowed semantic relation to which propagate the project
64
        if(StringUtils.isNotBlank(projectIdList)){
65
            final Set<String> toemitrelations = new HashSet<>();
66
            //selection of all the results bind by this result considering all the allowed semantic relations
67
            for (String sem_rel : sem_rels)
68
                toemitrelations.addAll(getRelationTarget(value,sem_rel));
69
            emit(context, toemitrelations, projectIdList);
70
            context.getCounter(COUNTER_PROPAGATION, "emit for semantic relation").increment(toemitrelations.size());
71
        }
72

  
73
        keyOut.set(entity.getId().getBytes());
74
        valueOut.set(Value.newInstance(projectIdList,Type.fromresult).toJson());
75
        //context.write(ProjectToResultKey.create(FROM_RESULT,entity.getId()), valueOut);
76

  
77
        context.write(keyOut, valueOut);
78
        context.getCounter(COUNTER_PROPAGATION, "emit for result").increment(1);
79

  
80
    }
81

  
82
    //emit for each valid semantic relation the id of the relation target and the list of projects associated to the source of the relation
83
    private void emit( Context context, Set<String> toemitrelations, String projectIdList) throws IOException, InterruptedException {
84
        for(String relation : toemitrelations){
85
            keyOut.set(relation.getBytes());
86
            valueOut.set(Value.newInstance( projectIdList,trust,Type.fromsemrel).toJson());
87
            context.write(keyOut, valueOut);
88
        }
89
    }
90

  
91
    //starting from the Result gets the list of projects it is related to and returns it as a csv
92
    private String getProjectIdList(Result value) throws InvalidProtocolBufferException {
93
        Set<String> ret = getRelationTarget(value, OUTCOME_PRODUCEDBY);
94
        return ret.size() == 0 ? null : String.join(SEPARATOR, ret);
95
    }
96

  
97
    private Set<String> getRelationTarget(Result value, String sem_rel) throws InvalidProtocolBufferException {
98

  
99
        final Map<byte[], byte[]> relationMap = value.getFamilyMap(Bytes.toBytes(sem_rel));
100

  
101
        /*
102
        we could extract the target qualifiers from the familyMap's keyset, but we also need to check the relationship is not deletedbyinference
103
        return relationMap.keySet().stream()
104
                .map(String::new)
105
                .collect(Collectors.toCollection(HashSet::new));
106
        */
107

  
108
        return relationMap.values().stream()
109
                .map(this::asOaf)
110
                .filter(Objects::nonNull)
111
                .filter(o -> isValid(o))
112
                .filter(o -> !o.getDataInfo().getDeletedbyinference())
113
                .map(o -> o.getRel().getTarget())
114
                .collect(Collectors.toCollection(HashSet::new));
115

  
116
    }
117

  
118
    private OafProtos.Oaf asOaf(byte[] r) {
119
        try {
120
            return OafProtos.Oaf.parseFrom(r);
121
        } catch (InvalidProtocolBufferException e) {
122
            return null;
123
        }
124
    }
125

  
126

  
127
    private boolean isValid(final OafProtos.Oaf oaf) {
128
        return (oaf != null) && oaf.isInitialized();
129
    }
130
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/ResultIterator.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation;
2

  
3
import eu.dnetlib.data.proto.OafProtos;
4
import org.apache.hadoop.io.Text;
5

  
6
import java.util.Iterator;
7
import java.util.List;
8

  
9

  
10
public abstract class ResultIterator implements Iterator<List<OafProtos.Oaf>>{
11

  
12
    protected Iterator<Text> it;
13
    protected int key;
14
    protected String keyb;
15
    protected String resultId;
16
    protected boolean propagate = true;
17
    protected final static String TERMINATOR = "FINITO";
18
    protected String trust = null;
19

  
20
    public ResultIterator(final Iterable<Text> values, int key) throws NotValidResultSequenceException {
21
        it = values.iterator();
22
        this.key = key;
23
        checkSequence();
24
    }
25

  
26
    public ResultIterator(final Iterable<Text> values, final String key) throws NotValidResultSequenceException {
27
        it = values.iterator();
28
        this.keyb = key;
29
        checkSequence();
30
    }
31

  
32
    protected abstract void checkSequence() throws NotValidResultSequenceException ;
33
    public abstract List<OafProtos.Oaf> next() ;
34

  
35
    public boolean hasNext(){
36
        return (!resultId.equals(TERMINATOR) && propagate);
37
    }
38

  
39
    public boolean getPropagate(){
40
        return propagate;
41
    }
42

  
43
    public String getResultId(){
44
        return resultId;
45
    }
46

  
47
}
48

  
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/Value.java
1 1
package eu.dnetlib.data.mapreduce.hbase.propagation;
2 2

  
3 3
import com.google.gson.Gson;
4
import eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.Type;
4 5

  
5 6
public class Value {
6 7

  
7 8
    public static final String DEFAULT_TRUST = "0";
8

  
9
    private Type type;
9 10
    private String value;
10 11
    private String trust;
11 12

  
......
13 14
        this(value, DEFAULT_TRUST);
14 15
    }
15 16

  
17
    public Value(String value, Type type) {
18
        this.value = value;
19
        this.type=type;
20
    }
21

  
22
    public Value(String value, String trust, Type type) {
23

  
24
        this.value=value;
25
        this.trust=trust;
26
        this.type=type;
27
    }
28

  
16 29
    public static Value newInstance(String value, String trust) {
17 30
        return new Value(value, trust);
18 31
    }
......
21 34
        return new Value(value);
22 35
    }
23 36

  
37
    public static Value newInstance(String value,Type type) {
38
        return new Value(value,type);
39
    }
40

  
41
    public static Value newInstance(String value, String trust, Type type){
42
        return new Value(value,trust,type);
43
    }
44

  
24 45
    public static Value fromJson(final String json) {
25 46
        return new Gson().fromJson(json, Value.class);
26 47
    }
......
34 55
        return value;
35 56
    }
36 57

  
37
    public void setValue(String value) {
58
    public Value setValue(String value) {
38 59
        this.value = value;
60
        return this;
39 61
    }
40 62

  
41 63
    public String getTrust() {
42 64
        return trust;
43 65
    }
44 66

  
45
    public void setTrust(String trust) {
67
    public Value setTrust(String trust) {
46 68
        this.trust = trust;
69
        return this;
47 70
    }
48 71

  
49 72
    public String toJson() {
......
51 74
    }
52 75

  
53 76

  
77
    public Type getType() {
78
        return type;
79
    }
80

  
81
    public Value setType(Type type) {
82
        this.type = type;
83
        return this;
84
    }
54 85
}

Also available in: Unified diff