001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.oozie.action.hadoop.MapReduceActionExecutor;
022import org.apache.oozie.client.WorkflowAction;
023import org.apache.oozie.service.CallbackService;
024import org.apache.oozie.workflow.WorkflowInstance;
025import org.apache.oozie.service.Services;
026import org.apache.oozie.util.ELEvaluator;
027import org.apache.oozie.util.PropertiesUtils;
028import org.apache.oozie.util.XConfiguration;
029import org.apache.oozie.util.ParamChecker;
030import org.apache.oozie.util.XmlUtils;
031import org.jdom.JDOMException;
032import java.io.IOException;
033import java.io.StringReader;
034import java.util.Properties;
035import java.util.Map;
036
037/**
038 * DAG EL functions.
039 */
040public class DagELFunctions {
041
042    public static final String HADOOP_JOBS_PREFIX = "hadoopJobs:";
043    private static final String WORKFLOW = "oozie.el.workflow.bean";
044    private static final String ACTION = "oozie.el.action.bean";
045    private static final String ACTION_PROTO_CONF = "oozie.el.action.proto.conf";
046
047    private static final String LAST_ACTION_IN_ERROR = "oozie.el.last.action.in.error";
048
049    private static final String ACTION_DATA = "action.data";
050    private static final String ACTION_ERROR_CODE = "action.error.code";
051    private static final String ACTION_ERROR_MESSAGE = "action.error.message";
052    private static final String ACTION_EXTERNAL_ID = "action.external.id";
053    private static final String ACTION_TRACKER_URI = "action.tracker.uri";
054    private static final String ACTION_EXTERNAL_STATUS = "action.external.status";
055
056    public static void configureEvaluator(ELEvaluator evaluator, WorkflowJobBean workflow, WorkflowActionBean action) {
057        evaluator.setVariable(WORKFLOW, workflow);
058        evaluator.setVariable(ACTION, action);
059
060        for (Map.Entry<String, String> entry : workflow.getWorkflowInstance().getConf()) {
061            if (ParamChecker.isValidIdentifier(entry.getKey())) {
062                String value = entry.getValue().trim();
063                try {
064                    String valueElem = "<value>"+value+"</value>";
065                    XmlUtils.parseXml(valueElem);
066                }
067                catch (JDOMException ex) {
068                    // If happens, try escaping the characters for XML. The escaping may or
069                    // may not solve the problem since the JDOMException could be for a range of issues.
070                    value = XmlUtils.escapeCharsForXML(value);
071                }
072                evaluator.setVariable(entry.getKey().trim(), value);
073            }
074        }
075        try {
076            evaluator.setVariable(ACTION_PROTO_CONF,
077                                  new XConfiguration(new StringReader(workflow.getProtoActionConf())));
078        }
079        catch (IOException ex) {
080            throw new RuntimeException("It should not happen", ex);
081        }
082    }
083
084    public static WorkflowActionBean getAction() {
085        ELEvaluator eval = ELEvaluator.getCurrent();
086        return (WorkflowActionBean) eval.getVariable(ACTION);
087    }
088
089    public static WorkflowJobBean getWorkflow() {
090        ELEvaluator eval = ELEvaluator.getCurrent();
091        return (WorkflowJobBean) eval.getVariable(WORKFLOW);
092    }
093
094    public static Configuration getProtoActionConf() {
095        ELEvaluator eval = ELEvaluator.getCurrent();
096        return (Configuration) eval.getVariable(ACTION_PROTO_CONF);
097    }
098
099    public static void setActionInfo(WorkflowInstance workflowInstance, WorkflowAction action) {
100        if (action.getExternalId() != null) {
101            workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_ID,
102                                    action.getExternalId());
103        }
104        if (action.getTrackerUri() != null) {
105            workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_TRACKER_URI,
106                                    action.getTrackerUri());
107        }
108        if (action.getExternalStatus() != null) {
109            workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_STATUS,
110                                    action.getExternalStatus());
111        }
112        if (action.getData() != null) {
113            workflowInstance
114                    .setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_DATA, action.getData());
115        }
116        if (action.getExternalChildIDs() != null) {
117            workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_DATA,
118                    HADOOP_JOBS_PREFIX + action.getExternalChildIDs());
119        }
120        if (action.getStats() != null) {
121            workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + MapReduceActionExecutor.HADOOP_COUNTERS,
122                    action.getStats());
123        }
124        if (action.getErrorCode() != null) {
125            workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_CODE,
126                                    action.getErrorCode());
127        }
128        if (action.getErrorMessage() != null) {
129            workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_MESSAGE,
130                                    action.getErrorMessage());
131        }
132        if (action.getStatus() == WorkflowAction.Status.ERROR) {
133            workflowInstance.setVar(LAST_ACTION_IN_ERROR, action.getName());
134        }
135    }
136
137    /**
138     * Return the job Id.
139     *
140     * @return the job Id.
141     */
142    public static String wf_id() {
143        return getWorkflow().getId();
144    }
145
146    /**
147     * Return the application name.
148     *
149     * @return the application name.
150     */
151    public static String wf_name() {
152        return getWorkflow().getAppName();
153    }
154
155    /**
156     * Return the application path.
157     *
158     * @return the application path.
159     */
160    public static String wf_appPath() {
161        return getWorkflow().getAppPath();
162    }
163
164    /**
165     * Return a job configuration property.
166     *
167     * @param property property name.
168     * @return the value of the property, <code>null</code> if the property is undefined.
169     */
170    public static String wf_conf(String property) {
171        return getWorkflow().getWorkflowInstance().getConf().get(property);
172    }
173
174    /**
175     * Return the job owner user name.
176     *
177     * @return the job owner user name.
178     */
179    public static String wf_user() {
180        return getWorkflow().getUser();
181    }
182
183    /**
184     * Return the job owner group name.
185     *
186     * @return the job owner group name.
187     */
188    public static String wf_group() {
189        return getWorkflow().getGroup();
190    }
191
192    /**
193     * Create a callback URL for the current action.
194     *
195     * @param externalStatusVar variable for the caller to inject the external status.
196     * @return the callback URL for the current action.
197     */
198    public static String wf_callback(String externalStatusVar) {
199        return Services.get().get(CallbackService.class).createCallBackUrl(getAction().getId(), externalStatusVar);
200    }
201
202    /**
203     * Return the transition taken by a workflow job action/decision action.
204     *
205     * @param actionName action/decision action name.
206     * @return the transition taken, <code>null</code> if the action has not completed yet.
207     */
208    public static String wf_transition(String actionName) {
209        return getWorkflow().getWorkflowInstance().getTransition(actionName);
210    }
211
212    /**
213     * Return the name of the last action that ended in error.
214     *
215     * @return the name of the last action that ended in error, <code>null</code> if no action in the workflow job has
216     *         ended in error.
217     */
218    public static String wf_lastErrorNode() {
219        return getWorkflow().getWorkflowInstance().getVar(LAST_ACTION_IN_ERROR);
220    }
221
222    /**
223     * Return the error code for an action.
224     *
225     * @param actionName action name.
226     * @return the error code for the action, <code>null</code> if the action has not ended in error.
227     */
228    public static String wf_errorCode(String actionName) {
229        return getWorkflow().getWorkflowInstance()
230                .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_CODE);
231    }
232
233    /**
234     * Return the error message for an action.
235     *
236     * @param actionName action name.
237     * @return the error message for the action, <code>null</code> if the action has not ended in error.
238     */
239    public static String wf_errorMessage(String actionName) {
240        return getWorkflow().getWorkflowInstance()
241                .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_MESSAGE);
242    }
243
244    /**
245     * Return the workflow run number, unless a rerun it is always 1.
246     *
247     * @return the workflow run number, unless a rerun it is always 1.
248     */
249    public static int wf_run() {
250        return getWorkflow().getRun();
251    }
252
253    /**
254     * Return the action data for an action.
255     *
256     * @param actionName action name.
257     * @return value of the property.
258     */
259    @SuppressWarnings("unchecked")
260    public static Map<String, String> wf_actionData(String actionName) {
261        ELEvaluator eval = ELEvaluator.getCurrent();
262        Properties props = (Properties) eval.getVariable(actionName + ACTION_ERROR_MESSAGE);
263
264        if (props == null) {
265            String data = getWorkflow().getWorkflowInstance()
266                    .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_DATA);
267            if (data != null) {
268                props = PropertiesUtils.stringToProperties(data);
269            }
270            else {
271                props = new Properties();
272            }
273            eval.setVariable(actionName + ACTION_ERROR_MESSAGE, props);
274        }
275        return (Map<String, String>) (Map) props;
276    }
277
278    /**
279     * Return the external ID of an action.
280     *
281     * @param actionName action name.
282     * @return the external ID of an action.
283     */
284    public static String wf_actionExternalId(String actionName) {
285        return getWorkflow().getWorkflowInstance()
286                .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_ID);
287    }
288
289    /**
290     * Return the tracker URI of an action.
291     *
292     * @param actionName action name.
293     * @return the tracker URI of an action.
294     */
295    public static String wf_actionTrackerUri(String actionName) {
296        return getWorkflow().getWorkflowInstance()
297                .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_TRACKER_URI);
298    }
299
300    /**
301     * Return the action external status.
302     *
303     * @param actionName action/decision action name.
304     * @return the action external status.
305     */
306    public static String wf_actionExternalStatus(String actionName) {
307        return getWorkflow().getWorkflowInstance()
308                .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_STATUS);
309    }
310
311    public static String getActionVar(String actionName, String varName) {
312        return getWorkflow().getWorkflowInstance().getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + varName);
313    }
314
315}