001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.hadoop.util.StringUtils; 022 import org.apache.oozie.command.wf.ReRunXCommand; 023 024 import org.apache.oozie.client.WorkflowAction; 025 import org.apache.oozie.WorkflowActionBean; 026 import org.apache.oozie.WorkflowJobBean; 027 import org.apache.oozie.ErrorCode; 028 import org.apache.oozie.workflow.WorkflowException; 029 import org.apache.oozie.workflow.WorkflowInstance; 030 import org.apache.oozie.workflow.lite.ActionNodeHandler; 031 import org.apache.oozie.workflow.lite.DecisionNodeHandler; 032 import org.apache.oozie.workflow.lite.NodeHandler; 033 import org.apache.oozie.util.XLog; 034 import org.apache.oozie.util.XmlUtils; 035 import org.jdom.Element; 036 import org.jdom.JDOMException; 037 038 import java.util.ArrayList; 039 import java.util.Collection; 040 import java.util.HashSet; 041 import java.util.List; 042 import java.util.Set; 043 044 public abstract class LiteWorkflowStoreService extends WorkflowStoreService { 045 046 public static final String CONF_PREFIX = Service.CONF_PREFIX + "LiteWorkflowStoreService."; 047 public static final String CONF_PREFIX_USER_RETRY = CONF_PREFIX + "user.retry."; 048 public static final String CONF_USER_RETRY_MAX = CONF_PREFIX_USER_RETRY + "max"; 049 public static final String CONF_USER_RETRY_INTEVAL = CONF_PREFIX_USER_RETRY + "inteval"; 050 public static final String CONF_USER_RETRY_ERROR_CODE = CONF_PREFIX_USER_RETRY + "error.code"; 051 public static final String CONF_USER_RETRY_ERROR_CODE_EXT = CONF_PREFIX_USER_RETRY + "error.code.ext"; 052 053 public static final String NODE_DEF_VERSION_0 = "_oozie_inst_v_0"; 054 public static final String NODE_DEF_VERSION_1 = "_oozie_inst_v_1"; 055 public static final String CONF_NODE_DEF_VERSION = CONF_PREFIX + "node.def.version"; 056 057 /** 058 * Delegation method used by the Action and Decision {@link NodeHandler} on start. <p/> This method provides the 059 * necessary information to create ActionExecutors. 060 * 061 * @param context NodeHandler context. 062 * @throws WorkflowException thrown if there was an error parsing the action configuration. 063 */ 064 @SuppressWarnings("unchecked") 065 protected static void liteExecute(NodeHandler.Context context) throws WorkflowException { 066 XLog log = XLog.getLog(LiteWorkflowStoreService.class); 067 String jobId = context.getProcessInstance().getId(); 068 String nodeName = context.getNodeDef().getName(); 069 String skipVar = context.getProcessInstance().getVar(context.getNodeDef().getName() 070 + WorkflowInstance.NODE_VAR_SEPARATOR + ReRunXCommand.TO_SKIP); 071 boolean skipAction = false; 072 if (skipVar != null) { 073 skipAction = skipVar.equals("true"); 074 } 075 WorkflowActionBean action = new WorkflowActionBean(); 076 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, nodeName); 077 078 if (!skipAction) { 079 String nodeConf = context.getNodeDef().getConf(); 080 String executionPath = context.getExecutionPath(); 081 082 String actionType; 083 try { 084 Element element = XmlUtils.parseXml(nodeConf); 085 actionType = element.getName(); 086 nodeConf = XmlUtils.prettyPrint(element).toString(); 087 } 088 catch (JDOMException ex) { 089 throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex); 090 } 091 092 log.debug(" Creating action for node [{0}]", nodeName); 093 action.setType(actionType); 094 action.setExecutionPath(executionPath); 095 action.setConf(nodeConf); 096 action.setLogToken(((WorkflowJobBean) context.getTransientVar(WORKFLOW_BEAN)).getLogToken()); 097 action.setStatus(WorkflowAction.Status.PREP); 098 action.setJobId(jobId); 099 } 100 action.setCred(context.getNodeDef().getCred()); 101 log.debug("Setting action for cred: '"+context.getNodeDef().getCred() + 102 "', name: '"+ context.getNodeDef().getName() + "'"); 103 104 action.setUserRetryCount(0); 105 int userRetryMax = getUserRetryMax(context); 106 int userRetryInterval = getUserRetryInterval(context); 107 action.setUserRetryMax(userRetryMax); 108 action.setUserRetryInterval(userRetryInterval); 109 log.debug("Setting action for userRetryMax: '"+ userRetryMax + 110 "', userRetryInterval: '" + userRetryInterval + 111 "', name: '"+ context.getNodeDef().getName() + "'"); 112 113 action.setName(nodeName); 114 action.setId(actionId); 115 context.setVar(nodeName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID, actionId); 116 List list = (List) context.getTransientVar(ACTIONS_TO_START); 117 if (list == null) { 118 list = new ArrayList(); 119 context.setTransientVar(ACTIONS_TO_START, list); 120 } 121 list.add(action); 122 } 123 124 private static int getUserRetryInterval(NodeHandler.Context context) throws WorkflowException { 125 Configuration conf = Services.get().get(ConfigurationService.class).getConf(); 126 int ret = conf.getInt(CONF_USER_RETRY_INTEVAL, 5); 127 String userRetryInterval = context.getNodeDef().getUserRetryInterval(); 128 129 if (!userRetryInterval.equals("null")) { 130 try { 131 ret = Integer.parseInt(userRetryInterval); 132 } 133 catch (NumberFormatException nfe) { 134 throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe); 135 } 136 } 137 return ret; 138 } 139 140 private static int getUserRetryMax(NodeHandler.Context context) throws WorkflowException { 141 XLog log = XLog.getLog(LiteWorkflowStoreService.class); 142 Configuration conf = Services.get().get(ConfigurationService.class).getConf(); 143 int ret = conf.getInt(CONF_USER_RETRY_MAX, 0); 144 int max = ret; 145 String userRetryMax = context.getNodeDef().getUserRetryMax(); 146 147 if (!userRetryMax.equals("null")) { 148 try { 149 ret = Integer.parseInt(userRetryMax); 150 if (ret > max) { 151 ret = max; 152 log.warn(ErrorCode.E0820.getTemplate(), ret, max); 153 } 154 } 155 catch (NumberFormatException nfe) { 156 throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe); 157 } 158 } 159 else { 160 ret = 0; 161 } 162 return ret; 163 } 164 165 /** 166 * Get system defined and instance defined error codes for which USER_RETRY is allowed 167 * 168 * @return set of error code user-retry is allowed for 169 */ 170 public static Set<String> getUserRetryErrorCode() { 171 Configuration conf = Services.get().get(ConfigurationService.class).getConf(); 172 // eliminating whitespaces in the error codes value specification 173 String errorCodeString = conf.get(CONF_USER_RETRY_ERROR_CODE).replaceAll("\\s+", ""); 174 Collection<String> strings = StringUtils.getStringCollection(errorCodeString); 175 String errorCodeExtString = conf.get(CONF_USER_RETRY_ERROR_CODE_EXT).replaceAll("\\s+", ""); 176 Collection<String> extra = StringUtils.getStringCollection(errorCodeExtString); 177 Set<String> set = new HashSet<String>(); 178 set.addAll(strings); 179 set.addAll(extra); 180 return set; 181 } 182 183 /** 184 * Get NodeDef default version, _oozie_inst_v_0 or _oozie_inst_v_1 185 * 186 * @return nodedef default version 187 * @throws WorkflowException thrown if there was an error parsing the action configuration. 188 */ 189 public static String getNodeDefDefaultVersion() throws WorkflowException { 190 Configuration conf = Services.get().get(ConfigurationService.class).getConf(); 191 String ret = conf.get(CONF_NODE_DEF_VERSION); 192 if (ret == null) { 193 ret = NODE_DEF_VERSION_1; 194 } 195 return ret; 196 } 197 198 /** 199 * Delegation method used when failing actions. <p/> 200 * 201 * @param context NodeHandler context. 202 */ 203 @SuppressWarnings("unchecked") 204 protected static void liteFail(NodeHandler.Context context) { 205 liteTerminate(context, ACTIONS_TO_FAIL); 206 } 207 208 /** 209 * Delegation method used when killing actions. <p/> 210 * 211 * @param context NodeHandler context. 212 */ 213 @SuppressWarnings("unchecked") 214 protected static void liteKill(NodeHandler.Context context) { 215 liteTerminate(context, ACTIONS_TO_KILL); 216 } 217 218 /** 219 * Used to terminate jobs - FAIL or KILL. <p/> 220 * 221 * @param context NodeHandler context. 222 * @param transientVar The transient variable name. 223 */ 224 @SuppressWarnings("unchecked") 225 private static void liteTerminate(NodeHandler.Context context, String transientVar) { 226 List<String> list = (List<String>) context.getTransientVar(transientVar); 227 if (list == null) { 228 list = new ArrayList<String>(); 229 context.setTransientVar(transientVar, list); 230 } 231 list.add(context.getVar(context.getNodeDef().getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID)); 232 } 233 234 // wires workflow lib action execution with Oozie Dag 235 public static class LiteActionHandler extends ActionNodeHandler { 236 237 @Override 238 public void start(Context context) throws WorkflowException { 239 liteExecute(context); 240 } 241 242 @Override 243 public void end(Context context) { 244 } 245 246 @Override 247 public void kill(Context context) { 248 liteKill(context); 249 } 250 251 @Override 252 public void fail(Context context) { 253 liteFail(context); 254 } 255 } 256 257 // wires workflow lib decision execution with Oozie Dag 258 public static class LiteDecisionHandler extends DecisionNodeHandler { 259 260 @Override 261 public void start(Context context) throws WorkflowException { 262 liteExecute(context); 263 } 264 265 @Override 266 public void end(Context context) { 267 } 268 269 @Override 270 public void kill(Context context) { 271 liteKill(context); 272 } 273 274 @Override 275 public void fail(Context context) { 276 liteFail(context); 277 } 278 } 279 }