001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.oozie.action.hadoop.MapReduceActionExecutor; 022 import org.apache.oozie.client.WorkflowAction; 023 import org.apache.oozie.service.CallbackService; 024 import org.apache.oozie.workflow.WorkflowInstance; 025 import org.apache.oozie.service.Services; 026 import org.apache.oozie.util.ELEvaluator; 027 import org.apache.oozie.util.PropertiesUtils; 028 import org.apache.oozie.util.XConfiguration; 029 import org.apache.oozie.util.ParamChecker; 030 import org.apache.oozie.util.XmlUtils; 031 import org.jdom.JDOMException; 032 import java.io.IOException; 033 import java.io.StringReader; 034 import java.util.Properties; 035 import java.util.Map; 036 037 /** 038 * DAG EL functions. 039 */ 040 public class DagELFunctions { 041 042 public static final String HADOOP_JOBS_PREFIX = "hadoopJobs:"; 043 private static final String WORKFLOW = "oozie.el.workflow.bean"; 044 private static final String ACTION = "oozie.el.action.bean"; 045 private static final String ACTION_PROTO_CONF = "oozie.el.action.proto.conf"; 046 047 private static final String LAST_ACTION_IN_ERROR = "oozie.el.last.action.in.error"; 048 049 private static final String ACTION_DATA = "action.data"; 050 private static final String ACTION_ERROR_CODE = "action.error.code"; 051 private static final String ACTION_ERROR_MESSAGE = "action.error.message"; 052 private static final String ACTION_EXTERNAL_ID = "action.external.id"; 053 private static final String ACTION_TRACKER_URI = "action.tracker.uri"; 054 private static final String ACTION_EXTERNAL_STATUS = "action.external.status"; 055 056 public static void configureEvaluator(ELEvaluator evaluator, WorkflowJobBean workflow, WorkflowActionBean action) { 057 evaluator.setVariable(WORKFLOW, workflow); 058 evaluator.setVariable(ACTION, action); 059 060 for (Map.Entry<String, String> entry : workflow.getWorkflowInstance().getConf()) { 061 if (ParamChecker.isValidIdentifier(entry.getKey())) { 062 String value = entry.getValue().trim(); 063 try { 064 String valueElem = "<value>"+value+"</value>"; 065 XmlUtils.parseXml(valueElem); 066 } 067 catch (JDOMException ex) { 068 // If happens, try escaping the characters for XML. The escaping may or 069 // may not solve the problem since the JDOMException could be for a range of issues. 070 value = XmlUtils.escapeCharsForXML(value); 071 } 072 evaluator.setVariable(entry.getKey().trim(), value); 073 } 074 } 075 try { 076 evaluator.setVariable(ACTION_PROTO_CONF, 077 new XConfiguration(new StringReader(workflow.getProtoActionConf()))); 078 } 079 catch (IOException ex) { 080 throw new RuntimeException("It should not happen", ex); 081 } 082 } 083 084 public static WorkflowActionBean getAction() { 085 ELEvaluator eval = ELEvaluator.getCurrent(); 086 return (WorkflowActionBean) eval.getVariable(ACTION); 087 } 088 089 public static WorkflowJobBean getWorkflow() { 090 ELEvaluator eval = ELEvaluator.getCurrent(); 091 return (WorkflowJobBean) eval.getVariable(WORKFLOW); 092 } 093 094 public static Configuration getProtoActionConf() { 095 ELEvaluator eval = ELEvaluator.getCurrent(); 096 return (Configuration) eval.getVariable(ACTION_PROTO_CONF); 097 } 098 099 public static void setActionInfo(WorkflowInstance workflowInstance, WorkflowAction action) { 100 if (action.getExternalId() != null) { 101 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_ID, 102 action.getExternalId()); 103 } 104 if (action.getTrackerUri() != null) { 105 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_TRACKER_URI, 106 action.getTrackerUri()); 107 } 108 if (action.getExternalStatus() != null) { 109 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_STATUS, 110 action.getExternalStatus()); 111 } 112 if (action.getData() != null) { 113 workflowInstance 114 .setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_DATA, action.getData()); 115 } 116 if (action.getExternalChildIDs() != null) { 117 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_DATA, 118 HADOOP_JOBS_PREFIX + action.getExternalChildIDs()); 119 } 120 if (action.getStats() != null) { 121 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + MapReduceActionExecutor.HADOOP_COUNTERS, 122 action.getStats()); 123 } 124 if (action.getErrorCode() != null) { 125 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_CODE, 126 action.getErrorCode()); 127 } 128 if (action.getErrorMessage() != null) { 129 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_MESSAGE, 130 action.getErrorMessage()); 131 } 132 if (action.getStatus() == WorkflowAction.Status.ERROR) { 133 workflowInstance.setVar(LAST_ACTION_IN_ERROR, action.getName()); 134 } 135 } 136 137 /** 138 * Return the job Id. 139 * 140 * @return the job Id. 141 */ 142 public static String wf_id() { 143 return getWorkflow().getId(); 144 } 145 146 /** 147 * Return the application name. 148 * 149 * @return the application name. 150 */ 151 public static String wf_name() { 152 return getWorkflow().getAppName(); 153 } 154 155 /** 156 * Return the application path. 157 * 158 * @return the application path. 159 */ 160 public static String wf_appPath() { 161 return getWorkflow().getAppPath(); 162 } 163 164 /** 165 * Return a job configuration property. 166 * 167 * @param property property name. 168 * @return the value of the property, <code>null</code> if the property is undefined. 169 */ 170 public static String wf_conf(String property) { 171 return getWorkflow().getWorkflowInstance().getConf().get(property); 172 } 173 174 /** 175 * Return the job owner user name. 176 * 177 * @return the job owner user name. 178 */ 179 public static String wf_user() { 180 return getWorkflow().getUser(); 181 } 182 183 /** 184 * Return the job owner group name. 185 * 186 * @return the job owner group name. 187 */ 188 public static String wf_group() { 189 return getWorkflow().getGroup(); 190 } 191 192 /** 193 * Create a callback URL for the current action. 194 * 195 * @param externalStatusVar variable for the caller to inject the external status. 196 * @return the callback URL for the current action. 197 */ 198 public static String wf_callback(String externalStatusVar) { 199 return Services.get().get(CallbackService.class).createCallBackUrl(getAction().getId(), externalStatusVar); 200 } 201 202 /** 203 * Return the transition taken by a workflow job action/decision action. 204 * 205 * @param actionName action/decision action name. 206 * @return the transition taken, <code>null</code> if the action has not completed yet. 207 */ 208 public static String wf_transition(String actionName) { 209 return getWorkflow().getWorkflowInstance().getTransition(actionName); 210 } 211 212 /** 213 * Return the name of the last action that ended in error. 214 * 215 * @return the name of the last action that ended in error, <code>null</code> if no action in the workflow job has 216 * ended in error. 217 */ 218 public static String wf_lastErrorNode() { 219 return getWorkflow().getWorkflowInstance().getVar(LAST_ACTION_IN_ERROR); 220 } 221 222 /** 223 * Return the error code for an action. 224 * 225 * @param actionName action name. 226 * @return the error code for the action, <code>null</code> if the action has not ended in error. 227 */ 228 public static String wf_errorCode(String actionName) { 229 return getWorkflow().getWorkflowInstance() 230 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_CODE); 231 } 232 233 /** 234 * Return the error message for an action. 235 * 236 * @param actionName action name. 237 * @return the error message for the action, <code>null</code> if the action has not ended in error. 238 */ 239 public static String wf_errorMessage(String actionName) { 240 return getWorkflow().getWorkflowInstance() 241 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_MESSAGE); 242 } 243 244 /** 245 * Return the workflow run number, unless a rerun it is always 1. 246 * 247 * @return the workflow run number, unless a rerun it is always 1. 248 */ 249 public static int wf_run() { 250 return getWorkflow().getRun(); 251 } 252 253 /** 254 * Return the action data for an action. 255 * 256 * @param actionName action name. 257 * @return value of the property. 258 */ 259 @SuppressWarnings("unchecked") 260 public static Map<String, String> wf_actionData(String actionName) { 261 ELEvaluator eval = ELEvaluator.getCurrent(); 262 Properties props = (Properties) eval.getVariable(actionName + ACTION_ERROR_MESSAGE); 263 264 if (props == null) { 265 String data = getWorkflow().getWorkflowInstance() 266 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_DATA); 267 if (data != null) { 268 props = PropertiesUtils.stringToProperties(data); 269 } 270 else { 271 props = new Properties(); 272 } 273 eval.setVariable(actionName + ACTION_ERROR_MESSAGE, props); 274 } 275 return (Map<String, String>) (Map) props; 276 } 277 278 /** 279 * Return the external ID of an action. 280 * 281 * @param actionName action name. 282 * @return the external ID of an action. 283 */ 284 public static String wf_actionExternalId(String actionName) { 285 return getWorkflow().getWorkflowInstance() 286 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_ID); 287 } 288 289 /** 290 * Return the tracker URI of an action. 291 * 292 * @param actionName action name. 293 * @return the tracker URI of an action. 294 */ 295 public static String wf_actionTrackerUri(String actionName) { 296 return getWorkflow().getWorkflowInstance() 297 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_TRACKER_URI); 298 } 299 300 /** 301 * Return the action external status. 302 * 303 * @param actionName action/decision action name. 304 * @return the action external status. 305 */ 306 public static String wf_actionExternalStatus(String actionName) { 307 return getWorkflow().getWorkflowInstance() 308 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_STATUS); 309 } 310 311 public static String getActionVar(String actionName, String varName) { 312 return getWorkflow().getWorkflowInstance().getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + varName); 313 } 314 315 }