001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.oozie.action.hadoop.MapReduceActionExecutor;
022    import org.apache.oozie.client.WorkflowAction;
023    import org.apache.oozie.service.CallbackService;
024    import org.apache.oozie.workflow.WorkflowInstance;
025    import org.apache.oozie.service.Services;
026    import org.apache.oozie.util.ELEvaluator;
027    import org.apache.oozie.util.PropertiesUtils;
028    import org.apache.oozie.util.XConfiguration;
029    import org.apache.oozie.util.ParamChecker;
030    import org.apache.oozie.util.XmlUtils;
031    import org.jdom.JDOMException;
032    import java.io.IOException;
033    import java.io.StringReader;
034    import java.util.Properties;
035    import java.util.Map;
036    
037    /**
038     * DAG EL functions.
039     */
040    public class DagELFunctions {
041    
042        public static final String HADOOP_JOBS_PREFIX = "hadoopJobs:";
043        private static final String WORKFLOW = "oozie.el.workflow.bean";
044        private static final String ACTION = "oozie.el.action.bean";
045        private static final String ACTION_PROTO_CONF = "oozie.el.action.proto.conf";
046    
047        private static final String LAST_ACTION_IN_ERROR = "oozie.el.last.action.in.error";
048    
049        private static final String ACTION_DATA = "action.data";
050        private static final String ACTION_ERROR_CODE = "action.error.code";
051        private static final String ACTION_ERROR_MESSAGE = "action.error.message";
052        private static final String ACTION_EXTERNAL_ID = "action.external.id";
053        private static final String ACTION_TRACKER_URI = "action.tracker.uri";
054        private static final String ACTION_EXTERNAL_STATUS = "action.external.status";
055    
056        public static void configureEvaluator(ELEvaluator evaluator, WorkflowJobBean workflow, WorkflowActionBean action) {
057            evaluator.setVariable(WORKFLOW, workflow);
058            evaluator.setVariable(ACTION, action);
059    
060            for (Map.Entry<String, String> entry : workflow.getWorkflowInstance().getConf()) {
061                if (ParamChecker.isValidIdentifier(entry.getKey())) {
062                    String value = entry.getValue().trim();
063                    try {
064                        String valueElem = "<value>"+value+"</value>";
065                        XmlUtils.parseXml(valueElem);
066                    }
067                    catch (JDOMException ex) {
068                        // If happens, try escaping the characters for XML. The escaping may or
069                        // may not solve the problem since the JDOMException could be for a range of issues.
070                        value = XmlUtils.escapeCharsForXML(value);
071                    }
072                    evaluator.setVariable(entry.getKey().trim(), value);
073                }
074            }
075            try {
076                evaluator.setVariable(ACTION_PROTO_CONF,
077                                      new XConfiguration(new StringReader(workflow.getProtoActionConf())));
078            }
079            catch (IOException ex) {
080                throw new RuntimeException("It should not happen", ex);
081            }
082        }
083    
084        public static WorkflowActionBean getAction() {
085            ELEvaluator eval = ELEvaluator.getCurrent();
086            return (WorkflowActionBean) eval.getVariable(ACTION);
087        }
088    
089        public static WorkflowJobBean getWorkflow() {
090            ELEvaluator eval = ELEvaluator.getCurrent();
091            return (WorkflowJobBean) eval.getVariable(WORKFLOW);
092        }
093    
094        public static Configuration getProtoActionConf() {
095            ELEvaluator eval = ELEvaluator.getCurrent();
096            return (Configuration) eval.getVariable(ACTION_PROTO_CONF);
097        }
098    
099        public static void setActionInfo(WorkflowInstance workflowInstance, WorkflowAction action) {
100            if (action.getExternalId() != null) {
101                workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_ID,
102                                        action.getExternalId());
103            }
104            if (action.getTrackerUri() != null) {
105                workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_TRACKER_URI,
106                                        action.getTrackerUri());
107            }
108            if (action.getExternalStatus() != null) {
109                workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_STATUS,
110                                        action.getExternalStatus());
111            }
112            if (action.getData() != null) {
113                workflowInstance
114                        .setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_DATA, action.getData());
115            }
116            if (action.getExternalChildIDs() != null) {
117                workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_DATA,
118                        HADOOP_JOBS_PREFIX + action.getExternalChildIDs());
119            }
120            if (action.getStats() != null) {
121                workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + MapReduceActionExecutor.HADOOP_COUNTERS,
122                        action.getStats());
123            }
124            if (action.getErrorCode() != null) {
125                workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_CODE,
126                                        action.getErrorCode());
127            }
128            if (action.getErrorMessage() != null) {
129                workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_MESSAGE,
130                                        action.getErrorMessage());
131            }
132            if (action.getStatus() == WorkflowAction.Status.ERROR) {
133                workflowInstance.setVar(LAST_ACTION_IN_ERROR, action.getName());
134            }
135        }
136    
137        /**
138         * Return the job Id.
139         *
140         * @return the job Id.
141         */
142        public static String wf_id() {
143            return getWorkflow().getId();
144        }
145    
146        /**
147         * Return the application name.
148         *
149         * @return the application name.
150         */
151        public static String wf_name() {
152            return getWorkflow().getAppName();
153        }
154    
155        /**
156         * Return the application path.
157         *
158         * @return the application path.
159         */
160        public static String wf_appPath() {
161            return getWorkflow().getAppPath();
162        }
163    
164        /**
165         * Return a job configuration property.
166         *
167         * @param property property name.
168         * @return the value of the property, <code>null</code> if the property is undefined.
169         */
170        public static String wf_conf(String property) {
171            return getWorkflow().getWorkflowInstance().getConf().get(property);
172        }
173    
174        /**
175         * Return the job owner user name.
176         *
177         * @return the job owner user name.
178         */
179        public static String wf_user() {
180            return getWorkflow().getUser();
181        }
182    
183        /**
184         * Return the job owner group name.
185         *
186         * @return the job owner group name.
187         */
188        public static String wf_group() {
189            return getWorkflow().getGroup();
190        }
191    
192        /**
193         * Create a callback URL for the current action.
194         *
195         * @param externalStatusVar variable for the caller to inject the external status.
196         * @return the callback URL for the current action.
197         */
198        public static String wf_callback(String externalStatusVar) {
199            return Services.get().get(CallbackService.class).createCallBackUrl(getAction().getId(), externalStatusVar);
200        }
201    
202        /**
203         * Return the transition taken by a workflow job action/decision action.
204         *
205         * @param actionName action/decision action name.
206         * @return the transition taken, <code>null</code> if the action has not completed yet.
207         */
208        public static String wf_transition(String actionName) {
209            return getWorkflow().getWorkflowInstance().getTransition(actionName);
210        }
211    
212        /**
213         * Return the name of the last action that ended in error.
214         *
215         * @return the name of the last action that ended in error, <code>null</code> if no action in the workflow job has
216         *         ended in error.
217         */
218        public static String wf_lastErrorNode() {
219            return getWorkflow().getWorkflowInstance().getVar(LAST_ACTION_IN_ERROR);
220        }
221    
222        /**
223         * Return the error code for an action.
224         *
225         * @param actionName action name.
226         * @return the error code for the action, <code>null</code> if the action has not ended in error.
227         */
228        public static String wf_errorCode(String actionName) {
229            return getWorkflow().getWorkflowInstance()
230                    .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_CODE);
231        }
232    
233        /**
234         * Return the error message for an action.
235         *
236         * @param actionName action name.
237         * @return the error message for the action, <code>null</code> if the action has not ended in error.
238         */
239        public static String wf_errorMessage(String actionName) {
240            return getWorkflow().getWorkflowInstance()
241                    .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_MESSAGE);
242        }
243    
244        /**
245         * Return the workflow run number, unless a rerun it is always 1.
246         *
247         * @return the workflow run number, unless a rerun it is always 1.
248         */
249        public static int wf_run() {
250            return getWorkflow().getRun();
251        }
252    
253        /**
254         * Return the action data for an action.
255         *
256         * @param actionName action name.
257         * @return value of the property.
258         */
259        @SuppressWarnings("unchecked")
260        public static Map<String, String> wf_actionData(String actionName) {
261            ELEvaluator eval = ELEvaluator.getCurrent();
262            Properties props = (Properties) eval.getVariable(actionName + ACTION_ERROR_MESSAGE);
263    
264            if (props == null) {
265                String data = getWorkflow().getWorkflowInstance()
266                        .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_DATA);
267                if (data != null) {
268                    props = PropertiesUtils.stringToProperties(data);
269                }
270                else {
271                    props = new Properties();
272                }
273                eval.setVariable(actionName + ACTION_ERROR_MESSAGE, props);
274            }
275            return (Map<String, String>) (Map) props;
276        }
277    
278        /**
279         * Return the external ID of an action.
280         *
281         * @param actionName action name.
282         * @return the external ID of an action.
283         */
284        public static String wf_actionExternalId(String actionName) {
285            return getWorkflow().getWorkflowInstance()
286                    .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_ID);
287        }
288    
289        /**
290         * Return the tracker URI of an action.
291         *
292         * @param actionName action name.
293         * @return the tracker URI of an action.
294         */
295        public static String wf_actionTrackerUri(String actionName) {
296            return getWorkflow().getWorkflowInstance()
297                    .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_TRACKER_URI);
298        }
299    
300        /**
301         * Return the action external status.
302         *
303         * @param actionName action/decision action name.
304         * @return the action external status.
305         */
306        public static String wf_actionExternalStatus(String actionName) {
307            return getWorkflow().getWorkflowInstance()
308                    .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_STATUS);
309        }
310    
311        public static String getActionVar(String actionName, String varName) {
312            return getWorkflow().getWorkflowInstance().getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + varName);
313        }
314    
315    }