This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.hadoop.util.StringUtils;
022 import org.apache.oozie.command.wf.ReRunXCommand;
023
024 import org.apache.oozie.client.WorkflowAction;
025 import org.apache.oozie.WorkflowActionBean;
026 import org.apache.oozie.WorkflowJobBean;
027 import org.apache.oozie.ErrorCode;
028 import org.apache.oozie.workflow.WorkflowException;
029 import org.apache.oozie.workflow.WorkflowInstance;
030 import org.apache.oozie.workflow.lite.ActionNodeHandler;
031 import org.apache.oozie.workflow.lite.DecisionNodeHandler;
032 import org.apache.oozie.workflow.lite.NodeHandler;
033 import org.apache.oozie.util.XLog;
034 import org.apache.oozie.util.XmlUtils;
035 import org.jdom.Element;
036 import org.jdom.JDOMException;
037
038 import java.util.ArrayList;
039 import java.util.Collection;
040 import java.util.HashSet;
041 import java.util.List;
042 import java.util.Set;
043
044 public abstract class LiteWorkflowStoreService extends WorkflowStoreService {
045
046 public static final String CONF_PREFIX = Service.CONF_PREFIX + "LiteWorkflowStoreService.";
047 public static final String CONF_PREFIX_USER_RETRY = CONF_PREFIX + "user.retry.";
048 public static final String CONF_USER_RETRY_MAX = CONF_PREFIX_USER_RETRY + "max";
049 public static final String CONF_USER_RETRY_INTEVAL = CONF_PREFIX_USER_RETRY + "inteval";
050 public static final String CONF_USER_RETRY_ERROR_CODE = CONF_PREFIX_USER_RETRY + "error.code";
051 public static final String CONF_USER_RETRY_ERROR_CODE_EXT = CONF_PREFIX_USER_RETRY + "error.code.ext";
052
053 public static final String NODE_DEF_VERSION_0 = "_oozie_inst_v_0";
054 public static final String NODE_DEF_VERSION_1 = "_oozie_inst_v_1";
055 public static final String CONF_NODE_DEF_VERSION = CONF_PREFIX + "node.def.version";
056
057 /**
058 * Delegation method used by the Action and Decision {@link NodeHandler} on start. <p/> This method provides the
059 * necessary information to create ActionExecutors.
060 *
061 * @param context NodeHandler context.
062 * @throws WorkflowException thrown if there was an error parsing the action configuration.
063 */
064 @SuppressWarnings("unchecked")
065 protected static void liteExecute(NodeHandler.Context context) throws WorkflowException {
066 XLog log = XLog.getLog(LiteWorkflowStoreService.class);
067 String jobId = context.getProcessInstance().getId();
068 String nodeName = context.getNodeDef().getName();
069 String skipVar = context.getProcessInstance().getVar(context.getNodeDef().getName()
070 + WorkflowInstance.NODE_VAR_SEPARATOR + ReRunXCommand.TO_SKIP);
071 boolean skipAction = false;
072 if (skipVar != null) {
073 skipAction = skipVar.equals("true");
074 }
075 WorkflowActionBean action = new WorkflowActionBean();
076 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, nodeName);
077
078 if (!skipAction) {
079 String nodeConf = context.getNodeDef().getConf();
080 String executionPath = context.getExecutionPath();
081
082 String actionType;
083 try {
084 Element element = XmlUtils.parseXml(nodeConf);
085 actionType = element.getName();
086 nodeConf = XmlUtils.prettyPrint(element).toString();
087 }
088 catch (JDOMException ex) {
089 throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
090 }
091
092 log.debug(" Creating action for node [{0}]", nodeName);
093 action.setType(actionType);
094 action.setExecutionPath(executionPath);
095 action.setConf(nodeConf);
096 action.setLogToken(((WorkflowJobBean) context.getTransientVar(WORKFLOW_BEAN)).getLogToken());
097 action.setStatus(WorkflowAction.Status.PREP);
098 action.setJobId(jobId);
099 }
100 action.setCred(context.getNodeDef().getCred());
101 log.debug("Setting action for cred: '"+context.getNodeDef().getCred() +
102 "', name: '"+ context.getNodeDef().getName() + "'");
103
104 action.setUserRetryCount(0);
105 int userRetryMax = getUserRetryMax(context);
106 int userRetryInterval = getUserRetryInterval(context);
107 action.setUserRetryMax(userRetryMax);
108 action.setUserRetryInterval(userRetryInterval);
109 log.debug("Setting action for userRetryMax: '"+ userRetryMax +
110 "', userRetryInterval: '" + userRetryInterval +
111 "', name: '"+ context.getNodeDef().getName() + "'");
112
113 action.setName(nodeName);
114 action.setId(actionId);
115 context.setVar(nodeName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID, actionId);
116 List list = (List) context.getTransientVar(ACTIONS_TO_START);
117 if (list == null) {
118 list = new ArrayList();
119 context.setTransientVar(ACTIONS_TO_START, list);
120 }
121 list.add(action);
122 }
123
124 private static int getUserRetryInterval(NodeHandler.Context context) throws WorkflowException {
125 Configuration conf = Services.get().get(ConfigurationService.class).getConf();
126 int ret = conf.getInt(CONF_USER_RETRY_INTEVAL, 5);
127 String userRetryInterval = context.getNodeDef().getUserRetryInterval();
128
129 if (!userRetryInterval.equals("null")) {
130 try {
131 ret = Integer.parseInt(userRetryInterval);
132 }
133 catch (NumberFormatException nfe) {
134 throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe);
135 }
136 }
137 return ret;
138 }
139
140 private static int getUserRetryMax(NodeHandler.Context context) throws WorkflowException {
141 XLog log = XLog.getLog(LiteWorkflowStoreService.class);
142 Configuration conf = Services.get().get(ConfigurationService.class).getConf();
143 int ret = conf.getInt(CONF_USER_RETRY_MAX, 0);
144 int max = ret;
145 String userRetryMax = context.getNodeDef().getUserRetryMax();
146
147 if (!userRetryMax.equals("null")) {
148 try {
149 ret = Integer.parseInt(userRetryMax);
150 if (ret > max) {
151 ret = max;
152 log.warn(ErrorCode.E0820.getTemplate(), ret, max);
153 }
154 }
155 catch (NumberFormatException nfe) {
156 throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe);
157 }
158 }
159 else {
160 ret = 0;
161 }
162 return ret;
163 }
164
165 /**
166 * Get system defined and instance defined error codes for which USER_RETRY is allowed
167 *
168 * @return set of error code user-retry is allowed for
169 */
170 public static Set<String> getUserRetryErrorCode() {
171 Configuration conf = Services.get().get(ConfigurationService.class).getConf();
172 // eliminating whitespaces in the error codes value specification
173 String errorCodeString = conf.get(CONF_USER_RETRY_ERROR_CODE).replaceAll("\\s+", "");
174 Collection<String> strings = StringUtils.getStringCollection(errorCodeString);
175 String errorCodeExtString = conf.get(CONF_USER_RETRY_ERROR_CODE_EXT).replaceAll("\\s+", "");
176 Collection<String> extra = StringUtils.getStringCollection(errorCodeExtString);
177 Set<String> set = new HashSet<String>();
178 set.addAll(strings);
179 set.addAll(extra);
180 return set;
181 }
182
183 /**
184 * Get NodeDef default version, _oozie_inst_v_0 or _oozie_inst_v_1
185 *
186 * @return nodedef default version
187 * @throws WorkflowException thrown if there was an error parsing the action configuration.
188 */
189 public static String getNodeDefDefaultVersion() throws WorkflowException {
190 Configuration conf = Services.get().get(ConfigurationService.class).getConf();
191 String ret = conf.get(CONF_NODE_DEF_VERSION);
192 if (ret == null) {
193 ret = NODE_DEF_VERSION_1;
194 }
195 return ret;
196 }
197
198 /**
199 * Delegation method used when failing actions. <p/>
200 *
201 * @param context NodeHandler context.
202 */
203 @SuppressWarnings("unchecked")
204 protected static void liteFail(NodeHandler.Context context) {
205 liteTerminate(context, ACTIONS_TO_FAIL);
206 }
207
208 /**
209 * Delegation method used when killing actions. <p/>
210 *
211 * @param context NodeHandler context.
212 */
213 @SuppressWarnings("unchecked")
214 protected static void liteKill(NodeHandler.Context context) {
215 liteTerminate(context, ACTIONS_TO_KILL);
216 }
217
218 /**
219 * Used to terminate jobs - FAIL or KILL. <p/>
220 *
221 * @param context NodeHandler context.
222 * @param transientVar The transient variable name.
223 */
224 @SuppressWarnings("unchecked")
225 private static void liteTerminate(NodeHandler.Context context, String transientVar) {
226 List<String> list = (List<String>) context.getTransientVar(transientVar);
227 if (list == null) {
228 list = new ArrayList<String>();
229 context.setTransientVar(transientVar, list);
230 }
231 list.add(context.getVar(context.getNodeDef().getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID));
232 }
233
234 // wires workflow lib action execution with Oozie Dag
235 public static class LiteActionHandler extends ActionNodeHandler {
236
237 @Override
238 public void start(Context context) throws WorkflowException {
239 liteExecute(context);
240 }
241
242 @Override
243 public void end(Context context) {
244 }
245
246 @Override
247 public void kill(Context context) {
248 liteKill(context);
249 }
250
251 @Override
252 public void fail(Context context) {
253 liteFail(context);
254 }
255 }
256
257 // wires workflow lib decision execution with Oozie Dag
258 public static class LiteDecisionHandler extends DecisionNodeHandler {
259
260 @Override
261 public void start(Context context) throws WorkflowException {
262 liteExecute(context);
263 }
264
265 @Override
266 public void end(Context context) {
267 }
268
269 @Override
270 public void kill(Context context) {
271 liteKill(context);
272 }
273
274 @Override
275 public void fail(Context context) {
276 liteFail(context);
277 }
278 }
279 }