This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.hadoop.util.StringUtils;
022 import org.apache.oozie.action.control.EndActionExecutor;
023 import org.apache.oozie.action.control.ForkActionExecutor;
024 import org.apache.oozie.action.control.JoinActionExecutor;
025 import org.apache.oozie.action.control.KillActionExecutor;
026 import org.apache.oozie.action.control.StartActionExecutor;
027 import org.apache.oozie.command.wf.ReRunXCommand;
028
029 import org.apache.oozie.client.WorkflowAction;
030 import org.apache.oozie.WorkflowActionBean;
031 import org.apache.oozie.WorkflowJobBean;
032 import org.apache.oozie.ErrorCode;
033 import org.apache.oozie.workflow.WorkflowException;
034 import org.apache.oozie.workflow.WorkflowInstance;
035 import org.apache.oozie.workflow.lite.ActionNodeHandler;
036 import org.apache.oozie.workflow.lite.ControlNodeHandler;
037 import org.apache.oozie.workflow.lite.DecisionNodeHandler;
038 import org.apache.oozie.workflow.lite.EndNodeDef;
039 import org.apache.oozie.workflow.lite.ForkNodeDef;
040 import org.apache.oozie.workflow.lite.JoinNodeDef;
041 import org.apache.oozie.workflow.lite.KillNodeDef;
042 import org.apache.oozie.workflow.lite.NodeDef;
043 import org.apache.oozie.workflow.lite.NodeHandler;
044 import org.apache.oozie.util.XLog;
045 import org.apache.oozie.util.XmlUtils;
046 import org.apache.oozie.workflow.lite.StartNodeDef;
047 import org.jdom.Element;
048 import org.jdom.JDOMException;
049
050 import java.util.ArrayList;
051 import java.util.Collection;
052 import java.util.HashSet;
053 import java.util.List;
054 import java.util.Set;
055
056 public abstract class LiteWorkflowStoreService extends WorkflowStoreService {
057
058 public static final String CONF_PREFIX = Service.CONF_PREFIX + "LiteWorkflowStoreService.";
059 public static final String CONF_PREFIX_USER_RETRY = CONF_PREFIX + "user.retry.";
060 public static final String CONF_USER_RETRY_MAX = CONF_PREFIX_USER_RETRY + "max";
061 public static final String CONF_USER_RETRY_INTEVAL = CONF_PREFIX_USER_RETRY + "inteval";
062 public static final String CONF_USER_RETRY_ERROR_CODE = CONF_PREFIX_USER_RETRY + "error.code";
063 public static final String CONF_USER_RETRY_ERROR_CODE_EXT = CONF_PREFIX_USER_RETRY + "error.code.ext";
064
065 public static final String NODE_DEF_VERSION_0 = "_oozie_inst_v_0";
066 public static final String NODE_DEF_VERSION_1 = "_oozie_inst_v_1";
067 public static final String CONF_NODE_DEF_VERSION = CONF_PREFIX + "node.def.version";
068
069 /**
070 * Delegation method used by the Action and Decision {@link NodeHandler} on start. <p/> This method provides the
071 * necessary information to create ActionExecutors.
072 *
073 * @param context NodeHandler context.
074 * @param actionType the action type.
075 * @throws WorkflowException thrown if there was an error parsing the action configuration.
076 */
077 @SuppressWarnings("unchecked")
078 protected static void liteExecute(NodeHandler.Context context, String actionType) throws WorkflowException {
079 XLog log = XLog.getLog(LiteWorkflowStoreService.class);
080 String jobId = context.getProcessInstance().getId();
081 String nodeName = context.getNodeDef().getName();
082 String skipVar = context.getProcessInstance().getVar(context.getNodeDef().getName()
083 + WorkflowInstance.NODE_VAR_SEPARATOR + ReRunXCommand.TO_SKIP);
084 boolean skipAction = false;
085 if (skipVar != null) {
086 skipAction = skipVar.equals("true");
087 }
088 WorkflowActionBean action = new WorkflowActionBean();
089 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, nodeName);
090
091 if (!skipAction) {
092 String nodeConf = context.getNodeDef().getConf();
093 String executionPath = context.getExecutionPath();
094
095 if (actionType == null) {
096 try {
097 Element element = XmlUtils.parseXml(nodeConf);
098 actionType = element.getName();
099 nodeConf = XmlUtils.prettyPrint(element).toString();
100 }
101 catch (JDOMException ex) {
102 throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
103 }
104 }
105 log.debug(" Creating action for node [{0}]", nodeName);
106 action.setType(actionType);
107 action.setExecutionPath(executionPath);
108 action.setConf(nodeConf);
109 action.setLogToken(((WorkflowJobBean) context.getTransientVar(WORKFLOW_BEAN)).getLogToken());
110 action.setStatus(WorkflowAction.Status.PREP);
111 action.setJobId(jobId);
112 }
113 action.setCred(context.getNodeDef().getCred());
114 log.debug("Setting action for cred: '"+context.getNodeDef().getCred() +
115 "', name: '"+ context.getNodeDef().getName() + "'");
116
117 action.setUserRetryCount(0);
118 int userRetryMax = getUserRetryMax(context);
119 int userRetryInterval = getUserRetryInterval(context);
120 action.setUserRetryMax(userRetryMax);
121 action.setUserRetryInterval(userRetryInterval);
122 log.debug("Setting action for userRetryMax: '"+ userRetryMax +
123 "', userRetryInterval: '" + userRetryInterval +
124 "', name: '"+ context.getNodeDef().getName() + "'");
125
126 action.setName(nodeName);
127 action.setId(actionId);
128 context.setVar(nodeName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID, actionId);
129 List list = (List) context.getTransientVar(ACTIONS_TO_START);
130 if (list == null) {
131 list = new ArrayList();
132 context.setTransientVar(ACTIONS_TO_START, list);
133 }
134 list.add(action);
135 }
136
137 private static int getUserRetryInterval(NodeHandler.Context context) throws WorkflowException {
138 Configuration conf = Services.get().get(ConfigurationService.class).getConf();
139 int ret = conf.getInt(CONF_USER_RETRY_INTEVAL, 5);
140 String userRetryInterval = context.getNodeDef().getUserRetryInterval();
141
142 if (!userRetryInterval.equals("null")) {
143 try {
144 ret = Integer.parseInt(userRetryInterval);
145 }
146 catch (NumberFormatException nfe) {
147 throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe);
148 }
149 }
150 return ret;
151 }
152
153 private static int getUserRetryMax(NodeHandler.Context context) throws WorkflowException {
154 XLog log = XLog.getLog(LiteWorkflowStoreService.class);
155 Configuration conf = Services.get().get(ConfigurationService.class).getConf();
156 int ret = conf.getInt(CONF_USER_RETRY_MAX, 0);
157 int max = ret;
158 String userRetryMax = context.getNodeDef().getUserRetryMax();
159
160 if (!userRetryMax.equals("null")) {
161 try {
162 ret = Integer.parseInt(userRetryMax);
163 if (ret > max) {
164 ret = max;
165 log.warn(ErrorCode.E0820.getTemplate(), ret, max);
166 }
167 }
168 catch (NumberFormatException nfe) {
169 throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe);
170 }
171 }
172 else {
173 ret = 0;
174 }
175 return ret;
176 }
177
178 /**
179 * Get system defined and instance defined error codes for which USER_RETRY is allowed
180 *
181 * @return set of error code user-retry is allowed for
182 */
183 public static Set<String> getUserRetryErrorCode() {
184 Configuration conf = Services.get().get(ConfigurationService.class).getConf();
185 // eliminating whitespaces in the error codes value specification
186 String errorCodeString = conf.get(CONF_USER_RETRY_ERROR_CODE).replaceAll("\\s+", "");
187 Collection<String> strings = StringUtils.getStringCollection(errorCodeString);
188 String errorCodeExtString = conf.get(CONF_USER_RETRY_ERROR_CODE_EXT).replaceAll("\\s+", "");
189 Collection<String> extra = StringUtils.getStringCollection(errorCodeExtString);
190 Set<String> set = new HashSet<String>();
191 set.addAll(strings);
192 set.addAll(extra);
193 return set;
194 }
195
196 /**
197 * Get NodeDef default version, _oozie_inst_v_0 or _oozie_inst_v_1
198 *
199 * @return nodedef default version
200 * @throws WorkflowException thrown if there was an error parsing the action configuration.
201 */
202 public static String getNodeDefDefaultVersion() throws WorkflowException {
203 Configuration conf = Services.get().get(ConfigurationService.class).getConf();
204 String ret = conf.get(CONF_NODE_DEF_VERSION);
205 if (ret == null) {
206 ret = NODE_DEF_VERSION_1;
207 }
208 return ret;
209 }
210
211 /**
212 * Delegation method used when failing actions. <p/>
213 *
214 * @param context NodeHandler context.
215 */
216 @SuppressWarnings("unchecked")
217 protected static void liteFail(NodeHandler.Context context) {
218 liteTerminate(context, ACTIONS_TO_FAIL);
219 }
220
221 /**
222 * Delegation method used when killing actions. <p/>
223 *
224 * @param context NodeHandler context.
225 */
226 @SuppressWarnings("unchecked")
227 protected static void liteKill(NodeHandler.Context context) {
228 liteTerminate(context, ACTIONS_TO_KILL);
229 }
230
231 /**
232 * Used to terminate jobs - FAIL or KILL. <p/>
233 *
234 * @param context NodeHandler context.
235 * @param transientVar The transient variable name.
236 */
237 @SuppressWarnings("unchecked")
238 private static void liteTerminate(NodeHandler.Context context, String transientVar) {
239 List<String> list = (List<String>) context.getTransientVar(transientVar);
240 if (list == null) {
241 list = new ArrayList<String>();
242 context.setTransientVar(transientVar, list);
243 }
244 list.add(context.getVar(context.getNodeDef().getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID));
245 }
246
247 // wires workflow lib action execution with Oozie Dag
248 public static class LiteActionHandler extends ActionNodeHandler {
249
250 @Override
251 public void start(Context context) throws WorkflowException {
252 liteExecute(context, null);
253 }
254
255 @Override
256 public void end(Context context) {
257 }
258
259 @Override
260 public void kill(Context context) {
261 liteKill(context);
262 }
263
264 @Override
265 public void fail(Context context) {
266 liteFail(context);
267 }
268 }
269
270 // wires workflow lib decision execution with Oozie Dag
271 public static class LiteDecisionHandler extends DecisionNodeHandler {
272
273 @Override
274 public void start(Context context) throws WorkflowException {
275 liteExecute(context, null);
276 }
277
278 @Override
279 public void end(Context context) {
280 }
281
282 @Override
283 public void kill(Context context) {
284 liteKill(context);
285 }
286
287 @Override
288 public void fail(Context context) {
289 liteFail(context);
290 }
291 }
292
293 // wires workflow lib control nodes with Oozie Dag
294 public static class LiteControlNodeHandler extends ControlNodeHandler {
295
296 @Override
297 public void touch(Context context) throws WorkflowException {
298 Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
299 String nodeType;
300 if (nodeClass.equals(StartNodeDef.class)) {
301 nodeType = StartActionExecutor.TYPE;
302 }
303 else if (nodeClass.equals(EndNodeDef.class)) {
304 nodeType = EndActionExecutor.TYPE;
305 }
306 else if (nodeClass.equals(KillNodeDef.class)) {
307 nodeType = KillActionExecutor.TYPE;
308 }
309 else if (nodeClass.equals(ForkNodeDef.class)) {
310 nodeType = ForkActionExecutor.TYPE;
311 }
312 else if (nodeClass.equals(JoinNodeDef.class)) {
313 nodeType = JoinActionExecutor.TYPE;
314 } else {
315 throw new IllegalStateException("Invalid node type: " + nodeClass);
316 }
317
318 liteExecute(context, nodeType);
319 }
320
321 }
322 }