Revision 31415
Added by Claudio Atzori about 10 years ago
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hbase/CreateHBaseTableJobNode.java | ||
---|---|---|
4 | 4 |
|
5 | 5 |
import org.apache.commons.logging.Log; |
6 | 6 |
import org.apache.commons.logging.LogFactory; |
7 |
import org.springframework.beans.factory.annotation.Autowired; |
|
8 | 7 |
|
9 | 8 |
import com.googlecode.sarasvati.Arc; |
10 | 9 |
import com.googlecode.sarasvati.NodeToken; |
11 | 10 |
|
12 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
|
13 |
import eu.dnetlib.enabling.tools.ServiceLocator; |
|
14 | 11 |
import eu.dnetlib.msro.openaireplus.workflows.hbase.HBaseTableUtils; |
15 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
16 | 12 |
|
17 |
public class CreateHBaseTableJobNode extends SimpleJobNode {
|
|
13 |
public class CreateHBaseTableJobNode extends AbstractHBaseAdminJobNode {
|
|
18 | 14 |
|
19 | 15 |
private static final Log log = LogFactory.getLog(CreateHBaseTableJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
20 | 16 |
|
21 |
private String hbaseTableProperty; |
|
22 |
private String cluster; |
|
23 |
|
|
24 | 17 |
private String xqueryForColumnsProperty; |
25 | 18 |
|
26 |
@Autowired |
|
27 |
private ServiceLocator<HadoopService> hadoopServiceLocator; |
|
28 |
|
|
29 | 19 |
@Override |
30 | 20 |
protected String execute(final NodeToken token) throws Exception { |
31 | 21 |
final Set<String> columns = HBaseTableUtils.listColumns(); |
32 |
final String tableName = tableName(); |
|
22 |
final String tableName = tableName(token); |
|
23 |
final String cluster = cluster(token); |
|
33 | 24 |
|
34 |
log.debug("Ensuring table " + tableName + " - columns: " + columns); |
|
25 |
log.debug("Ensuring table " + tableName + " on cluster: '" + cluster + "' - columns: " + columns);
|
|
35 | 26 |
|
36 | 27 |
hadoopServiceLocator.getService().createHbaseTable(cluster, tableName, columns); |
37 | 28 |
|
38 | 29 |
return Arc.DEFAULT_ARC; |
39 | 30 |
} |
40 | 31 |
|
41 |
private String tableName() { |
|
42 |
return getPropertyFetcher().getProperty(getHbaseTableProperty()); |
|
43 |
} |
|
44 |
|
|
45 |
public String getCluster() { |
|
46 |
return cluster; |
|
47 |
} |
|
48 |
|
|
49 |
public void setCluster(final String cluster) { |
|
50 |
this.cluster = cluster; |
|
51 |
} |
|
52 |
|
|
53 |
public String getHbaseTableProperty() { |
|
54 |
return hbaseTableProperty; |
|
55 |
} |
|
56 |
|
|
57 |
public void setHbaseTableProperty(String hbaseTableProperty) { |
|
58 |
this.hbaseTableProperty = hbaseTableProperty; |
|
59 |
} |
|
60 |
|
|
61 | 32 |
public String getXqueryForColumnsProperty() { |
62 | 33 |
return xqueryForColumnsProperty; |
63 | 34 |
} |
64 | 35 |
|
65 |
public void setXqueryForColumnsProperty(String xqueryForColumnsProperty) { |
|
36 |
public void setXqueryForColumnsProperty(final String xqueryForColumnsProperty) {
|
|
66 | 37 |
this.xqueryForColumnsProperty = xqueryForColumnsProperty; |
67 | 38 |
} |
68 | 39 |
|
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hbase/SubmitHadoopJobNode.java | ||
---|---|---|
45 | 45 |
|
46 | 46 |
job.setAction(type); |
47 | 47 |
job.getParameters().put("job.name", getHadoopJob()); |
48 |
job.getParameters().put("cluster", getCluster());
|
|
48 |
job.getParameters().put("cluster", cluster(token));
|
|
49 | 49 |
|
50 | 50 |
job.getParameters().putAll(parseJsonParameters(token)); |
51 | 51 |
} |
52 | 52 |
|
53 |
private String cluster(final NodeToken token) { |
|
54 |
if (token.getEnv().hasAttribute("cluster")) { |
|
55 |
String cluster = token.getEnv().getAttribute("cluster"); |
|
56 |
log.info("found override value in wfEnv for 'cluster' param: " + cluster); |
|
57 |
return cluster; |
|
58 |
} |
|
59 |
return getCluster(); |
|
60 |
} |
|
61 |
|
|
53 | 62 |
/** |
54 | 63 |
* reads the job type for the given job name |
55 |
*
|
|
64 |
* |
|
56 | 65 |
* @param jobName |
57 | 66 |
* @return |
58 | 67 |
* @throws ISLookUpException |
59 | 68 |
*/ |
60 |
private String getJobType(String jobName) throws ISLookUpException { |
|
61 |
List<String> res = lookupLocator.getService().quickSearchProfile( |
|
62 |
"/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'HadoopJobConfigurationDSResourceType']//HADOOP_JOB[./@name='" + jobName + "']/@type/string()"); |
|
63 |
if (res.isEmpty()) { throw new IllegalStateException("unable to find job type for job: " + jobName); } |
|
69 |
private String getJobType(final String jobName) throws ISLookUpException { |
|
70 |
List<String> res = |
|
71 |
lookupLocator.getService().quickSearchProfile( |
|
72 |
"/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'HadoopJobConfigurationDSResourceType']//HADOOP_JOB[./@name='" + jobName |
|
73 |
+ "']/@type/string()"); |
|
74 |
if (res.isEmpty()) throw new IllegalStateException("unable to find job type for job: " + jobName); |
|
64 | 75 |
|
65 | 76 |
final HadoopJobType type = HadoopJobType.valueOf(Iterables.getOnlyElement(res)); |
66 | 77 |
|
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hbase/DropHBaseTableJobNode.java | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import org.apache.commons.logging.Log; |
4 | 4 |
import org.apache.commons.logging.LogFactory; |
5 |
import org.springframework.beans.factory.annotation.Autowired; |
|
6 | 5 |
|
7 | 6 |
import com.googlecode.sarasvati.Arc; |
8 | 7 |
import com.googlecode.sarasvati.NodeToken; |
9 | 8 |
|
10 |
import eu.dnetlib.data.hadoop.config.ClusterName;
|
|
11 |
import eu.dnetlib.data.hadoop.rmi.HadoopService;
|
|
12 |
import eu.dnetlib.enabling.tools.ServiceLocator;
|
|
13 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
|
|
9 |
/**
|
|
10 |
* The Class DropHBaseTableJobNode.
|
|
11 |
*/
|
|
12 |
public class DropHBaseTableJobNode extends AbstractHBaseAdminJobNode {
|
|
14 | 13 |
|
15 |
public class DropHBaseTableJobNode extends SimpleJobNode { |
|
16 |
|
|
14 |
/** The Constant log. */ |
|
17 | 15 |
private static final Log log = LogFactory.getLog(DropHBaseTableJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
18 | 16 |
|
19 |
private String hbaseTableProperty; |
|
20 |
private String cluster; |
|
21 |
|
|
22 |
@Autowired |
|
23 |
private ServiceLocator<HadoopService> hadoopServiceLocator; |
|
24 |
|
|
17 |
/* |
|
18 |
* (non-Javadoc) |
|
19 |
* |
|
20 |
* @see eu.dnetlib.msro.workflows.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken) |
|
21 |
*/ |
|
25 | 22 |
@Override |
26 | 23 |
protected String execute(final NodeToken token) throws Exception { |
27 | 24 |
|
28 |
final String tableName = tableName(); |
|
25 |
final String tableName = tableName(token); |
|
26 |
final String cluster = cluster(token); |
|
29 | 27 |
|
30 |
log.info("Dropping hbase table " + tableName);
|
|
28 |
log.info("Dropping hbase table '" + tableName + "' on cluster: '" + cluster + "'");
|
|
31 | 29 |
|
32 |
HadoopService hadoopService = hadoopServiceLocator.getService(); |
|
33 |
String clusterName = ClusterName.valueOf(getCluster()).toString(); |
|
34 |
hadoopService.dropHbaseTable(clusterName, tableName); |
|
30 |
hadoopServiceLocator.getService().dropHbaseTable(cluster, tableName); |
|
35 | 31 |
|
36 | 32 |
return Arc.DEFAULT_ARC; |
37 | 33 |
} |
38 | 34 |
|
39 |
private String tableName() { |
|
40 |
return getPropertyFetcher().getProperty(getHbaseTableProperty()); |
|
41 |
} |
|
42 |
|
|
43 |
public String getCluster() { |
|
44 |
return cluster; |
|
45 |
} |
|
46 |
|
|
47 |
public void setCluster(final String cluster) { |
|
48 |
this.cluster = cluster; |
|
49 |
} |
|
50 |
|
|
51 |
public String getHbaseTableProperty() { |
|
52 |
return hbaseTableProperty; |
|
53 |
} |
|
54 |
|
|
55 |
public void setHbaseTableProperty(String hbaseTableProperty) { |
|
56 |
this.hbaseTableProperty = hbaseTableProperty; |
|
57 |
} |
|
58 |
|
|
59 | 35 |
} |
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hbase/AbstractHBaseAdminJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes.hbase; |
|
2 |
|
|
3 |
import java.util.Map.Entry; |
|
4 |
|
|
5 |
import org.apache.commons.logging.Log; |
|
6 |
import org.apache.commons.logging.LogFactory; |
|
7 |
import org.springframework.beans.factory.annotation.Autowired; |
|
8 |
|
|
9 |
import com.googlecode.sarasvati.NodeToken; |
|
10 |
|
|
11 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
|
12 |
import eu.dnetlib.enabling.tools.ServiceLocator; |
|
13 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
14 |
|
|
15 |
public abstract class AbstractHBaseAdminJobNode extends SimpleJobNode { |
|
16 |
|
|
17 |
/** The Constant log. */ |
|
18 |
private static final Log log = LogFactory.getLog(AbstractHBaseAdminJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
19 |
|
|
20 |
private String hbaseTableProperty; |
|
21 |
private String cluster; |
|
22 |
|
|
23 |
@Autowired |
|
24 |
protected ServiceLocator<HadoopService> hadoopServiceLocator; |
|
25 |
|
|
26 |
@Override |
|
27 |
protected void beforeStart(final NodeToken token) { |
|
28 |
for (Entry<String, String> e : parseJsonParameters(token).entrySet()) { |
|
29 |
token.getEnv().setAttribute(e.getKey(), e.getValue()); |
|
30 |
} |
|
31 |
} |
|
32 |
|
|
33 |
protected String tableName(final NodeToken token) { |
|
34 |
if (token.getEnv().hasAttribute("hbaseTable")) { |
|
35 |
String table = token.getEnv().getAttribute("hbaseTable"); |
|
36 |
log.info("found override value in wfEnv for 'hbaseTable' param: " + table); |
|
37 |
return table; |
|
38 |
} |
|
39 |
return getPropertyFetcher().getProperty(getHbaseTableProperty()); |
|
40 |
} |
|
41 |
|
|
42 |
protected String cluster(final NodeToken token) { |
|
43 |
if (token.getEnv().hasAttribute("cluster")) { |
|
44 |
String cluster = token.getEnv().getAttribute("cluster"); |
|
45 |
log.info("found override value in wfEnv for 'cluster' param: " + cluster); |
|
46 |
return cluster; |
|
47 |
} |
|
48 |
return getCluster(); |
|
49 |
} |
|
50 |
|
|
51 |
public String getCluster() { |
|
52 |
return cluster; |
|
53 |
} |
|
54 |
|
|
55 |
public void setCluster(final String cluster) { |
|
56 |
this.cluster = cluster; |
|
57 |
} |
|
58 |
|
|
59 |
public String getHbaseTableProperty() { |
|
60 |
return hbaseTableProperty; |
|
61 |
} |
|
62 |
|
|
63 |
public void setHbaseTableProperty(final String hbaseTableProperty) { |
|
64 |
this.hbaseTableProperty = hbaseTableProperty; |
|
65 |
} |
|
66 |
|
|
67 |
} |
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hbase/PrepareCopyTableJobNode.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes.hbase; |
2 | 2 |
|
3 | 3 |
import java.util.Map; |
4 |
import java.util.Set; |
|
4 | 5 |
|
5 | 6 |
import javax.annotation.Resource; |
6 | 7 |
|
... | ... | |
8 | 9 |
import org.apache.commons.logging.Log; |
9 | 10 |
import org.apache.commons.logging.LogFactory; |
10 | 11 |
|
12 |
import com.google.common.collect.Sets; |
|
11 | 13 |
import com.googlecode.sarasvati.Arc; |
12 | 14 |
import com.googlecode.sarasvati.NodeToken; |
13 | 15 |
|
14 | 16 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
17 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
15 | 18 |
import eu.dnetlib.enabling.tools.ServiceLocator; |
16 | 19 |
import eu.dnetlib.msro.rmi.MSROException; |
17 | 20 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
... | ... | |
35 | 38 |
*/ |
36 | 39 |
private static final Log log = LogFactory.getLog(PrepareCopyTableJobNode.class); |
37 | 40 |
|
41 |
/** The source table. */ |
|
42 |
private String sourceCluster; |
|
43 |
|
|
38 | 44 |
/** The target cluster. */ |
39 | 45 |
private String targetCluster; |
40 | 46 |
|
41 |
/** The target table. */ |
|
47 |
private String sourceTable; |
|
48 |
|
|
42 | 49 |
private String targetTable; |
43 | 50 |
|
44 | 51 |
/** The hadoop locator. */ |
... | ... | |
47 | 54 |
|
48 | 55 |
/* |
49 | 56 |
* (non-Javadoc) |
50 |
*
|
|
57 |
* |
|
51 | 58 |
* @see eu.dnetlib.msro.workflows.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken) |
52 | 59 |
*/ |
53 | 60 |
@Override |
54 | 61 |
protected String execute(final NodeToken token) throws Exception { |
55 | 62 |
|
63 |
checkNodeParams(); |
|
64 |
|
|
65 |
if (!StringUtils.equals(getSourceCluster(), getTargetCluster())) { |
|
66 |
final String outputQuorum = getOutputQuorum(); |
|
67 |
log.info("build hbase quorum: " + outputQuorum); |
|
68 |
token.getEnv().setAttribute("peer.addr", outputQuorum); |
|
69 |
} |
|
70 |
token.getEnv().setAttribute("sourceCluster", getSourceCluster()); |
|
71 |
token.getEnv().setAttribute("sourceTable", getSourceTable()); |
|
72 |
|
|
73 |
token.getEnv().setAttribute("targetCluster", getTargetCluster()); |
|
74 |
token.getEnv().setAttribute("targetTable", getTargetTable()); |
|
75 |
|
|
76 |
return Arc.DEFAULT_ARC; |
|
77 |
} |
|
78 |
|
|
79 |
/** |
|
80 |
* Builds the output quorum. |
|
81 |
* |
|
82 |
* @return the output quorum |
|
83 |
* @throws HadoopServiceException |
|
84 |
* when cannot retrieve the clustr configuration |
|
85 |
* @throws MSROException |
|
86 |
* when some of the needed properties is missing in the cluster configuration |
|
87 |
*/ |
|
88 |
private String getOutputQuorum() throws HadoopServiceException, MSROException { |
|
56 | 89 |
Map<String, String> conf = hadoopLocator.getService().getClusterConfiguration(getTargetCluster()); |
57 | 90 |
log.debug(conf); |
58 | 91 |
|
... | ... | |
60 | 93 |
String hbasePort = conf.get(HBASE_ZOOKEEPER_CLIENT_PORT); |
61 | 94 |
String znodeParent = conf.get(ZOOKEEPER_ZNODE_PARENT); |
62 | 95 |
|
63 |
checkParam(hbaseQuorum, String.format("unable to find property '%s' in cluster configuration: %s", HBASE_ZOOKEEPER_QUORUM, hbaseQuorum)); |
|
64 |
checkParam(hbasePort, String.format("unable to find property '%s' in cluster configuration: %s", HBASE_ZOOKEEPER_CLIENT_PORT, hbasePort)); |
|
65 |
checkParam(znodeParent, String.format("unable to find property '%s' in cluster configuration: %s", ZOOKEEPER_ZNODE_PARENT, znodeParent)); |
|
96 |
checkParamExist(hbaseQuorum, String.format("unable to find property '%s' in cluster configuration: %s", HBASE_ZOOKEEPER_QUORUM, hbaseQuorum));
|
|
97 |
checkParamExist(hbasePort, String.format("unable to find property '%s' in cluster configuration: %s", HBASE_ZOOKEEPER_CLIENT_PORT, hbasePort));
|
|
98 |
checkParamExist(znodeParent, String.format("unable to find property '%s' in cluster configuration: %s", ZOOKEEPER_ZNODE_PARENT, znodeParent));
|
|
66 | 99 |
|
67 |
checkParam(getTargetTable(), "targetTable cannot be null or empty"); |
|
68 |
|
|
69 | 100 |
String outputQuorum = String.format("%s:%s:%s", hbaseQuorum, hbasePort, znodeParent); |
70 |
log.info("build hbase quorum: " + outputQuorum); |
|
101 |
return outputQuorum; |
|
102 |
} |
|
71 | 103 |
|
72 |
token.getEnv().setAttribute("outputQuorum", outputQuorum); |
|
73 |
token.getEnv().setAttribute("targetTable", getTargetTable()); |
|
104 |
/** |
|
105 |
* Checks the wf params. |
|
106 |
* |
|
107 |
* @throws MSROException |
|
108 |
* the MSRO exception |
|
109 |
* @throws HadoopServiceException |
|
110 |
*/ |
|
111 |
private void checkNodeParams() throws MSROException, HadoopServiceException { |
|
74 | 112 |
|
75 |
return Arc.DEFAULT_ARC; |
|
113 |
checkParamExist(getSourceCluster(), "source cluster must be set"); |
|
114 |
checkParamExist(getTargetCluster(), "target cluster must be set"); |
|
115 |
checkParamExist(getSourceTable(), "source table must be set"); |
|
116 |
checkParamExist(getTargetTable(), "target table must be set"); |
|
117 |
|
|
118 |
Set<String> clusters = Sets.newHashSet(hadoopLocator.getService().listClusters()); |
|
119 |
if (!clusters.contains(getSourceCluster())) throw new MSROException(String.format("source cluster '%s' doesn not exists", getSourceCluster())); |
|
120 |
if (!clusters.contains(getTargetCluster())) throw new MSROException(String.format("target cluster '%s' doesn not exists", getTargetCluster())); |
|
121 |
|
|
122 |
if (!hadoopLocator.getService().existHbaseTable(getSourceCluster(), getSourceTable())) |
|
123 |
throw new MSROException(String.format("source table '%s' doesn not exists on cluster '%s'", getSourceTable(), getSourceCluster())); |
|
76 | 124 |
} |
77 | 125 |
|
78 | 126 |
/** |
79 |
* Check param. |
|
127 |
* Check parameter existence.
|
|
80 | 128 |
* |
81 | 129 |
* @param param |
82 | 130 |
* the param |
... | ... | |
85 | 133 |
* @throws MSROException |
86 | 134 |
* the MSRO exception |
87 | 135 |
*/ |
88 |
private void checkParam(final String param, final String msg) throws MSROException { |
|
136 |
private void checkParamExist(final String param, final String msg) throws MSROException {
|
|
89 | 137 |
if (StringUtils.isBlank(param)) throw new MSROException(msg); |
90 | 138 |
} |
91 | 139 |
|
92 |
/** |
|
93 |
* Gets the target cluster. |
|
94 |
* |
|
95 |
* @return the target cluster |
|
96 |
*/ |
|
140 |
public String getSourceCluster() { |
|
141 |
return sourceCluster; |
|
142 |
} |
|
143 |
|
|
144 |
public void setSourceCluster(final String sourceCluster) { |
|
145 |
this.sourceCluster = sourceCluster; |
|
146 |
} |
|
147 |
|
|
97 | 148 |
public String getTargetCluster() { |
98 | 149 |
return targetCluster; |
99 | 150 |
} |
100 | 151 |
|
101 |
/** |
|
102 |
* Sets the target cluster. |
|
103 |
* |
|
104 |
* @param targetCluster |
|
105 |
* the new target cluster |
|
106 |
*/ |
|
107 | 152 |
public void setTargetCluster(final String targetCluster) { |
108 | 153 |
this.targetCluster = targetCluster; |
109 | 154 |
} |
110 | 155 |
|
111 |
/** |
|
112 |
* Gets the target table. |
|
113 |
* |
|
114 |
* @return the target table |
|
115 |
*/ |
|
156 |
public String getSourceTable() { |
|
157 |
return sourceTable; |
|
158 |
} |
|
159 |
|
|
160 |
public void setSourceTable(final String sourceTable) { |
|
161 |
this.sourceTable = sourceTable; |
|
162 |
} |
|
163 |
|
|
116 | 164 |
public String getTargetTable() { |
117 | 165 |
return targetTable; |
118 | 166 |
} |
119 | 167 |
|
120 |
/** |
|
121 |
* Sets the target table. |
|
122 |
* |
|
123 |
* @param targetTable |
|
124 |
* the new target table |
|
125 |
*/ |
|
126 | 168 |
public void setTargetTable(final String targetTable) { |
127 | 169 |
this.targetTable = targetTable; |
128 | 170 |
} |
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hbase/ExistHBaseTableJobNode.java | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import org.apache.commons.logging.Log; |
4 | 4 |
import org.apache.commons.logging.LogFactory; |
5 |
import org.springframework.beans.factory.annotation.Autowired; |
|
6 | 5 |
|
7 | 6 |
import com.googlecode.sarasvati.NodeToken; |
8 | 7 |
|
9 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
|
10 |
import eu.dnetlib.enabling.tools.ServiceLocator; |
|
11 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
8 |
public class ExistHBaseTableJobNode extends AbstractHBaseAdminJobNode { |
|
12 | 9 |
|
13 |
public class ExistHBaseTableJobNode extends SimpleJobNode { |
|
14 |
|
|
15 | 10 |
private static final Log log = LogFactory.getLog(ExistHBaseTableJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
16 | 11 |
|
17 |
private String hbaseTableProperty; |
|
18 |
private String cluster; |
|
19 |
|
|
20 |
@Autowired |
|
21 |
private ServiceLocator<HadoopService> hadoopServiceLocator; |
|
22 |
|
|
23 | 12 |
@Override |
24 | 13 |
protected String execute(final NodeToken token) throws Exception { |
25 |
final String tableName = tableName(); |
|
14 |
final String tableName = tableName(token); |
|
15 |
final String cluster = cluster(token); |
|
26 | 16 |
|
27 |
log.info("checking table existance: '" + tableName + "'"); |
|
17 |
log.info("checking table existance: '" + tableName + "' on cluster: '" + cluster + "'");
|
|
28 | 18 |
|
29 | 19 |
boolean exists = hadoopServiceLocator.getService().existHbaseTable(cluster, tableName); |
30 | 20 |
|
... | ... | |
33 | 23 |
return exists ? "drop" : "create"; |
34 | 24 |
} |
35 | 25 |
|
36 |
private String tableName() { |
|
37 |
return getPropertyFetcher().getProperty(getHbaseTableProperty()); |
|
38 |
} |
|
39 |
|
|
40 |
public String getCluster() { |
|
41 |
return cluster; |
|
42 |
} |
|
43 |
|
|
44 |
public void setCluster(final String cluster) { |
|
45 |
this.cluster = cluster; |
|
46 |
} |
|
47 |
|
|
48 |
public String getHbaseTableProperty() { |
|
49 |
return hbaseTableProperty; |
|
50 |
} |
|
51 |
|
|
52 |
public void setHbaseTableProperty(String hbaseTableProperty) { |
|
53 |
this.hbaseTableProperty = hbaseTableProperty; |
|
54 |
} |
|
55 |
|
|
56 | 26 |
} |
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/functionality/modular/ui/workflows/values/ListHBaseTables.java | ||
---|---|---|
1 |
package eu.dnetlib.functionality.modular.ui.workflows.values; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 |
import javax.annotation.Resource; |
|
7 |
|
|
8 |
import org.apache.commons.logging.Log; |
|
9 |
import org.apache.commons.logging.LogFactory; |
|
10 |
|
|
11 |
import com.google.common.base.Function; |
|
12 |
import com.google.common.collect.Iterables; |
|
13 |
import com.google.common.collect.Lists; |
|
14 |
|
|
15 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
|
16 |
import eu.dnetlib.enabling.tools.ServiceLocator; |
|
17 |
import eu.dnetlib.msro.workflows.util.ValidNodeValuesFetcher; |
|
18 |
|
|
19 |
public class ListHBaseTables extends ValidNodeValuesFetcher { |
|
20 |
|
|
21 |
/** |
|
22 |
* logger. |
|
23 |
*/ |
|
24 |
private static final Log log = LogFactory.getLog(ListHBaseTables.class); |
|
25 |
|
|
26 |
/** The hadoop locator. */ |
|
27 |
@Resource(name = "hadoopServiceLocator") |
|
28 |
private ServiceLocator<HadoopService> hadoopLocator; |
|
29 |
|
|
30 |
private Function<String, DnetParamValue> f = new Function<String, DnetParamValue>() { |
|
31 |
|
|
32 |
@Override |
|
33 |
public DnetParamValue apply(final String s) { |
|
34 |
|
|
35 |
return new DnetParamValue(s, s); |
|
36 |
} |
|
37 |
}; |
|
38 |
|
|
39 |
@Override |
|
40 |
protected List<DnetParamValue> obtainValues(final Map<String, String> params) throws Exception { |
|
41 |
List<DnetParamValue> res = Lists.newArrayList(Iterables.transform(listTables(params.get("cluster")), f)); |
|
42 |
return res; |
|
43 |
} |
|
44 |
|
|
45 |
private List<String> listTables(final String cluster) { |
|
46 |
try { |
|
47 |
log.info("list tables for cluster: " + cluster); |
|
48 |
return hadoopLocator.getService().listHbaseTables(cluster); |
|
49 |
} catch (Throwable e) { |
|
50 |
log.error(e); |
|
51 |
return Lists.newArrayList(); |
|
52 |
} |
|
53 |
} |
|
54 |
} |
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/functionality/modular/ui/workflows/values/ListHadoopClusters.java | ||
---|---|---|
1 |
package eu.dnetlib.functionality.modular.ui.workflows.values; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 |
import javax.annotation.Resource; |
|
7 |
|
|
8 |
import org.apache.commons.logging.Log; |
|
9 |
import org.apache.commons.logging.LogFactory; |
|
10 |
|
|
11 |
import com.google.common.base.Function; |
|
12 |
import com.google.common.collect.Iterables; |
|
13 |
import com.google.common.collect.Lists; |
|
14 |
|
|
15 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
|
16 |
import eu.dnetlib.enabling.tools.ServiceLocator; |
|
17 |
import eu.dnetlib.msro.workflows.util.ValidNodeValuesFetcher; |
|
18 |
|
|
19 |
public class ListHadoopClusters extends ValidNodeValuesFetcher { |
|
20 |
|
|
21 |
/** |
|
22 |
* logger. |
|
23 |
*/ |
|
24 |
private static final Log log = LogFactory.getLog(ListHadoopClusters.class); |
|
25 |
|
|
26 |
/** The hadoop locator. */ |
|
27 |
@Resource(name = "hadoopServiceLocator") |
|
28 |
private ServiceLocator<HadoopService> hadoopLocator; |
|
29 |
|
|
30 |
private Function<String, DnetParamValue> f = new Function<String, DnetParamValue>() { |
|
31 |
|
|
32 |
@Override |
|
33 |
public DnetParamValue apply(final String s) { |
|
34 |
return new DnetParamValue(s, s); |
|
35 |
} |
|
36 |
}; |
|
37 |
|
|
38 |
@Override |
|
39 |
protected List<DnetParamValue> obtainValues(final Map<String, String> params) throws Exception { |
|
40 |
|
|
41 |
List<DnetParamValue> res = Lists.newArrayList(Iterables.transform(listClusters(), f)); |
|
42 |
return res; |
|
43 |
} |
|
44 |
|
|
45 |
private List<String> listClusters() { |
|
46 |
try { |
|
47 |
return hadoopLocator.getService().listClusters(); |
|
48 |
} catch (Throwable e) { |
|
49 |
log.error(e); |
|
50 |
return Lists.newArrayList(); |
|
51 |
} |
|
52 |
} |
|
53 |
} |
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/test/profiles/openaireplus/workflows/iis/copyHbaseTable.xml | ||
---|---|---|
15 | 15 |
<NODE name="prepare" type="PrepareCopyTable" isStart="true"> |
16 | 16 |
<DESCRIPTION>Prepare copy</DESCRIPTION> |
17 | 17 |
<PARAMETERS> |
18 |
<PARAM required="true" type="string" name="targetTable" managedBy="user">db_openaireplus_services</PARAM> |
|
19 |
<PARAM required="true" type="string" name="targetCluster" managedBy="user" function="validValues(['IIS','DM'])">IIS</PARAM> |
|
18 |
<PARAM required="true" type="string" name="sourceCluster" managedBy="user" function="obtainValues('hadoopClusters', {})">DM</PARAM> |
|
19 |
<PARAM required="false" type="string" name="sourceTable" managedBy="user"></PARAM> |
|
20 |
<PARAM required="true" type="string" name="targetCluster" managedBy="user" function="obtainValues('hadoopClusters', {})">IIS</PARAM> |
|
21 |
<PARAM required="false" type="string" name="targetTable" managedBy="user"></PARAM> |
|
20 | 22 |
</PARAMETERS> |
21 | 23 |
<ARCS> |
24 |
<ARC to="checkTable" /> |
|
25 |
</ARCS> |
|
26 |
</NODE> |
|
27 |
<NODE name="checkTable" type="CheckHBaseTable"> |
|
28 |
<DESCRIPTION>check hbase table</DESCRIPTION> |
|
29 |
<PARAMETERS> |
|
30 |
<PARAM required="true" type="string" name="envParams" managedBy="system"> |
|
31 |
{ |
|
32 |
'hbaseTable' : 'targetTable', |
|
33 |
'cluster' : 'targetCluster' |
|
34 |
} |
|
35 |
</PARAM> |
|
36 |
</PARAMETERS> |
|
37 |
<ARCS> |
|
38 |
<ARC to="create" name="create" /> |
|
39 |
<ARC to="drop" name ="drop" /> |
|
40 |
</ARCS> |
|
41 |
</NODE> |
|
42 |
<NODE name="drop" type="DropHBaseTable"> |
|
43 |
<DESCRIPTION>drop hbase table</DESCRIPTION> |
|
44 |
<PARAMETERS> |
|
45 |
<PARAM required="true" type="string" name="envParams" managedBy="system"> |
|
46 |
{ |
|
47 |
'hbaseTable' : 'targetTable', |
|
48 |
'cluster' : 'targetCluster' |
|
49 |
} |
|
50 |
</PARAM> |
|
51 |
</PARAMETERS> |
|
52 |
<ARCS> |
|
53 |
<ARC to="create" /> |
|
54 |
</ARCS> |
|
55 |
</NODE> |
|
56 |
<NODE name="create" type="CreateHBaseTable"> |
|
57 |
<DESCRIPTION>create hbase table</DESCRIPTION> |
|
58 |
<PARAMETERS> |
|
59 |
<PARAM name="xqueryForColumnsProperty" type="string" managedBy="user" required="true">dnet.openaire.model.relationships.xquery</PARAM> |
|
60 |
<PARAM required="true" type="string" name="envParams" managedBy="system"> |
|
61 |
{ |
|
62 |
'hbaseTable' : 'targetTable', |
|
63 |
'cluster' : 'targetCluster' |
|
64 |
} |
|
65 |
</PARAM> |
|
66 |
</PARAMETERS> |
|
67 |
<ARCS> |
|
22 | 68 |
<ARC to="copy" /> |
23 | 69 |
</ARCS> |
24 |
</NODE> |
|
70 |
</NODE>
|
|
25 | 71 |
<NODE name="copy" type="SubmitHadoopJob"> |
26 | 72 |
<DESCRIPTION>Copy table Job</DESCRIPTION> |
27 | 73 |
<PARAMETERS> |
28 |
<PARAM required="true" type="string" name="cluster" managedBy="system">DM</PARAM> |
|
29 | 74 |
<PARAM required="true" type="string" name="hadoopJob" managedBy="system">copytable</PARAM> |
30 | 75 |
<PARAM required="true" type="string" name="envParams" managedBy="system"> |
31 |
{ |
|
76 |
{ |
|
77 |
'cluster' : 'sourceCluster', |
|
32 | 78 |
'hbase.mapreduce.inputtable' : 'targetTable', |
33 | 79 |
'hbase.mapreduce.outputtable' : 'targetTable', |
34 |
'hbase.mapred.output.quorum' : 'outputQuorum' |
|
80 |
'hbase.mapred.output.quorum' : 'outputQuorum', |
|
81 |
'hbase.mapred.inputtable' : 'targetTable', |
|
82 |
'hbase.mapred.outputtable' : 'targetTable' |
|
35 | 83 |
} |
36 | 84 |
</PARAM> |
37 |
<PARAM required="true" type="string" name="sysParams" managedBy="system"> |
|
38 |
{ |
|
39 |
'hbase.mapred.inputtable' : 'hbase.mapred.datatable', |
|
40 |
'hbase.mapred.outputtable' : 'hbase.mapred.datatable' |
|
41 |
} |
|
42 |
</PARAM> |
|
43 | 85 |
</PARAMETERS> |
44 | 86 |
<ARCS> |
45 | 87 |
<ARC to="success" /> |
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/functionality/modular/ui/workflows/values/applicationContext-ui-openaireplus.xml | ||
---|---|---|
9 | 9 |
<bean id="listActionManagerSetsValues" |
10 | 10 |
class="eu.dnetlib.functionality.modular.ui.workflows.values.ListActionManagerSetsValues" |
11 | 11 |
p:name="actionSets" /> |
12 |
|
|
13 |
<bean id="listHBaseTables" |
|
14 |
class="eu.dnetlib.functionality.modular.ui.workflows.values.ListHBaseTables" |
|
15 |
p:name="hbaseTables"/> |
|
16 |
|
|
17 |
<bean id="listHadoopClusters" |
|
18 |
class="eu.dnetlib.functionality.modular.ui.workflows.values.ListHadoopClusters" |
|
19 |
p:name="hadoopClusters" /> |
|
12 | 20 |
|
13 | 21 |
</beans> |
Also available in: Unified diff
refactored submit job nodes, introducing AdminJobNodes