001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.oozie.command.wf.ReRunXCommand; 022 023 import org.apache.oozie.client.WorkflowAction; 024 import org.apache.oozie.WorkflowActionBean; 025 import org.apache.oozie.WorkflowJobBean; 026 import org.apache.oozie.ErrorCode; 027 import org.apache.oozie.workflow.WorkflowException; 028 import org.apache.oozie.workflow.WorkflowInstance; 029 import org.apache.oozie.workflow.lite.ActionNodeHandler; 030 import org.apache.oozie.workflow.lite.DecisionNodeHandler; 031 import org.apache.oozie.workflow.lite.NodeHandler; 032 import org.apache.oozie.util.XLog; 033 import org.apache.oozie.util.XmlUtils; 034 import org.jdom.Element; 035 import org.jdom.JDOMException; 036 037 import java.util.ArrayList; 038 import java.util.Collection; 039 import java.util.HashSet; 040 import java.util.List; 041 import java.util.Set; 042 043 public abstract class LiteWorkflowStoreService extends WorkflowStoreService { 044 045 public static final String CONF_PREFIX = Service.CONF_PREFIX + "LiteWorkflowStoreService."; 046 public static final String CONF_PREFIX_USER_RETRY = CONF_PREFIX + "user.retry."; 047 public static final String CONF_USER_RETRY_MAX = CONF_PREFIX_USER_RETRY + "max"; 048 public static final String CONF_USER_RETRY_INTEVAL = CONF_PREFIX_USER_RETRY + "inteval"; 049 public static final String CONF_USER_RETRY_ERROR_CODE = CONF_PREFIX_USER_RETRY + "error.code"; 050 public static final String CONF_USER_RETRY_ERROR_CODE_EXT = CONF_PREFIX_USER_RETRY + "error.code.ext"; 051 052 public static final String NODE_DEF_VERSION_0 = "_oozie_inst_v_0"; 053 public static final String NODE_DEF_VERSION_1 = "_oozie_inst_v_1"; 054 public static final String CONF_NODE_DEF_VERSION = CONF_PREFIX + "node.def.version"; 055 056 /** 057 * Delegation method used by the Action and Decision {@link NodeHandler} on start. <p/> This method provides the 058 * necessary information to create ActionExecutors. 059 * 060 * @param context NodeHandler context. 061 * @throws WorkflowException thrown if there was an error parsing the action configuration. 062 */ 063 @SuppressWarnings("unchecked") 064 protected static void liteExecute(NodeHandler.Context context) throws WorkflowException { 065 XLog log = XLog.getLog(LiteWorkflowStoreService.class); 066 String jobId = context.getProcessInstance().getId(); 067 String nodeName = context.getNodeDef().getName(); 068 String skipVar = context.getProcessInstance().getVar(context.getNodeDef().getName() 069 + WorkflowInstance.NODE_VAR_SEPARATOR + ReRunXCommand.TO_SKIP); 070 boolean skipAction = false; 071 if (skipVar != null) { 072 skipAction = skipVar.equals("true"); 073 } 074 WorkflowActionBean action = new WorkflowActionBean(); 075 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, nodeName); 076 077 if (!skipAction) { 078 String nodeConf = context.getNodeDef().getConf(); 079 String executionPath = context.getExecutionPath(); 080 081 String actionType; 082 try { 083 Element element = XmlUtils.parseXml(nodeConf); 084 actionType = element.getName(); 085 nodeConf = XmlUtils.prettyPrint(element).toString(); 086 } 087 catch (JDOMException ex) { 088 throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex); 089 } 090 091 log.debug(" Creating action for node [{0}]", nodeName); 092 action.setType(actionType); 093 action.setExecutionPath(executionPath); 094 action.setConf(nodeConf); 095 action.setLogToken(((WorkflowJobBean) context.getTransientVar(WORKFLOW_BEAN)).getLogToken()); 096 action.setStatus(WorkflowAction.Status.PREP); 097 action.setJobId(jobId); 098 } 099 action.setCred(context.getNodeDef().getCred()); 100 log.debug("Setting action for cred: '"+context.getNodeDef().getCred() + 101 "', name: '"+ context.getNodeDef().getName() + "'"); 102 103 action.setUserRetryCount(0); 104 int userRetryMax = getUserRetryMax(context); 105 int userRetryInterval = getUserRetryInterval(context); 106 action.setUserRetryMax(userRetryMax); 107 action.setUserRetryInterval(userRetryInterval); 108 log.debug("Setting action for userRetryMax: '"+ userRetryMax + 109 "', userRetryInterval: '" + userRetryInterval + 110 "', name: '"+ context.getNodeDef().getName() + "'"); 111 112 action.setName(nodeName); 113 action.setId(actionId); 114 context.setVar(nodeName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID, actionId); 115 List list = (List) context.getTransientVar(ACTIONS_TO_START); 116 if (list == null) { 117 list = new ArrayList(); 118 context.setTransientVar(ACTIONS_TO_START, list); 119 } 120 list.add(action); 121 } 122 123 private static int getUserRetryInterval(NodeHandler.Context context) throws WorkflowException { 124 Configuration conf = Services.get().get(ConfigurationService.class).getConf(); 125 int ret = conf.getInt(CONF_USER_RETRY_INTEVAL, 5); 126 String userRetryInterval = context.getNodeDef().getUserRetryInterval(); 127 128 if (!userRetryInterval.equals("null")) { 129 try { 130 ret = Integer.parseInt(userRetryInterval); 131 } 132 catch (NumberFormatException nfe) { 133 throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe); 134 } 135 } 136 return ret; 137 } 138 139 private static int getUserRetryMax(NodeHandler.Context context) throws WorkflowException { 140 XLog log = XLog.getLog(LiteWorkflowStoreService.class); 141 Configuration conf = Services.get().get(ConfigurationService.class).getConf(); 142 int ret = conf.getInt(CONF_USER_RETRY_MAX, 0); 143 int max = ret; 144 String userRetryMax = context.getNodeDef().getUserRetryMax(); 145 146 if (!userRetryMax.equals("null")) { 147 try { 148 ret = Integer.parseInt(userRetryMax); 149 if (ret > max) { 150 ret = max; 151 log.warn(ErrorCode.E0820.getTemplate(), ret, max); 152 } 153 } 154 catch (NumberFormatException nfe) { 155 throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe); 156 } 157 } 158 else { 159 ret = 0; 160 } 161 return ret; 162 } 163 164 /** 165 * Get system defined and instance defined error codes for which USER_RETRY is allowed 166 * 167 * @return set of error code user-retry is allowed for 168 */ 169 public static Set<String> getUserRetryErrorCode() { 170 Configuration conf = Services.get().get(ConfigurationService.class).getConf(); 171 Collection<String> strings = (Collection<String>) conf.getStringCollection(CONF_USER_RETRY_ERROR_CODE); 172 Collection<String> extra = (Collection<String>) conf.getStringCollection(CONF_USER_RETRY_ERROR_CODE_EXT); 173 Set<String> set = new HashSet<String>(); 174 set.addAll(strings); 175 set.addAll(extra); 176 return set; 177 } 178 179 /** 180 * Get NodeDef default version, _oozie_inst_v_0 or _oozie_inst_v_1 181 * 182 * @return nodedef default version 183 * @throws WorkflowException thrown if there was an error parsing the action configuration. 184 */ 185 public static String getNodeDefDefaultVersion() throws WorkflowException { 186 Configuration conf = Services.get().get(ConfigurationService.class).getConf(); 187 String ret = conf.get(CONF_NODE_DEF_VERSION); 188 if (ret == null) { 189 ret = NODE_DEF_VERSION_1; 190 } 191 return ret; 192 } 193 194 /** 195 * Delegation method used when failing actions. <p/> 196 * 197 * @param context NodeHandler context. 198 */ 199 @SuppressWarnings("unchecked") 200 protected static void liteFail(NodeHandler.Context context) { 201 liteTerminate(context, ACTIONS_TO_FAIL); 202 } 203 204 /** 205 * Delegation method used when killing actions. <p/> 206 * 207 * @param context NodeHandler context. 208 */ 209 @SuppressWarnings("unchecked") 210 protected static void liteKill(NodeHandler.Context context) { 211 liteTerminate(context, ACTIONS_TO_KILL); 212 } 213 214 /** 215 * Used to terminate jobs - FAIL or KILL. <p/> 216 * 217 * @param context NodeHandler context. 218 * @param transientVar The transient variable name. 219 */ 220 @SuppressWarnings("unchecked") 221 private static void liteTerminate(NodeHandler.Context context, String transientVar) { 222 List<String> list = (List<String>) context.getTransientVar(transientVar); 223 if (list == null) { 224 list = new ArrayList<String>(); 225 context.setTransientVar(transientVar, list); 226 } 227 list.add(context.getVar(context.getNodeDef().getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID)); 228 } 229 230 // wires workflow lib action execution with Oozie Dag 231 public static class LiteActionHandler extends ActionNodeHandler { 232 233 @Override 234 public void start(Context context) throws WorkflowException { 235 liteExecute(context); 236 } 237 238 @Override 239 public void end(Context context) { 240 } 241 242 @Override 243 public void kill(Context context) { 244 liteKill(context); 245 } 246 247 @Override 248 public void fail(Context context) { 249 liteFail(context); 250 } 251 } 252 253 // wires workflow lib decision execution with Oozie Dag 254 public static class LiteDecisionHandler extends DecisionNodeHandler { 255 256 @Override 257 public void start(Context context) throws WorkflowException { 258 liteExecute(context); 259 } 260 261 @Override 262 public void end(Context context) { 263 } 264 265 @Override 266 public void kill(Context context) { 267 liteKill(context); 268 } 269 270 @Override 271 public void fail(Context context) { 272 liteFail(context); 273 } 274 } 275 }