001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import org.apache.hadoop.util.StringUtils; 022import org.apache.oozie.action.control.EndActionExecutor; 023import org.apache.oozie.action.control.ForkActionExecutor; 024import org.apache.oozie.action.control.JoinActionExecutor; 025import org.apache.oozie.action.control.KillActionExecutor; 026import org.apache.oozie.action.control.StartActionExecutor; 027import org.apache.oozie.command.wf.ReRunXCommand; 028 029import org.apache.oozie.client.WorkflowAction; 030import org.apache.oozie.WorkflowActionBean; 031import org.apache.oozie.WorkflowJobBean; 032import org.apache.oozie.ErrorCode; 033import org.apache.oozie.workflow.WorkflowException; 034import org.apache.oozie.workflow.WorkflowInstance; 035import org.apache.oozie.workflow.lite.ActionNodeHandler; 036import org.apache.oozie.workflow.lite.ControlNodeHandler; 037import org.apache.oozie.workflow.lite.DecisionNodeHandler; 038import org.apache.oozie.workflow.lite.EndNodeDef; 039import org.apache.oozie.workflow.lite.ForkNodeDef; 040import org.apache.oozie.workflow.lite.JoinNodeDef; 041import org.apache.oozie.workflow.lite.KillNodeDef; 042import org.apache.oozie.workflow.lite.NodeDef; 043import org.apache.oozie.workflow.lite.NodeHandler; 044import org.apache.oozie.util.XLog; 045import org.apache.oozie.util.XmlUtils; 046import org.apache.oozie.workflow.lite.StartNodeDef; 047import org.jdom.Element; 048import org.jdom.JDOMException; 049 050import java.util.ArrayList; 051import java.util.Collection; 052import java.util.HashSet; 053import java.util.List; 054import java.util.Set; 055 056public abstract class LiteWorkflowStoreService extends WorkflowStoreService { 057 058 public static final String CONF_PREFIX = Service.CONF_PREFIX + "LiteWorkflowStoreService."; 059 public static final String CONF_PREFIX_USER_RETRY = CONF_PREFIX + "user.retry."; 060 public static final String CONF_USER_RETRY_MAX = CONF_PREFIX_USER_RETRY + "max"; 061 public static final String CONF_USER_RETRY_INTEVAL = CONF_PREFIX_USER_RETRY + "inteval"; 062 public static final String CONF_USER_RETRY_POLICY = CONF_PREFIX_USER_RETRY + "policy"; 063 public static final String CONF_USER_RETRY_ERROR_CODE = CONF_PREFIX_USER_RETRY + "error.code"; 064 public static final String CONF_USER_RETRY_ERROR_CODE_EXT = CONF_PREFIX_USER_RETRY + "error.code.ext"; 065 public static final String DEFAULT_USER_RETRY_POLICY = "PERIODIC"; 066 067 public static final String NODE_DEF_VERSION_0 = "_oozie_inst_v_0"; 068 public static final String NODE_DEF_VERSION_1 = "_oozie_inst_v_1"; 069 public static final String NODE_DEF_VERSION_2 = "_oozie_inst_v_2"; 070 public static final String CONF_NODE_DEF_VERSION = CONF_PREFIX + "node.def.version"; 071 072 public static final String USER_ERROR_CODE_ALL = "ALL"; 073 074 /** 075 * Delegation method used by the Action and Decision {@link NodeHandler} on start. <p> This method provides the 076 * necessary information to create ActionExecutors. 077 * 078 * @param context NodeHandler context. 079 * @param actionType the action type. 080 * @throws WorkflowException thrown if there was an error parsing the action configuration. 081 */ 082 @SuppressWarnings("unchecked") 083 protected static void liteExecute(NodeHandler.Context context, String actionType) throws WorkflowException { 084 XLog log = XLog.getLog(LiteWorkflowStoreService.class); 085 String jobId = context.getProcessInstance().getId(); 086 String nodeName = context.getNodeDef().getName(); 087 String skipVar = context.getProcessInstance().getVar(context.getNodeDef().getName() 088 + WorkflowInstance.NODE_VAR_SEPARATOR + ReRunXCommand.TO_SKIP); 089 boolean skipAction = false; 090 if (skipVar != null) { 091 skipAction = skipVar.equals("true"); 092 } 093 WorkflowActionBean action = new WorkflowActionBean(); 094 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, nodeName); 095 096 if (!skipAction) { 097 String nodeConf = context.getNodeDef().getConf(); 098 099 if (actionType == null) { 100 try { 101 Element element = XmlUtils.parseXml(nodeConf); 102 actionType = element.getName(); 103 nodeConf = XmlUtils.prettyPrint(element).toString(); 104 } 105 catch (JDOMException ex) { 106 throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex); 107 } 108 } 109 log.debug(" Creating action for node [{0}]", nodeName); 110 action.setType(actionType); 111 action.setConf(nodeConf); 112 action.setLogToken(((WorkflowJobBean) context.getTransientVar(WORKFLOW_BEAN)).getLogToken()); 113 action.setStatus(WorkflowAction.Status.PREP); 114 action.setJobId(jobId); 115 } 116 117 String executionPath = context.getExecutionPath(); 118 action.setExecutionPath(executionPath); 119 action.setCred(context.getNodeDef().getCred()); 120 log.debug("Setting action for cred: '"+context.getNodeDef().getCred() + 121 "', name: '"+ context.getNodeDef().getName() + "'"); 122 123 action.setUserRetryCount(0); 124 int userRetryMax = getUserRetryMax(context); 125 int userRetryInterval = getUserRetryInterval(context); 126 action.setUserRetryMax(userRetryMax); 127 action.setUserRetryInterval(userRetryInterval); 128 log.debug("Setting action for userRetryMax: '"+ userRetryMax + 129 "', userRetryInterval: '" + userRetryInterval + 130 "', name: '"+ context.getNodeDef().getName() + "'"); 131 132 action.setName(nodeName); 133 action.setId(actionId); 134 context.setVar(nodeName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID, actionId); 135 List list = (List) context.getTransientVar(ACTIONS_TO_START); 136 if (list == null) { 137 list = new ArrayList(); 138 context.setTransientVar(ACTIONS_TO_START, list); 139 } 140 list.add(action); 141 } 142 143 private static int getUserRetryInterval(NodeHandler.Context context) throws WorkflowException { 144 int ret = ConfigurationService.getInt(CONF_USER_RETRY_INTEVAL); 145 String userRetryInterval = context.getNodeDef().getUserRetryInterval(); 146 147 if (!userRetryInterval.equals("null")) { 148 try { 149 ret = Integer.parseInt(userRetryInterval); 150 } 151 catch (NumberFormatException nfe) { 152 throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe); 153 } 154 } 155 return ret; 156 } 157 158 private static int getUserRetryMax(NodeHandler.Context context) throws WorkflowException { 159 XLog log = XLog.getLog(LiteWorkflowStoreService.class); 160 int ret = ConfigurationService.getInt(CONF_USER_RETRY_MAX); 161 int max = ret; 162 String userRetryMax = context.getNodeDef().getUserRetryMax(); 163 164 if (!userRetryMax.equals("null")) { 165 try { 166 ret = Integer.parseInt(userRetryMax); 167 if (ret > max) { 168 ret = max; 169 log.warn(ErrorCode.E0820.getTemplate(), ret, max); 170 } 171 } 172 catch (NumberFormatException nfe) { 173 throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe); 174 } 175 } 176 else { 177 ret = 0; 178 } 179 return ret; 180 } 181 182 /** 183 * Get system defined and instance defined error codes for which USER_RETRY is allowed 184 * 185 * @return set of error code user-retry is allowed for 186 */ 187 public static Set<String> getUserRetryErrorCode() { 188 // eliminating whitespaces in the error codes value specification 189 String errorCodeString = ConfigurationService.get(CONF_USER_RETRY_ERROR_CODE).replaceAll("\\s+", ""); 190 Collection<String> strings = StringUtils.getStringCollection(errorCodeString); 191 String errorCodeExtString = ConfigurationService.get(CONF_USER_RETRY_ERROR_CODE_EXT).replaceAll("\\s+", ""); 192 Collection<String> extra = StringUtils.getStringCollection(errorCodeExtString); 193 Set<String> set = new HashSet<String>(); 194 set.addAll(strings); 195 set.addAll(extra); 196 return set; 197 } 198 199 /** 200 * Get NodeDef default version, _oozie_inst_v_0, _oozie_inst_v_1 or 201 * _oozie_inst_v_2 202 * 203 * @return nodedef default version 204 * @throws WorkflowException thrown if there was an error parsing the action 205 * configuration. 206 */ 207 public static String getNodeDefDefaultVersion() throws WorkflowException { 208 String ret = ConfigurationService.get(CONF_NODE_DEF_VERSION); 209 if (ret == null) { 210 ret = NODE_DEF_VERSION_2; 211 } 212 return ret; 213 } 214 215 /** 216 * Delegation method used when failing actions. <p> 217 * 218 * @param context NodeHandler context. 219 */ 220 @SuppressWarnings("unchecked") 221 protected static void liteFail(NodeHandler.Context context) { 222 liteTerminate(context, ACTIONS_TO_FAIL); 223 } 224 225 /** 226 * Delegation method used when killing actions. <p> 227 * 228 * @param context NodeHandler context. 229 */ 230 @SuppressWarnings("unchecked") 231 protected static void liteKill(NodeHandler.Context context) { 232 liteTerminate(context, ACTIONS_TO_KILL); 233 } 234 235 /** 236 * Used to terminate jobs - FAIL or KILL. <p> 237 * 238 * @param context NodeHandler context. 239 * @param transientVar The transient variable name. 240 */ 241 @SuppressWarnings("unchecked") 242 private static void liteTerminate(NodeHandler.Context context, String transientVar) { 243 List<String> list = (List<String>) context.getTransientVar(transientVar); 244 if (list == null) { 245 list = new ArrayList<String>(); 246 context.setTransientVar(transientVar, list); 247 } 248 list.add(context.getVar(context.getNodeDef().getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID)); 249 } 250 251 // wires workflow lib action execution with Oozie Dag 252 public static class LiteActionHandler extends ActionNodeHandler { 253 254 @Override 255 public void start(Context context) throws WorkflowException { 256 liteExecute(context, null); 257 } 258 259 @Override 260 public void end(Context context) { 261 } 262 263 @Override 264 public void kill(Context context) { 265 liteKill(context); 266 } 267 268 @Override 269 public void fail(Context context) { 270 liteFail(context); 271 } 272 } 273 274 // wires workflow lib decision execution with Oozie Dag 275 public static class LiteDecisionHandler extends DecisionNodeHandler { 276 277 @Override 278 public void start(Context context) throws WorkflowException { 279 liteExecute(context, null); 280 } 281 282 @Override 283 public void end(Context context) { 284 } 285 286 @Override 287 public void kill(Context context) { 288 liteKill(context); 289 } 290 291 @Override 292 public void fail(Context context) { 293 liteFail(context); 294 } 295 } 296 297 // wires workflow lib control nodes with Oozie Dag 298 public static class LiteControlNodeHandler extends ControlNodeHandler { 299 300 @Override 301 public void touch(Context context) throws WorkflowException { 302 Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass(); 303 String nodeType; 304 if (nodeClass.equals(StartNodeDef.class)) { 305 nodeType = StartActionExecutor.TYPE; 306 } 307 else if (nodeClass.equals(EndNodeDef.class)) { 308 nodeType = EndActionExecutor.TYPE; 309 } 310 else if (nodeClass.equals(KillNodeDef.class)) { 311 nodeType = KillActionExecutor.TYPE; 312 } 313 else if (nodeClass.equals(ForkNodeDef.class)) { 314 nodeType = ForkActionExecutor.TYPE; 315 } 316 else if (nodeClass.equals(JoinNodeDef.class)) { 317 nodeType = JoinActionExecutor.TYPE; 318 } else { 319 throw new IllegalStateException("Invalid node type: " + nodeClass); 320 } 321 322 liteExecute(context, nodeType); 323 } 324 325 } 326}