001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.util.StringUtils; 023import org.apache.oozie.action.control.EndActionExecutor; 024import org.apache.oozie.action.control.ForkActionExecutor; 025import org.apache.oozie.action.control.JoinActionExecutor; 026import org.apache.oozie.action.control.KillActionExecutor; 027import org.apache.oozie.action.control.StartActionExecutor; 028import org.apache.oozie.command.wf.ReRunXCommand; 029 030import org.apache.oozie.client.WorkflowAction; 031import org.apache.oozie.WorkflowActionBean; 032import org.apache.oozie.WorkflowJobBean; 033import org.apache.oozie.ErrorCode; 034import org.apache.oozie.workflow.WorkflowException; 035import org.apache.oozie.workflow.WorkflowInstance; 036import org.apache.oozie.workflow.lite.ActionNodeHandler; 037import org.apache.oozie.workflow.lite.ControlNodeHandler; 038import org.apache.oozie.workflow.lite.DecisionNodeHandler; 039import org.apache.oozie.workflow.lite.EndNodeDef; 040import org.apache.oozie.workflow.lite.ForkNodeDef; 041import org.apache.oozie.workflow.lite.JoinNodeDef; 042import org.apache.oozie.workflow.lite.KillNodeDef; 043import org.apache.oozie.workflow.lite.NodeDef; 044import org.apache.oozie.workflow.lite.NodeHandler; 045import org.apache.oozie.util.XLog; 046import org.apache.oozie.util.XmlUtils; 047import org.apache.oozie.workflow.lite.StartNodeDef; 048import org.jdom.Element; 049import org.jdom.JDOMException; 050 051import java.util.ArrayList; 052import java.util.Collection; 053import java.util.HashSet; 054import java.util.List; 055import java.util.Set; 056 057public abstract class LiteWorkflowStoreService extends WorkflowStoreService { 058 059 public static final String CONF_PREFIX = Service.CONF_PREFIX + "LiteWorkflowStoreService."; 060 public static final String CONF_PREFIX_USER_RETRY = CONF_PREFIX + "user.retry."; 061 public static final String CONF_USER_RETRY_MAX = CONF_PREFIX_USER_RETRY + "max"; 062 public static final String CONF_USER_RETRY_INTEVAL = CONF_PREFIX_USER_RETRY + "inteval"; 063 public static final String CONF_USER_RETRY_POLICY = CONF_PREFIX_USER_RETRY + "policy"; 064 public static final String CONF_USER_RETRY_ERROR_CODE = CONF_PREFIX_USER_RETRY + "error.code"; 065 public static final String CONF_USER_RETRY_ERROR_CODE_EXT = CONF_PREFIX_USER_RETRY + "error.code.ext"; 066 public static final String DEFAULT_USER_RETRY_POLICY = "PERIODIC"; 067 068 public static final String NODE_DEF_VERSION_0 = "_oozie_inst_v_0"; 069 public static final String NODE_DEF_VERSION_1 = "_oozie_inst_v_1"; 070 public static final String NODE_DEF_VERSION_2 = "_oozie_inst_v_2"; 071 public static final String CONF_NODE_DEF_VERSION = CONF_PREFIX + "node.def.version"; 072 073 public static final String USER_ERROR_CODE_ALL = "ALL"; 074 075 /** 076 * Delegation method used by the Action and Decision {@link NodeHandler} on start. <p> This method provides the 077 * necessary information to create ActionExecutors. 078 * 079 * @param context NodeHandler context. 080 * @param actionType the action type. 081 * @throws WorkflowException thrown if there was an error parsing the action configuration. 082 */ 083 @SuppressWarnings("unchecked") 084 protected static void liteExecute(NodeHandler.Context context, String actionType) throws WorkflowException { 085 XLog log = XLog.getLog(LiteWorkflowStoreService.class); 086 String jobId = context.getProcessInstance().getId(); 087 String nodeName = context.getNodeDef().getName(); 088 String skipVar = context.getProcessInstance().getVar(context.getNodeDef().getName() 089 + WorkflowInstance.NODE_VAR_SEPARATOR + ReRunXCommand.TO_SKIP); 090 boolean skipAction = false; 091 if (skipVar != null) { 092 skipAction = skipVar.equals("true"); 093 } 094 WorkflowActionBean action = new WorkflowActionBean(); 095 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, nodeName); 096 097 if (!skipAction) { 098 String nodeConf = context.getNodeDef().getConf(); 099 100 if (actionType == null) { 101 try { 102 Element element = XmlUtils.parseXml(nodeConf); 103 actionType = element.getName(); 104 nodeConf = XmlUtils.prettyPrint(element).toString(); 105 } 106 catch (JDOMException ex) { 107 throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex); 108 } 109 } 110 log.debug(" Creating action for node [{0}]", nodeName); 111 action.setType(actionType); 112 action.setConf(nodeConf); 113 action.setLogToken(((WorkflowJobBean) context.getTransientVar(WORKFLOW_BEAN)).getLogToken()); 114 action.setStatus(WorkflowAction.Status.PREP); 115 action.setJobId(jobId); 116 } 117 118 String executionPath = context.getExecutionPath(); 119 action.setExecutionPath(executionPath); 120 action.setCred(context.getNodeDef().getCred()); 121 log.debug("Setting action for cred: '"+context.getNodeDef().getCred() + 122 "', name: '"+ context.getNodeDef().getName() + "'"); 123 124 action.setUserRetryCount(0); 125 int userRetryMax = getUserRetryMax(context); 126 int userRetryInterval = getUserRetryInterval(context); 127 action.setUserRetryMax(userRetryMax); 128 action.setUserRetryInterval(userRetryInterval); 129 log.debug("Setting action for userRetryMax: '"+ userRetryMax + 130 "', userRetryInterval: '" + userRetryInterval + 131 "', name: '"+ context.getNodeDef().getName() + "'"); 132 133 action.setName(nodeName); 134 action.setId(actionId); 135 context.setVar(nodeName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID, actionId); 136 List list = (List) context.getTransientVar(ACTIONS_TO_START); 137 if (list == null) { 138 list = new ArrayList(); 139 context.setTransientVar(ACTIONS_TO_START, list); 140 } 141 list.add(action); 142 } 143 144 private static int getUserRetryInterval(NodeHandler.Context context) throws WorkflowException { 145 int ret = ConfigurationService.getInt(CONF_USER_RETRY_INTEVAL); 146 String userRetryInterval = context.getNodeDef().getUserRetryInterval(); 147 148 if (!userRetryInterval.equals("null")) { 149 try { 150 ret = Integer.parseInt(userRetryInterval); 151 } 152 catch (NumberFormatException nfe) { 153 throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe); 154 } 155 } 156 return ret; 157 } 158 159 private static int getUserRetryMax(NodeHandler.Context context) throws WorkflowException { 160 XLog log = XLog.getLog(LiteWorkflowStoreService.class); 161 int ret = ConfigurationService.getInt(CONF_USER_RETRY_MAX); 162 int max = ret; 163 String userRetryMax = context.getNodeDef().getUserRetryMax(); 164 165 if (!userRetryMax.equals("null")) { 166 try { 167 ret = Integer.parseInt(userRetryMax); 168 if (ret > max) { 169 ret = max; 170 log.warn(ErrorCode.E0820.getTemplate(), ret, max); 171 } 172 } 173 catch (NumberFormatException nfe) { 174 throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe); 175 } 176 } 177 else { 178 ret = 0; 179 } 180 return ret; 181 } 182 183 /** 184 * Get system defined and instance defined error codes for which USER_RETRY is allowed 185 * 186 * @return set of error code user-retry is allowed for 187 */ 188 public static Set<String> getUserRetryErrorCode() { 189 // eliminating whitespaces in the error codes value specification 190 String errorCodeString = ConfigurationService.get(CONF_USER_RETRY_ERROR_CODE).replaceAll("\\s+", ""); 191 Collection<String> strings = StringUtils.getStringCollection(errorCodeString); 192 String errorCodeExtString = ConfigurationService.get(CONF_USER_RETRY_ERROR_CODE_EXT).replaceAll("\\s+", ""); 193 Collection<String> extra = StringUtils.getStringCollection(errorCodeExtString); 194 Set<String> set = new HashSet<String>(); 195 set.addAll(strings); 196 set.addAll(extra); 197 return set; 198 } 199 200 /** 201 * Get NodeDef default version, _oozie_inst_v_0, _oozie_inst_v_1 or 202 * _oozie_inst_v_2 203 * 204 * @return nodedef default version 205 * @throws WorkflowException thrown if there was an error parsing the action 206 * configuration. 207 */ 208 public static String getNodeDefDefaultVersion() throws WorkflowException { 209 String ret = ConfigurationService.get(CONF_NODE_DEF_VERSION); 210 if (ret == null) { 211 ret = NODE_DEF_VERSION_2; 212 } 213 return ret; 214 } 215 216 /** 217 * Delegation method used when failing actions. <p> 218 * 219 * @param context NodeHandler context. 220 */ 221 @SuppressWarnings("unchecked") 222 protected static void liteFail(NodeHandler.Context context) { 223 liteTerminate(context, ACTIONS_TO_FAIL); 224 } 225 226 /** 227 * Delegation method used when killing actions. <p> 228 * 229 * @param context NodeHandler context. 230 */ 231 @SuppressWarnings("unchecked") 232 protected static void liteKill(NodeHandler.Context context) { 233 liteTerminate(context, ACTIONS_TO_KILL); 234 } 235 236 /** 237 * Used to terminate jobs - FAIL or KILL. <p> 238 * 239 * @param context NodeHandler context. 240 * @param transientVar The transient variable name. 241 */ 242 @SuppressWarnings("unchecked") 243 private static void liteTerminate(NodeHandler.Context context, String transientVar) { 244 List<String> list = (List<String>) context.getTransientVar(transientVar); 245 if (list == null) { 246 list = new ArrayList<String>(); 247 context.setTransientVar(transientVar, list); 248 } 249 list.add(context.getVar(context.getNodeDef().getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID)); 250 } 251 252 // wires workflow lib action execution with Oozie Dag 253 public static class LiteActionHandler extends ActionNodeHandler { 254 255 @Override 256 public void start(Context context) throws WorkflowException { 257 liteExecute(context, null); 258 } 259 260 @Override 261 public void end(Context context) { 262 } 263 264 @Override 265 public void kill(Context context) { 266 liteKill(context); 267 } 268 269 @Override 270 public void fail(Context context) { 271 liteFail(context); 272 } 273 } 274 275 // wires workflow lib decision execution with Oozie Dag 276 public static class LiteDecisionHandler extends DecisionNodeHandler { 277 278 @Override 279 public void start(Context context) throws WorkflowException { 280 liteExecute(context, null); 281 } 282 283 @Override 284 public void end(Context context) { 285 } 286 287 @Override 288 public void kill(Context context) { 289 liteKill(context); 290 } 291 292 @Override 293 public void fail(Context context) { 294 liteFail(context); 295 } 296 } 297 298 // wires workflow lib control nodes with Oozie Dag 299 public static class LiteControlNodeHandler extends ControlNodeHandler { 300 301 @Override 302 public void touch(Context context) throws WorkflowException { 303 Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass(); 304 String nodeType; 305 if (nodeClass.equals(StartNodeDef.class)) { 306 nodeType = StartActionExecutor.TYPE; 307 } 308 else if (nodeClass.equals(EndNodeDef.class)) { 309 nodeType = EndActionExecutor.TYPE; 310 } 311 else if (nodeClass.equals(KillNodeDef.class)) { 312 nodeType = KillActionExecutor.TYPE; 313 } 314 else if (nodeClass.equals(ForkNodeDef.class)) { 315 nodeType = ForkActionExecutor.TYPE; 316 } 317 else if (nodeClass.equals(JoinNodeDef.class)) { 318 nodeType = JoinActionExecutor.TYPE; 319 } else { 320 throw new IllegalStateException("Invalid node type: " + nodeClass); 321 } 322 323 liteExecute(context, nodeType); 324 } 325 326 } 327}