Revision 54836
Added by Miriam Baglioni almost 6 years ago
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/PropagationConstants.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.propagation; |
2 | 2 |
|
3 |
import com.google.common.collect.Sets; |
|
4 |
import eu.dnetlib.data.proto.RelTypeProtos; |
|
3 | 5 |
import eu.dnetlib.data.proto.TypeProtos; |
4 | 6 |
|
7 |
import java.util.Set; |
|
8 |
|
|
5 | 9 |
public class PropagationConstants { |
6 | 10 |
|
11 |
public enum Type { |
|
12 |
valid, |
|
13 |
notvalid, |
|
14 |
fromsemrel, |
|
15 |
fromresult; |
|
16 |
} |
|
7 | 17 |
|
8 | 18 |
public static final String ZERO = "0"; |
9 |
|
|
10 | 19 |
public static final String ONE = "1"; |
11 | 20 |
|
21 |
public static final int FROM_SEM_REL = 10;//for the relation to projects propagated thank to a semantic relation |
|
22 |
public static final int FROM_RESULT = 20;//for the relations to projects already owned by the result |
|
23 |
|
|
12 | 24 |
public static final String COUNTER_PROPAGATION = "Propagation"; |
13 | 25 |
|
14 |
public final static int PROJECT = TypeProtos.Type.project.getNumber(); |
|
15 |
public final static int DATASOURCE = TypeProtos.Type.datasource.getNumber(); |
|
16 |
public final static int ORGANIZATION = TypeProtos.Type.organization.getNumber(); |
|
17 |
public final static int PUBLICATION = TypeProtos.Type.result.getNumber(); |
|
18 | 26 |
|
27 |
public final static String DATA_INFO_TYPE = "propagation"; |
|
28 |
public final static String SCHEMA_NAME = "dnet:provenanceActions"; |
|
29 |
public final static String SCHEMA_ID = "dnet:provenanceActions"; |
|
30 |
|
|
31 |
public final static String DNET_COUNTRY_SCHEMA = "dnet:countries"; |
|
32 |
public final static String DNET_RELATION_SCHEMA = "dnet:result_project_relations"; |
|
33 |
public final static String CLASS_RELATION_ID = "propagation::project::semrel"; |
|
34 |
public final static String CLASS_COUNTRY_ID = "propagation::country::instrepos"; |
|
35 |
|
|
36 |
public final static int PROJECT = TypeProtos.Type.project.getNumber();//40 |
|
37 |
public final static int DATASOURCE = TypeProtos.Type.datasource.getNumber();//10 |
|
38 |
public final static int ORGANIZATION = TypeProtos.Type.organization.getNumber();//20 |
|
39 |
public final static int PUBLICATION = TypeProtos.Type.result.getNumber();//50 |
|
40 |
|
|
41 |
public final static RelTypeProtos.RelType REL_TYPE = RelTypeProtos.RelType.resultProject; |
|
42 |
public final static RelTypeProtos.SubRelType SUBREL_TYPE = RelTypeProtos.SubRelType.outcome; |
|
43 |
public static final String REL_PROJECT_RESULT = "produces"; |
|
44 |
public static final String REL_RESULT_PROJECT = "isProducedBy"; |
|
45 |
|
|
46 |
public static final String RELATION = REL_TYPE + "_" + SUBREL_TYPE + "_"; |
|
47 |
public static final String OUTCOME_PRODUCEDBY = RELATION + REL_RESULT_PROJECT; |
|
48 |
|
|
49 |
|
|
50 |
public static final String[] DEFAULT_RELATION_SET = new String[]{"resultResult_supplement_isSupplementedBy"}; |
|
51 |
public static final Set<String> DEFAULT_ALLOWED_DATASOURCES = Sets.newHashSet("pubsrepository::institutional"); |
|
52 |
|
|
53 |
private Set<String> whiteList = Sets.newHashSet("10|opendoar____::300891a62162b960cf02ce3827bb363c"); |
|
54 |
private Set<String> blackList = Sets.newHashSet(""); |
|
55 |
|
|
19 | 56 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/Utils.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation; |
|
2 |
|
|
3 |
import com.google.protobuf.InvalidProtocolBufferException; |
|
4 |
import eu.dnetlib.data.proto.*; |
|
5 |
import org.apache.hadoop.hbase.client.Result; |
|
6 |
import org.apache.hadoop.hbase.util.Bytes; |
|
7 |
|
|
8 |
import java.util.Map; |
|
9 |
|
|
10 |
public final class Utils { |
|
11 |
public static OafProtos.OafEntity getEntity(Result value, TypeProtos.Type type) throws InvalidProtocolBufferException { |
|
12 |
final Map<byte[],byte[]> map = value.getFamilyMap(Bytes.toBytes(type.toString())); |
|
13 |
|
|
14 |
final byte[] body = map.get(Bytes.toBytes("body")); |
|
15 |
if (body != null){ |
|
16 |
OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom(body); |
|
17 |
if(oaf.getDataInfo().getDeletedbyinference()) |
|
18 |
return null; |
|
19 |
return oaf.getEntity(); |
|
20 |
} |
|
21 |
return null; |
|
22 |
} |
|
23 |
|
|
24 |
public static FieldTypeProtos.DataInfo.Builder getDataInfo(String trust, String class_id,String schema_id,String schema_name, String data_info_type,String class_name) { |
|
25 |
FieldTypeProtos.DataInfo.Builder builder = FieldTypeProtos.DataInfo.newBuilder() |
|
26 |
.setInferred(true) |
|
27 |
.setProvenanceaction(getQualifier(class_id,schema_id,schema_name,class_name) ) |
|
28 |
.setInferenceprovenance(data_info_type) |
|
29 |
.setTrust(trust); |
|
30 |
return builder; |
|
31 |
|
|
32 |
} |
|
33 |
|
|
34 |
public static FieldTypeProtos.Qualifier.Builder getQualifier(String class_id, String schema_id, String schema_name, String class_name){ |
|
35 |
return FieldTypeProtos.Qualifier.newBuilder() |
|
36 |
.setClassid(class_id) |
|
37 |
.setClassname(class_name) |
|
38 |
.setSchemeid(schema_id) |
|
39 |
.setSchemename(schema_name); |
|
40 |
} |
|
41 |
|
|
42 |
|
|
43 |
public static FieldTypeProtos.Qualifier.Builder getCountry(String countryValue, String trust,String dnet_schema, String class_id,String schema_id,String schema_name, String data_info_type, String class_name) { |
|
44 |
|
|
45 |
final FieldTypeProtos.Qualifier.Builder country = FieldTypeProtos.Qualifier.newBuilder() |
|
46 |
.setClassid(countryValue) |
|
47 |
.setClassname(countryValue) |
|
48 |
.setSchemeid(dnet_schema) |
|
49 |
.setSchemename(dnet_schema); |
|
50 |
country.setDataInfo(getDataInfo(trust,class_id,schema_id,schema_name,data_info_type,class_name));//"Propagation of country information from datasources belonging to institutional repositories")); |
|
51 |
|
|
52 |
return country; |
|
53 |
} |
|
54 |
|
|
55 |
|
|
56 |
|
|
57 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/country/institutionalrepositories/PropagationCountryFromDsOrgResultFileReducer.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories; |
2 | 2 |
|
3 | 3 |
import com.googlecode.protobuf.format.JsonFormat; |
4 |
|
|
5 |
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
|
|
6 |
import eu.dnetlib.data.proto.*;
|
|
4 |
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException; |
|
5 |
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator;
|
|
6 |
import eu.dnetlib.data.proto.OafProtos;
|
|
7 | 7 |
import org.apache.commons.logging.Log; |
8 |
import org.apache.commons.logging.LogFactory; |
|
8 | 9 |
import org.apache.hadoop.io.Text; |
9 | 10 |
import org.apache.hadoop.mapreduce.Reducer; |
10 | 11 |
|
11 | 12 |
import java.io.IOException; |
12 |
import java.util.Iterator;
|
|
13 |
import java.util.List;
|
|
13 | 14 |
|
14 |
import org.apache.commons.logging.LogFactory;
|
|
15 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.COUNTER_PROPAGATION;
|
|
15 | 16 |
|
16 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
|
17 |
|
|
18 | 17 |
public class PropagationCountryFromDsOrgResultFileReducer extends Reducer<InstOrgKey, Text, Text,Text> { |
19 | 18 |
|
20 | 19 |
private static final Log log = LogFactory.getLog(PropagationCountryFromDsOrgResultFileReducer.class); // NOPMD by marko on 11/24/08 5:02 PM |
... | ... | |
23 | 22 |
|
24 | 23 |
private Text outValue; |
25 | 24 |
|
26 |
final static String DNETCOUNTRYSCHEMA = "dnet:countries"; |
|
27 |
private final static String DATA_INFO_TYPE = "propagation"; |
|
28 |
private final static String SCHEMA_NAME = "dnet:provenanceActions"; |
|
29 |
private final static String CLASS_ID = "propagation::country::instrepos"; |
|
30 |
private final static String SCHEMA_ID = "dnet:provenanceActions"; |
|
31 |
|
|
32 |
|
|
33 | 25 |
@Override |
34 | 26 |
protected void setup(final Context context) throws IOException, InterruptedException { |
35 | 27 |
super.setup(context); |
... | ... | |
42 | 34 |
|
43 | 35 |
outValue.set(data.getBytes()); |
44 | 36 |
try { |
37 |
keyOut.set(key); |
|
45 | 38 |
context.write(keyOut, outValue); |
46 | 39 |
} catch (Exception e) { |
47 | 40 |
e.printStackTrace(); |
... | ... | |
52 | 45 |
@Override |
53 | 46 |
protected void reduce(InstOrgKey key, Iterable<Text> values, Context context) { |
54 | 47 |
|
55 |
final Iterator<Text> it = values.iterator(); |
|
56 |
if(!it.hasNext()){ |
|
57 |
context.getCounter(COUNTER_PROPAGATION,"empty information for key").increment(1); |
|
48 |
ResultIterator rh = null; |
|
49 |
try { |
|
50 |
rh = new ResultCountryIterator(values,key.getKeyType().get()); |
|
51 |
} catch (NotValidResultSequenceException e) { |
|
52 |
context.getCounter(COUNTER_PROPAGATION,e.getMessage()).increment(1); |
|
58 | 53 |
return; |
59 | 54 |
} |
55 |
context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1); |
|
60 | 56 |
|
61 |
final Value first = Value.fromJson(it.next().toString()); |
|
62 |
if (!(first.getValue().equals(ZERO) || first.getValue().equals(ONE))) { |
|
63 |
if (key.getKeyType().get() == TypeProtos.Type.organization.getNumber()) |
|
64 |
context.getCounter(COUNTER_PROPAGATION,"First Element in reducer is not type of datasource, but the organization exists").increment(1); |
|
65 |
else { |
|
66 |
// context.getCounter(COUNTER_PROPAGATION,String.format("WARNINGS: First element value is %s " , first)); |
|
67 |
while (it.hasNext()) { |
|
68 |
|
|
69 |
String resultId = Value.fromJson(it.next().toString()).getValue(); |
|
70 |
if (!resultId.startsWith("50|")) |
|
71 |
context.getCounter(COUNTER_PROPAGATION,"ERROR ORDERING CHECK").increment(1); |
|
72 |
} |
|
73 |
} |
|
57 |
while(rh.hasNext()){ |
|
58 |
OafProtos.Oaf oafUpdate = rh.next().get(0); |
|
59 |
emit(context, oafUpdate.getEntity().getId(), JsonFormat.printToString(oafUpdate)); |
|
60 |
context.getCounter(COUNTER_PROPAGATION, " added country to product ").increment(1); |
|
74 | 61 |
} |
75 | 62 |
|
76 |
//ensure we are dealing with an institutional repository |
|
77 |
if (first.getValue().equals(ONE)) { |
|
78 |
context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1); |
|
79 |
if(!it.hasNext()){ |
|
80 |
context.getCounter(COUNTER_PROPAGATION,"no information apart of type of datasource").increment(1); |
|
81 |
return; |
|
82 |
} |
|
83 |
|
|
84 |
final String country = Value.fromJson(it.next().toString()).getValue(); // need to update the information for the country to each element in the iterator |
|
85 |
if(country.trim().length() != 2){ |
|
86 |
context.getCounter(COUNTER_PROPAGATION,"second element in reducer is not country").increment(1); |
|
87 |
return; |
|
88 |
} |
|
89 |
|
|
90 |
boolean propagate = true; |
|
91 |
while(it.hasNext()) { |
|
92 |
String resultId = Value.fromJson(it.next().toString()).getValue(); |
|
93 |
|
|
94 |
if (!resultId.startsWith(("50|"))) { |
|
95 |
if(!resultId.equalsIgnoreCase(country)) { |
|
96 |
context.getCounter(COUNTER_PROPAGATION, "resultId expected in ordering was not found" ).increment(1); |
|
97 |
propagate = false; |
|
98 |
} |
|
99 |
|
|
100 |
}else{ |
|
101 |
if (propagate){ |
|
102 |
OafProtos.Oaf oafUpdate = getOafCountry(resultId, country,first.getTrust()); |
|
103 |
emit(context, resultId, JsonFormat.printToString(oafUpdate)); |
|
104 |
|
|
105 |
context.getCounter(COUNTER_PROPAGATION, " added country to product ").increment(1); |
|
106 |
} |
|
107 |
|
|
108 |
} |
|
109 |
|
|
110 |
} |
|
111 |
} else { |
|
112 |
context.getCounter(COUNTER_PROPAGATION,"not allowed dsType institutional datasource").increment(1); |
|
63 |
if (!rh.getPropagate()){ |
|
64 |
context.getCounter(COUNTER_PROPAGATION, "resultId expected in ordering was not found" ).increment(1); |
|
113 | 65 |
} |
114 |
} |
|
115 | 66 |
|
116 |
private FieldTypeProtos.DataInfo.Builder getDataInfo(String trust) { |
|
117 |
FieldTypeProtos.DataInfo.Builder builder = FieldTypeProtos.DataInfo.newBuilder() |
|
118 |
.setInferred(true) |
|
119 |
.setProvenanceaction( |
|
120 |
FieldTypeProtos.Qualifier.newBuilder() |
|
121 |
.setClassid(CLASS_ID) |
|
122 |
.setClassname("Propagation of country information from datasources belonging to institutional repositories") |
|
123 |
.setSchemeid(SCHEMA_ID) |
|
124 |
.setSchemename(SCHEMA_NAME)) |
|
125 |
.setInferenceprovenance(DATA_INFO_TYPE) |
|
126 |
.setTrust(trust); |
|
127 |
return builder; |
|
128 | 67 |
|
129 | 68 |
} |
130 | 69 |
|
131 |
private OafProtos.Oaf getOafCountry(String resultId, String countryValue, String trust) { |
|
132 | 70 |
|
133 |
final FieldTypeProtos.Qualifier.Builder country = FieldTypeProtos.Qualifier.newBuilder() |
|
134 |
.setClassid(countryValue) |
|
135 |
.setClassname(countryValue) |
|
136 |
.setSchemeid(DNETCOUNTRYSCHEMA) |
|
137 |
.setSchemename(DNETCOUNTRYSCHEMA); |
|
138 |
country.setDataInfo(getDataInfo(trust)); |
|
139 | 71 |
|
140 |
final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder().addCountry(country); |
|
141 |
final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata); |
|
142 |
final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder() |
|
143 |
.setType(TypeProtos.Type.result) |
|
144 |
.setId(resultId) |
|
145 |
.setResult(result); |
|
146 | 72 |
|
147 |
return OafProtos.Oaf.newBuilder() |
|
148 |
.setKind(KindProtos.Kind.entity) |
|
149 |
.setEntity(entity) |
|
150 |
.build(); |
|
151 |
} |
|
152 |
|
|
153 |
|
|
154 | 73 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/country/institutionalrepositories/PropagationCountryFromDsOrgResultReducer.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories; |
2 | 2 |
|
3 | 3 |
import java.io.IOException; |
4 |
import java.util.Iterator;
|
|
4 |
import java.util.List;
|
|
5 | 5 |
|
6 | 6 |
|
7 |
import com.google.gson.Gson; |
|
7 |
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException; |
|
8 |
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator; |
|
8 | 9 |
import eu.dnetlib.data.mapreduce.hbase.propagation.Value; |
9 |
import eu.dnetlib.data.proto.*;
|
|
10 |
import org.apache.commons.lang3.StringUtils;
|
|
10 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
|
|
11 |
import eu.dnetlib.data.proto.OafProtos;
|
|
11 | 12 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
12 | 13 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
13 | 14 |
import org.apache.hadoop.hbase.util.Bytes; |
... | ... | |
23 | 24 |
|
24 | 25 |
private static final Log log = LogFactory.getLog(PropagationCountryFromDsOrgResultReducer.class); |
25 | 26 |
|
26 |
final static String DNETCOUNTRYSCHEMA = "dnet:countries"; |
|
27 |
private final static String DATA_INFO_TYPE = "propagation"; |
|
28 |
private final static String SCHEMA_NAME = "dnet:provenanceActions"; |
|
29 |
private final static String CLASS_ID = "propagation::country::instrepos"; |
|
30 |
private final static String SCHEMA_ID = "dnet:provenanceActions"; |
|
31 | 27 |
|
32 | 28 |
private ImmutableBytesWritable keyOut; |
33 | 29 |
|
... | ... | |
40 | 36 |
@Override |
41 | 37 |
protected void reduce(final InstOrgKey key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException { |
42 | 38 |
|
43 |
final Iterator<Text> it = values.iterator(); |
|
44 |
if(!it.hasNext()){ |
|
45 |
context.getCounter(COUNTER_PROPAGATION,"empty information for key").increment(1); |
|
39 |
|
|
40 |
ResultIterator rh = null; |
|
41 |
try { |
|
42 |
rh = new ResultCountryIterator(values,key.getKeyType().get()); |
|
43 |
} catch (NotValidResultSequenceException e) { |
|
44 |
context.getCounter(COUNTER_PROPAGATION,e.getMessage()).increment(1); |
|
46 | 45 |
return; |
47 | 46 |
} |
48 |
final Value first = Value.fromJson(it.next().toString()); |
|
49 | 47 |
|
50 |
if (!(first.getValue().equals(ZERO) || first.getValue().equals(ONE))) { |
|
51 |
if (key.getKeyType().get() == TypeProtos.Type.organization.getNumber()) |
|
52 |
context.getCounter(COUNTER_PROPAGATION,"First Element in reducer is not type of datasource, but the organization exists").increment(1); |
|
53 |
else { |
|
54 |
context.getCounter(COUNTER_PROPAGATION, "WARNING: unexpected first element").increment(1); |
|
55 |
while (it.hasNext()) { |
|
48 |
context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1); |
|
56 | 49 |
|
57 |
String resultId = Value.fromJson(it.next().toString()).getValue(); |
|
58 |
if (!resultId.startsWith("50|")) { |
|
59 |
context.getCounter(COUNTER_PROPAGATION,"ERROR ORDERING CHECK").increment(1); |
|
60 |
} |
|
61 |
} |
|
62 |
} |
|
63 |
} |
|
50 |
while(rh.hasNext()){ |
|
51 |
try{ |
|
52 |
List<OafProtos.Oaf> oap = rh.next(); |
|
53 |
byte[] oafUpdate = oap.get(0).toByteArray(); |
|
64 | 54 |
|
65 |
//ensure we are dealing with an institutional repository |
|
66 |
if (first.getValue().equals(ONE)) { |
|
67 |
context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1); |
|
68 |
if(!it.hasNext()) { |
|
69 |
context.getCounter(COUNTER_PROPAGATION,"no information apart of type of datasource").increment(1); |
|
70 |
return; |
|
71 |
} |
|
55 |
byte[] targetRowKey = Bytes.toBytes(oap.get(0).getEntity().getId()); |
|
56 |
OafRowKeyDecoder.decode(targetRowKey); |
|
57 |
final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), oafUpdate); |
|
58 |
keyOut.set(targetRowKey); |
|
59 |
context.write(keyOut, put); |
|
60 |
context.getCounter(COUNTER_PROPAGATION, "added country to product").increment(1); |
|
72 | 61 |
|
73 |
|
|
74 |
final String country = Value.fromJson(it.next().toString()).getValue(); // need to update the information for the country to each element in the iterator |
|
75 |
if(country.trim().length() != 2) { |
|
76 |
try { |
|
77 |
Integer.parseInt(country.trim()); |
|
78 |
} catch(Exception e) { } |
|
79 |
context.getCounter(COUNTER_PROPAGATION,"second element in reducer is not country").increment(1); |
|
80 |
return; |
|
62 |
}catch(IllegalArgumentException e){ |
|
63 |
context.getCounter(COUNTER_PROPAGATION,"not valid result id in result list for country propagation").increment(1); |
|
81 | 64 |
} |
82 | 65 |
|
83 |
boolean propagate = true; |
|
84 |
while(it.hasNext()) { |
|
66 |
} |
|
85 | 67 |
|
86 |
final String resultId = Value.fromJson(it.next().toString()).getValue(); |
|
87 |
|
|
88 |
if (!resultId.startsWith(("50|"))) { |
|
89 |
if(!resultId.equalsIgnoreCase(country)) { |
|
90 |
context.getCounter(COUNTER_PROPAGATION, "resultId expected in ordering was not found").increment(1); |
|
91 |
//throw new RuntimeException("Problem in country values of institutional repositories" + targetRowKey); |
|
92 |
propagate = false; |
|
93 |
} |
|
94 |
|
|
95 |
} else { |
|
96 |
if (propagate) { |
|
97 |
|
|
98 |
byte[] oafUpdate = getOafCountry(resultId, country, first.getTrust()); |
|
99 |
byte[] targetRowKey = Bytes.toBytes(resultId); |
|
100 |
final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), oafUpdate); |
|
101 |
keyOut.set(targetRowKey); |
|
102 |
context.write(keyOut, put); |
|
103 |
context.getCounter(COUNTER_PROPAGATION, "added country to product").increment(1); |
|
104 |
} |
|
105 |
} |
|
106 |
} |
|
107 |
} else { |
|
108 |
context.getCounter(COUNTER_PROPAGATION,"not allowed dsType institutional datasource").increment(1); |
|
68 |
if (!rh.getPropagate()){ |
|
69 |
context.getCounter(COUNTER_PROPAGATION, "resultId expected in ordering was not found" ).increment(1); |
|
109 | 70 |
} |
110 | 71 |
|
111 |
} |
|
112 | 72 |
|
113 |
private FieldTypeProtos.DataInfo.Builder getDataInfo(String trust) { |
|
114 |
FieldTypeProtos.DataInfo.Builder builder = FieldTypeProtos.DataInfo.newBuilder() |
|
115 |
.setInferred(true) |
|
116 |
.setProvenanceaction( |
|
117 |
FieldTypeProtos.Qualifier.newBuilder() |
|
118 |
.setClassid(CLASS_ID) |
|
119 |
.setClassname("Propagation of country information from datasources belonging to institutional repositories") |
|
120 |
.setSchemeid(SCHEMA_ID) |
|
121 |
.setSchemename(SCHEMA_NAME)) |
|
122 |
.setInferenceprovenance(DATA_INFO_TYPE) |
|
123 |
.setTrust(trust); |
|
124 |
return builder; |
|
125 | 73 |
|
126 | 74 |
} |
127 | 75 |
|
128 |
private byte[] getOafCountry(String resultId, String countryValue, String trust) { |
|
129 | 76 |
|
130 |
final FieldTypeProtos.Qualifier.Builder country = FieldTypeProtos.Qualifier.newBuilder() |
|
131 |
.setClassid(countryValue) |
|
132 |
.setClassname(countryValue) |
|
133 |
.setSchemeid(DNETCOUNTRYSCHEMA) |
|
134 |
.setSchemename(DNETCOUNTRYSCHEMA); |
|
135 |
country.setDataInfo(getDataInfo(trust)); |
|
136 |
|
|
137 |
final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder().addCountry(country); |
|
138 |
final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata); |
|
139 |
final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder() |
|
140 |
.setType(TypeProtos.Type.result) |
|
141 |
.setId(resultId) |
|
142 |
.setResult(result); |
|
143 |
|
|
144 |
return OafProtos.Oaf.newBuilder() |
|
145 |
.setKind(KindProtos.Kind.entity) |
|
146 |
.setEntity(entity) |
|
147 |
.build() |
|
148 |
.toByteArray(); |
|
149 |
} |
|
150 |
|
|
151 | 77 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/country/institutionalrepositories/ResultCountryIterator.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories; |
|
2 |
|
|
3 |
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException; |
|
4 |
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator; |
|
5 |
import eu.dnetlib.data.mapreduce.hbase.propagation.Value; |
|
6 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
7 |
import eu.dnetlib.data.proto.KindProtos; |
|
8 |
import eu.dnetlib.data.proto.OafProtos; |
|
9 |
import eu.dnetlib.data.proto.ResultProtos; |
|
10 |
import eu.dnetlib.data.proto.TypeProtos; |
|
11 |
import org.apache.hadoop.io.Text; |
|
12 |
|
|
13 |
import java.util.ArrayList; |
|
14 |
import java.util.Arrays; |
|
15 |
import java.util.List; |
|
16 |
|
|
17 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
|
18 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getCountry; |
|
19 |
|
|
20 |
public class ResultCountryIterator extends ResultIterator { |
|
21 |
private String country; |
|
22 |
|
|
23 |
|
|
24 |
public ResultCountryIterator(final Iterable<Text> values, int key) throws NotValidResultSequenceException { |
|
25 |
super(values,key); |
|
26 |
} |
|
27 |
|
|
28 |
@Override |
|
29 |
protected void checkSequence() throws NotValidResultSequenceException { |
|
30 |
if (!it.hasNext()) { |
|
31 |
throw new NotValidResultSequenceException("Empty information for key"); |
|
32 |
|
|
33 |
} |
|
34 |
final Value first = Value.fromJson(it.next().toString()); |
|
35 |
trust = first.getTrust(); |
|
36 |
if (!(first.getValue().equals(ZERO) || first.getValue().equals(ONE))) { |
|
37 |
if (key != TypeProtos.Type.datasource.getNumber()) { |
|
38 |
throw new NotValidResultSequenceException("First Element in reducer is not type of datasource, but the organization exists"); |
|
39 |
} else { |
|
40 |
while (it.hasNext()) { |
|
41 |
|
|
42 |
resultId = Value.fromJson(it.next().toString()).getValue(); |
|
43 |
if (!resultId.startsWith("50|")) { |
|
44 |
throw new NotValidResultSequenceException("ERROR ORDERING CHECK"); |
|
45 |
} |
|
46 |
} |
|
47 |
throw new NotValidResultSequenceException("WARNING: unexpected first element"); |
|
48 |
} |
|
49 |
} |
|
50 |
|
|
51 |
//ensure we are dealing with an institutional repository |
|
52 |
if (first.getValue().equals(ONE)) { |
|
53 |
//context.getCounter(COUNTER_PROPAGATION, "institutional datasource").increment(1); |
|
54 |
if (!it.hasNext()) { |
|
55 |
throw new NotValidResultSequenceException("No information apart of type of datasource"); |
|
56 |
} |
|
57 |
|
|
58 |
|
|
59 |
country = Value.fromJson(it.next().toString()).getValue(); // need to update the information for the country to each element in the iterator |
|
60 |
if (country.trim().length() != 2) { |
|
61 |
try { |
|
62 |
Integer.parseInt(country.trim()); |
|
63 |
} catch (Exception e) { |
|
64 |
|
|
65 |
} |
|
66 |
throw new NotValidResultSequenceException("Second element in reducer is not country"); |
|
67 |
} |
|
68 |
boolean iterate = true; |
|
69 |
resultId = TERMINATOR; |
|
70 |
while(it.hasNext() && iterate){ |
|
71 |
resultId = Value.fromJson(it.next().toString()).getValue(); |
|
72 |
if (!resultId.startsWith(("50|"))) { |
|
73 |
if (!resultId.equalsIgnoreCase(country)) { |
|
74 |
propagate = false; |
|
75 |
iterate = false; |
|
76 |
|
|
77 |
} |
|
78 |
}else{ |
|
79 |
propagate = true; |
|
80 |
iterate = false; |
|
81 |
} |
|
82 |
|
|
83 |
} |
|
84 |
if (!resultId.equals(TERMINATOR)){ |
|
85 |
try { |
|
86 |
OafRowKeyDecoder.decode(resultId); |
|
87 |
}catch(IllegalArgumentException e){ |
|
88 |
throw new NotValidResultSequenceException("No result in sequence"); |
|
89 |
} |
|
90 |
} |
|
91 |
}else |
|
92 |
throw new NotValidResultSequenceException("Not allowed dsType institutional datasource"); |
|
93 |
} |
|
94 |
|
|
95 |
@Override |
|
96 |
public List<OafProtos.Oaf> next() { |
|
97 |
final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder().addCountry( |
|
98 |
getCountry(country,trust,DNET_COUNTRY_SCHEMA,CLASS_COUNTRY_ID,SCHEMA_ID,SCHEMA_NAME,DATA_INFO_TYPE,"Propagation of country information from datasources belonging to institutional repositories")); |
|
99 |
final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata); |
|
100 |
final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder() |
|
101 |
.setType(TypeProtos.Type.result) |
|
102 |
.setId(resultId) |
|
103 |
.setResult(result); |
|
104 |
if(it.hasNext()) |
|
105 |
resultId=Value.fromJson(it.next().toString()).getValue(); |
|
106 |
else |
|
107 |
resultId = TERMINATOR; |
|
108 |
|
|
109 |
return new ArrayList<OafProtos.Oaf>(Arrays.asList(OafProtos.Oaf.newBuilder() |
|
110 |
.setKind(KindProtos.Kind.entity) |
|
111 |
.setEntity(entity) |
|
112 |
.build())); |
|
113 |
} |
|
114 |
|
|
115 |
|
|
116 |
|
|
117 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/country/institutionalrepositories/PropagationCountryFromDsOrgResultMapper.java | ||
---|---|---|
21 | 21 |
import java.util.stream.Collectors; |
22 | 22 |
|
23 | 23 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
24 |
|
|
24 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.*; |
|
25 | 25 |
/** |
26 | 26 |
* Created by miriam on 17/08/2018. |
27 | 27 |
*/ |
... | ... | |
111 | 111 |
String hostedBy = instance.getHostedby().getKey(); |
112 | 112 |
valueOut.set(Value.newInstance(entity.getId()).toJson()); |
113 | 113 |
context.write(InstOrgKey.publication(hostedBy),valueOut); |
114 |
context.getCounter(COUNTER_PROPAGATION, "emit publication ").increment(1); |
|
114 |
context.getCounter(COUNTER_PROPAGATION, "emit hostedby | collectedfrom for publication ").increment(1);
|
|
115 | 115 |
String collectedFrom = instance.getCollectedfrom().getKey(); |
116 | 116 |
if (!hostedBy.equals(collectedFrom)) { |
117 | 117 |
context.write(InstOrgKey.publication(collectedFrom), valueOut); |
118 |
context.getCounter(COUNTER_PROPAGATION, "emit publication ").increment(1); |
|
118 |
context.getCounter(COUNTER_PROPAGATION, "emit hostedby | collectedfrom for publication ").increment(1);
|
|
119 | 119 |
} |
120 | 120 |
} |
121 | 121 |
break; |
... | ... | |
135 | 135 |
context.getCounter(COUNTER_PROPAGATION, String.format("%s in propagation allowed list", dsType)).increment(1); |
136 | 136 |
} |
137 | 137 |
|
138 |
private OafProtos.OafEntity getEntity(Result value, TypeProtos.Type type) throws InvalidProtocolBufferException { |
|
139 |
final Map<byte[],byte[]> map = value.getFamilyMap(Bytes.toBytes(type.toString())); |
|
140 | 138 |
|
141 |
final byte[] body = map.get(Bytes.toBytes("body")); |
|
142 |
if (body != null){ |
|
143 |
OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom(body); |
|
144 |
if(oaf.getDataInfo().getDeletedbyinference()) |
|
145 |
return null; |
|
146 |
return oaf.getEntity(); |
|
147 |
} |
|
148 |
return null; |
|
149 |
} |
|
150 | 139 |
|
151 | 140 |
private String getTrust(Result value) throws InvalidProtocolBufferException { |
152 | 141 |
final Map<byte[],byte[]> map = value.getFamilyMap(Bytes.toBytes("datasource")); |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/NotValidResultSequenceException.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation; |
|
2 |
|
|
3 |
public class NotValidResultSequenceException extends Exception { |
|
4 |
|
|
5 |
|
|
6 |
private String message; |
|
7 |
|
|
8 |
|
|
9 |
public NotValidResultSequenceException(String message){ |
|
10 |
this.message = message; |
|
11 |
} |
|
12 |
|
|
13 |
@Override |
|
14 |
public String getMessage() { |
|
15 |
return message; |
|
16 |
} |
|
17 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/projecttoresult/ResultProjectIterator.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult; |
|
2 |
|
|
3 |
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException; |
|
4 |
import eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants; |
|
5 |
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator; |
|
6 |
import eu.dnetlib.data.mapreduce.hbase.propagation.Value; |
|
7 |
import eu.dnetlib.data.proto.KindProtos; |
|
8 |
import eu.dnetlib.data.proto.OafProtos; |
|
9 |
import eu.dnetlib.data.proto.RelMetadataProtos; |
|
10 |
import eu.dnetlib.data.proto.ResultProjectProtos; |
|
11 |
import org.apache.commons.lang3.StringUtils; |
|
12 |
import org.apache.hadoop.io.Text; |
|
13 |
|
|
14 |
import java.util.*; |
|
15 |
import java.util.stream.Collectors; |
|
16 |
|
|
17 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
|
18 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getDataInfo; |
|
19 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getQualifier; |
|
20 |
|
|
21 |
public class ResultProjectIterator extends ResultIterator { |
|
22 |
|
|
23 |
private Set<String> projectList; |
|
24 |
private Iterator<String> pli ; |
|
25 |
|
|
26 |
|
|
27 |
private final static String MSG = "Propagation of relation to project through semantic relation among results"; |
|
28 |
|
|
29 |
|
|
30 |
public ResultProjectIterator(final Iterable<Text> values, final String key) throws NotValidResultSequenceException{ |
|
31 |
super(values, key); |
|
32 |
} |
|
33 |
|
|
34 |
@Override |
|
35 |
protected void checkSequence() throws NotValidResultSequenceException { |
|
36 |
if(!it.hasNext()){ |
|
37 |
throw new NotValidResultSequenceException("Empty information for key"); |
|
38 |
} |
|
39 |
projectList = new HashSet<>(); |
|
40 |
loadProjectList(); |
|
41 |
pli = projectList.iterator(); |
|
42 |
getNext(); |
|
43 |
} |
|
44 |
|
|
45 |
private void getNext(){ |
|
46 |
if (pli.hasNext()) |
|
47 |
resultId = pli.next(); |
|
48 |
else |
|
49 |
resultId = TERMINATOR; |
|
50 |
} |
|
51 |
|
|
52 |
private void loadProjectList() { |
|
53 |
final Set<String> fromResult = new HashSet<>(); |
|
54 |
final Set<String> fromSemRel = new HashSet<>(); |
|
55 |
while (it.hasNext()) { |
|
56 |
Value v = Value.fromJson(it.next().toString()); |
|
57 |
if(trust == null) trust = v.getTrust(); |
|
58 |
if (v.getType() == PropagationConstants.Type.fromresult) { |
|
59 |
if(StringUtils.isNotBlank(v.getValue())) |
|
60 |
fromResult.addAll(Arrays.asList(StringUtils.split(v.getValue(), ","))); |
|
61 |
} else { |
|
62 |
fromSemRel.addAll(Arrays.asList(StringUtils.split(v.getValue(), ","))); |
|
63 |
} |
|
64 |
} |
|
65 |
//takes the projects to be associated to the result that have not already been associated to it |
|
66 |
|
|
67 |
projectList.addAll(fromSemRel.stream() |
|
68 |
.filter(it->!fromResult.contains(it)) |
|
69 |
.collect(Collectors.toCollection((HashSet::new)))); |
|
70 |
|
|
71 |
|
|
72 |
} |
|
73 |
@Override |
|
74 |
public List<OafProtos.Oaf> next() { |
|
75 |
|
|
76 |
ArrayList<OafProtos.Oaf> ret = new ArrayList<OafProtos.Oaf> (Arrays.asList( |
|
77 |
getOafRel(keyb,resultId,REL_RESULT_PROJECT), |
|
78 |
getOafRel(resultId,keyb,REL_PROJECT_RESULT) |
|
79 |
)); |
|
80 |
|
|
81 |
getNext(); |
|
82 |
|
|
83 |
return ret; |
|
84 |
} |
|
85 |
|
|
86 |
private OafProtos.Oaf getOafRel(String source, String target, String semantics){ |
|
87 |
final ResultProjectProtos.ResultProject.Builder rpb = ResultProjectProtos.ResultProject.newBuilder() |
|
88 |
.setOutcome( |
|
89 |
ResultProjectProtos.ResultProject.Outcome.newBuilder() |
|
90 |
.setRelMetadata( |
|
91 |
RelMetadataProtos.RelMetadata.newBuilder() |
|
92 |
.setSemantics( |
|
93 |
getQualifier(semantics,semantics,DNET_RELATION_SCHEMA,DNET_RELATION_SCHEMA) |
|
94 |
) |
|
95 |
) |
|
96 |
); |
|
97 |
|
|
98 |
final OafProtos.OafRel.Builder relation = OafProtos.OafRel.newBuilder() |
|
99 |
.setChild(false) |
|
100 |
.setSubRelType(SUBREL_TYPE) |
|
101 |
.setRelType(REL_TYPE) |
|
102 |
.setRelClass(semantics) |
|
103 |
.setTarget(target) |
|
104 |
.setSource(source) |
|
105 |
.setResultProject(rpb); |
|
106 |
|
|
107 |
return OafProtos.Oaf.newBuilder().setKind(KindProtos.Kind.relation) |
|
108 |
|
|
109 |
.setRel(relation) |
|
110 |
.setDataInfo(getDataInfo(trust,CLASS_RELATION_ID,SCHEMA_ID,SCHEMA_NAME,DATA_INFO_TYPE,MSG)) |
|
111 |
.build(); |
|
112 |
} |
|
113 |
|
|
114 |
|
|
115 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/projecttoresult/SemanticRelationCounter.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult; |
|
2 |
|
|
3 |
import com.google.protobuf.InvalidProtocolBufferException; |
|
4 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
5 |
import org.apache.hadoop.hbase.client.Result; |
|
6 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
7 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
8 |
import org.apache.hadoop.hbase.util.Bytes; |
|
9 |
import org.apache.hadoop.io.Writable; |
|
10 |
import java.io.IOException; |
|
11 |
import java.util.Map; |
|
12 |
|
|
13 |
public class SemanticRelationCounter extends TableMapper<ImmutableBytesWritable, Writable> { |
|
14 |
|
|
15 |
private String[] sem_rels; |
|
16 |
|
|
17 |
@Override |
|
18 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
19 |
super.setup(context); |
|
20 |
|
|
21 |
String[] default_set = {"resultResult_supplement_isSupplementedBy"}; |
|
22 |
|
|
23 |
sem_rels = context.getConfiguration().getStrings("propagatetoproject.semanticrelations",default_set); |
|
24 |
if (sem_rels.length == 0) |
|
25 |
throw new InterruptedException("No semantic relation over which propagate is specified"); |
|
26 |
} |
|
27 |
|
|
28 |
@Override |
|
29 |
protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException { |
|
30 |
|
|
31 |
final Map<byte[], byte[]> resultMap = value.getFamilyMap(Bytes.toBytes("result")); |
|
32 |
|
|
33 |
final byte[] body = resultMap.get(Bytes.toBytes("body")); |
|
34 |
|
|
35 |
|
|
36 |
if (body != null) { |
|
37 |
final Oaf res_oaf = Oaf.parseFrom(body); |
|
38 |
if (res_oaf.getDataInfo().getDeletedbyinference()){ |
|
39 |
context.getCounter("Relation counter","del by inference result").increment(1); |
|
40 |
return; |
|
41 |
} |
|
42 |
context.getCounter("Relation counter", "not null body ").increment(1); |
|
43 |
boolean foundRelation = false; |
|
44 |
for (String sem_rel : sem_rels) |
|
45 |
foundRelation = foundRelation || countRelation(sem_rel,context,value); |
|
46 |
if(countRelation("resultProject_outcome_isProducedBy",context,value)){ |
|
47 |
if (foundRelation) |
|
48 |
context.getCounter("Relation counter", "Association to project in result with semantic relation" ).increment(1); |
|
49 |
} |
|
50 |
|
|
51 |
} |
|
52 |
} |
|
53 |
|
|
54 |
private boolean countRelation(String rel_to_find, Context context, Result value) throws InvalidProtocolBufferException { |
|
55 |
final Map<byte[], byte[]> relationMap = value.getFamilyMap(Bytes.toBytes(rel_to_find)); |
|
56 |
boolean ret = false; |
|
57 |
for(byte[] relation:relationMap.values() ){ |
|
58 |
final Oaf rel_oaf = Oaf.parseFrom(relation); |
|
59 |
if (!isValid(rel_oaf)) { |
|
60 |
context.getCounter("Relation counter", rel_to_find + " not present in relation set").increment(1); |
|
61 |
return ret; |
|
62 |
} |
|
63 |
if (rel_oaf.getDataInfo().getDeletedbyinference()){ |
|
64 |
context.getCounter("Relation counter ",rel_to_find + " del by inference" ).increment(1); |
|
65 |
|
|
66 |
}else{ |
|
67 |
context.getCounter("Relation counter ",rel_to_find ).increment(1); |
|
68 |
ret = true; |
|
69 |
} |
|
70 |
} |
|
71 |
return ret; |
|
72 |
} |
|
73 |
|
|
74 |
|
|
75 |
|
|
76 |
private boolean isValid(final Oaf oaf) { |
|
77 |
return (oaf != null) && oaf.isInitialized(); |
|
78 |
} |
|
79 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/projecttoresult/ProjectToResultKey.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult; |
|
2 |
|
|
3 |
import com.google.common.collect.ComparisonChain; |
|
4 |
import eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult.ProjectToResultKey; |
|
5 |
import org.apache.hadoop.io.IntWritable; |
|
6 |
import org.apache.hadoop.io.Text; |
|
7 |
import org.apache.hadoop.io.WritableComparable; |
|
8 |
|
|
9 |
import java.io.DataInput; |
|
10 |
import java.io.DataOutput; |
|
11 |
import java.io.IOException; |
|
12 |
|
|
13 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.FROM_SEM_REL; |
|
14 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.FROM_RESULT; |
|
15 |
|
|
16 |
public class ProjectToResultKey implements WritableComparable<ProjectToResultKey> { |
|
17 |
|
|
18 |
|
|
19 |
private IntWritable keyType; |
|
20 |
|
|
21 |
private Text id; |
|
22 |
|
|
23 |
public ProjectToResultKey() { |
|
24 |
} |
|
25 |
|
|
26 |
public static ProjectToResultKey create(final int keyType, final String id) { |
|
27 |
return new ProjectToResultKey(keyType, id); |
|
28 |
} |
|
29 |
|
|
30 |
public static ProjectToResultKey existSemRel(final String id) { |
|
31 |
return new ProjectToResultKey(FROM_SEM_REL, id); |
|
32 |
} |
|
33 |
|
|
34 |
public static ProjectToResultKey doesNotExistSemRel(final String id) { |
|
35 |
return new ProjectToResultKey(FROM_RESULT, id); |
|
36 |
} |
|
37 |
|
|
38 |
|
|
39 |
public ProjectToResultKey(final int keyType, final String id) { |
|
40 |
this.id = new Text(id); |
|
41 |
this.keyType = new IntWritable(keyType); |
|
42 |
} |
|
43 |
|
|
44 |
public void setKeyType(final IntWritable keyType) { |
|
45 |
this.keyType = keyType; |
|
46 |
} |
|
47 |
|
|
48 |
public void setId(final Text id) { |
|
49 |
this.id = id; |
|
50 |
} |
|
51 |
|
|
52 |
public Text getId() { |
|
53 |
return id; |
|
54 |
} |
|
55 |
|
|
56 |
public IntWritable getKeyType() { |
|
57 |
return keyType; |
|
58 |
} |
|
59 |
|
|
60 |
@Override |
|
61 |
public int compareTo(final ProjectToResultKey o) { |
|
62 |
final int res = ComparisonChain.start() |
|
63 |
.compare(getId(), o.getId()) |
|
64 |
.compare(getKeyType(), o.getKeyType()) |
|
65 |
.result(); |
|
66 |
|
|
67 |
//System.out.println(String.format("%s.compareTo(%s) = %s", toString(), o.toString(), res)); |
|
68 |
return res; |
|
69 |
} |
|
70 |
|
|
71 |
@Override |
|
72 |
public void write(final DataOutput out) throws IOException { |
|
73 |
keyType.write(out); |
|
74 |
id.write(out); |
|
75 |
} |
|
76 |
|
|
77 |
@Override |
|
78 |
public void readFields(final DataInput in) throws IOException { |
|
79 |
keyType = new IntWritable(); |
|
80 |
keyType.readFields(in); |
|
81 |
id = new Text(); |
|
82 |
id.readFields(in); |
|
83 |
} |
|
84 |
|
|
85 |
@Override |
|
86 |
public String toString() { |
|
87 |
return (new StringBuilder()) |
|
88 |
.append('{') |
|
89 |
.append(getKeyType().get()) |
|
90 |
.append(',') |
|
91 |
.append(getId()) |
|
92 |
.append('}') |
|
93 |
.toString(); |
|
94 |
} |
|
95 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/projecttoresult/PropagationProjectToResultReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult; |
|
2 |
|
|
3 |
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException; |
|
4 |
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator; |
|
5 |
import eu.dnetlib.data.proto.*; |
|
6 |
import org.apache.commons.logging.Log; |
|
7 |
import org.apache.commons.logging.LogFactory; |
|
8 |
import org.apache.hadoop.hbase.client.Put; |
|
9 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
10 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
|
11 |
import org.apache.hadoop.hbase.util.Bytes; |
|
12 |
import org.apache.hadoop.io.Text; |
|
13 |
|
|
14 |
import java.io.IOException; |
|
15 |
|
|
16 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
|
17 |
|
|
18 |
public class PropagationProjectToResultReducer extends TableReducer<ImmutableBytesWritable, Text, ImmutableBytesWritable> { |
|
19 |
private static final Log log = LogFactory.getLog(PropagationProjectToResultReducer.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
20 |
private ImmutableBytesWritable keyOut; |
|
21 |
|
|
22 |
|
|
23 |
|
|
24 |
@Override |
|
25 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
26 |
super.setup(context); |
|
27 |
keyOut = new ImmutableBytesWritable(); |
|
28 |
} |
|
29 |
|
|
30 |
|
|
31 |
@Override |
|
32 |
protected void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { |
|
33 |
ResultIterator rh = null; |
|
34 |
try { |
|
35 |
rh = new ResultProjectIterator(values,Bytes.toString(key.copyBytes())); |
|
36 |
} catch (NotValidResultSequenceException e) { |
|
37 |
context.getCounter(COUNTER_PROPAGATION,e.getMessage()).increment(1); |
|
38 |
return; |
|
39 |
} |
|
40 |
while(rh.hasNext()){ |
|
41 |
for(OafProtos.Oaf oaf:rh.next()){ |
|
42 |
final String source = oaf.getRel().getSource(); |
|
43 |
final Put put = new Put(Bytes.toBytes(source)).add(Bytes.toBytes(RELATION + oaf.getRel().getRelClass()),Bytes.toBytes(oaf.getRel().getTarget()),oaf.toByteArray()); |
|
44 |
keyOut.set(Bytes.toBytes(source)); |
|
45 |
context.write(keyOut, put); |
|
46 |
|
|
47 |
} |
|
48 |
context.getCounter(COUNTER_PROPAGATION,"Added relation to project").increment(1); |
|
49 |
} |
|
50 |
|
|
51 |
} |
|
52 |
|
|
53 |
|
|
54 |
|
|
55 |
|
|
56 |
|
|
57 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/projecttoresult/PropagationProjectToResultFileReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult; |
|
2 |
|
|
3 |
|
|
4 |
import com.googlecode.protobuf.format.JsonFormat; |
|
5 |
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException; |
|
6 |
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator; |
|
7 |
import eu.dnetlib.data.proto.*; |
|
8 |
import org.apache.commons.logging.Log; |
|
9 |
import org.apache.commons.logging.LogFactory; |
|
10 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
11 |
import org.apache.hadoop.hbase.util.Bytes; |
|
12 |
import org.apache.hadoop.io.Text; |
|
13 |
import org.apache.hadoop.mapreduce.Reducer; |
|
14 |
|
|
15 |
import java.io.IOException; |
|
16 |
|
|
17 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
|
18 |
|
|
19 |
public class PropagationProjectToResultFileReducer extends Reducer<ImmutableBytesWritable, Text, Text,Text> { |
|
20 |
private static final Log log = LogFactory.getLog(PropagationProjectToResultReducer.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
21 |
|
|
22 |
private Text keyOut; |
|
23 |
private Text outValue; |
|
24 |
|
|
25 |
|
|
26 |
@Override |
|
27 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
28 |
super.setup(context); |
|
29 |
keyOut = new Text(""); |
|
30 |
outValue = new Text(); |
|
31 |
} |
|
32 |
|
|
33 |
|
|
34 |
@Override |
|
35 |
protected void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { |
|
36 |
ResultIterator rh = null; |
|
37 |
try { |
|
38 |
rh = new ResultProjectIterator(values, Bytes.toString(key.copyBytes())); |
|
39 |
} catch (NotValidResultSequenceException e) { |
|
40 |
context.getCounter(COUNTER_PROPAGATION, e.getMessage()).increment(1); |
|
41 |
return; |
|
42 |
} |
|
43 |
while (rh.hasNext()) { |
|
44 |
for (OafProtos.Oaf oaf : rh.next()) { |
|
45 |
keyOut.set(oaf.getRel().getTarget()); |
|
46 |
outValue.set(JsonFormat.printToString(oaf).getBytes()); |
|
47 |
context.write(keyOut, outValue); |
|
48 |
} |
|
49 |
context.getCounter(COUNTER_PROPAGATION, "Added relation to project").increment(1); |
|
50 |
} |
|
51 |
|
|
52 |
} |
|
53 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/projecttoresult/NaturalProjectToResultKeyPartitioner.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult; |
|
2 |
|
|
3 |
import eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories.InstOrgKey; |
|
4 |
import org.apache.hadoop.io.Text; |
|
5 |
import org.apache.hadoop.mapreduce.Partitioner; |
|
6 |
|
|
7 |
/** |
|
8 |
* Created by miriam on 17/08/2018. |
|
9 |
*/ |
|
10 |
public class NaturalProjectToResultKeyPartitioner extends Partitioner<ProjectToResultKey, Text > { |
|
11 |
|
|
12 |
@Override |
|
13 |
public int getPartition(ProjectToResultKey key, Text val, int numPartitions) { |
|
14 |
final int res = Math.abs(key.getId().hashCode() % numPartitions); |
|
15 |
|
|
16 |
return res; |
|
17 |
} |
|
18 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/projecttoresult/NaturalProjectToResultKeyGroupingComparator.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult; |
|
2 |
|
|
3 |
import eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories.InstOrgKey; |
|
4 |
import org.apache.hadoop.io.WritableComparable; |
|
5 |
import org.apache.hadoop.io.WritableComparator; |
|
6 |
|
|
7 |
/** |
|
8 |
* Created by miriam on 17/08/2018. |
|
9 |
*/ |
|
10 |
public class NaturalProjectToResultKeyGroupingComparator extends WritableComparator { |
|
11 |
|
|
12 |
protected NaturalProjectToResultKeyGroupingComparator() { |
|
13 |
super(ProjectToResultKey.class, true); |
|
14 |
} |
|
15 |
|
|
16 |
@Override |
|
17 |
public int compare(WritableComparable w1, WritableComparable w2) { |
|
18 |
final InstOrgKey k1 = (InstOrgKey) w1; |
|
19 |
final InstOrgKey k2 = (InstOrgKey) w2; |
|
20 |
|
|
21 |
return k1.getId().compareTo(k2.getId()); |
|
22 |
} |
|
23 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/projecttoresult/ProjectToResultMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult; |
|
2 |
|
|
3 |
import com.google.protobuf.InvalidProtocolBufferException; |
|
4 |
import eu.dnetlib.data.mapreduce.hbase.propagation.Value; |
|
5 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
6 |
import eu.dnetlib.data.proto.OafProtos; |
|
7 |
import eu.dnetlib.data.proto.TypeProtos; |
|
8 |
import org.apache.commons.lang3.StringUtils; |
|
9 |
import org.apache.hadoop.hbase.client.Result; |
|
10 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
11 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
12 |
import org.apache.hadoop.hbase.util.Bytes; |
|
13 |
import org.apache.hadoop.io.Text; |
|
14 |
import java.io.IOException; |
|
15 |
import java.util.*; |
|
16 |
import java.util.stream.Collectors; |
|
17 |
|
|
18 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
|
19 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getEntity; |
|
20 |
|
|
21 |
|
|
22 |
//public class ProjectToResultMapper extends TableMapper<ProjectToResultKey, Text> { |
|
23 |
public class ProjectToResultMapper extends TableMapper<ImmutableBytesWritable, Text> { |
|
24 |
private static final String SEPARATOR = ","; |
|
25 |
private String[] sem_rels; |
|
26 |
private String trust; |
|
27 |
|
|
28 |
private ImmutableBytesWritable keyOut; |
|
29 |
private Text valueOut; |
|
30 |
|
|
31 |
|
|
32 |
|
|
33 |
@Override |
|
34 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
35 |
|
|
36 |
keyOut = new ImmutableBytesWritable(); |
|
37 |
valueOut = new Text(); |
|
38 |
|
|
39 |
sem_rels = context.getConfiguration().getStrings("propagatetoproject.semanticrelations", DEFAULT_RELATION_SET); |
|
40 |
trust = context.getConfiguration().get("propagatetoproject.trust","0.85"); |
|
41 |
|
|
42 |
} |
|
43 |
|
|
44 |
@Override |
|
45 |
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { |
|
46 |
final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType(); |
|
47 |
//If the type is not result I do not need to process it |
|
48 |
if(!type.equals(TypeProtos.Type.result)) { |
|
49 |
return; |
|
50 |
} |
|
51 |
//verify if entity is valid |
|
52 |
final OafProtos.OafEntity entity = getEntity(value, type); |
|
53 |
if (entity == null) { |
|
54 |
context.getCounter(COUNTER_PROPAGATION,"Del by inference or null body for result").increment(1); |
|
55 |
return; |
|
56 |
} |
|
57 |
|
|
58 |
context.getCounter(COUNTER_PROPAGATION, "Valid result ").increment(1); |
|
59 |
|
|
60 |
//selection of all the projects associated to this result |
|
61 |
String projectIdList = getProjectIdList(value); |
|
62 |
|
|
63 |
//if the list of projects is not empty, verify if it exists some allowed semantic relation to which propagate the project |
|
64 |
if(StringUtils.isNotBlank(projectIdList)){ |
|
65 |
final Set<String> toemitrelations = new HashSet<>(); |
|
66 |
//selection of all the results bind by this result considering all the allowed semantic relations |
|
67 |
for (String sem_rel : sem_rels) |
|
68 |
toemitrelations.addAll(getRelationTarget(value,sem_rel)); |
|
69 |
emit(context, toemitrelations, projectIdList); |
|
70 |
context.getCounter(COUNTER_PROPAGATION, "emit for semantic relation").increment(toemitrelations.size()); |
|
71 |
} |
|
72 |
|
|
73 |
keyOut.set(entity.getId().getBytes()); |
|
74 |
valueOut.set(Value.newInstance(projectIdList,Type.fromresult).toJson()); |
|
75 |
//context.write(ProjectToResultKey.create(FROM_RESULT,entity.getId()), valueOut); |
|
76 |
|
|
77 |
context.write(keyOut, valueOut); |
|
78 |
context.getCounter(COUNTER_PROPAGATION, "emit for result").increment(1); |
|
79 |
|
|
80 |
} |
|
81 |
|
|
82 |
//emit for each valid semantic relation the id of the relation target and the list of projects associated to the source of the relation |
|
83 |
private void emit( Context context, Set<String> toemitrelations, String projectIdList) throws IOException, InterruptedException { |
|
84 |
for(String relation : toemitrelations){ |
|
85 |
keyOut.set(relation.getBytes()); |
|
86 |
valueOut.set(Value.newInstance( projectIdList,trust,Type.fromsemrel).toJson()); |
|
87 |
context.write(keyOut, valueOut); |
|
88 |
} |
|
89 |
} |
|
90 |
|
|
91 |
//starting from the Result gets the list of projects it is related to and returns it as a csv |
|
92 |
private String getProjectIdList(Result value) throws InvalidProtocolBufferException { |
|
93 |
Set<String> ret = getRelationTarget(value, OUTCOME_PRODUCEDBY); |
|
94 |
return ret.size() == 0 ? null : String.join(SEPARATOR, ret); |
|
95 |
} |
|
96 |
|
|
97 |
private Set<String> getRelationTarget(Result value, String sem_rel) throws InvalidProtocolBufferException { |
|
98 |
|
|
99 |
final Map<byte[], byte[]> relationMap = value.getFamilyMap(Bytes.toBytes(sem_rel)); |
|
100 |
|
|
101 |
/* |
|
102 |
we could extract the target qualifiers from the familyMap's keyset, but we also need to check the relationship is not deletedbyinference |
|
103 |
return relationMap.keySet().stream() |
|
104 |
.map(String::new) |
|
105 |
.collect(Collectors.toCollection(HashSet::new)); |
|
106 |
*/ |
|
107 |
|
|
108 |
return relationMap.values().stream() |
|
109 |
.map(this::asOaf) |
|
110 |
.filter(Objects::nonNull) |
|
111 |
.filter(o -> isValid(o)) |
|
112 |
.filter(o -> !o.getDataInfo().getDeletedbyinference()) |
|
113 |
.map(o -> o.getRel().getTarget()) |
|
114 |
.collect(Collectors.toCollection(HashSet::new)); |
|
115 |
|
|
116 |
} |
|
117 |
|
|
118 |
private OafProtos.Oaf asOaf(byte[] r) { |
|
119 |
try { |
|
120 |
return OafProtos.Oaf.parseFrom(r); |
|
121 |
} catch (InvalidProtocolBufferException e) { |
|
122 |
return null; |
|
123 |
} |
|
124 |
} |
|
125 |
|
|
126 |
|
|
127 |
private boolean isValid(final OafProtos.Oaf oaf) { |
|
128 |
return (oaf != null) && oaf.isInitialized(); |
|
129 |
} |
|
130 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/ResultIterator.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation; |
|
2 |
|
|
3 |
import eu.dnetlib.data.proto.OafProtos; |
|
4 |
import org.apache.hadoop.io.Text; |
|
5 |
|
|
6 |
import java.util.Iterator; |
|
7 |
import java.util.List; |
|
8 |
|
|
9 |
|
|
10 |
public abstract class ResultIterator implements Iterator<List<OafProtos.Oaf>>{ |
|
11 |
|
|
12 |
protected Iterator<Text> it; |
|
13 |
protected int key; |
|
14 |
protected String keyb; |
|
15 |
protected String resultId; |
|
16 |
protected boolean propagate = true; |
|
17 |
protected final static String TERMINATOR = "FINITO"; |
|
18 |
protected String trust = null; |
|
19 |
|
|
20 |
public ResultIterator(final Iterable<Text> values, int key) throws NotValidResultSequenceException { |
|
21 |
it = values.iterator(); |
|
22 |
this.key = key; |
|
23 |
checkSequence(); |
|
24 |
} |
|
25 |
|
|
26 |
public ResultIterator(final Iterable<Text> values, final String key) throws NotValidResultSequenceException { |
|
27 |
it = values.iterator(); |
|
28 |
this.keyb = key; |
|
29 |
checkSequence(); |
|
30 |
} |
|
31 |
|
|
32 |
protected abstract void checkSequence() throws NotValidResultSequenceException ; |
|
33 |
public abstract List<OafProtos.Oaf> next() ; |
|
34 |
|
|
35 |
public boolean hasNext(){ |
|
36 |
return (!resultId.equals(TERMINATOR) && propagate); |
|
37 |
} |
|
38 |
|
|
39 |
public boolean getPropagate(){ |
|
40 |
return propagate; |
|
41 |
} |
|
42 |
|
|
43 |
public String getResultId(){ |
|
44 |
return resultId; |
|
45 |
} |
|
46 |
|
|
47 |
} |
|
48 |
|
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/Value.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.propagation; |
2 | 2 |
|
3 | 3 |
import com.google.gson.Gson; |
4 |
import eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.Type; |
|
4 | 5 |
|
5 | 6 |
public class Value { |
6 | 7 |
|
7 | 8 |
public static final String DEFAULT_TRUST = "0"; |
8 |
|
|
9 |
private Type type; |
|
9 | 10 |
private String value; |
10 | 11 |
private String trust; |
11 | 12 |
|
... | ... | |
13 | 14 |
this(value, DEFAULT_TRUST); |
14 | 15 |
} |
15 | 16 |
|
17 |
public Value(String value, Type type) { |
|
18 |
this.value = value; |
|
19 |
this.type=type; |
|
20 |
} |
|
21 |
|
|
22 |
public Value(String value, String trust, Type type) { |
|
23 |
|
|
24 |
this.value=value; |
|
25 |
this.trust=trust; |
|
26 |
this.type=type; |
|
27 |
} |
|
28 |
|
|
16 | 29 |
public static Value newInstance(String value, String trust) { |
17 | 30 |
return new Value(value, trust); |
18 | 31 |
} |
... | ... | |
21 | 34 |
return new Value(value); |
22 | 35 |
} |
23 | 36 |
|
37 |
public static Value newInstance(String value,Type type) { |
|
38 |
return new Value(value,type); |
|
39 |
} |
|
40 |
|
|
41 |
public static Value newInstance(String value, String trust, Type type){ |
|
42 |
return new Value(value,trust,type); |
|
43 |
} |
|
44 |
|
|
24 | 45 |
public static Value fromJson(final String json) { |
25 | 46 |
return new Gson().fromJson(json, Value.class); |
26 | 47 |
} |
... | ... | |
34 | 55 |
return value; |
35 | 56 |
} |
36 | 57 |
|
37 |
public void setValue(String value) {
|
|
58 |
public Value setValue(String value) {
|
|
38 | 59 |
this.value = value; |
60 |
return this; |
|
39 | 61 |
} |
40 | 62 |
|
41 | 63 |
public String getTrust() { |
42 | 64 |
return trust; |
43 | 65 |
} |
44 | 66 |
|
45 |
public void setTrust(String trust) {
|
|
67 |
public Value setTrust(String trust) {
|
|
46 | 68 |
this.trust = trust; |
69 |
return this; |
|
47 | 70 |
} |
48 | 71 |
|
49 | 72 |
public String toJson() { |
... | ... | |
51 | 74 |
} |
52 | 75 |
|
53 | 76 |
|
77 |
public Type getType() { |
|
78 |
return type; |
|
79 |
} |
|
80 |
|
|
81 |
public Value setType(Type type) { |
|
82 |
this.type = type; |
|
83 |
return this; |
|
84 |
} |
|
54 | 85 |
} |
Also available in: Unified diff
propagation of country from institutional repository and of result projects through semantic link