001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.hadoop.util.StringUtils;
022    import org.apache.oozie.action.control.EndActionExecutor;
023    import org.apache.oozie.action.control.ForkActionExecutor;
024    import org.apache.oozie.action.control.JoinActionExecutor;
025    import org.apache.oozie.action.control.KillActionExecutor;
026    import org.apache.oozie.action.control.StartActionExecutor;
027    import org.apache.oozie.command.wf.ReRunXCommand;
028    
029    import org.apache.oozie.client.WorkflowAction;
030    import org.apache.oozie.WorkflowActionBean;
031    import org.apache.oozie.WorkflowJobBean;
032    import org.apache.oozie.ErrorCode;
033    import org.apache.oozie.workflow.WorkflowException;
034    import org.apache.oozie.workflow.WorkflowInstance;
035    import org.apache.oozie.workflow.lite.ActionNodeHandler;
036    import org.apache.oozie.workflow.lite.ControlNodeHandler;
037    import org.apache.oozie.workflow.lite.DecisionNodeHandler;
038    import org.apache.oozie.workflow.lite.EndNodeDef;
039    import org.apache.oozie.workflow.lite.ForkNodeDef;
040    import org.apache.oozie.workflow.lite.JoinNodeDef;
041    import org.apache.oozie.workflow.lite.KillNodeDef;
042    import org.apache.oozie.workflow.lite.NodeDef;
043    import org.apache.oozie.workflow.lite.NodeHandler;
044    import org.apache.oozie.util.XLog;
045    import org.apache.oozie.util.XmlUtils;
046    import org.apache.oozie.workflow.lite.StartNodeDef;
047    import org.jdom.Element;
048    import org.jdom.JDOMException;
049    
050    import java.util.ArrayList;
051    import java.util.Collection;
052    import java.util.HashSet;
053    import java.util.List;
054    import java.util.Set;
055    
056    public abstract class LiteWorkflowStoreService extends WorkflowStoreService {
057    
058        public static final String CONF_PREFIX = Service.CONF_PREFIX + "LiteWorkflowStoreService.";
059        public static final String CONF_PREFIX_USER_RETRY = CONF_PREFIX + "user.retry.";
060        public static final String CONF_USER_RETRY_MAX = CONF_PREFIX_USER_RETRY + "max";
061        public static final String CONF_USER_RETRY_INTEVAL = CONF_PREFIX_USER_RETRY + "inteval";
062        public static final String CONF_USER_RETRY_ERROR_CODE = CONF_PREFIX_USER_RETRY + "error.code";
063        public static final String CONF_USER_RETRY_ERROR_CODE_EXT = CONF_PREFIX_USER_RETRY + "error.code.ext";
064    
065        public static final String NODE_DEF_VERSION_0 = "_oozie_inst_v_0";
066        public static final String NODE_DEF_VERSION_1 = "_oozie_inst_v_1";
067        public static final String CONF_NODE_DEF_VERSION = CONF_PREFIX + "node.def.version";
068    
069        /**
070         * Delegation method used by the Action and Decision {@link NodeHandler} on start. <p/> This method provides the
071         * necessary information to create ActionExecutors.
072         *
073         * @param context NodeHandler context.
074         * @param actionType the action type.
075         * @throws WorkflowException thrown if there was an error parsing the action configuration.
076         */
077        @SuppressWarnings("unchecked")
078        protected static void liteExecute(NodeHandler.Context context, String actionType) throws WorkflowException {
079            XLog log = XLog.getLog(LiteWorkflowStoreService.class);
080            String jobId = context.getProcessInstance().getId();
081            String nodeName = context.getNodeDef().getName();
082            String skipVar = context.getProcessInstance().getVar(context.getNodeDef().getName()
083                    + WorkflowInstance.NODE_VAR_SEPARATOR + ReRunXCommand.TO_SKIP);
084            boolean skipAction = false;
085            if (skipVar != null) {
086                skipAction = skipVar.equals("true");
087            }
088            WorkflowActionBean action = new WorkflowActionBean();
089            String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, nodeName);
090    
091            if (!skipAction) {
092                String nodeConf = context.getNodeDef().getConf();
093                String executionPath = context.getExecutionPath();
094    
095                if (actionType == null) {
096                    try {
097                        Element element = XmlUtils.parseXml(nodeConf);
098                        actionType = element.getName();
099                        nodeConf = XmlUtils.prettyPrint(element).toString();
100                    }
101                    catch (JDOMException ex) {
102                        throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
103                    }
104                }
105                log.debug(" Creating action for node [{0}]", nodeName);
106                action.setType(actionType);
107                action.setExecutionPath(executionPath);
108                action.setConf(nodeConf);
109                action.setLogToken(((WorkflowJobBean) context.getTransientVar(WORKFLOW_BEAN)).getLogToken());
110                action.setStatus(WorkflowAction.Status.PREP);
111                action.setJobId(jobId);
112            }
113            action.setCred(context.getNodeDef().getCred());
114            log.debug("Setting action for cred: '"+context.getNodeDef().getCred() +
115                            "', name: '"+ context.getNodeDef().getName() + "'");
116    
117            action.setUserRetryCount(0);
118            int userRetryMax = getUserRetryMax(context);
119            int userRetryInterval = getUserRetryInterval(context);
120            action.setUserRetryMax(userRetryMax);
121            action.setUserRetryInterval(userRetryInterval);
122            log.debug("Setting action for userRetryMax: '"+ userRetryMax +
123                            "', userRetryInterval: '" + userRetryInterval +
124                            "', name: '"+ context.getNodeDef().getName() + "'");
125    
126            action.setName(nodeName);
127            action.setId(actionId);
128            context.setVar(nodeName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID, actionId);
129            List list = (List) context.getTransientVar(ACTIONS_TO_START);
130            if (list == null) {
131                list = new ArrayList();
132                context.setTransientVar(ACTIONS_TO_START, list);
133            }
134            list.add(action);
135        }
136    
137        private static int getUserRetryInterval(NodeHandler.Context context) throws WorkflowException {
138            Configuration conf = Services.get().get(ConfigurationService.class).getConf();
139            int ret = conf.getInt(CONF_USER_RETRY_INTEVAL, 5);
140            String userRetryInterval = context.getNodeDef().getUserRetryInterval();
141    
142            if (!userRetryInterval.equals("null")) {
143                try {
144                    ret = Integer.parseInt(userRetryInterval);
145                }
146                catch (NumberFormatException nfe) {
147                    throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe);
148                }
149            }
150            return ret;
151        }
152    
153        private static int getUserRetryMax(NodeHandler.Context context) throws WorkflowException {
154            XLog log = XLog.getLog(LiteWorkflowStoreService.class);
155            Configuration conf = Services.get().get(ConfigurationService.class).getConf();
156            int ret = conf.getInt(CONF_USER_RETRY_MAX, 0);
157            int max = ret;
158            String userRetryMax = context.getNodeDef().getUserRetryMax();
159    
160            if (!userRetryMax.equals("null")) {
161                try {
162                    ret = Integer.parseInt(userRetryMax);
163                    if (ret > max) {
164                        ret = max;
165                        log.warn(ErrorCode.E0820.getTemplate(), ret, max);
166                    }
167                }
168                catch (NumberFormatException nfe) {
169                    throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe);
170                }
171            }
172            else {
173                ret = 0;
174            }
175            return ret;
176        }
177    
178        /**
179         * Get system defined and instance defined error codes for which USER_RETRY is allowed
180         *
181         * @return set of error code user-retry is allowed for
182         */
183        public static Set<String> getUserRetryErrorCode() {
184            Configuration conf = Services.get().get(ConfigurationService.class).getConf();
185            // eliminating whitespaces in the error codes value specification
186            String errorCodeString = conf.get(CONF_USER_RETRY_ERROR_CODE).replaceAll("\\s+", "");
187            Collection<String> strings = StringUtils.getStringCollection(errorCodeString);
188            String errorCodeExtString = conf.get(CONF_USER_RETRY_ERROR_CODE_EXT).replaceAll("\\s+", "");
189            Collection<String> extra = StringUtils.getStringCollection(errorCodeExtString);
190            Set<String> set = new HashSet<String>();
191            set.addAll(strings);
192            set.addAll(extra);
193            return set;
194        }
195    
196        /**
197         * Get NodeDef default version, _oozie_inst_v_0 or _oozie_inst_v_1
198         *
199         * @return nodedef default version
200         * @throws WorkflowException thrown if there was an error parsing the action configuration.
201        */
202        public static String getNodeDefDefaultVersion() throws WorkflowException {
203            Configuration conf = Services.get().get(ConfigurationService.class).getConf();
204            String ret = conf.get(CONF_NODE_DEF_VERSION);
205            if (ret == null) {
206                ret = NODE_DEF_VERSION_1;
207            }
208            return ret;
209        }
210    
211        /**
212         * Delegation method used when failing actions. <p/>
213         *
214         * @param context NodeHandler context.
215         */
216        @SuppressWarnings("unchecked")
217        protected static void liteFail(NodeHandler.Context context) {
218            liteTerminate(context, ACTIONS_TO_FAIL);
219        }
220    
221        /**
222         * Delegation method used when killing actions. <p/>
223         *
224         * @param context NodeHandler context.
225         */
226        @SuppressWarnings("unchecked")
227        protected static void liteKill(NodeHandler.Context context) {
228            liteTerminate(context, ACTIONS_TO_KILL);
229        }
230    
231        /**
232         * Used to terminate jobs - FAIL or KILL. <p/>
233         *
234         * @param context NodeHandler context.
235         * @param transientVar The transient variable name.
236         */
237        @SuppressWarnings("unchecked")
238        private static void liteTerminate(NodeHandler.Context context, String transientVar) {
239            List<String> list = (List<String>) context.getTransientVar(transientVar);
240            if (list == null) {
241                list = new ArrayList<String>();
242                context.setTransientVar(transientVar, list);
243            }
244            list.add(context.getVar(context.getNodeDef().getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID));
245        }
246    
247        // wires workflow lib action execution with Oozie Dag
248        public static class LiteActionHandler extends ActionNodeHandler {
249    
250            @Override
251            public void start(Context context) throws WorkflowException {
252                liteExecute(context, null);
253            }
254    
255            @Override
256            public void end(Context context) {
257            }
258    
259            @Override
260            public void kill(Context context) {
261                liteKill(context);
262            }
263    
264            @Override
265            public void fail(Context context) {
266                liteFail(context);
267            }
268        }
269    
270        // wires workflow lib decision execution with Oozie Dag
271        public static class LiteDecisionHandler extends DecisionNodeHandler {
272    
273            @Override
274            public void start(Context context) throws WorkflowException {
275                liteExecute(context, null);
276            }
277    
278            @Override
279            public void end(Context context) {
280            }
281    
282            @Override
283            public void kill(Context context) {
284                liteKill(context);
285            }
286    
287            @Override
288            public void fail(Context context) {
289                liteFail(context);
290            }
291        }
292    
293        // wires workflow lib control nodes with Oozie Dag
294        public static class LiteControlNodeHandler extends ControlNodeHandler {
295    
296          @Override
297          public void touch(Context context) throws WorkflowException {
298              Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
299              String nodeType;
300              if (nodeClass.equals(StartNodeDef.class)) {
301                nodeType = StartActionExecutor.TYPE;
302              }
303              else if (nodeClass.equals(EndNodeDef.class)) {
304                  nodeType = EndActionExecutor.TYPE;
305              }
306              else if (nodeClass.equals(KillNodeDef.class)) {
307                  nodeType = KillActionExecutor.TYPE;
308              }
309              else if (nodeClass.equals(ForkNodeDef.class)) {
310                  nodeType = ForkActionExecutor.TYPE;
311              }
312              else if (nodeClass.equals(JoinNodeDef.class)) {
313                  nodeType = JoinActionExecutor.TYPE;
314              } else {
315                throw new IllegalStateException("Invalid node type: " + nodeClass);
316              }
317    
318              liteExecute(context, nodeType);
319          }
320    
321        }
322    }