This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.oozie.action.hadoop.MapReduceActionExecutor;
022 import org.apache.oozie.client.WorkflowAction;
023 import org.apache.oozie.service.CallbackService;
024 import org.apache.oozie.workflow.WorkflowInstance;
025 import org.apache.oozie.service.Services;
026 import org.apache.oozie.util.ELEvaluator;
027 import org.apache.oozie.util.PropertiesUtils;
028 import org.apache.oozie.util.XConfiguration;
029 import org.apache.oozie.util.ParamChecker;
030 import org.apache.oozie.util.XmlUtils;
031 import org.jdom.JDOMException;
032 import java.io.IOException;
033 import java.io.StringReader;
034 import java.util.Properties;
035 import java.util.Map;
036
037 /**
038 * DAG EL functions.
039 */
040 public class DagELFunctions {
041
042 public static final String HADOOP_JOBS_PREFIX = "hadoopJobs:";
043 private static final String WORKFLOW = "oozie.el.workflow.bean";
044 private static final String ACTION = "oozie.el.action.bean";
045 private static final String ACTION_PROTO_CONF = "oozie.el.action.proto.conf";
046
047 private static final String LAST_ACTION_IN_ERROR = "oozie.el.last.action.in.error";
048
049 private static final String ACTION_DATA = "action.data";
050 private static final String ACTION_ERROR_CODE = "action.error.code";
051 private static final String ACTION_ERROR_MESSAGE = "action.error.message";
052 private static final String ACTION_EXTERNAL_ID = "action.external.id";
053 private static final String ACTION_TRACKER_URI = "action.tracker.uri";
054 private static final String ACTION_EXTERNAL_STATUS = "action.external.status";
055
056 public static void configureEvaluator(ELEvaluator evaluator, WorkflowJobBean workflow, WorkflowActionBean action) {
057 evaluator.setVariable(WORKFLOW, workflow);
058 evaluator.setVariable(ACTION, action);
059
060 for (Map.Entry<String, String> entry : workflow.getWorkflowInstance().getConf()) {
061 if (ParamChecker.isValidIdentifier(entry.getKey())) {
062 String value = entry.getValue().trim();
063 try {
064 String valueElem = "<value>"+value+"</value>";
065 XmlUtils.parseXml(valueElem);
066 }
067 catch (JDOMException ex) {
068 // If happens, try escaping the characters for XML. The escaping may or
069 // may not solve the problem since the JDOMException could be for a range of issues.
070 value = XmlUtils.escapeCharsForXML(value);
071 }
072 evaluator.setVariable(entry.getKey().trim(), value);
073 }
074 }
075 try {
076 evaluator.setVariable(ACTION_PROTO_CONF,
077 new XConfiguration(new StringReader(workflow.getProtoActionConf())));
078 }
079 catch (IOException ex) {
080 throw new RuntimeException("It should not happen", ex);
081 }
082 }
083
084 public static WorkflowActionBean getAction() {
085 ELEvaluator eval = ELEvaluator.getCurrent();
086 return (WorkflowActionBean) eval.getVariable(ACTION);
087 }
088
089 public static WorkflowJobBean getWorkflow() {
090 ELEvaluator eval = ELEvaluator.getCurrent();
091 return (WorkflowJobBean) eval.getVariable(WORKFLOW);
092 }
093
094 public static Configuration getProtoActionConf() {
095 ELEvaluator eval = ELEvaluator.getCurrent();
096 return (Configuration) eval.getVariable(ACTION_PROTO_CONF);
097 }
098
099 public static void setActionInfo(WorkflowInstance workflowInstance, WorkflowAction action) {
100 if (action.getExternalId() != null) {
101 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_ID,
102 action.getExternalId());
103 }
104 if (action.getTrackerUri() != null) {
105 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_TRACKER_URI,
106 action.getTrackerUri());
107 }
108 if (action.getExternalStatus() != null) {
109 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_STATUS,
110 action.getExternalStatus());
111 }
112 if (action.getData() != null) {
113 workflowInstance
114 .setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_DATA, action.getData());
115 }
116 if (action.getExternalChildIDs() != null) {
117 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_DATA,
118 HADOOP_JOBS_PREFIX + action.getExternalChildIDs());
119 }
120 if (action.getStats() != null) {
121 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + MapReduceActionExecutor.HADOOP_COUNTERS,
122 action.getStats());
123 }
124 if (action.getErrorCode() != null) {
125 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_CODE,
126 action.getErrorCode());
127 }
128 if (action.getErrorMessage() != null) {
129 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_MESSAGE,
130 action.getErrorMessage());
131 }
132 if (action.getStatus() == WorkflowAction.Status.ERROR) {
133 workflowInstance.setVar(LAST_ACTION_IN_ERROR, action.getName());
134 }
135 }
136
137 /**
138 * Return the job Id.
139 *
140 * @return the job Id.
141 */
142 public static String wf_id() {
143 return getWorkflow().getId();
144 }
145
146 /**
147 * Return the application name.
148 *
149 * @return the application name.
150 */
151 public static String wf_name() {
152 return getWorkflow().getAppName();
153 }
154
155 /**
156 * Return the application path.
157 *
158 * @return the application path.
159 */
160 public static String wf_appPath() {
161 return getWorkflow().getAppPath();
162 }
163
164 /**
165 * Return a job configuration property.
166 *
167 * @param property property name.
168 * @return the value of the property, <code>null</code> if the property is undefined.
169 */
170 public static String wf_conf(String property) {
171 return getWorkflow().getWorkflowInstance().getConf().get(property);
172 }
173
174 /**
175 * Return the job owner user name.
176 *
177 * @return the job owner user name.
178 */
179 public static String wf_user() {
180 return getWorkflow().getUser();
181 }
182
183 /**
184 * Return the job owner group name.
185 *
186 * @return the job owner group name.
187 */
188 public static String wf_group() {
189 return getWorkflow().getGroup();
190 }
191
192 /**
193 * Create a callback URL for the current action.
194 *
195 * @param externalStatusVar variable for the caller to inject the external status.
196 * @return the callback URL for the current action.
197 */
198 public static String wf_callback(String externalStatusVar) {
199 return Services.get().get(CallbackService.class).createCallBackUrl(getAction().getId(), externalStatusVar);
200 }
201
202 /**
203 * Return the transition taken by a workflow job action/decision action.
204 *
205 * @param actionName action/decision action name.
206 * @return the transition taken, <code>null</code> if the action has not completed yet.
207 */
208 public static String wf_transition(String actionName) {
209 return getWorkflow().getWorkflowInstance().getTransition(actionName);
210 }
211
212 /**
213 * Return the name of the last action that ended in error.
214 *
215 * @return the name of the last action that ended in error, <code>null</code> if no action in the workflow job has
216 * ended in error.
217 */
218 public static String wf_lastErrorNode() {
219 return getWorkflow().getWorkflowInstance().getVar(LAST_ACTION_IN_ERROR);
220 }
221
222 /**
223 * Return the error code for an action.
224 *
225 * @param actionName action name.
226 * @return the error code for the action, <code>null</code> if the action has not ended in error.
227 */
228 public static String wf_errorCode(String actionName) {
229 return getWorkflow().getWorkflowInstance()
230 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_CODE);
231 }
232
233 /**
234 * Return the error message for an action.
235 *
236 * @param actionName action name.
237 * @return the error message for the action, <code>null</code> if the action has not ended in error.
238 */
239 public static String wf_errorMessage(String actionName) {
240 return getWorkflow().getWorkflowInstance()
241 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_MESSAGE);
242 }
243
244 /**
245 * Return the workflow run number, unless a rerun it is always 1.
246 *
247 * @return the workflow run number, unless a rerun it is always 1.
248 */
249 public static int wf_run() {
250 return getWorkflow().getRun();
251 }
252
253 /**
254 * Return the action data for an action.
255 *
256 * @param actionName action name.
257 * @return value of the property.
258 */
259 @SuppressWarnings("unchecked")
260 public static Map<String, String> wf_actionData(String actionName) {
261 ELEvaluator eval = ELEvaluator.getCurrent();
262 Properties props = (Properties) eval.getVariable(actionName + ACTION_ERROR_MESSAGE);
263
264 if (props == null) {
265 String data = getWorkflow().getWorkflowInstance()
266 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_DATA);
267 if (data != null) {
268 props = PropertiesUtils.stringToProperties(data);
269 }
270 else {
271 props = new Properties();
272 }
273 eval.setVariable(actionName + ACTION_ERROR_MESSAGE, props);
274 }
275 return (Map<String, String>) (Map) props;
276 }
277
278 /**
279 * Return the external ID of an action.
280 *
281 * @param actionName action name.
282 * @return the external ID of an action.
283 */
284 public static String wf_actionExternalId(String actionName) {
285 return getWorkflow().getWorkflowInstance()
286 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_ID);
287 }
288
289 /**
290 * Return the tracker URI of an action.
291 *
292 * @param actionName action name.
293 * @return the tracker URI of an action.
294 */
295 public static String wf_actionTrackerUri(String actionName) {
296 return getWorkflow().getWorkflowInstance()
297 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_TRACKER_URI);
298 }
299
300 /**
301 * Return the action external status.
302 *
303 * @param actionName action/decision action name.
304 * @return the action external status.
305 */
306 public static String wf_actionExternalStatus(String actionName) {
307 return getWorkflow().getWorkflowInstance()
308 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_STATUS);
309 }
310
311 public static String getActionVar(String actionName, String varName) {
312 return getWorkflow().getWorkflowInstance().getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + varName);
313 }
314
315 }