001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.oozie.client.WorkflowAction;
022    import org.apache.oozie.service.CallbackService;
023    import org.apache.oozie.workflow.WorkflowInstance;
024    import org.apache.oozie.service.Services;
025    import org.apache.oozie.util.ELEvaluator;
026    import org.apache.oozie.util.PropertiesUtils;
027    import org.apache.oozie.util.XConfiguration;
028    import org.apache.oozie.util.ParamChecker;
029    import org.apache.oozie.util.XmlUtils;
030    import org.jdom.JDOMException;
031    import java.io.IOException;
032    import java.io.StringReader;
033    import java.util.Properties;
034    import java.util.Map;
035    
036    /**
037     * DAG EL functions.
038     */
039    public class DagELFunctions {
040    
041        private static final String WORKFLOW = "oozie.el.workflow.bean";
042        private static final String ACTION = "oozie.el.action.bean";
043        private static final String ACTION_PROTO_CONF = "oozie.el.action.proto.conf";
044    
045        private static final String LAST_ACTION_IN_ERROR = "oozie.el.last.action.in.error";
046    
047        private static final String ACTION_DATA = "action.data";
048        private static final String ACTION_ERROR_CODE = "action.error.code";
049        private static final String ACTION_ERROR_MESSAGE = "action.error.message";
050        private static final String ACTION_EXTERNAL_ID = "action.external.id";
051        private static final String ACTION_TRACKER_URI = "action.tracker.uri";
052        private static final String ACTION_EXTERNAL_STATUS = "action.external.status";
053    
054        public static void configureEvaluator(ELEvaluator evaluator, WorkflowJobBean workflow, WorkflowActionBean action) {
055            evaluator.setVariable(WORKFLOW, workflow);
056            evaluator.setVariable(ACTION, action);
057    
058            for (Map.Entry<String, String> entry : workflow.getWorkflowInstance().getConf()) {
059                if (ParamChecker.isValidIdentifier(entry.getKey())) {
060                    String value = entry.getValue().trim();
061                    try {
062                        String valueElem = "<value>"+value+"</value>";
063                        XmlUtils.parseXml(valueElem);
064                    }
065                    catch (JDOMException ex) {
066                        // If happens, try escaping the characters for XML. The escaping may or
067                        // may not solve the problem since the JDOMException could be for a range of issues.
068                        value = XmlUtils.escapeCharsForXML(value);
069                    }
070                    evaluator.setVariable(entry.getKey().trim(), value);
071                }
072            }
073            try {
074                evaluator.setVariable(ACTION_PROTO_CONF,
075                                      new XConfiguration(new StringReader(workflow.getProtoActionConf())));
076            }
077            catch (IOException ex) {
078                throw new RuntimeException("It should not happen", ex);
079            }
080        }
081    
082        public static WorkflowActionBean getAction() {
083            ELEvaluator eval = ELEvaluator.getCurrent();
084            return (WorkflowActionBean) eval.getVariable(ACTION);
085        }
086    
087        public static WorkflowJobBean getWorkflow() {
088            ELEvaluator eval = ELEvaluator.getCurrent();
089            return (WorkflowJobBean) eval.getVariable(WORKFLOW);
090        }
091    
092        public static Configuration getProtoActionConf() {
093            ELEvaluator eval = ELEvaluator.getCurrent();
094            return (Configuration) eval.getVariable(ACTION_PROTO_CONF);
095        }
096    
097        public static void setActionInfo(WorkflowInstance workflowInstance, WorkflowAction action) {
098            if (action.getExternalId() != null) {
099                workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_ID,
100                                        action.getExternalId());
101            }
102            if (action.getTrackerUri() != null) {
103                workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_TRACKER_URI,
104                                        action.getTrackerUri());
105            }
106            if (action.getExternalStatus() != null) {
107                workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_STATUS,
108                                        action.getExternalStatus());
109            }
110            if (action.getData() != null) {
111                workflowInstance
112                        .setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_DATA, action.getData());
113            }
114            if (action.getErrorCode() != null) {
115                workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_CODE,
116                                        action.getErrorCode());
117            }
118            if (action.getErrorMessage() != null) {
119                workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_MESSAGE,
120                                        action.getErrorMessage());
121            }
122            if (action.getStatus() == WorkflowAction.Status.ERROR) {
123                workflowInstance.setVar(LAST_ACTION_IN_ERROR, action.getName());
124            }
125        }
126    
127        /**
128         * Return the job Id.
129         *
130         * @return the job Id.
131         */
132        public static String wf_id() {
133            return getWorkflow().getId();
134        }
135    
136        /**
137         * Return the application name.
138         *
139         * @return the application name.
140         */
141        public static String wf_name() {
142            return getWorkflow().getAppName();
143        }
144    
145        /**
146         * Return the application path.
147         *
148         * @return the application path.
149         */
150        public static String wf_appPath() {
151            return getWorkflow().getAppPath();
152        }
153    
154        /**
155         * Return a job configuration property.
156         *
157         * @param property property name.
158         * @return the value of the property, <code>null</code> if the property is undefined.
159         */
160        public static String wf_conf(String property) {
161            return getWorkflow().getWorkflowInstance().getConf().get(property);
162        }
163    
164        /**
165         * Return the job owner user name.
166         *
167         * @return the job owner user name.
168         */
169        public static String wf_user() {
170            return getWorkflow().getUser();
171        }
172    
173        /**
174         * Return the job owner group name.
175         *
176         * @return the job owner group name.
177         */
178        public static String wf_group() {
179            return getWorkflow().getGroup();
180        }
181    
182        /**
183         * Create a callback URL for the current action.
184         *
185         * @param externalStatusVar variable for the caller to inject the external status.
186         * @return the callback URL for the current action.
187         */
188        public static String wf_callback(String externalStatusVar) {
189            return Services.get().get(CallbackService.class).createCallBackUrl(getAction().getId(), externalStatusVar);
190        }
191    
192        /**
193         * Return the transition taken by a workflow job action/decision action.
194         *
195         * @param actionName action/decision action name.
196         * @return the transition taken, <code>null</code> if the action has not completed yet.
197         */
198        public static String wf_transition(String actionName) {
199            return getWorkflow().getWorkflowInstance().getTransition(actionName);
200        }
201    
202        /**
203         * Return the name of the last action that ended in error.
204         *
205         * @return the name of the last action that ended in error, <code>null</code> if no action in the workflow job has
206         *         ended in error.
207         */
208        public static String wf_lastErrorNode() {
209            return getWorkflow().getWorkflowInstance().getVar(LAST_ACTION_IN_ERROR);
210        }
211    
212        /**
213         * Return the error code for an action.
214         *
215         * @param actionName action name.
216         * @return the error code for the action, <code>null</code> if the action has not ended in error.
217         */
218        public static String wf_errorCode(String actionName) {
219            return getWorkflow().getWorkflowInstance()
220                    .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_CODE);
221        }
222    
223        /**
224         * Return the error message for an action.
225         *
226         * @param actionName action name.
227         * @return the error message for the action, <code>null</code> if the action has not ended in error.
228         */
229        public static String wf_errorMessage(String actionName) {
230            return getWorkflow().getWorkflowInstance()
231                    .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_MESSAGE);
232        }
233    
234        /**
235         * Return the workflow run number, unless a rerun it is always 1.
236         *
237         * @return the workflow run number, unless a rerun it is always 1.
238         */
239        public static int wf_run() {
240            return getWorkflow().getRun();
241        }
242    
243        /**
244         * Return the action data for an action.
245         *
246         * @param actionName action name.
247         * @return value of the property.
248         */
249        @SuppressWarnings("unchecked")
250        public static Map<String, String> wf_actionData(String actionName) {
251            ELEvaluator eval = ELEvaluator.getCurrent();
252            Properties props = (Properties) eval.getVariable(actionName + ACTION_ERROR_MESSAGE);
253    
254            if (props == null) {
255                String data = getWorkflow().getWorkflowInstance()
256                        .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_DATA);
257                if (data != null) {
258                    props = PropertiesUtils.stringToProperties(data);
259                }
260                else {
261                    props = new Properties();
262                }
263                eval.setVariable(actionName + ACTION_ERROR_MESSAGE, props);
264            }
265            return (Map<String, String>) (Map) props;
266        }
267    
268        /**
269         * Return the external ID of an action.
270         *
271         * @param actionName action name.
272         * @return the external ID of an action.
273         */
274        public static String wf_actionExternalId(String actionName) {
275            return getWorkflow().getWorkflowInstance()
276                    .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_ID);
277        }
278    
279        /**
280         * Return the tracker URI of an action.
281         *
282         * @param actionName action name.
283         * @return the tracker URI of an action.
284         */
285        public static String wf_actionTrackerUri(String actionName) {
286            return getWorkflow().getWorkflowInstance()
287                    .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_TRACKER_URI);
288        }
289    
290        /**
291         * Return the action external status.
292         *
293         * @param actionName action/decision action name.
294         * @return the action external status.
295         */
296        public static String wf_actionExternalStatus(String actionName) {
297            return getWorkflow().getWorkflowInstance()
298                    .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_STATUS);
299        }
300    
301        public static String getActionVar(String actionName, String varName) {
302            return getWorkflow().getWorkflowInstance().getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + varName);
303        }
304    
305    }