001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.hadoop.util.StringUtils;
022    import org.apache.oozie.command.wf.ReRunXCommand;
023    
024    import org.apache.oozie.client.WorkflowAction;
025    import org.apache.oozie.WorkflowActionBean;
026    import org.apache.oozie.WorkflowJobBean;
027    import org.apache.oozie.ErrorCode;
028    import org.apache.oozie.workflow.WorkflowException;
029    import org.apache.oozie.workflow.WorkflowInstance;
030    import org.apache.oozie.workflow.lite.ActionNodeHandler;
031    import org.apache.oozie.workflow.lite.DecisionNodeHandler;
032    import org.apache.oozie.workflow.lite.NodeHandler;
033    import org.apache.oozie.util.XLog;
034    import org.apache.oozie.util.XmlUtils;
035    import org.jdom.Element;
036    import org.jdom.JDOMException;
037    
038    import java.util.ArrayList;
039    import java.util.Collection;
040    import java.util.HashSet;
041    import java.util.List;
042    import java.util.Set;
043    
044    public abstract class LiteWorkflowStoreService extends WorkflowStoreService {
045    
046        public static final String CONF_PREFIX = Service.CONF_PREFIX + "LiteWorkflowStoreService.";
047        public static final String CONF_PREFIX_USER_RETRY = CONF_PREFIX + "user.retry.";
048        public static final String CONF_USER_RETRY_MAX = CONF_PREFIX_USER_RETRY + "max";
049        public static final String CONF_USER_RETRY_INTEVAL = CONF_PREFIX_USER_RETRY + "inteval";
050        public static final String CONF_USER_RETRY_ERROR_CODE = CONF_PREFIX_USER_RETRY + "error.code";
051        public static final String CONF_USER_RETRY_ERROR_CODE_EXT = CONF_PREFIX_USER_RETRY + "error.code.ext";
052    
053        public static final String NODE_DEF_VERSION_0 = "_oozie_inst_v_0";
054        public static final String NODE_DEF_VERSION_1 = "_oozie_inst_v_1";
055        public static final String CONF_NODE_DEF_VERSION = CONF_PREFIX + "node.def.version";
056    
057        /**
058         * Delegation method used by the Action and Decision {@link NodeHandler} on start. <p/> This method provides the
059         * necessary information to create ActionExecutors.
060         *
061         * @param context NodeHandler context.
062         * @throws WorkflowException thrown if there was an error parsing the action configuration.
063         */
064        @SuppressWarnings("unchecked")
065        protected static void liteExecute(NodeHandler.Context context) throws WorkflowException {
066            XLog log = XLog.getLog(LiteWorkflowStoreService.class);
067            String jobId = context.getProcessInstance().getId();
068            String nodeName = context.getNodeDef().getName();
069            String skipVar = context.getProcessInstance().getVar(context.getNodeDef().getName()
070                    + WorkflowInstance.NODE_VAR_SEPARATOR + ReRunXCommand.TO_SKIP);
071            boolean skipAction = false;
072            if (skipVar != null) {
073                skipAction = skipVar.equals("true");
074            }
075            WorkflowActionBean action = new WorkflowActionBean();
076            String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, nodeName);
077    
078            if (!skipAction) {
079                String nodeConf = context.getNodeDef().getConf();
080                String executionPath = context.getExecutionPath();
081    
082                String actionType;
083                try {
084                    Element element = XmlUtils.parseXml(nodeConf);
085                    actionType = element.getName();
086                    nodeConf = XmlUtils.prettyPrint(element).toString();
087                }
088                catch (JDOMException ex) {
089                    throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
090                }
091    
092                log.debug(" Creating action for node [{0}]", nodeName);
093                action.setType(actionType);
094                action.setExecutionPath(executionPath);
095                action.setConf(nodeConf);
096                action.setLogToken(((WorkflowJobBean) context.getTransientVar(WORKFLOW_BEAN)).getLogToken());
097                action.setStatus(WorkflowAction.Status.PREP);
098                action.setJobId(jobId);
099            }
100            action.setCred(context.getNodeDef().getCred());
101            log.debug("Setting action for cred: '"+context.getNodeDef().getCred() +
102                            "', name: '"+ context.getNodeDef().getName() + "'");
103    
104            action.setUserRetryCount(0);
105            int userRetryMax = getUserRetryMax(context);
106            int userRetryInterval = getUserRetryInterval(context);
107            action.setUserRetryMax(userRetryMax);
108            action.setUserRetryInterval(userRetryInterval);
109            log.debug("Setting action for userRetryMax: '"+ userRetryMax +
110                            "', userRetryInterval: '" + userRetryInterval +
111                            "', name: '"+ context.getNodeDef().getName() + "'");
112    
113            action.setName(nodeName);
114            action.setId(actionId);
115            context.setVar(nodeName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID, actionId);
116            List list = (List) context.getTransientVar(ACTIONS_TO_START);
117            if (list == null) {
118                list = new ArrayList();
119                context.setTransientVar(ACTIONS_TO_START, list);
120            }
121            list.add(action);
122        }
123    
124        private static int getUserRetryInterval(NodeHandler.Context context) throws WorkflowException {
125            Configuration conf = Services.get().get(ConfigurationService.class).getConf();
126            int ret = conf.getInt(CONF_USER_RETRY_INTEVAL, 5);
127            String userRetryInterval = context.getNodeDef().getUserRetryInterval();
128    
129            if (!userRetryInterval.equals("null")) {
130                try {
131                    ret = Integer.parseInt(userRetryInterval);
132                }
133                catch (NumberFormatException nfe) {
134                    throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe);
135                }
136            }
137            return ret;
138        }
139    
140        private static int getUserRetryMax(NodeHandler.Context context) throws WorkflowException {
141            XLog log = XLog.getLog(LiteWorkflowStoreService.class);
142            Configuration conf = Services.get().get(ConfigurationService.class).getConf();
143            int ret = conf.getInt(CONF_USER_RETRY_MAX, 0);
144            int max = ret;
145            String userRetryMax = context.getNodeDef().getUserRetryMax();
146    
147            if (!userRetryMax.equals("null")) {
148                try {
149                    ret = Integer.parseInt(userRetryMax);
150                    if (ret > max) {
151                        ret = max;
152                        log.warn(ErrorCode.E0820.getTemplate(), ret, max);
153                    }
154                }
155                catch (NumberFormatException nfe) {
156                    throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe);
157                }
158            }
159            else {
160                ret = 0;
161            }
162            return ret;
163        }
164    
165        /**
166         * Get system defined and instance defined error codes for which USER_RETRY is allowed
167         *
168         * @return set of error code user-retry is allowed for
169         */
170        public static Set<String> getUserRetryErrorCode() {
171            Configuration conf = Services.get().get(ConfigurationService.class).getConf();
172            // eliminating whitespaces in the error codes value specification
173            String errorCodeString = conf.get(CONF_USER_RETRY_ERROR_CODE).replaceAll("\\s+", "");
174            Collection<String> strings = StringUtils.getStringCollection(errorCodeString);
175            String errorCodeExtString = conf.get(CONF_USER_RETRY_ERROR_CODE_EXT).replaceAll("\\s+", "");
176            Collection<String> extra = StringUtils.getStringCollection(errorCodeExtString);
177            Set<String> set = new HashSet<String>();
178            set.addAll(strings);
179            set.addAll(extra);
180            return set;
181        }
182    
183        /**
184         * Get NodeDef default version, _oozie_inst_v_0 or _oozie_inst_v_1
185         *
186         * @return nodedef default version
187         * @throws WorkflowException thrown if there was an error parsing the action configuration.
188        */
189        public static String getNodeDefDefaultVersion() throws WorkflowException {
190            Configuration conf = Services.get().get(ConfigurationService.class).getConf();
191            String ret = conf.get(CONF_NODE_DEF_VERSION);
192            if (ret == null) {
193                ret = NODE_DEF_VERSION_1;
194            }
195            return ret;
196        }
197    
198        /**
199         * Delegation method used when failing actions. <p/>
200         *
201         * @param context NodeHandler context.
202         */
203        @SuppressWarnings("unchecked")
204        protected static void liteFail(NodeHandler.Context context) {
205            liteTerminate(context, ACTIONS_TO_FAIL);
206        }
207    
208        /**
209         * Delegation method used when killing actions. <p/>
210         *
211         * @param context NodeHandler context.
212         */
213        @SuppressWarnings("unchecked")
214        protected static void liteKill(NodeHandler.Context context) {
215            liteTerminate(context, ACTIONS_TO_KILL);
216        }
217    
218        /**
219         * Used to terminate jobs - FAIL or KILL. <p/>
220         *
221         * @param context NodeHandler context.
222         * @param transientVar The transient variable name.
223         */
224        @SuppressWarnings("unchecked")
225        private static void liteTerminate(NodeHandler.Context context, String transientVar) {
226            List<String> list = (List<String>) context.getTransientVar(transientVar);
227            if (list == null) {
228                list = new ArrayList<String>();
229                context.setTransientVar(transientVar, list);
230            }
231            list.add(context.getVar(context.getNodeDef().getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID));
232        }
233    
234        // wires workflow lib action execution with Oozie Dag
235        public static class LiteActionHandler extends ActionNodeHandler {
236    
237            @Override
238            public void start(Context context) throws WorkflowException {
239                liteExecute(context);
240            }
241    
242            @Override
243            public void end(Context context) {
244            }
245    
246            @Override
247            public void kill(Context context) {
248                liteKill(context);
249            }
250    
251            @Override
252            public void fail(Context context) {
253                liteFail(context);
254            }
255        }
256    
257        // wires workflow lib decision execution with Oozie Dag
258        public static class LiteDecisionHandler extends DecisionNodeHandler {
259    
260            @Override
261            public void start(Context context) throws WorkflowException {
262                liteExecute(context);
263            }
264    
265            @Override
266            public void end(Context context) {
267            }
268    
269            @Override
270            public void kill(Context context) {
271                liteKill(context);
272            }
273    
274            @Override
275            public void fail(Context context) {
276                liteFail(context);
277            }
278        }
279    }