001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie; 020 021import org.apache.hadoop.conf.Configuration; 022import org.apache.oozie.action.hadoop.MapReduceActionExecutor; 023import org.apache.oozie.client.WorkflowAction; 024import org.apache.oozie.service.CallbackService; 025import org.apache.oozie.workflow.WorkflowInstance; 026import org.apache.oozie.service.Services; 027import org.apache.oozie.util.ELEvaluator; 028import org.apache.oozie.util.PropertiesUtils; 029import org.apache.oozie.util.XConfiguration; 030import org.apache.oozie.util.ParamChecker; 031import org.apache.oozie.util.XmlUtils; 032import org.jdom.JDOMException; 033import java.io.IOException; 034import java.io.StringReader; 035import java.util.Properties; 036import java.util.Map; 037 038/** 039 * DAG EL functions. 040 */ 041public class DagELFunctions { 042 043 public static final String HADOOP_JOBS_PREFIX = "hadoopJobs:"; 044 private static final String WORKFLOW = "oozie.el.workflow.bean"; 045 private static final String ACTION = "oozie.el.action.bean"; 046 private static final String ACTION_PROTO_CONF = "oozie.el.action.proto.conf"; 047 048 private static final String LAST_ACTION_IN_ERROR = "oozie.el.last.action.in.error"; 049 050 private static final String ACTION_DATA = "action.data"; 051 private static final String ACTION_ERROR_CODE = "action.error.code"; 052 private static final String ACTION_ERROR_MESSAGE = "action.error.message"; 053 private static final String ACTION_EXTERNAL_ID = "action.external.id"; 054 private static final String ACTION_TRACKER_URI = "action.tracker.uri"; 055 private static final String ACTION_EXTERNAL_STATUS = "action.external.status"; 056 057 public static void configureEvaluator(ELEvaluator evaluator, WorkflowJobBean workflow, WorkflowActionBean action) { 058 evaluator.setVariable(WORKFLOW, workflow); 059 evaluator.setVariable(ACTION, action); 060 061 for (Map.Entry<String, String> entry : workflow.getWorkflowInstance().getConf()) { 062 if (ParamChecker.isValidIdentifier(entry.getKey())) { 063 String value = entry.getValue().trim(); 064 try { 065 String valueElem = "<value>"+value+"</value>"; 066 XmlUtils.parseXml(valueElem); 067 } 068 catch (JDOMException ex) { 069 // If happens, try escaping the characters for XML. The escaping may or 070 // may not solve the problem since the JDOMException could be for a range of issues. 071 value = XmlUtils.escapeCharsForXML(value); 072 } 073 evaluator.setVariable(entry.getKey().trim(), value); 074 } 075 } 076 try { 077 evaluator.setVariable(ACTION_PROTO_CONF, 078 new XConfiguration(new StringReader(workflow.getProtoActionConf()))); 079 } 080 catch (IOException ex) { 081 throw new RuntimeException("It should not happen", ex); 082 } 083 } 084 085 public static WorkflowActionBean getAction() { 086 ELEvaluator eval = ELEvaluator.getCurrent(); 087 return (WorkflowActionBean) eval.getVariable(ACTION); 088 } 089 090 public static WorkflowJobBean getWorkflow() { 091 ELEvaluator eval = ELEvaluator.getCurrent(); 092 return (WorkflowJobBean) eval.getVariable(WORKFLOW); 093 } 094 095 public static Configuration getProtoActionConf() { 096 ELEvaluator eval = ELEvaluator.getCurrent(); 097 return (Configuration) eval.getVariable(ACTION_PROTO_CONF); 098 } 099 100 public static void setActionInfo(WorkflowInstance workflowInstance, WorkflowAction action) { 101 if (action.getExternalId() != null) { 102 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_ID, 103 action.getExternalId()); 104 } 105 if (action.getTrackerUri() != null) { 106 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_TRACKER_URI, 107 action.getTrackerUri()); 108 } 109 if (action.getExternalStatus() != null) { 110 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_STATUS, 111 action.getExternalStatus()); 112 } 113 if (action.getData() != null) { 114 workflowInstance 115 .setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_DATA, action.getData()); 116 } 117 if (action.getExternalChildIDs() != null) { 118 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_DATA, 119 HADOOP_JOBS_PREFIX + action.getExternalChildIDs()); 120 } 121 if (action.getStats() != null) { 122 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + MapReduceActionExecutor.HADOOP_COUNTERS, 123 action.getStats()); 124 } 125 if (action.getErrorCode() != null) { 126 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_CODE, 127 action.getErrorCode()); 128 } 129 if (action.getErrorMessage() != null) { 130 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_MESSAGE, 131 action.getErrorMessage()); 132 } 133 if (action.getStatus() == WorkflowAction.Status.ERROR) { 134 workflowInstance.setVar(LAST_ACTION_IN_ERROR, action.getName()); 135 } 136 } 137 138 /** 139 * Return the job Id. 140 * 141 * @return the job Id. 142 */ 143 public static String wf_id() { 144 return getWorkflow().getId(); 145 } 146 147 /** 148 * Return the application name. 149 * 150 * @return the application name. 151 */ 152 public static String wf_name() { 153 return getWorkflow().getAppName(); 154 } 155 156 /** 157 * Return the application path. 158 * 159 * @return the application path. 160 */ 161 public static String wf_appPath() { 162 return getWorkflow().getAppPath(); 163 } 164 165 /** 166 * Return a job configuration property. 167 * 168 * @param property property name. 169 * @return the value of the property, <code>null</code> if the property is undefined. 170 */ 171 public static String wf_conf(String property) { 172 return getWorkflow().getWorkflowInstance().getConf().get(property); 173 } 174 175 /** 176 * Return the job owner user name. 177 * 178 * @return the job owner user name. 179 */ 180 public static String wf_user() { 181 return getWorkflow().getUser(); 182 } 183 184 /** 185 * Return the job owner group name. 186 * 187 * @return the job owner group name. 188 */ 189 public static String wf_group() { 190 return getWorkflow().getGroup(); 191 } 192 193 /** 194 * Create a callback URL for the current action. 195 * 196 * @param externalStatusVar variable for the caller to inject the external status. 197 * @return the callback URL for the current action. 198 */ 199 public static String wf_callback(String externalStatusVar) { 200 return Services.get().get(CallbackService.class).createCallBackUrl(getAction().getId(), externalStatusVar); 201 } 202 203 /** 204 * Return the transition taken by a workflow job action/decision action. 205 * 206 * @param actionName action/decision action name. 207 * @return the transition taken, <code>null</code> if the action has not completed yet. 208 */ 209 public static String wf_transition(String actionName) { 210 return getWorkflow().getWorkflowInstance().getTransition(actionName); 211 } 212 213 /** 214 * Return the name of the last action that ended in error. 215 * 216 * @return the name of the last action that ended in error, <code>null</code> if no action in the workflow job has 217 * ended in error. 218 */ 219 public static String wf_lastErrorNode() { 220 return getWorkflow().getWorkflowInstance().getVar(LAST_ACTION_IN_ERROR); 221 } 222 223 /** 224 * Return the error code for an action. 225 * 226 * @param actionName action name. 227 * @return the error code for the action, <code>null</code> if the action has not ended in error. 228 */ 229 public static String wf_errorCode(String actionName) { 230 return getWorkflow().getWorkflowInstance() 231 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_CODE); 232 } 233 234 /** 235 * Return the error message for an action. 236 * 237 * @param actionName action name. 238 * @return the error message for the action, <code>null</code> if the action has not ended in error. 239 */ 240 public static String wf_errorMessage(String actionName) { 241 return getWorkflow().getWorkflowInstance() 242 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_MESSAGE); 243 } 244 245 /** 246 * Return the workflow run number, unless a rerun it is always 1. 247 * 248 * @return the workflow run number, unless a rerun it is always 1. 249 */ 250 public static int wf_run() { 251 return getWorkflow().getRun(); 252 } 253 254 /** 255 * Return the action data for an action. 256 * 257 * @param actionName action name. 258 * @return value of the property. 259 */ 260 @SuppressWarnings("unchecked") 261 public static Map<String, String> wf_actionData(String actionName) { 262 ELEvaluator eval = ELEvaluator.getCurrent(); 263 Properties props = (Properties) eval.getVariable(actionName + ACTION_ERROR_MESSAGE); 264 265 if (props == null) { 266 String data = getWorkflow().getWorkflowInstance() 267 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_DATA); 268 if (data != null) { 269 props = PropertiesUtils.stringToProperties(data); 270 } 271 else { 272 props = new Properties(); 273 } 274 eval.setVariable(actionName + ACTION_ERROR_MESSAGE, props); 275 } 276 return (Map<String, String>) (Map) props; 277 } 278 279 /** 280 * Return the external ID of an action. 281 * 282 * @param actionName action name. 283 * @return the external ID of an action. 284 */ 285 public static String wf_actionExternalId(String actionName) { 286 return getWorkflow().getWorkflowInstance() 287 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_ID); 288 } 289 290 /** 291 * Return the tracker URI of an action. 292 * 293 * @param actionName action name. 294 * @return the tracker URI of an action. 295 */ 296 public static String wf_actionTrackerUri(String actionName) { 297 return getWorkflow().getWorkflowInstance() 298 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_TRACKER_URI); 299 } 300 301 /** 302 * Return the action external status. 303 * 304 * @param actionName action/decision action name. 305 * @return the action external status. 306 */ 307 public static String wf_actionExternalStatus(String actionName) { 308 return getWorkflow().getWorkflowInstance() 309 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_STATUS); 310 } 311 312 public static String getActionVar(String actionName, String varName) { 313 return getWorkflow().getWorkflowInstance().getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + varName); 314 } 315 316}