001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.hadoop.util.StringUtils; 022 import org.apache.oozie.action.control.EndActionExecutor; 023 import org.apache.oozie.action.control.ForkActionExecutor; 024 import org.apache.oozie.action.control.JoinActionExecutor; 025 import org.apache.oozie.action.control.KillActionExecutor; 026 import org.apache.oozie.action.control.StartActionExecutor; 027 import org.apache.oozie.command.wf.ReRunXCommand; 028 029 import org.apache.oozie.client.WorkflowAction; 030 import org.apache.oozie.WorkflowActionBean; 031 import org.apache.oozie.WorkflowJobBean; 032 import org.apache.oozie.ErrorCode; 033 import org.apache.oozie.workflow.WorkflowException; 034 import org.apache.oozie.workflow.WorkflowInstance; 035 import org.apache.oozie.workflow.lite.ActionNodeHandler; 036 import org.apache.oozie.workflow.lite.ControlNodeHandler; 037 import org.apache.oozie.workflow.lite.DecisionNodeHandler; 038 import org.apache.oozie.workflow.lite.EndNodeDef; 039 import org.apache.oozie.workflow.lite.ForkNodeDef; 040 import org.apache.oozie.workflow.lite.JoinNodeDef; 041 import org.apache.oozie.workflow.lite.KillNodeDef; 042 import org.apache.oozie.workflow.lite.NodeDef; 043 import org.apache.oozie.workflow.lite.NodeHandler; 044 import org.apache.oozie.util.XLog; 045 import org.apache.oozie.util.XmlUtils; 046 import org.apache.oozie.workflow.lite.StartNodeDef; 047 import org.jdom.Element; 048 import org.jdom.JDOMException; 049 050 import java.util.ArrayList; 051 import java.util.Collection; 052 import java.util.HashSet; 053 import java.util.List; 054 import java.util.Set; 055 056 public abstract class LiteWorkflowStoreService extends WorkflowStoreService { 057 058 public static final String CONF_PREFIX = Service.CONF_PREFIX + "LiteWorkflowStoreService."; 059 public static final String CONF_PREFIX_USER_RETRY = CONF_PREFIX + "user.retry."; 060 public static final String CONF_USER_RETRY_MAX = CONF_PREFIX_USER_RETRY + "max"; 061 public static final String CONF_USER_RETRY_INTEVAL = CONF_PREFIX_USER_RETRY + "inteval"; 062 public static final String CONF_USER_RETRY_ERROR_CODE = CONF_PREFIX_USER_RETRY + "error.code"; 063 public static final String CONF_USER_RETRY_ERROR_CODE_EXT = CONF_PREFIX_USER_RETRY + "error.code.ext"; 064 065 public static final String NODE_DEF_VERSION_0 = "_oozie_inst_v_0"; 066 public static final String NODE_DEF_VERSION_1 = "_oozie_inst_v_1"; 067 public static final String CONF_NODE_DEF_VERSION = CONF_PREFIX + "node.def.version"; 068 069 /** 070 * Delegation method used by the Action and Decision {@link NodeHandler} on start. <p/> This method provides the 071 * necessary information to create ActionExecutors. 072 * 073 * @param context NodeHandler context. 074 * @param actionType the action type. 075 * @throws WorkflowException thrown if there was an error parsing the action configuration. 076 */ 077 @SuppressWarnings("unchecked") 078 protected static void liteExecute(NodeHandler.Context context, String actionType) throws WorkflowException { 079 XLog log = XLog.getLog(LiteWorkflowStoreService.class); 080 String jobId = context.getProcessInstance().getId(); 081 String nodeName = context.getNodeDef().getName(); 082 String skipVar = context.getProcessInstance().getVar(context.getNodeDef().getName() 083 + WorkflowInstance.NODE_VAR_SEPARATOR + ReRunXCommand.TO_SKIP); 084 boolean skipAction = false; 085 if (skipVar != null) { 086 skipAction = skipVar.equals("true"); 087 } 088 WorkflowActionBean action = new WorkflowActionBean(); 089 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, nodeName); 090 091 if (!skipAction) { 092 String nodeConf = context.getNodeDef().getConf(); 093 String executionPath = context.getExecutionPath(); 094 095 if (actionType == null) { 096 try { 097 Element element = XmlUtils.parseXml(nodeConf); 098 actionType = element.getName(); 099 nodeConf = XmlUtils.prettyPrint(element).toString(); 100 } 101 catch (JDOMException ex) { 102 throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex); 103 } 104 } 105 log.debug(" Creating action for node [{0}]", nodeName); 106 action.setType(actionType); 107 action.setExecutionPath(executionPath); 108 action.setConf(nodeConf); 109 action.setLogToken(((WorkflowJobBean) context.getTransientVar(WORKFLOW_BEAN)).getLogToken()); 110 action.setStatus(WorkflowAction.Status.PREP); 111 action.setJobId(jobId); 112 } 113 action.setCred(context.getNodeDef().getCred()); 114 log.debug("Setting action for cred: '"+context.getNodeDef().getCred() + 115 "', name: '"+ context.getNodeDef().getName() + "'"); 116 117 action.setUserRetryCount(0); 118 int userRetryMax = getUserRetryMax(context); 119 int userRetryInterval = getUserRetryInterval(context); 120 action.setUserRetryMax(userRetryMax); 121 action.setUserRetryInterval(userRetryInterval); 122 log.debug("Setting action for userRetryMax: '"+ userRetryMax + 123 "', userRetryInterval: '" + userRetryInterval + 124 "', name: '"+ context.getNodeDef().getName() + "'"); 125 126 action.setName(nodeName); 127 action.setId(actionId); 128 context.setVar(nodeName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID, actionId); 129 List list = (List) context.getTransientVar(ACTIONS_TO_START); 130 if (list == null) { 131 list = new ArrayList(); 132 context.setTransientVar(ACTIONS_TO_START, list); 133 } 134 list.add(action); 135 } 136 137 private static int getUserRetryInterval(NodeHandler.Context context) throws WorkflowException { 138 Configuration conf = Services.get().get(ConfigurationService.class).getConf(); 139 int ret = conf.getInt(CONF_USER_RETRY_INTEVAL, 5); 140 String userRetryInterval = context.getNodeDef().getUserRetryInterval(); 141 142 if (!userRetryInterval.equals("null")) { 143 try { 144 ret = Integer.parseInt(userRetryInterval); 145 } 146 catch (NumberFormatException nfe) { 147 throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe); 148 } 149 } 150 return ret; 151 } 152 153 private static int getUserRetryMax(NodeHandler.Context context) throws WorkflowException { 154 XLog log = XLog.getLog(LiteWorkflowStoreService.class); 155 Configuration conf = Services.get().get(ConfigurationService.class).getConf(); 156 int ret = conf.getInt(CONF_USER_RETRY_MAX, 0); 157 int max = ret; 158 String userRetryMax = context.getNodeDef().getUserRetryMax(); 159 160 if (!userRetryMax.equals("null")) { 161 try { 162 ret = Integer.parseInt(userRetryMax); 163 if (ret > max) { 164 ret = max; 165 log.warn(ErrorCode.E0820.getTemplate(), ret, max); 166 } 167 } 168 catch (NumberFormatException nfe) { 169 throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe); 170 } 171 } 172 else { 173 ret = 0; 174 } 175 return ret; 176 } 177 178 /** 179 * Get system defined and instance defined error codes for which USER_RETRY is allowed 180 * 181 * @return set of error code user-retry is allowed for 182 */ 183 public static Set<String> getUserRetryErrorCode() { 184 Configuration conf = Services.get().get(ConfigurationService.class).getConf(); 185 // eliminating whitespaces in the error codes value specification 186 String errorCodeString = conf.get(CONF_USER_RETRY_ERROR_CODE).replaceAll("\\s+", ""); 187 Collection<String> strings = StringUtils.getStringCollection(errorCodeString); 188 String errorCodeExtString = conf.get(CONF_USER_RETRY_ERROR_CODE_EXT).replaceAll("\\s+", ""); 189 Collection<String> extra = StringUtils.getStringCollection(errorCodeExtString); 190 Set<String> set = new HashSet<String>(); 191 set.addAll(strings); 192 set.addAll(extra); 193 return set; 194 } 195 196 /** 197 * Get NodeDef default version, _oozie_inst_v_0 or _oozie_inst_v_1 198 * 199 * @return nodedef default version 200 * @throws WorkflowException thrown if there was an error parsing the action configuration. 201 */ 202 public static String getNodeDefDefaultVersion() throws WorkflowException { 203 Configuration conf = Services.get().get(ConfigurationService.class).getConf(); 204 String ret = conf.get(CONF_NODE_DEF_VERSION); 205 if (ret == null) { 206 ret = NODE_DEF_VERSION_1; 207 } 208 return ret; 209 } 210 211 /** 212 * Delegation method used when failing actions. <p/> 213 * 214 * @param context NodeHandler context. 215 */ 216 @SuppressWarnings("unchecked") 217 protected static void liteFail(NodeHandler.Context context) { 218 liteTerminate(context, ACTIONS_TO_FAIL); 219 } 220 221 /** 222 * Delegation method used when killing actions. <p/> 223 * 224 * @param context NodeHandler context. 225 */ 226 @SuppressWarnings("unchecked") 227 protected static void liteKill(NodeHandler.Context context) { 228 liteTerminate(context, ACTIONS_TO_KILL); 229 } 230 231 /** 232 * Used to terminate jobs - FAIL or KILL. <p/> 233 * 234 * @param context NodeHandler context. 235 * @param transientVar The transient variable name. 236 */ 237 @SuppressWarnings("unchecked") 238 private static void liteTerminate(NodeHandler.Context context, String transientVar) { 239 List<String> list = (List<String>) context.getTransientVar(transientVar); 240 if (list == null) { 241 list = new ArrayList<String>(); 242 context.setTransientVar(transientVar, list); 243 } 244 list.add(context.getVar(context.getNodeDef().getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID)); 245 } 246 247 // wires workflow lib action execution with Oozie Dag 248 public static class LiteActionHandler extends ActionNodeHandler { 249 250 @Override 251 public void start(Context context) throws WorkflowException { 252 liteExecute(context, null); 253 } 254 255 @Override 256 public void end(Context context) { 257 } 258 259 @Override 260 public void kill(Context context) { 261 liteKill(context); 262 } 263 264 @Override 265 public void fail(Context context) { 266 liteFail(context); 267 } 268 } 269 270 // wires workflow lib decision execution with Oozie Dag 271 public static class LiteDecisionHandler extends DecisionNodeHandler { 272 273 @Override 274 public void start(Context context) throws WorkflowException { 275 liteExecute(context, null); 276 } 277 278 @Override 279 public void end(Context context) { 280 } 281 282 @Override 283 public void kill(Context context) { 284 liteKill(context); 285 } 286 287 @Override 288 public void fail(Context context) { 289 liteFail(context); 290 } 291 } 292 293 // wires workflow lib control nodes with Oozie Dag 294 public static class LiteControlNodeHandler extends ControlNodeHandler { 295 296 @Override 297 public void touch(Context context) throws WorkflowException { 298 Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass(); 299 String nodeType; 300 if (nodeClass.equals(StartNodeDef.class)) { 301 nodeType = StartActionExecutor.TYPE; 302 } 303 else if (nodeClass.equals(EndNodeDef.class)) { 304 nodeType = EndActionExecutor.TYPE; 305 } 306 else if (nodeClass.equals(KillNodeDef.class)) { 307 nodeType = KillActionExecutor.TYPE; 308 } 309 else if (nodeClass.equals(ForkNodeDef.class)) { 310 nodeType = ForkActionExecutor.TYPE; 311 } 312 else if (nodeClass.equals(JoinNodeDef.class)) { 313 nodeType = JoinActionExecutor.TYPE; 314 } else { 315 throw new IllegalStateException("Invalid node type: " + nodeClass); 316 } 317 318 liteExecute(context, nodeType); 319 } 320 321 } 322 }