Project

General

Profile

« Previous | Next » 

Revision 53510

[maven-release-plugin] copy for tag dnet-openaireplus-workflows-6.3.19

View differences:

modules/dnet-openaireplus-workflows/tags/dnet-openaireplus-workflows-6.3.19/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/contexts/GenerateFETH2020ContextJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.contexts;
2

  
3
import java.util.List;
4
import javax.annotation.Resource;
5

  
6
import com.googlecode.sarasvati.Arc;
7
import com.googlecode.sarasvati.NodeToken;
8
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
9
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException;
10
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
11
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
12
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
13
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
14
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
15
import eu.dnetlib.msro.workflows.resultset.ProcessCountingResultSetFactory;
16
import eu.dnetlib.msro.workflows.util.ProgressProvider;
17
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider;
18
import org.apache.commons.logging.Log;
19
import org.apache.commons.logging.LogFactory;
20

  
21
public class GenerateFETH2020ContextJobNode extends SimpleJobNode implements ProgressJobNode {
22

  
23
	private static final Log log = LogFactory.getLog(GenerateFETH2020ContextJobNode.class);
24
	private String eprParam;
25
	private ResultsetProgressProvider progressProvider;
26
	@Resource
27
	private ResultSetClientFactory resultSetClientFactory;
28
	@Resource
29
	private ProcessCountingResultSetFactory processCountingResultSetFactory;
30
	@Resource
31
	private UniqueServiceLocator serviceLocator;
32

  
33
	@Override
34
	public ProgressProvider getProgressProvider() {
35
		return this.progressProvider;
36
	}
37

  
38
	@Override
39
	protected String execute(NodeToken token) throws Exception {
40
		final String epr = token.getEnv().getAttribute(eprParam);
41

  
42
		this.progressProvider = processCountingResultSetFactory.createProgressProvider(token.getProcess(), epr);
43

  
44
		final Iterable<String> iter = resultSetClientFactory.getClient(progressProvider.getEpr());
45

  
46
		BuildH2020FETTaxonomy builder = new BuildH2020FETTaxonomy();
47
		builder.setIterator(iter.iterator());
48
		String taxonomy = builder.parseProjects();
49
		final String xquery = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType') "
50
				+ "where $x//CONFIGURATION/context[@id='fet-h2020'] return $x//RESOURCE_IDENTIFIER/@value/string()";
51
		List<String> list = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery);
52
		if (list.isEmpty()) {
53
			registerProfile(taxonomy);
54
		} else {
55
			deleteProfile(list.get(0));
56
			registerProfile(taxonomy);
57
		}
58
		return Arc.DEFAULT_ARC;
59
	}
60

  
61
	private void registerProfile(String profile) throws ISRegistryException {
62
		log.info("registering fet-h2020 profile");
63
		ISRegistryService is = serviceLocator.getService(ISRegistryService.class);
64
		String id = is.registerProfile(profile);
65
		log.info("Generating profile with id " + id);
66
	}
67

  
68
	private void deleteProfile(String profId) throws ISRegistryException {
69
		log.info("deleting fet-h2020 profile with id " + profId);
70
		ISRegistryService is = serviceLocator.getService(ISRegistryService.class);
71
		is.deleteProfile(profId);
72
	}
73

  
74
	public String getEprParam() {
75
		return eprParam;
76
	}
77

  
78
	public void setEprParam(final String eprParam) {
79
		this.eprParam = eprParam;
80
	}
81

  
82
	public void setProgressProvider(final ResultsetProgressProvider progressProvider) {
83
		this.progressProvider = progressProvider;
84
	}
85

  
86
	public ResultSetClientFactory getResultSetClientFactory() {
87
		return resultSetClientFactory;
88
	}
89

  
90
	public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) {
91
		this.resultSetClientFactory = resultSetClientFactory;
92
	}
93

  
94
	public ProcessCountingResultSetFactory getProcessCountingResultSetFactory() {
95
		return processCountingResultSetFactory;
96
	}
97

  
98
	public void setProcessCountingResultSetFactory(final ProcessCountingResultSetFactory processCountingResultSetFactory) {
99
		this.processCountingResultSetFactory = processCountingResultSetFactory;
100
	}
101

  
102
	public UniqueServiceLocator getServiceLocator() {
103
		return serviceLocator;
104
	}
105

  
106
	public void setServiceLocator(final UniqueServiceLocator serviceLocator) {
107
		this.serviceLocator = serviceLocator;
108
	}
109

  
110
}
modules/dnet-openaireplus-workflows/tags/dnet-openaireplus-workflows-6.3.19/pom.xml
1
<?xml version="1.0" ?>
2
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3
	<parent>
4
		<groupId>eu.dnetlib</groupId>
5
		<artifactId>dnet45-parent</artifactId>
6
		<version>1.0.0</version>
7
		<relativePath />
8
	</parent>
9
	<modelVersion>4.0.0</modelVersion>
10
	<groupId>eu.dnetlib</groupId>
11
	<artifactId>dnet-openaireplus-workflows</artifactId>
12
	<packaging>jar</packaging>
13
	<version>6.3.19</version>
14

  
15
	<scm>
16
		<developerConnection>scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-openaireplus-workflows/tags/dnet-openaireplus-workflows-6.3.19</developerConnection>
17
	</scm>
18
	<dependencies>
19
		<dependency>
20
			<groupId>eu.dnetlib</groupId>
21
			<artifactId>cnr-data-flow-monitoring-core</artifactId>
22
			<version>[2.0.0,3.0.0)</version>
23
			<exclusions>
24
				<exclusion>
25
					<artifactId>wstx-asl</artifactId>
26
					<groupId>org.codehaus.woodstox</groupId>
27
				</exclusion>
28
			</exclusions>
29
		</dependency>
30
		<dependency>
31
			<groupId>eu.dnetlib</groupId>
32
			<artifactId>cnr-enabling-database-api</artifactId>
33
			<version>[2.0.0,3.0.0)</version>
34
		</dependency>
35
		<dependency>
36
			<groupId>eu.dnetlib</groupId>
37
			<artifactId>cnr-enabling-database-service</artifactId>
38
			<version>[3.0.0,4.0.0)</version>
39
		</dependency>
40
		<dependency>
41
			<groupId>eu.dnetlib</groupId>
42
			<artifactId>dnet-msro-service</artifactId>
43
			<version>[3.0.0,4.0.0)</version>
44
		</dependency>
45
		<dependency>
46
			<groupId>eu.dnetlib</groupId>
47
			<artifactId>cnr-resultset-service</artifactId>
48
			<version>[2.0.0,3.0.0)</version>
49
		</dependency>
50
		<dependency>
51
			<groupId>eu.dnetlib</groupId>
52
			<artifactId>dnet-openaire-datasource-manager</artifactId>
53
			<version>[1.0.0-SNAPSHOT,2.0.0)</version>
54
		</dependency>
55
		<dependency>
56
			<groupId>eu.dnetlib</groupId>
57
			<artifactId>dnet-openaireplus-mapping-utils</artifactId>
58
			<version>[6.0.0,7.0.0)</version>
59
		</dependency>
60
		<dependency>
61
			<groupId>eu.dnetlib</groupId>
62
			<artifactId>dnet-hadoop-service-rmi</artifactId>
63
			<version>[1.0.0,2.0.0)</version>
64
		</dependency>
65
		<dependency>
66
			<groupId>eu.dnetlib</groupId>
67
			<artifactId>dnet-index-solr-common</artifactId>
68
			<version>[1.0.0,2.0.0)</version>
69
		</dependency>
70
		<dependency>
71
			<groupId>eu.dnetlib</groupId>
72
			<artifactId>dnet-collector-plugins</artifactId>
73
			<version>[1.0.0,2.0.0)</version>
74
		</dependency>
75

  
76
		<dependency>
77
			<groupId>eu.dnetlib</groupId>
78
			<artifactId>dnet-actionmanager-api</artifactId>
79
			<version>[4.0.0,5.0.0)</version>
80
		</dependency>
81

  
82
		<dependency>
83
			<groupId>eu.dnetlib</groupId>
84
			<artifactId>dnet-oai-common-workflows</artifactId>
85
			<version>[5.0.0,6.0.0)</version>
86
		</dependency>
87
		
88
		<dependency>
89
			<groupId>eu.dnetlib</groupId>
90
			<artifactId>cnr-mongo-mdstore</artifactId>
91
			<version>[6.0.0,7.0.0)</version>
92
			<scope>provided</scope>
93
		</dependency>
94
		
95
		<dependency>
96
			<groupId>eu.dnetlib</groupId>
97
			<artifactId>dnet-modular-objectstore-service</artifactId>
98
			<version>[4.2.1,5.0.0)</version>
99
			<scope>provided</scope>
100
		</dependency>
101

  
102
		<dependency>
103
			<groupId>eu.dnetlib</groupId>
104
			<artifactId>dnet-index-solr-client</artifactId>
105
			<version>[2.0.0,2.9.9)</version>
106
		</dependency>
107

  
108
		<dependency>
109
			<groupId>eu.dnetlib</groupId>
110
			<artifactId>dnet-validator-workflows</artifactId>
111
			<version>[1.0.0,2.0.0)</version>
112
		</dependency>
113

  
114
		<dependency>
115
			<groupId>eu.dnetlib</groupId>
116
			<artifactId>dnet-deduplication</artifactId>
117
			<version>[1.0.0,2.0.0)</version>
118
		</dependency>
119

  
120
		<!-- modular ui and servlet api are here because of the stats controller -->
121
		<dependency>
122
			<groupId>eu.dnetlib</groupId>
123
			<artifactId>dnet-directindex-api</artifactId>
124
			<version>[1.0.0,2.0.0)</version>
125
		</dependency>
126

  
127
		<dependency>
128
			<groupId>org.apache.velocity</groupId>
129
			<artifactId>velocity</artifactId>
130
			<version>1.7</version>
131
			<exclusions>
132
				<exclusion>
133
					<artifactId>antlr</artifactId>
134
					<groupId>antlr</groupId>
135
				</exclusion>
136
			</exclusions>
137
		</dependency>
138
		<dependency>
139
			<groupId>org.apache.velocity</groupId>
140
			<artifactId>velocity-tools</artifactId>
141
			<version>2.0</version>
142
			<exclusions>
143
				<exclusion>
144
					<artifactId>antlr</artifactId>
145
					<groupId>antlr</groupId>
146
				</exclusion>
147
			</exclusions>
148
		</dependency>
149
		<dependency>
150
			<groupId>javax.servlet</groupId>
151
			<artifactId>javax.servlet-api</artifactId>
152
			<version>${javax.servlet.version}</version>
153
			<scope>provided</scope>
154
		</dependency>
155
		<dependency>
156
			<groupId>junit</groupId>
157
			<artifactId>junit</artifactId>
158
			<version>${junit.version}</version>
159
			<scope>test</scope>
160
		</dependency>
161
		<dependency>
162
			<groupId>eu.dnetlib</groupId>
163
			<artifactId>dnet-openaireplus-profiles</artifactId>
164
			<version>[1.0.0,2.0.0)</version>
165
			<scope>test</scope>
166
		</dependency>
167
		<dependency>
168
			<groupId>io.springfox</groupId>
169
			<artifactId>springfox-swagger2</artifactId>
170
			<version>${springfox-version}</version>
171
		</dependency>
172
		<dependency>
173
			<groupId>org.mockito</groupId>
174
			<artifactId>mockito-core</artifactId>
175
			<version>1.9.5</version>
176
		</dependency>
177
		<dependency>
178
			<groupId>org.springframework</groupId>
179
			<artifactId>spring-jdbc</artifactId>
180
			<version>${spring.version}</version>
181
		</dependency>
182

  
183
		<dependency>
184
			<groupId>eu.dnetlib</groupId>
185
			<artifactId>cnr-misc-utils</artifactId>
186
			<version>[1.0.5, 2.0.0)</version>
187
		</dependency>
188
	</dependencies>
189

  
190
	<properties>
191
		<springfox-version>2.5.0</springfox-version>
192
	</properties>
193
</project>
modules/dnet-openaireplus-workflows/tags/dnet-openaireplus-workflows-6.3.19/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/index/PrepareIndexDataJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.index;
2

  
3
import java.io.IOException;
4
import java.io.InputStream;
5
import java.io.StringReader;
6
import java.io.StringWriter;
7
import javax.annotation.Resource;
8
import javax.xml.transform.Transformer;
9
import javax.xml.transform.TransformerException;
10
import javax.xml.transform.TransformerFactory;
11
import javax.xml.transform.stream.StreamResult;
12
import javax.xml.transform.stream.StreamSource;
13

  
14
import com.googlecode.sarasvati.Arc;
15
import com.googlecode.sarasvati.NodeToken;
16
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
17
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
18
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
19
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
20
import eu.dnetlib.miscutils.datetime.DateUtils;
21
import eu.dnetlib.miscutils.functional.hash.Hashing;
22
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
23
import org.apache.commons.io.IOUtils;
24
import org.apache.commons.lang.StringUtils;
25
import org.apache.commons.logging.Log;
26
import org.apache.commons.logging.LogFactory;
27
import org.springframework.beans.factory.annotation.Required;
28
import org.springframework.core.io.ClassPathResource;
29

  
30
public class PrepareIndexDataJobNode extends SimpleJobNode {
31

  
32
	/**
33
	 * logger.
34
	 */
35
	private static final Log log = LogFactory.getLog(PrepareIndexDataJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
36

  
37
	@Resource
38
	private UniqueServiceLocator serviceLocator;
39

  
40
	/**
41
	 * Stylesheet which transforms a layout to another stylesheet which converts a input record to a index record.
42
	 */
43
	private String layoutToRecordStylesheet;
44

  
45
	private String outputRecordsPathParam;
46

  
47
	private String rottenRecordsPathParam;
48

  
49
	private String hbaseTable;
50

  
51
	private String oafSchemaLocationProperty;
52

  
53
	@Override
54
	protected String execute(final NodeToken token) throws Exception {
55

  
56
		log.info("start preparing job");
57

  
58
		final String xslt = prepareXslt(env("format", token), env("layout", token));
59

  
60
		token.getEnv().setAttribute("index.xslt", xslt);
61

  
62
		if (!StringUtils.isBlank(getOutputRecordsPathParam())) {
63
			token.getEnv().setAttribute(getOutputRecordsPathParam(), "/tmp" + getFileName(token, "indexrecords"));
64
		}
65
		if (!StringUtils.isBlank(getRottenRecordsPathParam())) {
66
			token.getEnv().setAttribute(getRottenRecordsPathParam(), "/tmp" + getFileName(token, "rottenrecords"));
67
		}
68

  
69
		token.getEnv().setAttribute("index.solr.url", getIndexSolrUrlZk());
70
		token.getEnv().setAttribute("index.solr.collection", getCollectionName(token));
71

  
72
		token.getEnv().setAttribute("index.shutdown.wait.time", getIndexSolrShutdownWait());
73
		token.getEnv().setAttribute("index.buffer.flush.threshold", getIndexBufferFlushTreshold());
74
		token.getEnv().setAttribute("index.solr.sim.mode", isFeedingSimulationMode());
75

  
76
		token.getEnv().setAttribute("index.feed.timestamp", DateUtils.now_ISO8601());
77

  
78
		token.getEnv().setAttribute(getOafSchemaLocationProperty(), getPropertyFetcher().getProperty(getOafSchemaLocationProperty()));
79

  
80
		return Arc.DEFAULT_ARC;
81
	}
82

  
83
	protected String tableName(final NodeToken token) {
84
		if (token.getEnv().hasAttribute("hbaseTable")) {
85
			String table = token.getEnv().getAttribute("hbaseTable");
86
			log.debug("found override value in wfEnv for 'hbaseTable' param: " + table);
87
			return table;
88
		}
89
		return getHbaseTable();
90
	}
91

  
92
	public String getIndexSolrUrlZk() throws ISLookUpException {
93
		return getServiceConfigValue(
94
				"for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()");
95
	}
96

  
97
	public String getIndexSolrShutdownWait() throws ISLookUpException {
98
		return queryForServiceProperty("solr:feedingShutdownTolerance");
99
	}
100

  
101
	public String getIndexBufferFlushTreshold() throws ISLookUpException {
102
		return queryForServiceProperty("solr:feedingBufferFlushThreshold");
103
	}
104

  
105
	public String isFeedingSimulationMode() throws ISLookUpException {
106
		return queryForServiceProperty("solr:feedingSimulationMode");
107
	}
108

  
109
	private String queryForServiceProperty(final String key) throws ISLookUpException {
110
		return getServiceConfigValue(
111
				"for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//SERVICE_PROPERTIES/PROPERTY[./@ key='"
112
						+ key + "']/@value/string()");
113
	}
114

  
115
	private String getServiceConfigValue(final String xquery) throws ISLookUpException {
116
		log.debug("quering for service property: " + xquery);
117
		final String res = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(xquery);
118
		if (StringUtils.isBlank(res)) { throw new IllegalStateException("unable to find unique service property, xquery: " + xquery); }
119
		return res;
120
	}
121

  
122
	private String getFileName(final NodeToken token, final String fileNamePrefix) {
123
		return "/" + fileNamePrefix + "_" + tableName(token) + "_" + token.getEnv().getAttribute("format") + ".seq";
124
	}
125

  
126
	private String getCollectionName(final NodeToken token) {
127
		return env("format", token) + "-" + env("layout", token) + "-" + env("interpretation", token);
128
	}
129

  
130
	private String env(final String s, final NodeToken token) {
131
		return token.getEnv().getAttribute(s);
132
	}
133

  
134
	/**
135
	 * Transforms each OAF record into a index record.
136
	 *
137
	 * @param format         format
138
	 * @param layout         layout
139
	 * @return resultset with transformed records
140
	 * @throws ISLookUpException    could happen
141
	 * @throws IOException          could happen
142
	 * @throws TransformerException could happen
143
	 */
144
	protected String prepareXslt(final String format, final String layout) throws ISLookUpException, IOException, TransformerException {
145

  
146
		final TransformerFactory factory = TransformerFactory.newInstance();
147
		final Transformer layoutTransformer = factory.newTransformer(new StreamSource(new StringReader(readXslt(getLayoutToRecordStylesheet()))));
148

  
149
		final StreamResult layoutToXsltXslt = new StreamResult(new StringWriter());
150

  
151
		layoutTransformer.setParameter("format", format);
152
		layoutTransformer.transform(new StreamSource(new StringReader(getLayoutSource(format, layout))), layoutToXsltXslt);
153

  
154
		return new String(Hashing.encodeBase64(layoutToXsltXslt.getWriter().toString()));
155
	}
156

  
157
	private String readXslt(final String s) throws IOException {
158
		ClassPathResource resource = new ClassPathResource(s);
159
		InputStream inputStream = resource.getInputStream();
160
		return IOUtils.toString(inputStream);
161
	}
162

  
163
	private String getLayoutSource(final String format, final String layout) throws ISLookUpDocumentNotFoundException, ISLookUpException {
164
		return serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(
165
				"collection('')//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and .//NAME='" + format + "']//LAYOUT[@name='" + layout
166
						+ "']");
167
	}
168

  
169
	public String getLayoutToRecordStylesheet() {
170
		return layoutToRecordStylesheet;
171
	}
172

  
173
	public void setLayoutToRecordStylesheet(final String layoutToRecordStylesheet) {
174
		this.layoutToRecordStylesheet = layoutToRecordStylesheet;
175
	}
176

  
177
	public String getHbaseTable() {
178
		return hbaseTable;
179
	}
180

  
181
	@Required
182
	public void setHbaseTable(final String hbaseTable) {
183
		this.hbaseTable = hbaseTable;
184
	}
185

  
186
	public String getOutputRecordsPathParam() {
187
		return outputRecordsPathParam;
188
	}
189

  
190
	public void setOutputRecordsPathParam(final String outputRecordsPathParam) {
191
		this.outputRecordsPathParam = outputRecordsPathParam;
192
	}
193

  
194
	public String getRottenRecordsPathParam() {
195
		return rottenRecordsPathParam;
196
	}
197

  
198
	public void setRottenRecordsPathParam(final String rottenRecordsPathParam) {
199
		this.rottenRecordsPathParam = rottenRecordsPathParam;
200
	}
201

  
202
	public String getOafSchemaLocationProperty() {
203
		return oafSchemaLocationProperty;
204
	}
205

  
206
	public void setOafSchemaLocationProperty(final String oafSchemaLocationProperty) {
207
		this.oafSchemaLocationProperty = oafSchemaLocationProperty;
208
	}
209

  
210
}
modules/dnet-openaireplus-workflows/tags/dnet-openaireplus-workflows-6.3.19/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/index/SwitchIndexesJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.index;
2

  
3
import java.util.Queue;
4

  
5
import com.googlecode.sarasvati.Arc;
6
import com.googlecode.sarasvati.NodeToken;
7
import eu.dnetlib.msro.rmi.MSROException;
8
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11

  
12
/**
13
 * SwitchIndexesJobNode guides the index switches on all search services whose ids are in the queue available in the parameter named 'searchService_ids'.
14
 *
15
 * @author claudio, alessia
16
 * @see eu.dnetlib.msro.openaireplus.workflows.nodes.index.FindSearchServicesJobNode
17
 * @see eu.dnetlib.msro.openaireplus.workflows.nodes.index.SwitchIndexJobNode
18
 */
19
public class SwitchIndexesJobNode extends SimpleJobNode {
20

  
21
	private static final Log log = LogFactory.getLog(SwitchIndexesJobNode.class);
22

  
23
	@Override
24
	protected String execute(final NodeToken token) throws Exception {
25
		Queue<String> q = (Queue<String>) token.getEnv().getTransientAttribute("searchService_ids");
26
		log.debug("Got the searchService_ids queue: " + q.toString());
27
		if (q == null) throw new MSROException("Transient param 'searchService_ids' with queue of string could not be found");
28
		if (q.isEmpty()) {
29
			log.info("searchService_ids queue consumed, now ending cycle");
30
			return Arc.DEFAULT_ARC;
31
		} else {
32
			//we have something to do: setting the xqueryForServiceIdParam for the SwitchIndexJobNode
33
			String id = q.poll();
34
			log.debug("Polled id: " + id);
35
			log.debug("And now the queue is " + q);
36
			token.getEnv().setAttribute("search_service_ID", id);
37
			log.debug("Asking to switch on profile with id: " + id);
38
			//updating the queue for next iteration
39
			token.getEnv().setTransientAttribute("searchService_ids", q);
40
			return "switch";
41
		}
42
	}
43

  
44
}
modules/dnet-openaireplus-workflows/tags/dnet-openaireplus-workflows-6.3.19/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/index/SwitchSearchServiceJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.index;
2

  
3
import com.googlecode.sarasvati.NodeToken;
4
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
5
import eu.dnetlib.msro.rmi.MSROException;
6
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
7
import org.apache.commons.lang.StringUtils;
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10

  
11
/**
12
 * SwitchSearchServiceJobNode switches the index used by a SearchService whose profile id is in 'search_service_ID' env param.
13
 *
14
 * @author claudio, alessia
15
 */
16
public class SwitchSearchServiceJobNode extends BlackboardJobNode {
17

  
18
	private static final Log log = LogFactory.getLog(SwitchSearchServiceJobNode.class);
19
	private static final String BB_ACTION_SWITCH_INDEX = "UpdateIndex";
20

  
21
	private String inputIndexIdParam;
22

  
23
	private String outputIndexIdParam;
24

  
25
	@Override
26
	protected String obtainServiceId(final NodeToken token) {
27
		final String id = token.getEnv().getAttribute("search_service_ID");
28
		if (StringUtils.isBlank(id)) throw new RuntimeException("No id found in env attribute 'search_service_ID'");
29
		else return id;
30
	}
31

  
32
	@Override
33
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
34
		job.setAction(BB_ACTION_SWITCH_INDEX);
35
		String indexId = token.getEnv().getAttribute(getInputIndexIdParam());
36

  
37
		checkParam(getInputIndexIdParam(), "output indexId param is missing");
38
		checkParam(indexId, "indexId is required to perform switch");
39
		log.info("Switching " + obtainServiceId(token) + " to index " + indexId);
40
		job.getParameters().put(getOutputIndexIdParam(), indexId);
41
	}
42

  
43
	private void checkParam(final String param, final String msg) throws MSROException {
44
		if (StringUtils.isBlank(param)) throw new MSROException(msg);
45
	}
46

  
47
	public String getInputIndexIdParam() {
48
		return inputIndexIdParam;
49
	}
50

  
51
	public void setInputIndexIdParam(final String inputIndexIdParam) {
52
		this.inputIndexIdParam = inputIndexIdParam;
53
	}
54

  
55
	public String getOutputIndexIdParam() {
56
		return outputIndexIdParam;
57
	}
58

  
59
	public void setOutputIndexIdParam(final String outputIndexIdParam) {
60
		this.outputIndexIdParam = outputIndexIdParam;
61
	}
62

  
63
}
modules/dnet-openaireplus-workflows/tags/dnet-openaireplus-workflows-6.3.19/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/index/SwitchIndexJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.index;
2

  
3
import java.util.List;
4

  
5
import com.googlecode.sarasvati.NodeToken;
6
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
7
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
8
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
9
import eu.dnetlib.msro.rmi.MSROException;
10
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
11
import org.apache.commons.lang.StringUtils;
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14

  
15
/**
16
 * SwitchIndexJobNode performs index switch.
17
 *
18
 * @author claudio
19
 * @see eu.dnetlib.msro.openaireplus.workflows.nodes.index.SwitchSearchServiceJobNode
20
 * @see eu.dnetlib.msro.openaireplus.workflows.nodes.index.SwitchIndexesJobNode
21
 * @deprecated
22
 */
23
@Deprecated
24
public class SwitchIndexJobNode extends BlackboardJobNode {
25

  
26
	private static final Log log = LogFactory.getLog(SwitchIndexJobNode.class);
27
	private static final String BB_ACTION_SWITCH_INDEX = "UpdateIndex";
28

  
29
	private String inputIndexIdParam;
30

  
31
	private String outputIndexIdParam;
32

  
33
	private String xqueryForServiceIdParam;
34

  
35
	@Override
36
	protected String obtainServiceId(final NodeToken token) {
37
		final String xquery = token.getEnv().getAttribute(getXqueryForServiceIdParam());
38
		List<String> searchServiceIds;
39
		try {
40
			searchServiceIds = getServiceLocator().getService(ISLookUpService.class).quickSearchProfile(xquery);
41
			if (searchServiceIds.size() > 1) throw new RuntimeException("Too many SearchService ids found using query: " + xquery);
42
			if (searchServiceIds.size() < 1) throw new RuntimeException("SearchService id not found using query: " + xquery);
43
			return searchServiceIds.get(0);
44
		} catch (ISLookUpException e) {
45
			throw new RuntimeException(e);
46
		}
47
	}
48

  
49
	@Override
50
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
51
		job.setAction(BB_ACTION_SWITCH_INDEX);
52
		String indexId = token.getEnv().getAttribute(getInputIndexIdParam());
53

  
54
		checkParam(getInputIndexIdParam(), "output indexId param is missing");
55
		checkParam(indexId, "indexId is required to perform switch");
56
		log.info("Switching " + obtainServiceId(token) + " to index " + indexId);
57
		job.getParameters().put(getOutputIndexIdParam(), indexId);
58
	}
59

  
60
	private void checkParam(final String param, final String msg) throws MSROException {
61
		if (StringUtils.isBlank(param)) throw new MSROException(msg);
62
	}
63

  
64
	public String getInputIndexIdParam() {
65
		return inputIndexIdParam;
66
	}
67

  
68
	public void setInputIndexIdParam(final String inputIndexIdParam) {
69
		this.inputIndexIdParam = inputIndexIdParam;
70
	}
71

  
72
	public String getOutputIndexIdParam() {
73
		return outputIndexIdParam;
74
	}
75

  
76
	public void setOutputIndexIdParam(final String outputIndexIdParam) {
77
		this.outputIndexIdParam = outputIndexIdParam;
78
	}
79

  
80
	public String getXqueryForServiceIdParam() {
81
		return xqueryForServiceIdParam;
82
	}
83

  
84
	public void setXqueryForServiceIdParam(final String xqueryForServiceIdParam) {
85
		this.xqueryForServiceIdParam = xqueryForServiceIdParam;
86
	}
87

  
88
}
modules/dnet-openaireplus-workflows/tags/dnet-openaireplus-workflows-6.3.19/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/IncrementalTransformationJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes;
2

  
3
import java.util.List;
4

  
5
import com.googlecode.sarasvati.NodeToken;
6
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
7
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
8
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
9
import eu.dnetlib.miscutils.datetime.DateUtils;
10
import org.apache.commons.logging.Log;
11
import org.apache.commons.logging.LogFactory;
12
import org.springframework.beans.factory.annotation.Autowired;
13

  
14
public class IncrementalTransformationJobNode extends IncrementalOperationJobNode {
15

  
16
    private static final Log log = LogFactory.getLog(IncrementalTransformationJobNode.class);
17

  
18
    @Autowired
19
    private UniqueServiceLocator locator;
20

  
21
    private static final DateUtils dateUtils = new DateUtils();
22

  
23
    @Override
24
    protected boolean forceRefresh(final NodeToken nodeToken, final Long lastSuccessEndDate, final String currentWfProfileId) throws Exception {
25
        if (lastSuccessEndDate < 0) {
26
            nodeToken.getFullEnv().setAttribute("OperationTypeInfo", "Last success date < 0, transformation forced to REFRESH");
27
            nodeToken.getFullEnv().setAttribute("operationType", "REFRESH");
28
            return true;
29
        }
30
        String trRuleId = getTransformationId(currentWfProfileId);
31
        final long lastUpdateDate = getLastTransformationRuleUpdate(trRuleId);
32
        log.info(String.format("Last update date of the transformation rule with id %s is %s", trRuleId, DateUtils.calculate_ISO8601(lastUpdateDate)));
33

  
34
        if (lastUpdateDate > lastSuccessEndDate) {
35
            nodeToken.getFullEnv().setAttribute("OperationTypeInfo", "Transformation Rule has been updated, transformation forced to REFRESH");
36
            nodeToken.getFullEnv().setAttribute("operationType", "REFRESH");
37
            return true;
38
        }
39
        return false;
40
    }
41

  
42

  
43

  
44

  
45

  
46
    private String getTransformationId(final String workflowId) throws ISLookUpException {
47

  
48
        final String query="for $x in collection('/db/DRIVER/WorkflowDSResources/WorkflowDSResourceType') where $x//RESOURCE_IDENTIFIER/@value='%s' " +
49
                "return $x//PARAM[./@category='TRANSFORMATION_RULE_ID']/text()";
50
        final ISLookUpService lookUpService = locator.getService(ISLookUpService.class);
51
        final String queryInstance = String.format(query, workflowId);
52
        log.debug("Query to find the Transformation Rule");
53
        List<String> transformationId = lookUpService.quickSearchProfile(queryInstance);
54
        if(transformationId== null || transformationId.isEmpty())
55
            throw new RuntimeException("Error unable to find the Transformation rule ID on workflow profile "+workflowId);
56
        return transformationId.get(0);
57
    }
58

  
59
    private Long getLastTransformationRuleUpdate(final String trId) throws ISLookUpException {
60
        final String query = "for $x in collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType') where $x//RESOURCE_IDENTIFIER/@value='%s' return $x//DATE_OF_CREATION/@value/string()";
61
        log.debug("retrieve creation date from transformation ID "+trId);
62
        final ISLookUpService lookUpService = locator.getService(ISLookUpService.class);
63
        final String queryInstance = String.format(query, trId);
64
        log.debug("Query to find the Transformation Rule");
65
        List<String> currentDate = lookUpService.quickSearchProfile(queryInstance);
66
        if(currentDate== null || currentDate.isEmpty())
67
            throw new RuntimeException("Error unable to find the creation date of the  Transformation rule "+trId);
68
        return dateUtils.parse(currentDate.get(0)).getTime();
69
    }
70
    
71
}
modules/dnet-openaireplus-workflows/tags/dnet-openaireplus-workflows-6.3.19/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/repohi/RetrieveInterfaceInfoJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.repohi;
2

  
3
import java.io.StringReader;
4
import javax.annotation.Resource;
5

  
6
import com.googlecode.sarasvati.Arc;
7
import com.googlecode.sarasvati.NodeToken;
8
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
9
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
10
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
11
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
12
import org.dom4j.Document;
13
import org.dom4j.Node;
14
import org.dom4j.io.SAXReader;
15

  
16
public class RetrieveInterfaceInfoJobNode extends SimpleJobNode {
17

  
18
	@Resource
19
	private UniqueServiceLocator serviceLocator;
20

  
21
	@Override
22
	protected String execute(final NodeToken token) throws Exception {
23

  
24
		String datasourceId = token.getFullEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ID);
25
		String interfaceID = token.getFullEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE);
26
		final String profile = serviceLocator.getService(ISLookUpService.class).getResourceProfile(datasourceId);
27
		final Document doc = new SAXReader().read(new StringReader(profile));
28
		final Node ifcNode = doc.selectSingleNode("//INTERFACE[@id='" + interfaceID + "']");
29

  
30
		String contentDescription = ifcNode.valueOf("./@contentDescription");
31
		token.getEnv().setAttribute("objectStoreContentDescription", contentDescription);
32

  
33
		final Node acProtNode = doc.selectSingleNode("//INTERFACE[@id='" + interfaceID + "']/ACCESS_PROTOCOL");
34

  
35
		String basePath = acProtNode.valueOf("./@basePath");
36
		token.getEnv().setAttribute("objectStoreBasePath", basePath);
37

  
38
		return Arc.DEFAULT_ARC;
39
	}
40

  
41
}
modules/dnet-openaireplus-workflows/tags/dnet-openaireplus-workflows-6.3.19/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/index/ContextLoader.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.index;
2

  
3
import javax.annotation.Resource;
4

  
5
import com.google.common.base.Joiner;
6
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
7
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
8
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
9
import eu.dnetlib.miscutils.functional.xml.IndentXmlString;
10
import org.apache.commons.logging.Log;
11
import org.apache.commons.logging.LogFactory;
12
import org.springframework.beans.factory.annotation.Required;
13

  
14
public class ContextLoader {
15

  
16
	/**
17
	 * logger.
18
	 */
19
	private static final Log log = LogFactory.getLog(ContextLoader.class); // NOPMD by marko on 11/24/08 5:02 PM
20

  
21
	private String xquery;
22

  
23
	@Resource
24
	private UniqueServiceLocator serviceLocator;
25

  
26
	public String load() throws ISLookUpException {
27

  
28
		log.info("loading ContextDSResources: " + getXquery());
29

  
30
		StringBuilder sb = new StringBuilder("<ContextDSResources>");
31
		Joiner.on("").appendTo(sb, serviceLocator.getService(ISLookUpService.class).quickSearchProfile(getXquery()));
32
		sb.append("</ContextDSResources>");
33

  
34
		log.debug("got ContextDSResources: \n" + IndentXmlString.apply(sb.toString()));
35

  
36
		return sb.toString();
37
	}
38

  
39
	public String getXquery() {
40
		return xquery;
41
	}
42

  
43
	@Required
44
	public void setXquery(final String xquery) {
45
		this.xquery = xquery;
46
	}
47

  
48
}
modules/dnet-openaireplus-workflows/tags/dnet-openaireplus-workflows-6.3.19/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/IncrementalOperationJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes;
2

  
3
import java.util.Iterator;
4
import java.util.Map;
5

  
6
import com.googlecode.sarasvati.Arc;
7
import com.googlecode.sarasvati.NodeToken;
8
import eu.dnetlib.common.logging.DnetLogger;
9
import eu.dnetlib.miscutils.datetime.DateUtils;
10
import eu.dnetlib.msro.rmi.MSROException;
11
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
12
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
13
import org.apache.commons.lang.math.NumberUtils;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.springframework.beans.factory.annotation.Autowired;
17

  
18
public class IncrementalOperationJobNode extends SimpleJobNode {
19

  
20
    private static final Log log = LogFactory.getLog(IncrementalOperationJobNode.class);
21

  
22
    //incremental or refresh
23
    private String operationType;
24

  
25
    @Autowired
26
    private DnetLogger dnetLogger;
27

  
28
    @Override
29
    protected String execute(NodeToken nodeToken) throws Exception {
30

  
31
        if ("incremental".equalsIgnoreCase(operationType)) {
32
            final String currentWfProfileId = findCurrentWfProfileId(nodeToken);
33
            final Long lastSuccessEndDate = findLastSuccessEndDate(currentWfProfileId);
34
            if(forceRefresh(nodeToken, lastSuccessEndDate, currentWfProfileId)) return Arc.DEFAULT_ARC;
35
            log.info("Last success date "+ lastSuccessEndDate);
36

  
37
            nodeToken.getFullEnv().setAttribute("OperationTypeInfo", "Operation type set to INCREMENTAL with date "+DateUtils.calculate_ISO8601(lastSuccessEndDate));
38
            nodeToken.getFullEnv().setAttribute("operationType", "INCREMENTAL");
39
            nodeToken.getFullEnv().setAttribute("DateFromFilter", lastSuccessEndDate);            
40
            return Arc.DEFAULT_ARC;
41
        }
42
        nodeToken.getFullEnv().setAttribute("operationType", "REFRESH");
43
        nodeToken.getFullEnv().setAttribute("OperationTypeInfo", "Operation type manually set to REFRESH");
44
        return Arc.DEFAULT_ARC;
45
    }
46

  
47
    protected boolean forceRefresh(final NodeToken nodeToken, final Long lastSuccessEndDate, final String currentWfProfileId) throws Exception {
48
        if (lastSuccessEndDate < 0) {
49
            nodeToken.getFullEnv().setAttribute("OperationTypeInfo", "Last success date < 0, operation forced to REFRESH");
50
            nodeToken.getFullEnv().setAttribute("operationType", "REFRESH");
51
            return true;
52
        }
53
        return false;
54
    }
55

  
56
    private Long findLastSuccessEndDate(String profId) {
57
        long res = -1;
58

  
59
        final Iterator<Map<String, String>> iter = dnetLogger.find(WorkflowsConstants.SYSTEM_WF_PROFILE_ID, profId);
60
        while (iter.hasNext()) {
61
            final Map<String, String> map = iter.next();
62
            log.debug("Iterating on the logs");
63
            if ("true".equalsIgnoreCase(map.get(WorkflowsConstants.SYSTEM_COMPLETED_SUCCESSFULLY))) {
64
                final long curr = NumberUtils.toLong(map.get(WorkflowsConstants.SYSTEM_END_DATE), -1);
65
                if (curr > res) {
66
                    res = curr;
67
                }
68
            }
69
        }
70
        return res;
71
    }
72

  
73
    private String findCurrentWfProfileId(NodeToken token) throws MSROException {
74
        log.debug("Start to find the current profile Id");
75
        final String p1 = token.getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
76
        if (p1 != null && !p1.isEmpty()) {
77
            log.debug("The profile Id found is "+p1);
78
            return p1;
79
        }
80
        final String p2 = token.getFullEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
81
        if (p2 != null && !p2.isEmpty()) {
82
            log.debug("The profile Id found is "+p2);
83
            return p2;
84
        }
85
        final String p3 = token.getProcess().getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
86
        if (p3 != null && !p3.isEmpty()) {
87
            log.debug("The profile Id found is "+p3);
88
            return p3;
89
        }
90
        throw new MSROException("Missing property in env: " + WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
91
    }
92

  
93
    public String getOperationType() {
94
        return operationType;
95
    }
96

  
97
    public void setOperationType(final String operationType) {
98
        this.operationType = operationType;
99
    }
100
}
modules/dnet-openaireplus-workflows/tags/dnet-openaireplus-workflows-6.3.19/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/repobye/DeleteOpenaireMetaWfJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.repobye;
2

  
3
import eu.dnetlib.enabling.datasources.common.LocalDatasourceManager;
4
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
5
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
6
import eu.dnetlib.msro.workflows.nodes.repobye.DeleteMetaWfJobNode;
7
import org.springframework.beans.factory.annotation.Autowired;
8

  
9
public class DeleteOpenaireMetaWfJobNode extends DeleteMetaWfJobNode {
10

  
11
	@Autowired
12
	private UniqueServiceLocator serviceLocator;
13

  
14
	@Autowired
15
	private LocalDatasourceManager dsManager;
16

  
17
	@Override
18
	protected void setActivationStatus(final String dsId, final String ifaceId, final boolean active) throws Exception {
19

  
20
		final String openaireDsId = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(
21
				"/*[.//RESOURCE_IDENTIFIER/@value = '" + dsId + "']//FIELD/value[../key='OpenAireDataSourceId']/text()");
22

  
23
		if (openaireDsId.equals("openaire____::bootstrap")) {
24
			dsManager.setActive(dsId, ifaceId, active);
25
		} else {
26
			dsManager.setActive(openaireDsId, ifaceId, active);
27
		}
28
	}
29
}
modules/dnet-openaireplus-workflows/tags/dnet-openaireplus-workflows-6.3.19/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/datacite/SplitterDatasetsIterator.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.datacite;
2

  
3
import java.io.ByteArrayInputStream;
4
import java.io.InputStream;
5
import java.io.StringWriter;
6
import java.nio.charset.StandardCharsets;
7
import java.util.ArrayList;
8
import java.util.List;
9
import java.util.Map;
10
import java.util.Queue;
11
import javax.xml.XMLConstants;
12
import javax.xml.parsers.DocumentBuilderFactory;
13
import javax.xml.transform.Transformer;
14
import javax.xml.transform.TransformerFactory;
15
import javax.xml.transform.dom.DOMSource;
16
import javax.xml.transform.stream.StreamResult;
17
import javax.xml.xpath.XPath;
18
import javax.xml.xpath.XPathConstants;
19
import javax.xml.xpath.XPathFactory;
20

  
21
import com.google.common.collect.Maps;
22
import org.apache.commons.lang.StringUtils;
23
import org.apache.commons.logging.Log;
24
import org.apache.commons.logging.LogFactory;
25
import org.w3c.dom.*;
26

  
27
/**
28
 * The Class SplitterDatasetsIterator.
29
 */
30
public class SplitterDatasetsIterator {
31

  
32
	/**
33
	 * The Constant log.
34
	 */
35
	private static final Log log = LogFactory.getLog(SplitterDatasetsIterator.class);
36

  
37
	/**
38
	 * The end queue.
39
	 */
40
	public static String END_QUEUE = "END_QUEUE";
41

  
42
	/**
43
	 * The publications.
44
	 */
45
	private Queue<String> publications;
46

  
47
	/**
48
	 * The input epr.
49
	 */
50
	private Iterable<String> inputEPR;
51

  
52
	/**
53
	 * The root name.
54
	 */
55
	private String rootName;
56

  
57
	/**
58
	 * Instantiates a new splitter datasets iterator.
59
	 *
60
	 * @param publicationsQueue the publications queue
61
	 * @param inputEPR          the input epr
62
	 * @param rootName          the root name
63
	 */
64
	public SplitterDatasetsIterator(final Queue<String> publicationsQueue, final Iterable<String> inputEPR, final String rootName) {
65
		this.publications = publicationsQueue;
66
		this.inputEPR = inputEPR;
67
		this.rootName = rootName;
68
	}
69

  
70
	/**
71
	 * Populate queues.
72
	 */
73
	public void populateQueues() {
74

  
75
		if (this.inputEPR == null) return;
76
		for (String inputXML : inputEPR) {
77

  
78
			final ByteArrayInputStream bais = new ByteArrayInputStream(inputXML.getBytes(StandardCharsets.UTF_8));
79
			final List<String> publication_extracted = extractByTag(bais, "publication");
80

  
81
			if (publication_extracted != null) {
82
				publications.addAll(publication_extracted);
83

  
84
			}
85
		}
86
		publications.add(END_QUEUE);
87
	}
88

  
89
	/**
90
	 * Extract by tag.
91
	 *
92
	 * @param inputXML the input xml
93
	 * @param tag      the tag
94
	 * @return the list
95
	 */
96
	private List<String> extractByTag(final InputStream inputXML, final String tag) {
97
		try {
98

  
99
			DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
100
			Document doc = dbf.newDocumentBuilder().parse(inputXML);
101

  
102
			XPath xpath = XPathFactory.newInstance().newXPath();
103

  
104
			Node rootNode = (Node) xpath.evaluate("//*[local-name()='" + this.rootName + "']", doc, XPathConstants.NODE);
105

  
106
			NamedNodeMap attributes = rootNode.getAttributes();
107

  
108
			Map<String, String> nameSpaces = Maps.newHashMap();
109

  
110
			for (int i = 0; i < attributes.getLength(); i++) {
111
				Node node = attributes.item(i);
112
				String name = node.getNodeName();
113
				if (name.startsWith("xmlns:")) {
114
					nameSpaces.put(StringUtils.substringAfter(name, "xmlns:"), node.getNodeValue());
115
				}
116

  
117
			}
118
			xpath = XPathFactory.newInstance().newXPath();
119
			NodeList nodes = (NodeList) xpath.evaluate("//*[local-name()='" + tag + "']/*[local-name()='record']", doc, XPathConstants.NODESET);
120

  
121
			if ((nodes != null) && (nodes.getLength() > 0)) {
122
				List<String> result = new ArrayList<>();
123
				for (int i = 0; i < nodes.getLength(); i++) {
124
					Document currentDoc = dbf.newDocumentBuilder().newDocument();
125
					Node imported = currentDoc.importNode(nodes.item(i), true);
126
					for (String key : nameSpaces.keySet()) {
127
						Element element = (Element) imported;
128
						element.setAttributeNS(XMLConstants.XMLNS_ATTRIBUTE_NS_URI, "xmlns:" + key, nameSpaces.get(key));
129
					}
130
					Transformer transformer = TransformerFactory.newInstance().newTransformer();
131
					DOMSource mydoc = new DOMSource(imported);
132
					StringWriter writer = new StringWriter();
133
					transformer.transform(mydoc, new StreamResult(writer));
134
					String record = writer.toString();
135
					result.add(record);
136
				}
137
				return result;
138
			}
139
		} catch (Exception e) {
140
			log.error("Error on extracting " + tag, e);
141
			return null;
142
		}
143
		return null;
144
	}
145
}
modules/dnet-openaireplus-workflows/tags/dnet-openaireplus-workflows-6.3.19/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/consistency/FixRepoMdstoreSizesJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.consistency;
2

  
3
import java.io.IOException;
4
import java.io.StringReader;
5
import java.util.Date;
6
import java.util.HashMap;
7
import java.util.List;
8
import java.util.Map;
9
import java.util.Objects;
10
import java.util.Set;
11
import java.util.stream.Collectors;
12

  
13
import javax.annotation.Resource;
14

  
15
import org.antlr.stringtemplate.StringTemplate;
16
import org.apache.commons.io.IOUtils;
17
import org.apache.commons.lang.math.NumberUtils;
18
import org.apache.commons.lang3.StringUtils;
19
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21
import org.dom4j.Document;
22
import org.dom4j.DocumentException;
23
import org.dom4j.io.SAXReader;
24
import org.springframework.beans.factory.annotation.Autowired;
25

  
26
import com.googlecode.sarasvati.Arc;
27
import com.googlecode.sarasvati.NodeToken;
28

  
29
import eu.dnetlib.data.mdstore.MDStoreServiceException;
30
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
31
import eu.dnetlib.data.objectstore.modular.connector.ObjectStoreDao;
32
import eu.dnetlib.data.objectstore.rmi.ObjectStoreServiceException;
33
import eu.dnetlib.enabling.datasources.LocalOpenaireDatasourceManager;
34
import eu.dnetlib.enabling.datasources.common.Api;
35
import eu.dnetlib.enabling.datasources.common.ApiParam;
36
import eu.dnetlib.enabling.datasources.common.DsmException;
37
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
38
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
39
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException;
40
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
41
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
42
import eu.dnetlib.miscutils.datetime.DateUtils;
43
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
44
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
45
import eu.dnetlib.msro.workflows.util.ProgressProvider;
46
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
47

  
48
public class FixRepoMdstoreSizesJobNode extends SimpleJobNode implements ProgressJobNode {
49

  
50
	@Autowired
51
	private LocalOpenaireDatasourceManager dsManager;
52

  
53
	@Autowired
54
	private UniqueServiceLocator serviceLocator;
55

  
56
	@Resource(name = "mongodbMDStoreDao")
57
	private MDStoreDao mdstoreDao;
58

  
59
	@Autowired
60
	private ObjectStoreDao objectStoreDao;
61

  
62
	private final DateUtils dateUtils = new DateUtils();
63

  
64
	private int current = 0;
65
	private int total = 0;
66

  
67
	private ISRegistryService registry;
68
	private ISLookUpService lookup;
69

  
70
	private final Map<String, String> openaireIds = new HashMap<>();
71
	private boolean alwaysUpdate = false;
72

  
73
	private static final Log log = LogFactory.getLog(FixRepoMdstoreSizesJobNode.class);
74

  
75
	public void init(final int total) {
76
		this.current = 0;
77
		this.total = total;
78
		this.lookup = serviceLocator.getService(ISLookUpService.class);
79
		this.registry = serviceLocator.getService(ISRegistryService.class);
80
		try {
81
			openaireIds.putAll(lookup.quickSearchProfile(
82
					"for $x in collection('/db/DRIVER/RepositoryServiceResources/RepositoryServiceResourceType') return concat($x//DATASOURCE_ORIGINAL_ID, ' @@@ ', $x//RESOURCE_IDENTIFIER/@value)")
83
					.stream()
84
					.collect(Collectors.toMap(
85
							s -> StringUtils.substringBefore(s, "@@@").trim(),
86
							s -> StringUtils.substringAfter(s, "@@@").trim())));
87
		} catch (final ISLookUpException e) {
88
			// TODO Auto-generated catch block
89
			e.printStackTrace();
90
		}
91

  
92
	}
93

  
94
	@Override
95
	protected String execute(final NodeToken token) throws Exception {
96
		final Set<String> list = dsManager.listManagedDatasourceIds();
97

  
98
		init(list.size());
99

  
100
		for (final String dsId : list) {
101
			log.info("Processing ds: " + dsId);
102

  
103
			current++;
104

  
105
			try {
106
				for (final Api<ApiParam> api : dsManager.getApis(dsId)) {
107
					verifyApi(dsId, api);
108
				}
109
			} catch (final Throwable e) {
110
				log.error("Error processing ds: " + dsId, e);
111
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR + "::" + dsId, e.getMessage());
112
			}
113
		}
114

  
115
		return Arc.DEFAULT_ARC;
116
	}
117

  
118
	private void verifyApi(final String dsId, final Api<ApiParam> api)
119
			throws DsmException, ISRegistryException, IOException, ISLookUpException, MDStoreServiceException, ObjectStoreServiceException {
120

  
121
		for (final Document doc : listCollectionMdStores(dsId, api.getId())) {
122
			final String mdId = doc.valueOf("//RESOURCE_IDENTIFIER/@value");
123
			final int actualSize = NumberUtils.toInt(doc.valueOf("//NUMBER_OF_RECORDS"), 0);
124
			final int size = mdstoreDao.getMDStore(mdId).getSize();
125
			if (alwaysUpdate || size != actualSize) {
126
				log.info("  -- Updating size of api " + api.getId() + ", new value = " + size);
127
				updateMdStoreProfile(mdId, doc, size);
128
				dsManager.setLastCollectionInfo(dsId, api.getId(), mdId, size, calculateLastDate(doc));
129
			}
130
		}
131

  
132
		for (final Document doc : listTransformationMdStores(dsId, api.getId())) {
133
			final String mdId = doc.valueOf("//RESOURCE_IDENTIFIER/@value");
134
			final int actualSize = NumberUtils.toInt(doc.valueOf("//NUMBER_OF_RECORDS"), 0);
135
			final int size = mdstoreDao.getMDStore(mdId).getSize();
136
			if (alwaysUpdate || size != actualSize) {
137
				updateMdStoreProfile(mdId, doc, size);
138
				dsManager.setLastAggregationInfo(dsId, api.getId(), mdId, size, calculateLastDate(doc));
139
			}
140
		}
141

  
142
		for (final Document doc : listDownloadObjectStores(dsId, api.getId())) {
143
			final String objId = doc.valueOf("//RESOURCE_IDENTIFIER/@value");
144
			final int actualSize = NumberUtils.toInt(doc.valueOf("//STORE_SIZE"), 0);
145
			final int size = objectStoreDao.getObjectStore(objId).getSize();
146
			if (alwaysUpdate || size != actualSize) {
147
				updateObjStoreProfile(objId, doc, size);
148
				dsManager.setLastDownloadInfo(dsId, api.getId(), objId, size, calculateLastDate(doc));
149
			}
150
		}
151
	}
152

  
153
	private Date calculateLastDate(final Document doc) {
154
		final String dateS = doc.valueOf("//LAST_STORAGE_DATE");
155
		final Date date = StringUtils.isNoneBlank(dateS) ? dateUtils.parse(dateS) : new Date();
156
		return date;
157
	}
158

  
159
	private List<Document> listCollectionMdStores(final String dsId, final String apiId) throws ISLookUpException, IOException {
160
		return executeXquery("listCollectionMdStores.xquery.st", dsId, apiId);
161
	}
162

  
163
	private List<Document> listTransformationMdStores(final String dsId, final String apiId) throws ISLookUpException, IOException {
164
		return executeXquery("listTransformationMdStores.xquery.st", dsId, apiId);
165
	}
166

  
167
	private List<Document> listDownloadObjectStores(final String dsId, final String apiId) throws ISLookUpException, IOException {
168
		return executeXquery("listDownloadObjectStores.xquery.st", dsId, apiId);
169
	}
170

  
171
	private List<Document> executeXquery(final String template, final String dsId, final String apiId) throws ISLookUpException, IOException {
172
		final StringTemplate st = new StringTemplate(IOUtils.toString(getClass().getResourceAsStream(template)));
173
		st.setAttribute("dsId", openaireIds.get(dsId));
174
		st.setAttribute("apiId", apiId);
175

  
176
		final SAXReader reader = new SAXReader();
177

  
178
		return lookup.quickSearchProfile(st.toString())
179
				.stream()
180
				.map(s -> {
181
					try {
182
						return reader.read(new StringReader(s));
183
					} catch (final DocumentException e) {
184
						return null;
185
					}
186
				})
187
				.filter(Objects::nonNull)
188
				.collect(Collectors.toList());
189
	}
190

  
191
	private void updateMdStoreProfile(final String mdId, final Document doc, final int size) throws ISRegistryException {
192
		doc.selectSingleNode("//NUMBER_OF_RECORDS").setText(Integer.toString(size));
193
		registry.updateProfile(mdId, doc.asXML(), "MDStoreDSResourceType");
194
	}
195

  
196
	private void updateObjStoreProfile(final String objId, final Document doc, final int size) throws ISRegistryException {
197
		doc.selectSingleNode("//COUNT_STORE").setText(Integer.toString(size));
198
		doc.selectSingleNode("//STORE_SIZE").setText(Integer.toString(size));
199
		registry.updateProfile(objId, doc.asXML(), "ObjectStoreDSResourceType");
200
	}
201

  
202
	public boolean isAlwaysUpdate() {
203
		return alwaysUpdate;
204
	}
205

  
206
	public void setAlwaysUpdate(final boolean alwaysUpdate) {
207
		this.alwaysUpdate = alwaysUpdate;
208
	}
209

  
210
	@Override
211
	public ProgressProvider getProgressProvider() {
212
		return new ProgressProvider() {
213

  
214
			@Override
215
			public int getTotalValue() {
216
				return total;
217
			}
218

  
219
			@Override
220
			public int getCurrentValue() {
221
				return current;
222
			}
223

  
224
			@Override
225
			public boolean isInaccurate() {
226
				return false;
227
			}
228
		};
229
	}
230

  
231
}
modules/dnet-openaireplus-workflows/tags/dnet-openaireplus-workflows-6.3.19/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hostedby/PatchHostedBy.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.hostedby;
2

  
3
import java.io.StringReader;
4
import java.util.Map;
5

  
6
import eu.dnetlib.miscutils.functional.UnaryFunction;
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.dom4j.Document;
10
import org.dom4j.Element;
11
import org.dom4j.Node;
12
import org.dom4j.io.SAXReader;
13

  
14
/**
15
 * The Class PatchHostedBy.
16
 */
17
public class PatchHostedBy implements UnaryFunction<String, String> {
18

  
19
	/**
20
	 * The set spec hosted by map.
21
	 */
22
	private Map<String, HostedByEntry> setSpecHostedByMap;
23

  
24
	/**
25
	 * The counters.
26
	 */
27
	private HostedByCounters counters;
28

  
29
	/**
30
	 * The xpath.
31
	 */
32
	private String xpath;
33

  
34
	/**
35
	 * The reader.
36
	 */
37
	private final SAXReader reader = new SAXReader();
38

  
39
	/**
40
	 * The Constant log.
41
	 */
42
	private static final Log log = LogFactory.getLog(PatchHostedBy.class);
43

  
44
	/**
45
	 * Instantiates a new patch hosted by.
46
	 *
47
	 * @param setSpecHostedByMap the set spec hosted by map
48
	 * @param xpath              the xpath
49
	 * @param counters           the counters
50
	 */
51
	public PatchHostedBy(final Map<String, HostedByEntry> setSpecHostedByMap, final String xpath, final HostedByCounters counters) {
52
		this.setSpecHostedByMap = setSpecHostedByMap;
53
		this.xpath = xpath;
54
		this.counters = counters;
55
		if (log.isDebugEnabled()) {
56
			log.debug("****************************************");
57
			log.debug("SetSpec/hostedBy map:");
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff