Creating a D-Net Workflow¶
Typologies of workflows:- collection workflows: operate the systematic collection of metadata or files from data sources;
- processing workflows: implement the sequence of steps needed to stage collected metadata (or files) and form the target Information Spaces
Prerequisites¶
In order to implement workflow nodes you need to depend on the maven artifact dnet-msro-service
providing the building blocks. Currently the latest release is [3.0.0]
, and this module is work in progress.
<dependency> <groupId>eu.dnetlib</groupId> <artifactId>dnet-msro-service</artifactId> <version>[3.0.0,4.0.0)</version> </dependency>
Definition of workflow¶
The definition of a processing workflow basically consists in the creation of (at least) 2 D-Net profiles:- One (or more) workflow (
WorkflowDSResourceType
) : it defines the sequence of steps to execute (in sequence or in parallel). - One meta-workflow (
MetaWorkflowDSResourceType
): it describes a set of related workflows that can run in parallel or in sequence to accomplish a high-level functionality.- For example, in order to export the Information Space via OAI-PMH we have first to feed the OAI-PMH back-end (workflow 1: "OAI store feed"), then we can configure the back-end with the needed indexes (workflow 2: "OAI Post feed").
Workflow definition¶
Let's see a simple example of a workflow with only one step that performs a sleep:
<?xml version="1.0" encoding="UTF-8"?> <RESOURCE_PROFILE xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <HEADER> <RESOURCE_IDENTIFIER value="wfUUID_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/> <RESOURCE_TYPE value="WorkflowDSResourceType"/> <RESOURCE_KIND value="WorkflowDSResources"/> <RESOURCE_URI value=""/> <DATE_OF_CREATION value="2015-02-06T11:13:51.0Z"/> </HEADER> <BODY> <WORKFLOW_NAME>Sleeping Beauty</WORKFLOW_NAME> <WORKFLOW_TYPE>Tutorial</WORKFLOW_TYPE> <WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY> <CONFIGURATION start="manual"> <NODE name="sleep" isStart="true" type="Sleep" > <DESCRIPTION>Sleep for a while</DESCRIPTION> <PARAMETERS> <PARAM name="sleepTimeMs" managedBy="user" type="int" required="true">1000</PARAM> </PARAMETERS> <ARCS> <ARC to="success"/> </ARCS> </NODE> </CONFIGURATION> <STATUS/> </BODY> </RESOURCE_PROFILE>The
CONFIGURATION
element of the profile tells us that the workflow is composed of one step.The step is defined by the
NODE
element:
name="sleep"
: the name that will appear in the UI when showing the workflowisStart="true"
: marks the node as starting node. Each workflow must have at least one start nodetype="Sleep"
: the type of the node. Through this value we refer to a Java bean with namewfNodeSleep
, whose class implements the business logic. Therefore we expect a bean like the following to be defined in an application-context XML:<bean id="wfNodeSleep" class="eu.dnetlib.msro.tutorial.SleepJobNode" scope="prototype"/>
Note that the attributescope="prototype"
is important because it tells the Spring framework to create a new bean instance of the object every time a request for that specific bean is made.
We'll see how the actual classSleepJobNode
in a while.DESCRIPTION
: brief description of the logics of the node. It will appear in the UI when showing the workflowPARAMETERS
: parameters needed by the node. A parameter (PARAM
), has the follwoing attributes:name
andtype
: name and type of the param. The Java class must define a property (accessible via public getter/setter) with the same name and type.required
: tells if the param is required or not. If it is, then the workflow can be run only if the param is set.managedBy
: who's in charge of setting the value. Allowed values are:user
: the UI will enable user to set the valuesystem
: the UI will not enable the user to set the value, as the system is expected to set the parameter automatically (more on this in a while)- More attributes are available. TODO: add other optional attributes for parameter (e.g.
function
)
ARCS
: which node must be executed after this node. In the example, we simply go to thesuccess
node, a pre-defined node that completes the workflow successfully. If there is an exception in one node, the pre-definedfailure
node is executed automatically.
JobNode types¶
The D-Net workflow engine is based on Sarasvati and offers three JobNode super classes to choose from, depending on the specific workflow requirements. Every jobNode extends the Sarasvati class
com.googlecode.sarasvati.mem.MemNode
and defines the following abstract method to override
abstract protected String execute(final NodeToken token) throws Exception;
that must return the name of the arc to follow.
Through this mechanism you can guide the workflow to execute a path or another depending on what's happening during the execution of the current step.
The D-Net workflow engine offers three JobNode super classes to choose from, depending on the specific workflow requirements:
eu.dnetlib.msro.workflows.nodes.SimpleJobNode
Basic workflow node implementation. The execution is synchronous, and it runs the execute
method in the main thread. The use of this kind of job nodes is suited for short lasting synchronous service calls, e.g. isLookup, mapped resultsets. This node is synchronous with sarasvati, which won't be able to execute in parallel more than one instance of this nodes at the same time. To overcome this limitation, use the following node type.
eu.dnetlib.msro.workflows.nodes.AsyncJobNode
The AsyncJob nodes delegates the execution of the execute
method in a separated thread. This implies that the jobNode is decoupled from sarasvati, allowing parallel execution of different AsyncJobNodes at the same time. The workflow doesn't advance until the delegated thread running the execute
method doesn't complete.
eu.dnetlib.msro.workflows.nodes.BlackboardJobNode
The BlackboardJobNode allows to send blackboard messages to D-Net services implementing the blackboard protocol, and differently from the previous two, exposes a different signature:
abstract protected String obtainServiceId(NodeToken token); abstract protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception;
JobNode examples¶
Now let's see how the business logic of the node is implemented in the Java class eu.dnetlib.msro.tutorial.SleepJobNode
:
- sleep and go to success if sleepTimeMs < 2000
- sleep and go to failure if sleepTimeMs >= 2000
public class SleepJobNode extends SimpleJobNode{ private int sleepTimeMs; @Override protected String execute(final NodeToken token) throws Exception { //removed exception handling for readability Thread.sleep(sleepTimeMs); if(sleepTimeMs < 2000) return "ok"; else return "oops"; } public int getSleepTimeMs(){ return sleepTimeMs;} public void setSleepTimeMs(int time){ sleepTimeMs = time;} }
and the workflow should be adapted to consider the two different arcs: "ok" and "oops"
<NODE name="sleep" isStart="true" type="Sleep" > <DESCRIPTION>Sleep for a while</DESCRIPTION> <PARAMETERS> <PARAM name="sleepTimeMs" managedBy="user" type="int" required="true">1000</PARAM> </PARAMETERS> <ARCS> <ARC name="ok" to="success"/> <ARC name="oops" to="failure"/> </ARCS> </NODE>
Extending BlackboardJobNode¶
D-Net services can implement the BlackBoard (BB) protocol, which allows them to receive messages.If a message is known to a service, the service will execute the corresponding action and notify the caller about its execution status by updating the blackboard in the service profile. The management of reading/writing BB messages is implemented in the
BlackboardJobNode
class, so that you only have to:
- Specify the profile id of the service you want to call by overriding the method
abstract protected String obtainServiceId(NodeToken token);
- Prepare the BB message by overriding the method
abstract protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception;
Let's suppose we have a service accepting the BB message "SLEEP" that requires a parameter named sleepTimeMs
.
Our SleepJobNode
class becomes:
public class SleepJobNode extends BlackboardJobNode{ private String xqueryForServiceId; private int sleepTimeMs; @Override protected String obtainServiceId(final NodeToken token) { return getServiceLocator().getService(ISLookUpService.class).getResourceProfileByQuery(xqueryForServiceId); } @Override protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception { job.setAction("SLEEP"); job.getParameters().put("sleepTimeMs", sleepTimeMs); } public int getSleepTimeMs(){ return sleepTimeMs;} public void setSleepTimeMs(int time){ sleepTimeMs = time;} public String getXqueryForServiceId() { return xqueryForServiceId;} public void setXqueryForServiceId(final String xqueryForServiceId) { this.xqueryForServiceId = xqueryForServiceId;} }
And the workflow node must declare a new parameter for the xqueryForServiceId
:
<NODE name="sleep" isStart="true" type="Sleep"> <DESCRIPTION>Sleep for a while</DESCRIPTION> <PARAMETERS> <PARAM name="sleepTimeMs" managedBy="user" type="int" required="true">1000</PARAM> <PARAM name="xqueryForServiceId" managedBy="user" type="string" required="true">/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='SleepServiceResourceType']/HEADER/RESOURCE_IDENTIFIER/@value/string()</PARAM> </PARAMETERS> <ARCS> <ARC to="success"/> </ARCS> </NODE>
But what if one of your params shuld not be set by an end-user but it should rather be picked from the workflow env?
Let's suppose this is the case for the xquery. Then the SleepJobNode class should expect to receive the name of the env parameter where to look for the actual xquery to use:
public class SleepJobNode extends BlackboardJobNode{ private String xqueryForServiceIdParam; . . . @Override protected String obtainServiceId(final NodeToken token) { final String xquery = token.getEnv().getAttribute(getXqueryForServiceIdParam()); return getServiceLocator().getService(ISLookUpService.class).getResourceProfileByQuery(xquery); } . . .
<NODE name="sleep" isStart="true" type="Sleep"> <DESCRIPTION>Sleep for a while</DESCRIPTION> <PARAMETERS> <PARAM name="sleepTimeMs" managedBy="user" type="int" required="true">1000</PARAM> <PARAM managedBy="system" name="xqueryForServiceIdParam" required="true" type="string">xqueryForServiceId_in_env</PARAM> </PARAMETERS> <ARCS> <ARC to="success"/> </ARCS> </NODE>
Special parameters¶
Note that in case of parameter name clashes, the order is sysparams < envparams < params
, thus XML node params override params in the env, which in turns override sys params.
envParams¶
envParams
is a special parameter that allows you to map one value of the wf env to another parameter of the same wf env, allowing to wave parameters from node to node.
This can be useful when different nodes of the same workflow want to use the same value but they are coded to read it from different env parameters.
Example:
<PARAM required="true" type="string" name="envParams" managedBy="system"> { 'path' : 'rottenRecordsPath' } </PARAM>
The value of
envParams
is a list of parameter mappings. In the example we have 1 mapping: the value in the env parameter named "rottenRecordsPath" is copied into another env parameter named "path".Given the above configuration, a node will be able to find the value it needs in the expected env parameter ("path").
sysParams¶
sysParams
is a special parameter that allows you to set a parameter with a value coming from a system property (those you set in *.properties files).
Example:
<PARAM required="true" type="string" name="sysParams" managedBy="system"> { 'hbase.mapred.inputtable' : 'hbase.mapred.datatable', 'hbase.mapreduce.inputtable' : 'hbase.mapred.datatable' } </PARAM>
The value of
sysParams
is a list of parameter mappings. In the example we have 2 mapping: the value of the system property "hbase.mapred.datatable" will be copied into two new parameters: "hbase.mapred.inputtable" and "hbase.mapreduce.inputtable".
Re-using existing job node beans¶
Often you will need generic nodes in your workflow that have already been implemented and used.
before you implement a new node for a generic feature, it is better to check here on where is my bean and filter by "wfNode".
where is my bean will tell you which module and which application context defines the bean you need.
Consider that the service looks into the whole dnet40/modules
svn directory, hence you will usually find several entries for the same bean.
Typically, you'll need beans located in dnet-msro-service
and dnet-*-workflows
(e.g. dnet-opnaireplus-workflows
for the OpenAIRE project).
CNR suggests you to check out those modules from svn so you can quickly refer to node implementations, if you need it.
Meta-Workflow definition¶
A meta-workflow implements a high-level functionality by composing workflows.
In the simplest scenario, one meta-workflow refers to only one workflow, as follows:
<RESOURCE_PROFILE> <HEADER> <RESOURCE_IDENTIFIER value="metaWF-UUID_TWV0YVdvcmtmbG93RFNSZXNvdXJjZXMvTWV0YVdvcmtmbG93RFNSZXNvdXJjZVR5cGU="/> <RESOURCE_TYPE value="MetaWorkflowDSResourceType"/> <RESOURCE_KIND value="MetaWorkflowDSResources"/> <RESOURCE_URI value=""/> <DATE_OF_CREATION value="2006-05-04T18:13:51.0Z"/> </HEADER> <BODY> <METAWORKFLOW_NAME family="tutorial">Sleeping</METAWORKFLOW_NAME> <METAWORKFLOW_DESCRIPTION>A sample meta-wf</METAWORKFLOW_DESCRIPTION> <METAWORKFLOW_SECTION>Tutorial</METAWORKFLOW_SECTION> <ADMIN_EMAIL>wf-admin-1@wf.com,wf-admin-2@wf.com</ADMIN_EMAIL> <CONFIGURATION status="EXECUTABLE"> <WORKFLOW id="wf-profileID" name="Sleeping Beauty"/> </CONFIGURATION> <SCHEDULING enabled="false"> <CRON>29 5 22 ? * *</CRON> <MININTERVAL>10080</MININTERVAL> </SCHEDULING> </BODY> </RESOURCE_PROFILE>Let's have a look at the information in the
BODY
element:
METAWORKFLOW_NAME
,family
,METAWORKFLOW_DESCRIPTION
, andMETAWORKFLOW_SECTION
, : guide the UI for a correct visualizationADMIN_EMAIL
: comma separated list of emails. Put here the emails of people that want to receive emails about the failure and completion of the workflows included in this meta-workflow.WORKFLOW
: reference to an existing workflowSCHEDULING
:enabled
: set to true if you want the meta-workflow to run automatically according to the given scheduleCRON
: cron expression to schedule the meta-workflow executionMININTERVAL
: milliseconds (?) between two subsequent runs of the meta-workflow.
More complex meta-workflows may include different workflows running in sequence, in parallel, or a mix of them.
The data provision meta-workflow of OpenAIRE is a good example of a complex meta-workflow. It composes a total of 10 different workflows, where workflows profileID2
, profileID3
, profileID4
, profileID5
runs in parallel. Note that the parallel/sequence composition is defined by the level of nesting of the WORKFLOW
XML element.
<CONFIGURATION status="EXECUTABLE"> <WORKFLOW id="profileID1" name="reset"> <WORKFLOW id="profileID2" name="db2hbase"/> <WORKFLOW id="profileID3" name="oaf2hbase"/> <WORKFLOW id="profileID4" name="odf2hbase"/> <WORKFLOW id="profileID5" name="actions2hbase"> <WORKFLOW id="profileID6" name="deduplication organizations"> <WORKFLOW id="profileID7" name="deduplication persons"> <WORKFLOW id="profileID8" name="deduplication results"> <WORKFLOW id="profileID9" name="update provision"> <WORKFLOW id="profileID10" name="prepare pre-public info space"/> </WORKFLOW> </WORKFLOW> </WORKFLOW> </WORKFLOW> </WORKFLOW> </WORKFLOW> </CONFIGURATION>
Workflow use case: metadata aggregation¶
COMING SOON
Updated by Claudio Atzori almost 10 years ago · 16 revisions