Revision 39862
Added by Claudio Atzori about 9 years ago
modules/dnet-deduplication/trunk/src/main/java/eu/dnetlib/msro/workflows/hadoop/hbase/CreateHBaseTableJobNode.java | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import java.util.Set; |
4 | 4 |
|
5 |
import org.apache.commons.logging.Log; |
|
6 |
import org.apache.commons.logging.LogFactory; |
|
7 |
|
|
8 | 5 |
import com.googlecode.sarasvati.Arc; |
9 | 6 |
import com.googlecode.sarasvati.NodeToken; |
10 |
|
|
11 | 7 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
8 |
import eu.dnetlib.msro.rmi.MSROException; |
|
9 |
import org.apache.commons.lang.StringUtils; |
|
10 |
import org.apache.commons.logging.Log; |
|
11 |
import org.apache.commons.logging.LogFactory; |
|
12 | 12 |
|
13 | 13 |
public class CreateHBaseTableJobNode extends AbstractHBaseAdminJobNode { |
14 | 14 |
|
15 | 15 |
private static final Log log = LogFactory.getLog(CreateHBaseTableJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
16 | 16 |
|
17 |
private boolean reuseRegionInfo = false; |
|
18 |
|
|
17 | 19 |
@Override |
18 | 20 |
protected String execute(final NodeToken token) throws Exception { |
19 | 21 |
final Set<String> columns = getColumns(token); |
... | ... | |
22 | 24 |
|
23 | 25 |
log.info("Ensuring table " + tableName + " on cluster: '" + cluster + "' - columns: " + columns); |
24 | 26 |
|
25 |
getServiceLocator().getService(HadoopService.class).createHbaseTable(cluster, tableName, columns); |
|
27 |
if (isReuseRegionInfo()) { |
|
28 |
String jsonConf = token.getEnv().getAttribute(getTableConfigurationParamName()); |
|
29 |
if (StringUtils.isBlank(jsonConf)) { |
|
30 |
throw new MSROException("cannot find HBase table configuration in workflow env"); |
|
31 |
} |
|
26 | 32 |
|
33 |
getServiceLocator().getService(HadoopService.class).createConfiguredHbaseTable(cluster, tableName, jsonConf); |
|
34 |
} else { |
|
35 |
getServiceLocator().getService(HadoopService.class).createHbaseTable(cluster, tableName, columns); |
|
36 |
} |
|
27 | 37 |
return Arc.DEFAULT_ARC; |
28 | 38 |
} |
29 | 39 |
|
40 |
public boolean isReuseRegionInfo() { |
|
41 |
return reuseRegionInfo; |
|
42 |
} |
|
43 |
|
|
44 |
public void setReuseRegionInfo(final boolean reuseRegionInfo) { |
|
45 |
this.reuseRegionInfo = reuseRegionInfo; |
|
46 |
} |
|
30 | 47 |
} |
modules/dnet-deduplication/trunk/src/main/java/eu/dnetlib/msro/workflows/hadoop/hbase/AbstractHBaseAdminJobNode.java | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import java.util.Map.Entry; |
4 | 4 |
import java.util.Set; |
5 |
|
|
6 | 5 |
import javax.annotation.Resource; |
7 | 6 |
|
8 |
import org.apache.commons.lang.StringUtils; |
|
9 |
import org.apache.commons.logging.Log; |
|
10 |
import org.apache.commons.logging.LogFactory; |
|
11 |
|
|
12 | 7 |
import com.google.common.base.Joiner; |
13 | 8 |
import com.google.common.base.Splitter; |
14 | 9 |
import com.google.common.collect.Sets; |
15 | 10 |
import com.googlecode.sarasvati.NodeToken; |
16 |
|
|
17 | 11 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
18 | 12 |
import eu.dnetlib.msro.rmi.MSROException; |
19 | 13 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
14 |
import org.apache.commons.lang.StringUtils; |
|
15 |
import org.apache.commons.logging.Log; |
|
16 |
import org.apache.commons.logging.LogFactory; |
|
20 | 17 |
|
21 | 18 |
public abstract class AbstractHBaseAdminJobNode extends SimpleJobNode { |
22 | 19 |
|
... | ... | |
24 | 21 |
private static final Log log = LogFactory.getLog(AbstractHBaseAdminJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
25 | 22 |
|
26 | 23 |
private String tableColumnsParamName = "columns"; |
24 |
private String tableConfigurationParamName = "tableConf"; |
|
27 | 25 |
private String hbaseTableProperty; |
28 | 26 |
private String cluster; |
29 | 27 |
|
... | ... | |
94 | 92 |
return serviceLocator; |
95 | 93 |
} |
96 | 94 |
|
95 |
public String getTableConfigurationParamName() { |
|
96 |
return tableConfigurationParamName; |
|
97 |
} |
|
98 |
|
|
99 |
public void setTableConfigurationParamName(final String tableConfigurationParamName) { |
|
100 |
this.tableConfigurationParamName = tableConfigurationParamName; |
|
101 |
} |
|
97 | 102 |
} |
modules/dnet-deduplication/trunk/src/main/java/eu/dnetlib/msro/workflows/hadoop/hbase/ExistHBaseTableJobNode.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro.workflows.hadoop.hbase; |
2 | 2 |
|
3 |
import com.googlecode.sarasvati.NodeToken; |
|
4 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
|
3 | 5 |
import org.apache.commons.logging.Log; |
4 | 6 |
import org.apache.commons.logging.LogFactory; |
5 | 7 |
|
6 |
import com.googlecode.sarasvati.NodeToken; |
|
7 |
|
|
8 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
|
9 |
|
|
10 | 8 |
public class ExistHBaseTableJobNode extends AbstractHBaseAdminJobNode { |
11 | 9 |
|
12 | 10 |
private static final Log log = LogFactory.getLog(ExistHBaseTableJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
... | ... | |
26 | 24 |
|
27 | 25 |
log.info("table '" + tableName + "' exists: " + exists); |
28 | 26 |
|
27 |
if (exists) { |
|
28 |
final String tableDesc = getServiceLocator().getService(HadoopService.class).describeHBaseTableConfiguration(cluster, tableName); |
|
29 |
token.getEnv().setAttribute(getTableConfigurationParamName(), tableDesc); |
|
30 |
} |
|
31 |
|
|
29 | 32 |
return exists ? getExistOutNode() : getDontExistOutNode(); |
30 | 33 |
} |
31 | 34 |
|
modules/dnet-deduplication/trunk/src/main/resources/eu/dnetlib/test/profiles/meta/workflows/reset.hbase.xml | ||
---|---|---|
1 | 1 |
<?xml version="1.0" encoding="UTF-8"?> |
2 |
<RESOURCE_PROFILE xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
|
2 |
<RESOURCE_PROFILE> |
|
3 | 3 |
<HEADER> |
4 | 4 |
<RESOURCE_IDENTIFIER value="ce304c65-5836-4cf0-9a48-53472b9f6f36_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/> |
5 | 5 |
<RESOURCE_TYPE value="WorkflowDSResourceType"/> |
... | ... | |
27 | 27 |
<PARAM name="hbaseTableProperty" type="string" managedBy="system" required="true">hbase.mapred.datatable</PARAM> |
28 | 28 |
<PARAM name="cluster" type="string" managedBy="system" required="true">DM</PARAM> |
29 | 29 |
<PARAM name="tableColumnsParamName" type="string" managedBy="system" required="true">hTableColumns</PARAM> |
30 |
<PARAM name="tableConfigurationParamName" type="string" managedBy="system" required="true">tableConf</PARAM> |
|
30 | 31 |
<PARAM name="existOutNode" type="string" managedBy="system" required="true">drop</PARAM> |
31 | 32 |
<PARAM name="dontExistOutNode" type="string" required="true" managedBy="system">define</PARAM> |
32 | 33 |
</PARAMETERS> |
... | ... | |
62 | 63 |
<PARAM name="hbaseTableProperty" type="string" managedBy="system" required="true">hbase.mapred.datatable</PARAM> |
63 | 64 |
<PARAM name="cluster" type="string" managedBy="system" required="true">DM</PARAM> |
64 | 65 |
<PARAM name="tableColumnsParamName" type="string" managedBy="system" required="true">hTableColumns</PARAM> |
66 |
<PARAM name="tableConfigurationParamName" type="string" managedBy="system" required="true">tableConf</PARAM> |
|
67 |
<PARAM name="reuseRegionInfo" type="boolean" managedBy="user" required="true">true</PARAM> |
|
65 | 68 |
</PARAMETERS> |
66 | 69 |
<ARCS> |
67 | 70 |
<ARC to="success" /> |
... | ... | |
70 | 73 |
</CONFIGURATION> |
71 | 74 |
<STATUS /> |
72 | 75 |
</BODY> |
73 |
</RESOURCE_PROFILE> |
|
76 |
</RESOURCE_PROFILE> |
Also available in: Unified diff
create hbase table considers the existing region splits