001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.oozie.client.WorkflowAction; 022 import org.apache.oozie.service.CallbackService; 023 import org.apache.oozie.workflow.WorkflowInstance; 024 import org.apache.oozie.service.Services; 025 import org.apache.oozie.util.ELEvaluator; 026 import org.apache.oozie.util.PropertiesUtils; 027 import org.apache.oozie.util.XConfiguration; 028 import org.apache.oozie.util.ParamChecker; 029 import org.apache.oozie.util.XmlUtils; 030 import org.jdom.JDOMException; 031 import java.io.IOException; 032 import java.io.StringReader; 033 import java.util.Properties; 034 import java.util.Map; 035 036 /** 037 * DAG EL functions. 038 */ 039 public class DagELFunctions { 040 041 private static final String WORKFLOW = "oozie.el.workflow.bean"; 042 private static final String ACTION = "oozie.el.action.bean"; 043 private static final String ACTION_PROTO_CONF = "oozie.el.action.proto.conf"; 044 045 private static final String LAST_ACTION_IN_ERROR = "oozie.el.last.action.in.error"; 046 047 private static final String ACTION_DATA = "action.data"; 048 private static final String ACTION_ERROR_CODE = "action.error.code"; 049 private static final String ACTION_ERROR_MESSAGE = "action.error.message"; 050 private static final String ACTION_EXTERNAL_ID = "action.external.id"; 051 private static final String ACTION_TRACKER_URI = "action.tracker.uri"; 052 private static final String ACTION_EXTERNAL_STATUS = "action.external.status"; 053 054 public static void configureEvaluator(ELEvaluator evaluator, WorkflowJobBean workflow, WorkflowActionBean action) { 055 evaluator.setVariable(WORKFLOW, workflow); 056 evaluator.setVariable(ACTION, action); 057 058 for (Map.Entry<String, String> entry : workflow.getWorkflowInstance().getConf()) { 059 if (ParamChecker.isValidIdentifier(entry.getKey())) { 060 String value = entry.getValue().trim(); 061 try { 062 String valueElem = "<value>"+value+"</value>"; 063 XmlUtils.parseXml(valueElem); 064 } 065 catch (JDOMException ex) { 066 // If happens, try escaping the characters for XML. The escaping may or 067 // may not solve the problem since the JDOMException could be for a range of issues. 068 value = XmlUtils.escapeCharsForXML(value); 069 } 070 evaluator.setVariable(entry.getKey().trim(), value); 071 } 072 } 073 try { 074 evaluator.setVariable(ACTION_PROTO_CONF, 075 new XConfiguration(new StringReader(workflow.getProtoActionConf()))); 076 } 077 catch (IOException ex) { 078 throw new RuntimeException("It should not happen", ex); 079 } 080 } 081 082 public static WorkflowActionBean getAction() { 083 ELEvaluator eval = ELEvaluator.getCurrent(); 084 return (WorkflowActionBean) eval.getVariable(ACTION); 085 } 086 087 public static WorkflowJobBean getWorkflow() { 088 ELEvaluator eval = ELEvaluator.getCurrent(); 089 return (WorkflowJobBean) eval.getVariable(WORKFLOW); 090 } 091 092 public static Configuration getProtoActionConf() { 093 ELEvaluator eval = ELEvaluator.getCurrent(); 094 return (Configuration) eval.getVariable(ACTION_PROTO_CONF); 095 } 096 097 public static void setActionInfo(WorkflowInstance workflowInstance, WorkflowAction action) { 098 if (action.getExternalId() != null) { 099 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_ID, 100 action.getExternalId()); 101 } 102 if (action.getTrackerUri() != null) { 103 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_TRACKER_URI, 104 action.getTrackerUri()); 105 } 106 if (action.getExternalStatus() != null) { 107 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_STATUS, 108 action.getExternalStatus()); 109 } 110 if (action.getData() != null) { 111 workflowInstance 112 .setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_DATA, action.getData()); 113 } 114 if (action.getErrorCode() != null) { 115 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_CODE, 116 action.getErrorCode()); 117 } 118 if (action.getErrorMessage() != null) { 119 workflowInstance.setVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_MESSAGE, 120 action.getErrorMessage()); 121 } 122 if (action.getStatus() == WorkflowAction.Status.ERROR) { 123 workflowInstance.setVar(LAST_ACTION_IN_ERROR, action.getName()); 124 } 125 } 126 127 /** 128 * Return the job Id. 129 * 130 * @return the job Id. 131 */ 132 public static String wf_id() { 133 return getWorkflow().getId(); 134 } 135 136 /** 137 * Return the application name. 138 * 139 * @return the application name. 140 */ 141 public static String wf_name() { 142 return getWorkflow().getAppName(); 143 } 144 145 /** 146 * Return the application path. 147 * 148 * @return the application path. 149 */ 150 public static String wf_appPath() { 151 return getWorkflow().getAppPath(); 152 } 153 154 /** 155 * Return a job configuration property. 156 * 157 * @param property property name. 158 * @return the value of the property, <code>null</code> if the property is undefined. 159 */ 160 public static String wf_conf(String property) { 161 return getWorkflow().getWorkflowInstance().getConf().get(property); 162 } 163 164 /** 165 * Return the job owner user name. 166 * 167 * @return the job owner user name. 168 */ 169 public static String wf_user() { 170 return getWorkflow().getUser(); 171 } 172 173 /** 174 * Return the job owner group name. 175 * 176 * @return the job owner group name. 177 */ 178 public static String wf_group() { 179 return getWorkflow().getGroup(); 180 } 181 182 /** 183 * Create a callback URL for the current action. 184 * 185 * @param externalStatusVar variable for the caller to inject the external status. 186 * @return the callback URL for the current action. 187 */ 188 public static String wf_callback(String externalStatusVar) { 189 return Services.get().get(CallbackService.class).createCallBackUrl(getAction().getId(), externalStatusVar); 190 } 191 192 /** 193 * Return the transition taken by a workflow job action/decision action. 194 * 195 * @param actionName action/decision action name. 196 * @return the transition taken, <code>null</code> if the action has not completed yet. 197 */ 198 public static String wf_transition(String actionName) { 199 return getWorkflow().getWorkflowInstance().getTransition(actionName); 200 } 201 202 /** 203 * Return the name of the last action that ended in error. 204 * 205 * @return the name of the last action that ended in error, <code>null</code> if no action in the workflow job has 206 * ended in error. 207 */ 208 public static String wf_lastErrorNode() { 209 return getWorkflow().getWorkflowInstance().getVar(LAST_ACTION_IN_ERROR); 210 } 211 212 /** 213 * Return the error code for an action. 214 * 215 * @param actionName action name. 216 * @return the error code for the action, <code>null</code> if the action has not ended in error. 217 */ 218 public static String wf_errorCode(String actionName) { 219 return getWorkflow().getWorkflowInstance() 220 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_CODE); 221 } 222 223 /** 224 * Return the error message for an action. 225 * 226 * @param actionName action name. 227 * @return the error message for the action, <code>null</code> if the action has not ended in error. 228 */ 229 public static String wf_errorMessage(String actionName) { 230 return getWorkflow().getWorkflowInstance() 231 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ERROR_MESSAGE); 232 } 233 234 /** 235 * Return the workflow run number, unless a rerun it is always 1. 236 * 237 * @return the workflow run number, unless a rerun it is always 1. 238 */ 239 public static int wf_run() { 240 return getWorkflow().getRun(); 241 } 242 243 /** 244 * Return the action data for an action. 245 * 246 * @param actionName action name. 247 * @return value of the property. 248 */ 249 @SuppressWarnings("unchecked") 250 public static Map<String, String> wf_actionData(String actionName) { 251 ELEvaluator eval = ELEvaluator.getCurrent(); 252 Properties props = (Properties) eval.getVariable(actionName + ACTION_ERROR_MESSAGE); 253 254 if (props == null) { 255 String data = getWorkflow().getWorkflowInstance() 256 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_DATA); 257 if (data != null) { 258 props = PropertiesUtils.stringToProperties(data); 259 } 260 else { 261 props = new Properties(); 262 } 263 eval.setVariable(actionName + ACTION_ERROR_MESSAGE, props); 264 } 265 return (Map<String, String>) (Map) props; 266 } 267 268 /** 269 * Return the external ID of an action. 270 * 271 * @param actionName action name. 272 * @return the external ID of an action. 273 */ 274 public static String wf_actionExternalId(String actionName) { 275 return getWorkflow().getWorkflowInstance() 276 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_ID); 277 } 278 279 /** 280 * Return the tracker URI of an action. 281 * 282 * @param actionName action name. 283 * @return the tracker URI of an action. 284 */ 285 public static String wf_actionTrackerUri(String actionName) { 286 return getWorkflow().getWorkflowInstance() 287 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_TRACKER_URI); 288 } 289 290 /** 291 * Return the action external status. 292 * 293 * @param actionName action/decision action name. 294 * @return the action external status. 295 */ 296 public static String wf_actionExternalStatus(String actionName) { 297 return getWorkflow().getWorkflowInstance() 298 .getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_EXTERNAL_STATUS); 299 } 300 301 public static String getActionVar(String actionName, String varName) { 302 return getWorkflow().getWorkflowInstance().getVar(actionName + WorkflowInstance.NODE_VAR_SEPARATOR + varName); 303 } 304 305 }