This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.oozie.command.wf.ReRunXCommand;
022
023 import org.apache.oozie.client.WorkflowAction;
024 import org.apache.oozie.WorkflowActionBean;
025 import org.apache.oozie.WorkflowJobBean;
026 import org.apache.oozie.ErrorCode;
027 import org.apache.oozie.workflow.WorkflowException;
028 import org.apache.oozie.workflow.WorkflowInstance;
029 import org.apache.oozie.workflow.lite.ActionNodeHandler;
030 import org.apache.oozie.workflow.lite.DecisionNodeHandler;
031 import org.apache.oozie.workflow.lite.NodeHandler;
032 import org.apache.oozie.util.XLog;
033 import org.apache.oozie.util.XmlUtils;
034 import org.jdom.Element;
035 import org.jdom.JDOMException;
036
037 import java.util.ArrayList;
038 import java.util.Collection;
039 import java.util.HashSet;
040 import java.util.List;
041 import java.util.Set;
042
043 public abstract class LiteWorkflowStoreService extends WorkflowStoreService {
044
045 public static final String CONF_PREFIX = Service.CONF_PREFIX + "LiteWorkflowStoreService.";
046 public static final String CONF_PREFIX_USER_RETRY = CONF_PREFIX + "user.retry.";
047 public static final String CONF_USER_RETRY_MAX = CONF_PREFIX_USER_RETRY + "max";
048 public static final String CONF_USER_RETRY_INTEVAL = CONF_PREFIX_USER_RETRY + "inteval";
049 public static final String CONF_USER_RETRY_ERROR_CODE = CONF_PREFIX_USER_RETRY + "error.code";
050 public static final String CONF_USER_RETRY_ERROR_CODE_EXT = CONF_PREFIX_USER_RETRY + "error.code.ext";
051
052 public static final String NODE_DEF_VERSION_0 = "_oozie_inst_v_0";
053 public static final String NODE_DEF_VERSION_1 = "_oozie_inst_v_1";
054 public static final String CONF_NODE_DEF_VERSION = CONF_PREFIX + "node.def.version";
055
056 /**
057 * Delegation method used by the Action and Decision {@link NodeHandler} on start. <p/> This method provides the
058 * necessary information to create ActionExecutors.
059 *
060 * @param context NodeHandler context.
061 * @throws WorkflowException thrown if there was an error parsing the action configuration.
062 */
063 @SuppressWarnings("unchecked")
064 protected static void liteExecute(NodeHandler.Context context) throws WorkflowException {
065 XLog log = XLog.getLog(LiteWorkflowStoreService.class);
066 String jobId = context.getProcessInstance().getId();
067 String nodeName = context.getNodeDef().getName();
068 String skipVar = context.getProcessInstance().getVar(context.getNodeDef().getName()
069 + WorkflowInstance.NODE_VAR_SEPARATOR + ReRunXCommand.TO_SKIP);
070 boolean skipAction = false;
071 if (skipVar != null) {
072 skipAction = skipVar.equals("true");
073 }
074 WorkflowActionBean action = new WorkflowActionBean();
075 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, nodeName);
076
077 if (!skipAction) {
078 String nodeConf = context.getNodeDef().getConf();
079 String executionPath = context.getExecutionPath();
080
081 String actionType;
082 try {
083 Element element = XmlUtils.parseXml(nodeConf);
084 actionType = element.getName();
085 nodeConf = XmlUtils.prettyPrint(element).toString();
086 }
087 catch (JDOMException ex) {
088 throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
089 }
090
091 log.debug(" Creating action for node [{0}]", nodeName);
092 action.setType(actionType);
093 action.setExecutionPath(executionPath);
094 action.setConf(nodeConf);
095 action.setLogToken(((WorkflowJobBean) context.getTransientVar(WORKFLOW_BEAN)).getLogToken());
096 action.setStatus(WorkflowAction.Status.PREP);
097 action.setJobId(jobId);
098 }
099 action.setCred(context.getNodeDef().getCred());
100 log.debug("Setting action for cred: '"+context.getNodeDef().getCred() +
101 "', name: '"+ context.getNodeDef().getName() + "'");
102
103 action.setUserRetryCount(0);
104 int userRetryMax = getUserRetryMax(context);
105 int userRetryInterval = getUserRetryInterval(context);
106 action.setUserRetryMax(userRetryMax);
107 action.setUserRetryInterval(userRetryInterval);
108 log.debug("Setting action for userRetryMax: '"+ userRetryMax +
109 "', userRetryInterval: '" + userRetryInterval +
110 "', name: '"+ context.getNodeDef().getName() + "'");
111
112 action.setName(nodeName);
113 action.setId(actionId);
114 context.setVar(nodeName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID, actionId);
115 List list = (List) context.getTransientVar(ACTIONS_TO_START);
116 if (list == null) {
117 list = new ArrayList();
118 context.setTransientVar(ACTIONS_TO_START, list);
119 }
120 list.add(action);
121 }
122
123 private static int getUserRetryInterval(NodeHandler.Context context) throws WorkflowException {
124 Configuration conf = Services.get().get(ConfigurationService.class).getConf();
125 int ret = conf.getInt(CONF_USER_RETRY_INTEVAL, 5);
126 String userRetryInterval = context.getNodeDef().getUserRetryInterval();
127
128 if (!userRetryInterval.equals("null")) {
129 try {
130 ret = Integer.parseInt(userRetryInterval);
131 }
132 catch (NumberFormatException nfe) {
133 throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe);
134 }
135 }
136 return ret;
137 }
138
139 private static int getUserRetryMax(NodeHandler.Context context) throws WorkflowException {
140 XLog log = XLog.getLog(LiteWorkflowStoreService.class);
141 Configuration conf = Services.get().get(ConfigurationService.class).getConf();
142 int ret = conf.getInt(CONF_USER_RETRY_MAX, 0);
143 int max = ret;
144 String userRetryMax = context.getNodeDef().getUserRetryMax();
145
146 if (!userRetryMax.equals("null")) {
147 try {
148 ret = Integer.parseInt(userRetryMax);
149 if (ret > max) {
150 ret = max;
151 log.warn(ErrorCode.E0820.getTemplate(), ret, max);
152 }
153 }
154 catch (NumberFormatException nfe) {
155 throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe);
156 }
157 }
158 else {
159 ret = 0;
160 }
161 return ret;
162 }
163
164 /**
165 * Get system defined and instance defined error codes for which USER_RETRY is allowed
166 *
167 * @return set of error code user-retry is allowed for
168 */
169 public static Set<String> getUserRetryErrorCode() {
170 Configuration conf = Services.get().get(ConfigurationService.class).getConf();
171 Collection<String> strings = (Collection<String>) conf.getStringCollection(CONF_USER_RETRY_ERROR_CODE);
172 Collection<String> extra = (Collection<String>) conf.getStringCollection(CONF_USER_RETRY_ERROR_CODE_EXT);
173 Set<String> set = new HashSet<String>();
174 set.addAll(strings);
175 set.addAll(extra);
176 return set;
177 }
178
179 /**
180 * Get NodeDef default version, _oozie_inst_v_0 or _oozie_inst_v_1
181 *
182 * @return nodedef default version
183 * @throws WorkflowException thrown if there was an error parsing the action configuration.
184 */
185 public static String getNodeDefDefaultVersion() throws WorkflowException {
186 Configuration conf = Services.get().get(ConfigurationService.class).getConf();
187 String ret = conf.get(CONF_NODE_DEF_VERSION);
188 if (ret == null) {
189 ret = NODE_DEF_VERSION_1;
190 }
191 return ret;
192 }
193
194 /**
195 * Delegation method used when failing actions. <p/>
196 *
197 * @param context NodeHandler context.
198 */
199 @SuppressWarnings("unchecked")
200 protected static void liteFail(NodeHandler.Context context) {
201 liteTerminate(context, ACTIONS_TO_FAIL);
202 }
203
204 /**
205 * Delegation method used when killing actions. <p/>
206 *
207 * @param context NodeHandler context.
208 */
209 @SuppressWarnings("unchecked")
210 protected static void liteKill(NodeHandler.Context context) {
211 liteTerminate(context, ACTIONS_TO_KILL);
212 }
213
214 /**
215 * Used to terminate jobs - FAIL or KILL. <p/>
216 *
217 * @param context NodeHandler context.
218 * @param transientVar The transient variable name.
219 */
220 @SuppressWarnings("unchecked")
221 private static void liteTerminate(NodeHandler.Context context, String transientVar) {
222 List<String> list = (List<String>) context.getTransientVar(transientVar);
223 if (list == null) {
224 list = new ArrayList<String>();
225 context.setTransientVar(transientVar, list);
226 }
227 list.add(context.getVar(context.getNodeDef().getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID));
228 }
229
230 // wires workflow lib action execution with Oozie Dag
231 public static class LiteActionHandler extends ActionNodeHandler {
232
233 @Override
234 public void start(Context context) throws WorkflowException {
235 liteExecute(context);
236 }
237
238 @Override
239 public void end(Context context) {
240 }
241
242 @Override
243 public void kill(Context context) {
244 liteKill(context);
245 }
246
247 @Override
248 public void fail(Context context) {
249 liteFail(context);
250 }
251 }
252
253 // wires workflow lib decision execution with Oozie Dag
254 public static class LiteDecisionHandler extends DecisionNodeHandler {
255
256 @Override
257 public void start(Context context) throws WorkflowException {
258 liteExecute(context);
259 }
260
261 @Override
262 public void end(Context context) {
263 }
264
265 @Override
266 public void kill(Context context) {
267 liteKill(context);
268 }
269
270 @Override
271 public void fail(Context context) {
272 liteFail(context);
273 }
274 }
275 }