This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.oozie.client.WorkflowAction;
022 import org.apache.oozie.service.CallbackService;
023 import org.apache.oozie.workflow.WorkflowInstance;
024 import org.apache.oozie.service.Services;
025 import org.apache.oozie.util.ELEvaluator;
026 import org.apache.oozie.util.PropertiesUtils;
027 import org.apache.oozie.util.XConfiguration;
028 import org.apache.oozie.util.ParamChecker;
029 import org.apache.oozie.util.XmlUtils;
030 import org.jdom.JDOMException;
031 import java.io.IOException;
032 import java.io.StringReader;
033 import java.util.Properties;
034 import java.util.Map;
035
036 /**
037 * DAG EL functions.
038 */
039 public class DagELFunctions {
040
041 private static final String WORKFLOW = "oozie.el.workflow.bean";
042 private static final String ACTION = "oozie.el.action.bean";
043 private static final String ACTION_PROTO_CONF = "oozie.el.action.proto.conf";
044
045 private static final String LAST_ACTION_IN_ERROR = "oozie.el.last.action.in.error";
046
047 private static final String ACTION_DATA = "action.data";
048 private static final String ACTION_ERROR_CODE = "action.error.code";
049 private static final String ACTION_ERROR_MESSAGE = "action.error.message";
050 private static final String ACTION_EXTERNAL_ID = "action.external.id";
051 private static final String ACTION_TRACKER_URI = "action.tracker.uri";
052 private static final String ACTION_EXTERNAL_STATUS = "action.external.status";
053
054 public static void configureEvaluator(ELEvaluator evaluator, WorkflowJobBean workflow, WorkflowActionBean action) {
055 evaluator.setVariable(WORKFLOW, workflow);
056 evaluator.setVariable(ACTION, action);
057
058 for (Map.Entry<String, String> entry : workflow.getWorkflowInstance().getConf()) {
059 if (ParamChecker.isValidIdentifier(entry.getKey())) {
060 String value = entry.getValue().trim();
061 try {
062 String valueElem = "<value>"+value+"</value>";
063 XmlUtils.parseXml(valueElem);
064 }
065 catch (JDOMException ex) {
066 // If happens, try escaping the characters for XML. The escaping may or
067 // may not solve the problem since the JDOMException could be for a range of issues.
068 value = XmlUtils.escapeCharsForXML(value);
069 }
070 evaluator.setVariable(entry.getKey().trim(), value);
071 }
072 }
073 try {
074 evaluator.setVariable(ACTION_PROTO_CONF,
075 new XConfiguration(new StringReader(workflow.getProtoActionConf())));
076 }
077 catch (IOException ex) {
078 throw new RuntimeException("It should not happen", ex);
079 }
080 }
081
082 public static WorkflowActionBean getAction() {
083 ELEvaluator eval = ELEvaluator.getCurrent();
084 return (WorkflowActionBean) eval.getVariable(ACTION);
085 }
086
087 public static WorkflowJobBean getWorkflow() {
088 ELEvaluator eval = ELEvaluator.getCurrent();
089 return (WorkflowJobBean) eval.getVariable(WORKFLOW);
090 }
091
092 public static Configuration getProtoActionConf() {
093 ELEvaluator eval = ELEvaluator.getCurrent();
094 return (Configuration) eval.getVariable(ACTION_PROTO_CONF);
095 }
096
097 public static void setActionInfo(WorkflowInstance workflowInstance, WorkflowAction action) {
098 if (action.getExternalId() != null) {
099 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_ID,
100 action.getExternalId());
101 }
102 if (action.getTrackerUri() != null) {
103 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_TRACKER_URI,
104 action.getTrackerUri());
105 }
106 if (action.getExternalStatus() != null) {
107 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_STATUS,
108 action.getExternalStatus());
109 }
110 if (action.getData() != null) {
111 workflowInstance
112 .setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_DATA, action.getData());
113 }
114 if (action.getErrorCode() != null) {
115 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_CODE,
116 action.getErrorCode());
117 }
118 if (action.getErrorMessage() != null) {
119 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_MESSAGE,
120 action.getErrorMessage());
121 }
122 if (action.getStatus() == WorkflowAction.Status.ERROR) {
123 workflowInstance.setVar(LAST_ACTION_IN_ERROR, action.getName());
124 }
125 }
126
127 /**
128 * Return the job Id.
129 *
130 * @return the job Id.
131 */
132 public static String wf_id() {
133 return getWorkflow().getId();
134 }
135
136 /**
137 * Return the application name.
138 *
139 * @return the application name.
140 */
141 public static String wf_name() {
142 return getWorkflow().getAppName();
143 }
144
145 /**
146 * Return the application path.
147 *
148 * @return the application path.
149 */
150 public static String wf_appPath() {
151 return getWorkflow().getAppPath();
152 }
153
154 /**
155 * Return a job configuration property.
156 *
157 * @param property property name.
158 * @return the value of the property, <code>null</code> if the property is undefined.
159 */
160 public static String wf_conf(String property) {
161 return getWorkflow().getWorkflowInstance().getConf().get(property);
162 }
163
164 /**
165 * Return the job owner user name.
166 *
167 * @return the job owner user name.
168 */
169 public static String wf_user() {
170 return getWorkflow().getUser();
171 }
172
173 /**
174 * Return the job owner group name.
175 *
176 * @return the job owner group name.
177 */
178 public static String wf_group() {
179 return getWorkflow().getGroup();
180 }
181
182 /**
183 * Create a callback URL for the current action.
184 *
185 * @param externalStatusVar variable for the caller to inject the external status.
186 * @return the callback URL for the current action.
187 */
188 public static String wf_callback(String externalStatusVar) {
189 return Services.get().get(CallbackService.class).createCallBackUrl(getAction().getId(), externalStatusVar);
190 }
191
192 /**
193 * Return the transition taken by a workflow job action/decision action.
194 *
195 * @param actionName action/decision action name.
196 * @return the transition taken, <code>null</code> if the action has not completed yet.
197 */
198 public static String wf_transition(String actionName) {
199 return getWorkflow().getWorkflowInstance().getTransition(actionName);
200 }
201
202 /**
203 * Return the name of the last action that ended in error.
204 *
205 * @return the name of the last action that ended in error, <code>null</code> if no action in the workflow job has
206 * ended in error.
207 */
208 public static String wf_lastErrorNode() {
209 return getWorkflow().getWorkflowInstance().getVar(LAST_ACTION_IN_ERROR);
210 }
211
212 /**
213 * Return the error code for an action.
214 *
215 * @param actionName action name.
216 * @return the error code for the action, <code>null</code> if the action has not ended in error.
217 */
218 public static String wf_errorCode(String actionName) {
219 return getWorkflow().getWorkflowInstance()
220 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_CODE);
221 }
222
223 /**
224 * Return the error message for an action.
225 *
226 * @param actionName action name.
227 * @return the error message for the action, <code>null</code> if the action has not ended in error.
228 */
229 public static String wf_errorMessage(String actionName) {
230 return getWorkflow().getWorkflowInstance()
231 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_MESSAGE);
232 }
233
234 /**
235 * Return the workflow run number, unless a rerun it is always 1.
236 *
237 * @return the workflow run number, unless a rerun it is always 1.
238 */
239 public static int wf_run() {
240 return getWorkflow().getRun();
241 }
242
243 /**
244 * Return the action data for an action.
245 *
246 * @param actionName action name.
247 * @return value of the property.
248 */
249 @SuppressWarnings("unchecked")
250 public static Map<String, String> wf_actionData(String actionName) {
251 ELEvaluator eval = ELEvaluator.getCurrent();
252 Properties props = (Properties) eval.getVariable(actionName + ACTION_ERROR_MESSAGE);
253
254 if (props == null) {
255 String data = getWorkflow().getWorkflowInstance()
256 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_DATA);
257 if (data != null) {
258 props = PropertiesUtils.stringToProperties(data);
259 }
260 else {
261 props = new Properties();
262 }
263 eval.setVariable(actionName + ACTION_ERROR_MESSAGE, props);
264 }
265 return (Map<String, String>) (Map) props;
266 }
267
268 /**
269 * Return the external ID of an action.
270 *
271 * @param actionName action name.
272 * @return the external ID of an action.
273 */
274 public static String wf_actionExternalId(String actionName) {
275 return getWorkflow().getWorkflowInstance()
276 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_ID);
277 }
278
279 /**
280 * Return the tracker URI of an action.
281 *
282 * @param actionName action name.
283 * @return the tracker URI of an action.
284 */
285 public static String wf_actionTrackerUri(String actionName) {
286 return getWorkflow().getWorkflowInstance()
287 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_TRACKER_URI);
288 }
289
290 /**
291 * Return the action external status.
292 *
293 * @param actionName action/decision action name.
294 * @return the action external status.
295 */
296 public static String wf_actionExternalStatus(String actionName) {
297 return getWorkflow().getWorkflowInstance()
298 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_STATUS);
299 }
300
301 public static String getActionVar(String actionName, String varName) {
302 return getWorkflow().getWorkflowInstance().getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + varName);
303 }
304
305 }