Project

General

Profile

« Previous | Next » 

Revision 31415

refactored submit job nodes, introducing AdminJobNodes

View differences:

modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hbase/CreateHBaseTableJobNode.java
4 4

  
5 5
import org.apache.commons.logging.Log;
6 6
import org.apache.commons.logging.LogFactory;
7
import org.springframework.beans.factory.annotation.Autowired;
8 7

  
9 8
import com.googlecode.sarasvati.Arc;
10 9
import com.googlecode.sarasvati.NodeToken;
11 10

  
12
import eu.dnetlib.data.hadoop.rmi.HadoopService;
13
import eu.dnetlib.enabling.tools.ServiceLocator;
14 11
import eu.dnetlib.msro.openaireplus.workflows.hbase.HBaseTableUtils;
15
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
16 12

  
17
public class CreateHBaseTableJobNode extends SimpleJobNode {
13
public class CreateHBaseTableJobNode extends AbstractHBaseAdminJobNode {
18 14

  
19 15
	private static final Log log = LogFactory.getLog(CreateHBaseTableJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
20 16

  
21
	private String hbaseTableProperty;
22
	private String cluster;
23

  
24 17
	private String xqueryForColumnsProperty;
25 18

  
26
	@Autowired
27
	private ServiceLocator<HadoopService> hadoopServiceLocator;
28

  
29 19
	@Override
30 20
	protected String execute(final NodeToken token) throws Exception {
31 21
		final Set<String> columns = HBaseTableUtils.listColumns();
32
		final String tableName = tableName();
22
		final String tableName = tableName(token);
23
		final String cluster = cluster(token);
33 24

  
34
		log.debug("Ensuring table " + tableName + " - columns: " + columns);
25
		log.debug("Ensuring table " + tableName + " on cluster: '" + cluster + "' - columns: " + columns);
35 26

  
36 27
		hadoopServiceLocator.getService().createHbaseTable(cluster, tableName, columns);
37 28

  
38 29
		return Arc.DEFAULT_ARC;
39 30
	}
40 31

  
41
	private String tableName() {
42
		return getPropertyFetcher().getProperty(getHbaseTableProperty());
43
	}
44

  
45
	public String getCluster() {
46
		return cluster;
47
	}
48

  
49
	public void setCluster(final String cluster) {
50
		this.cluster = cluster;
51
	}
52

  
53
	public String getHbaseTableProperty() {
54
		return hbaseTableProperty;
55
	}
56

  
57
	public void setHbaseTableProperty(String hbaseTableProperty) {
58
		this.hbaseTableProperty = hbaseTableProperty;
59
	}
60

  
61 32
	public String getXqueryForColumnsProperty() {
62 33
		return xqueryForColumnsProperty;
63 34
	}
64 35

  
65
	public void setXqueryForColumnsProperty(String xqueryForColumnsProperty) {
36
	public void setXqueryForColumnsProperty(final String xqueryForColumnsProperty) {
66 37
		this.xqueryForColumnsProperty = xqueryForColumnsProperty;
67 38
	}
68 39

  
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hbase/SubmitHadoopJobNode.java
45 45

  
46 46
		job.setAction(type);
47 47
		job.getParameters().put("job.name", getHadoopJob());
48
		job.getParameters().put("cluster", getCluster());
48
		job.getParameters().put("cluster", cluster(token));
49 49

  
50 50
		job.getParameters().putAll(parseJsonParameters(token));
51 51
	}
52 52

  
53
	private String cluster(final NodeToken token) {
54
		if (token.getEnv().hasAttribute("cluster")) {
55
			String cluster = token.getEnv().getAttribute("cluster");
56
			log.info("found override value in wfEnv for 'cluster' param: " + cluster);
57
			return cluster;
58
		}
59
		return getCluster();
60
	}
61

  
53 62
	/**
54 63
	 * reads the job type for the given job name
55
	 * 
64
	 *
56 65
	 * @param jobName
57 66
	 * @return
58 67
	 * @throws ISLookUpException
59 68
	 */
60
	private String getJobType(String jobName) throws ISLookUpException {
61
		List<String> res = lookupLocator.getService().quickSearchProfile(
62
				"/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'HadoopJobConfigurationDSResourceType']//HADOOP_JOB[./@name='" + jobName + "']/@type/string()");
63
		if (res.isEmpty()) { throw new IllegalStateException("unable to find job type for job: " + jobName); }
69
	private String getJobType(final String jobName) throws ISLookUpException {
70
		List<String> res =
71
				lookupLocator.getService().quickSearchProfile(
72
						"/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'HadoopJobConfigurationDSResourceType']//HADOOP_JOB[./@name='" + jobName
73
								+ "']/@type/string()");
74
		if (res.isEmpty()) throw new IllegalStateException("unable to find job type for job: " + jobName);
64 75

  
65 76
		final HadoopJobType type = HadoopJobType.valueOf(Iterables.getOnlyElement(res));
66 77

  
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hbase/DropHBaseTableJobNode.java
2 2

  
3 3
import org.apache.commons.logging.Log;
4 4
import org.apache.commons.logging.LogFactory;
5
import org.springframework.beans.factory.annotation.Autowired;
6 5

  
7 6
import com.googlecode.sarasvati.Arc;
8 7
import com.googlecode.sarasvati.NodeToken;
9 8

  
10
import eu.dnetlib.data.hadoop.config.ClusterName;
11
import eu.dnetlib.data.hadoop.rmi.HadoopService;
12
import eu.dnetlib.enabling.tools.ServiceLocator;
13
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
9
/**
10
 * The Class DropHBaseTableJobNode.
11
 */
12
public class DropHBaseTableJobNode extends AbstractHBaseAdminJobNode {
14 13

  
15
public class DropHBaseTableJobNode extends SimpleJobNode {
16

  
14
	/** The Constant log. */
17 15
	private static final Log log = LogFactory.getLog(DropHBaseTableJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
18 16

  
19
	private String hbaseTableProperty;
20
	private String cluster;
21

  
22
	@Autowired
23
	private ServiceLocator<HadoopService> hadoopServiceLocator;
24

  
17
	/*
18
	 * (non-Javadoc)
19
	 * 
20
	 * @see eu.dnetlib.msro.workflows.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken)
21
	 */
25 22
	@Override
26 23
	protected String execute(final NodeToken token) throws Exception {
27 24

  
28
		final String tableName = tableName();
25
		final String tableName = tableName(token);
26
		final String cluster = cluster(token);
29 27

  
30
		log.info("Dropping hbase table " + tableName);
28
		log.info("Dropping hbase table '" + tableName + "' on cluster: '" + cluster + "'");
31 29

  
32
		HadoopService hadoopService = hadoopServiceLocator.getService();
33
		String clusterName = ClusterName.valueOf(getCluster()).toString();
34
		hadoopService.dropHbaseTable(clusterName, tableName);
30
		hadoopServiceLocator.getService().dropHbaseTable(cluster, tableName);
35 31

  
36 32
		return Arc.DEFAULT_ARC;
37 33
	}
38 34

  
39
	private String tableName() {
40
		return getPropertyFetcher().getProperty(getHbaseTableProperty());
41
	}
42

  
43
	public String getCluster() {
44
		return cluster;
45
	}
46

  
47
	public void setCluster(final String cluster) {
48
		this.cluster = cluster;
49
	}
50

  
51
	public String getHbaseTableProperty() {
52
		return hbaseTableProperty;
53
	}
54

  
55
	public void setHbaseTableProperty(String hbaseTableProperty) {
56
		this.hbaseTableProperty = hbaseTableProperty;
57
	}
58

  
59 35
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hbase/AbstractHBaseAdminJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.hbase;
2

  
3
import java.util.Map.Entry;
4

  
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.springframework.beans.factory.annotation.Autowired;
8

  
9
import com.googlecode.sarasvati.NodeToken;
10

  
11
import eu.dnetlib.data.hadoop.rmi.HadoopService;
12
import eu.dnetlib.enabling.tools.ServiceLocator;
13
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
14

  
15
public abstract class AbstractHBaseAdminJobNode extends SimpleJobNode {
16

  
17
	/** The Constant log. */
18
	private static final Log log = LogFactory.getLog(AbstractHBaseAdminJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
19

  
20
	private String hbaseTableProperty;
21
	private String cluster;
22

  
23
	@Autowired
24
	protected ServiceLocator<HadoopService> hadoopServiceLocator;
25

  
26
	@Override
27
	protected void beforeStart(final NodeToken token) {
28
		for (Entry<String, String> e : parseJsonParameters(token).entrySet()) {
29
			token.getEnv().setAttribute(e.getKey(), e.getValue());
30
		}
31
	}
32

  
33
	protected String tableName(final NodeToken token) {
34
		if (token.getEnv().hasAttribute("hbaseTable")) {
35
			String table = token.getEnv().getAttribute("hbaseTable");
36
			log.info("found override value in wfEnv for 'hbaseTable' param: " + table);
37
			return table;
38
		}
39
		return getPropertyFetcher().getProperty(getHbaseTableProperty());
40
	}
41

  
42
	protected String cluster(final NodeToken token) {
43
		if (token.getEnv().hasAttribute("cluster")) {
44
			String cluster = token.getEnv().getAttribute("cluster");
45
			log.info("found override value in wfEnv for 'cluster' param: " + cluster);
46
			return cluster;
47
		}
48
		return getCluster();
49
	}
50

  
51
	public String getCluster() {
52
		return cluster;
53
	}
54

  
55
	public void setCluster(final String cluster) {
56
		this.cluster = cluster;
57
	}
58

  
59
	public String getHbaseTableProperty() {
60
		return hbaseTableProperty;
61
	}
62

  
63
	public void setHbaseTableProperty(final String hbaseTableProperty) {
64
		this.hbaseTableProperty = hbaseTableProperty;
65
	}
66

  
67
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hbase/PrepareCopyTableJobNode.java
1 1
package eu.dnetlib.msro.openaireplus.workflows.nodes.hbase;
2 2

  
3 3
import java.util.Map;
4
import java.util.Set;
4 5

  
5 6
import javax.annotation.Resource;
6 7

  
......
8 9
import org.apache.commons.logging.Log;
9 10
import org.apache.commons.logging.LogFactory;
10 11

  
12
import com.google.common.collect.Sets;
11 13
import com.googlecode.sarasvati.Arc;
12 14
import com.googlecode.sarasvati.NodeToken;
13 15

  
14 16
import eu.dnetlib.data.hadoop.rmi.HadoopService;
17
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
15 18
import eu.dnetlib.enabling.tools.ServiceLocator;
16 19
import eu.dnetlib.msro.rmi.MSROException;
17 20
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
......
35 38
	 */
36 39
	private static final Log log = LogFactory.getLog(PrepareCopyTableJobNode.class);
37 40

  
41
	/** The source table. */
42
	private String sourceCluster;
43

  
38 44
	/** The target cluster. */
39 45
	private String targetCluster;
40 46

  
41
	/** The target table. */
47
	private String sourceTable;
48

  
42 49
	private String targetTable;
43 50

  
44 51
	/** The hadoop locator. */
......
47 54

  
48 55
	/*
49 56
	 * (non-Javadoc)
50
	 * 
57
	 *
51 58
	 * @see eu.dnetlib.msro.workflows.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken)
52 59
	 */
53 60
	@Override
54 61
	protected String execute(final NodeToken token) throws Exception {
55 62

  
63
		checkNodeParams();
64

  
65
		if (!StringUtils.equals(getSourceCluster(), getTargetCluster())) {
66
			final String outputQuorum = getOutputQuorum();
67
			log.info("build hbase quorum: " + outputQuorum);
68
			token.getEnv().setAttribute("peer.addr", outputQuorum);
69
		}
70
		token.getEnv().setAttribute("sourceCluster", getSourceCluster());
71
		token.getEnv().setAttribute("sourceTable", getSourceTable());
72

  
73
		token.getEnv().setAttribute("targetCluster", getTargetCluster());
74
		token.getEnv().setAttribute("targetTable", getTargetTable());
75

  
76
		return Arc.DEFAULT_ARC;
77
	}
78

  
79
	/**
80
	 * Builds the output quorum.
81
	 *
82
	 * @return the output quorum
83
	 * @throws HadoopServiceException
84
	 *             when cannot retrieve the clustr configuration
85
	 * @throws MSROException
86
	 *             when some of the needed properties is missing in the cluster configuration
87
	 */
88
	private String getOutputQuorum() throws HadoopServiceException, MSROException {
56 89
		Map<String, String> conf = hadoopLocator.getService().getClusterConfiguration(getTargetCluster());
57 90
		log.debug(conf);
58 91

  
......
60 93
		String hbasePort = conf.get(HBASE_ZOOKEEPER_CLIENT_PORT);
61 94
		String znodeParent = conf.get(ZOOKEEPER_ZNODE_PARENT);
62 95

  
63
		checkParam(hbaseQuorum, String.format("unable to find property '%s' in cluster configuration: %s", HBASE_ZOOKEEPER_QUORUM, hbaseQuorum));
64
		checkParam(hbasePort, String.format("unable to find property '%s' in cluster configuration: %s", HBASE_ZOOKEEPER_CLIENT_PORT, hbasePort));
65
		checkParam(znodeParent, String.format("unable to find property '%s' in cluster configuration: %s", ZOOKEEPER_ZNODE_PARENT, znodeParent));
96
		checkParamExist(hbaseQuorum, String.format("unable to find property '%s' in cluster configuration: %s", HBASE_ZOOKEEPER_QUORUM, hbaseQuorum));
97
		checkParamExist(hbasePort, String.format("unable to find property '%s' in cluster configuration: %s", HBASE_ZOOKEEPER_CLIENT_PORT, hbasePort));
98
		checkParamExist(znodeParent, String.format("unable to find property '%s' in cluster configuration: %s", ZOOKEEPER_ZNODE_PARENT, znodeParent));
66 99

  
67
		checkParam(getTargetTable(), "targetTable cannot be null or empty");
68

  
69 100
		String outputQuorum = String.format("%s:%s:%s", hbaseQuorum, hbasePort, znodeParent);
70
		log.info("build hbase quorum: " + outputQuorum);
101
		return outputQuorum;
102
	}
71 103

  
72
		token.getEnv().setAttribute("outputQuorum", outputQuorum);
73
		token.getEnv().setAttribute("targetTable", getTargetTable());
104
	/**
105
	 * Checks the wf params.
106
	 *
107
	 * @throws MSROException
108
	 *             the MSRO exception
109
	 * @throws HadoopServiceException
110
	 */
111
	private void checkNodeParams() throws MSROException, HadoopServiceException {
74 112

  
75
		return Arc.DEFAULT_ARC;
113
		checkParamExist(getSourceCluster(), "source cluster must be set");
114
		checkParamExist(getTargetCluster(), "target cluster must be set");
115
		checkParamExist(getSourceTable(), "source table must be set");
116
		checkParamExist(getTargetTable(), "target table must be set");
117

  
118
		Set<String> clusters = Sets.newHashSet(hadoopLocator.getService().listClusters());
119
		if (!clusters.contains(getSourceCluster())) throw new MSROException(String.format("source cluster '%s' doesn not exists", getSourceCluster()));
120
		if (!clusters.contains(getTargetCluster())) throw new MSROException(String.format("target cluster '%s' doesn not exists", getTargetCluster()));
121

  
122
		if (!hadoopLocator.getService().existHbaseTable(getSourceCluster(), getSourceTable()))
123
			throw new MSROException(String.format("source table '%s' doesn not exists on cluster '%s'", getSourceTable(), getSourceCluster()));
76 124
	}
77 125

  
78 126
	/**
79
	 * Check param.
127
	 * Check parameter existence.
80 128
	 *
81 129
	 * @param param
82 130
	 *            the param
......
85 133
	 * @throws MSROException
86 134
	 *             the MSRO exception
87 135
	 */
88
	private void checkParam(final String param, final String msg) throws MSROException {
136
	private void checkParamExist(final String param, final String msg) throws MSROException {
89 137
		if (StringUtils.isBlank(param)) throw new MSROException(msg);
90 138
	}
91 139

  
92
	/**
93
	 * Gets the target cluster.
94
	 *
95
	 * @return the target cluster
96
	 */
140
	public String getSourceCluster() {
141
		return sourceCluster;
142
	}
143

  
144
	public void setSourceCluster(final String sourceCluster) {
145
		this.sourceCluster = sourceCluster;
146
	}
147

  
97 148
	public String getTargetCluster() {
98 149
		return targetCluster;
99 150
	}
100 151

  
101
	/**
102
	 * Sets the target cluster.
103
	 *
104
	 * @param targetCluster
105
	 *            the new target cluster
106
	 */
107 152
	public void setTargetCluster(final String targetCluster) {
108 153
		this.targetCluster = targetCluster;
109 154
	}
110 155

  
111
	/**
112
	 * Gets the target table.
113
	 *
114
	 * @return the target table
115
	 */
156
	public String getSourceTable() {
157
		return sourceTable;
158
	}
159

  
160
	public void setSourceTable(final String sourceTable) {
161
		this.sourceTable = sourceTable;
162
	}
163

  
116 164
	public String getTargetTable() {
117 165
		return targetTable;
118 166
	}
119 167

  
120
	/**
121
	 * Sets the target table.
122
	 *
123
	 * @param targetTable
124
	 *            the new target table
125
	 */
126 168
	public void setTargetTable(final String targetTable) {
127 169
		this.targetTable = targetTable;
128 170
	}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hbase/ExistHBaseTableJobNode.java
2 2

  
3 3
import org.apache.commons.logging.Log;
4 4
import org.apache.commons.logging.LogFactory;
5
import org.springframework.beans.factory.annotation.Autowired;
6 5

  
7 6
import com.googlecode.sarasvati.NodeToken;
8 7

  
9
import eu.dnetlib.data.hadoop.rmi.HadoopService;
10
import eu.dnetlib.enabling.tools.ServiceLocator;
11
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
8
public class ExistHBaseTableJobNode extends AbstractHBaseAdminJobNode {
12 9

  
13
public class ExistHBaseTableJobNode extends SimpleJobNode {
14

  
15 10
	private static final Log log = LogFactory.getLog(ExistHBaseTableJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
16 11

  
17
	private String hbaseTableProperty;
18
	private String cluster;
19

  
20
	@Autowired
21
	private ServiceLocator<HadoopService> hadoopServiceLocator;
22

  
23 12
	@Override
24 13
	protected String execute(final NodeToken token) throws Exception {
25
		final String tableName = tableName();
14
		final String tableName = tableName(token);
15
		final String cluster = cluster(token);
26 16

  
27
		log.info("checking table existance: '" + tableName + "'");
17
		log.info("checking table existance: '" + tableName + "' on cluster: '" + cluster + "'");
28 18

  
29 19
		boolean exists = hadoopServiceLocator.getService().existHbaseTable(cluster, tableName);
30 20

  
......
33 23
		return exists ? "drop" : "create";
34 24
	}
35 25

  
36
	private String tableName() {
37
		return getPropertyFetcher().getProperty(getHbaseTableProperty());
38
	}
39

  
40
	public String getCluster() {
41
		return cluster;
42
	}
43

  
44
	public void setCluster(final String cluster) {
45
		this.cluster = cluster;
46
	}
47

  
48
	public String getHbaseTableProperty() {
49
		return hbaseTableProperty;
50
	}
51

  
52
	public void setHbaseTableProperty(String hbaseTableProperty) {
53
		this.hbaseTableProperty = hbaseTableProperty;
54
	}
55

  
56 26
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/functionality/modular/ui/workflows/values/ListHBaseTables.java
1
package eu.dnetlib.functionality.modular.ui.workflows.values;
2

  
3
import java.util.List;
4
import java.util.Map;
5

  
6
import javax.annotation.Resource;
7

  
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10

  
11
import com.google.common.base.Function;
12
import com.google.common.collect.Iterables;
13
import com.google.common.collect.Lists;
14

  
15
import eu.dnetlib.data.hadoop.rmi.HadoopService;
16
import eu.dnetlib.enabling.tools.ServiceLocator;
17
import eu.dnetlib.msro.workflows.util.ValidNodeValuesFetcher;
18

  
19
public class ListHBaseTables extends ValidNodeValuesFetcher {
20

  
21
	/**
22
	 * logger.
23
	 */
24
	private static final Log log = LogFactory.getLog(ListHBaseTables.class);
25

  
26
	/** The hadoop locator. */
27
	@Resource(name = "hadoopServiceLocator")
28
	private ServiceLocator<HadoopService> hadoopLocator;
29

  
30
	private Function<String, DnetParamValue> f = new Function<String, DnetParamValue>() {
31

  
32
		@Override
33
		public DnetParamValue apply(final String s) {
34

  
35
			return new DnetParamValue(s, s);
36
		}
37
	};
38

  
39
	@Override
40
	protected List<DnetParamValue> obtainValues(final Map<String, String> params) throws Exception {
41
		List<DnetParamValue> res = Lists.newArrayList(Iterables.transform(listTables(params.get("cluster")), f));
42
		return res;
43
	}
44

  
45
	private List<String> listTables(final String cluster) {
46
		try {
47
			log.info("list tables for cluster: " + cluster);
48
			return hadoopLocator.getService().listHbaseTables(cluster);
49
		} catch (Throwable e) {
50
			log.error(e);
51
			return Lists.newArrayList();
52
		}
53
	}
54
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/functionality/modular/ui/workflows/values/ListHadoopClusters.java
1
package eu.dnetlib.functionality.modular.ui.workflows.values;
2

  
3
import java.util.List;
4
import java.util.Map;
5

  
6
import javax.annotation.Resource;
7

  
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10

  
11
import com.google.common.base.Function;
12
import com.google.common.collect.Iterables;
13
import com.google.common.collect.Lists;
14

  
15
import eu.dnetlib.data.hadoop.rmi.HadoopService;
16
import eu.dnetlib.enabling.tools.ServiceLocator;
17
import eu.dnetlib.msro.workflows.util.ValidNodeValuesFetcher;
18

  
19
public class ListHadoopClusters extends ValidNodeValuesFetcher {
20

  
21
	/**
22
	 * logger.
23
	 */
24
	private static final Log log = LogFactory.getLog(ListHadoopClusters.class);
25

  
26
	/** The hadoop locator. */
27
	@Resource(name = "hadoopServiceLocator")
28
	private ServiceLocator<HadoopService> hadoopLocator;
29

  
30
	private Function<String, DnetParamValue> f = new Function<String, DnetParamValue>() {
31

  
32
		@Override
33
		public DnetParamValue apply(final String s) {
34
			return new DnetParamValue(s, s);
35
		}
36
	};
37

  
38
	@Override
39
	protected List<DnetParamValue> obtainValues(final Map<String, String> params) throws Exception {
40

  
41
		List<DnetParamValue> res = Lists.newArrayList(Iterables.transform(listClusters(), f));
42
		return res;
43
	}
44

  
45
	private List<String> listClusters() {
46
		try {
47
			return hadoopLocator.getService().listClusters();
48
		} catch (Throwable e) {
49
			log.error(e);
50
			return Lists.newArrayList();
51
		}
52
	}
53
}
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/test/profiles/openaireplus/workflows/iis/copyHbaseTable.xml
15 15
			<NODE name="prepare" type="PrepareCopyTable" isStart="true">
16 16
				<DESCRIPTION>Prepare copy</DESCRIPTION>
17 17
				<PARAMETERS>
18
					<PARAM required="true" type="string" name="targetTable" managedBy="user">db_openaireplus_services</PARAM>
19
					<PARAM required="true" type="string" name="targetCluster" managedBy="user" function="validValues(['IIS','DM'])">IIS</PARAM>
18
					<PARAM required="true" type="string" name="sourceCluster" managedBy="user" function="obtainValues('hadoopClusters', {})">DM</PARAM>
19
					<PARAM required="false" type="string" name="sourceTable" managedBy="user"></PARAM>
20
					<PARAM required="true" type="string" name="targetCluster" managedBy="user" function="obtainValues('hadoopClusters', {})">IIS</PARAM>
21
					<PARAM required="false" type="string" name="targetTable" managedBy="user"></PARAM>
20 22
				</PARAMETERS>
21 23
				<ARCS>
24
					<ARC to="checkTable" />
25
				</ARCS>
26
			</NODE>
27
			<NODE name="checkTable" type="CheckHBaseTable">
28
				<DESCRIPTION>check hbase table</DESCRIPTION>
29
				<PARAMETERS>
30
					<PARAM required="true" type="string" name="envParams" managedBy="system">
31
						{ 
32
							'hbaseTable' : 'targetTable',
33
							'cluster' : 'targetCluster'
34
						}
35
					</PARAM>	
36
				</PARAMETERS>
37
				<ARCS>
38
					<ARC to="create" name="create" />
39
					<ARC to="drop" name ="drop" />
40
				</ARCS>
41
			</NODE> 
42
			<NODE name="drop" type="DropHBaseTable">
43
				<DESCRIPTION>drop hbase table</DESCRIPTION>
44
				<PARAMETERS>
45
					<PARAM required="true" type="string" name="envParams" managedBy="system">
46
						{ 
47
							'hbaseTable' : 'targetTable',
48
							'cluster' : 'targetCluster'
49
						}
50
					</PARAM>					
51
				</PARAMETERS>
52
				<ARCS>
53
					<ARC to="create" />
54
				</ARCS>
55
			</NODE>        
56
			<NODE name="create" type="CreateHBaseTable">
57
				<DESCRIPTION>create hbase table</DESCRIPTION>
58
				<PARAMETERS>
59
					<PARAM name="xqueryForColumnsProperty" type="string" managedBy="user" required="true">dnet.openaire.model.relationships.xquery</PARAM>
60
					<PARAM required="true" type="string" name="envParams" managedBy="system">
61
						{ 
62
							'hbaseTable' : 'targetTable',
63
							'cluster' : 'targetCluster'
64
						}
65
					</PARAM>					
66
				</PARAMETERS>
67
				<ARCS>
22 68
					<ARC to="copy" />
23 69
				</ARCS>
24
			</NODE>
70
			</NODE>			
25 71
			<NODE name="copy" type="SubmitHadoopJob">
26 72
				<DESCRIPTION>Copy table Job</DESCRIPTION>
27 73
				<PARAMETERS>
28
					<PARAM required="true" type="string" name="cluster" managedBy="system">DM</PARAM>
29 74
					<PARAM required="true" type="string" name="hadoopJob" managedBy="system">copytable</PARAM>
30 75
					<PARAM required="true" type="string" name="envParams" managedBy="system">
31
						{ 
76
						{
77
							'cluster' : 'sourceCluster', 
32 78
							'hbase.mapreduce.inputtable' : 'targetTable', 
33 79
							'hbase.mapreduce.outputtable' : 'targetTable',
34
							'hbase.mapred.output.quorum' : 'outputQuorum'
80
							'hbase.mapred.output.quorum' : 'outputQuorum',
81
							'hbase.mapred.inputtable' : 'targetTable', 
82
							'hbase.mapred.outputtable' : 'targetTable'
35 83
						}
36 84
					</PARAM>	
37
					<PARAM required="true" type="string" name="sysParams" managedBy="system">
38
						{ 	
39
							'hbase.mapred.inputtable' : 'hbase.mapred.datatable', 
40
							'hbase.mapred.outputtable' : 'hbase.mapred.datatable'
41
						}
42
					</PARAM>
43 85
				</PARAMETERS>
44 86
				<ARCS>
45 87
					<ARC to="success" />
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/functionality/modular/ui/workflows/values/applicationContext-ui-openaireplus.xml
9 9
	<bean id="listActionManagerSetsValues" 
10 10
		class="eu.dnetlib.functionality.modular.ui.workflows.values.ListActionManagerSetsValues"
11 11
		p:name="actionSets" />
12
		
13
	<bean id="listHBaseTables" 
14
		class="eu.dnetlib.functionality.modular.ui.workflows.values.ListHBaseTables"
15
		p:name="hbaseTables"/>
16
		
17
	<bean id="listHadoopClusters" 
18
		class="eu.dnetlib.functionality.modular.ui.workflows.values.ListHadoopClusters"
19
		p:name="hadoopClusters" />	
12 20

  
13 21
</beans>

Also available in: Unified diff