001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.oozie.command.wf.ReRunXCommand;
022    
023    import org.apache.oozie.client.WorkflowAction;
024    import org.apache.oozie.WorkflowActionBean;
025    import org.apache.oozie.WorkflowJobBean;
026    import org.apache.oozie.ErrorCode;
027    import org.apache.oozie.workflow.WorkflowException;
028    import org.apache.oozie.workflow.WorkflowInstance;
029    import org.apache.oozie.workflow.lite.ActionNodeHandler;
030    import org.apache.oozie.workflow.lite.DecisionNodeHandler;
031    import org.apache.oozie.workflow.lite.NodeHandler;
032    import org.apache.oozie.util.XLog;
033    import org.apache.oozie.util.XmlUtils;
034    import org.jdom.Element;
035    import org.jdom.JDOMException;
036    
037    import java.util.ArrayList;
038    import java.util.Collection;
039    import java.util.HashSet;
040    import java.util.List;
041    import java.util.Set;
042    
043    public abstract class LiteWorkflowStoreService extends WorkflowStoreService {
044    
045        public static final String CONF_PREFIX = Service.CONF_PREFIX + "LiteWorkflowStoreService.";
046        public static final String CONF_PREFIX_USER_RETRY = CONF_PREFIX + "user.retry.";
047        public static final String CONF_USER_RETRY_MAX = CONF_PREFIX_USER_RETRY + "max";
048        public static final String CONF_USER_RETRY_INTEVAL = CONF_PREFIX_USER_RETRY + "inteval";
049        public static final String CONF_USER_RETRY_ERROR_CODE = CONF_PREFIX_USER_RETRY + "error.code";
050        public static final String CONF_USER_RETRY_ERROR_CODE_EXT = CONF_PREFIX_USER_RETRY + "error.code.ext";
051    
052        public static final String NODE_DEF_VERSION_0 = "_oozie_inst_v_0";
053        public static final String NODE_DEF_VERSION_1 = "_oozie_inst_v_1";
054        public static final String CONF_NODE_DEF_VERSION = CONF_PREFIX + "node.def.version";
055    
056        /**
057         * Delegation method used by the Action and Decision {@link NodeHandler} on start. <p/> This method provides the
058         * necessary information to create ActionExecutors.
059         *
060         * @param context NodeHandler context.
061         * @throws WorkflowException thrown if there was an error parsing the action configuration.
062         */
063        @SuppressWarnings("unchecked")
064        protected static void liteExecute(NodeHandler.Context context) throws WorkflowException {
065            XLog log = XLog.getLog(LiteWorkflowStoreService.class);
066            String jobId = context.getProcessInstance().getId();
067            String nodeName = context.getNodeDef().getName();
068            String skipVar = context.getProcessInstance().getVar(context.getNodeDef().getName()
069                    + WorkflowInstance.NODE_VAR_SEPARATOR + ReRunXCommand.TO_SKIP);
070            boolean skipAction = false;
071            if (skipVar != null) {
072                skipAction = skipVar.equals("true");
073            }
074            WorkflowActionBean action = new WorkflowActionBean();
075            String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, nodeName);
076    
077            if (!skipAction) {
078                String nodeConf = context.getNodeDef().getConf();
079                String executionPath = context.getExecutionPath();
080    
081                String actionType;
082                try {
083                    Element element = XmlUtils.parseXml(nodeConf);
084                    actionType = element.getName();
085                    nodeConf = XmlUtils.prettyPrint(element).toString();
086                }
087                catch (JDOMException ex) {
088                    throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
089                }
090    
091                log.debug(" Creating action for node [{0}]", nodeName);
092                action.setType(actionType);
093                action.setExecutionPath(executionPath);
094                action.setConf(nodeConf);
095                action.setLogToken(((WorkflowJobBean) context.getTransientVar(WORKFLOW_BEAN)).getLogToken());
096                action.setStatus(WorkflowAction.Status.PREP);
097                action.setJobId(jobId);
098            }
099            action.setCred(context.getNodeDef().getCred());
100            log.debug("Setting action for cred: '"+context.getNodeDef().getCred() +
101                            "', name: '"+ context.getNodeDef().getName() + "'");
102    
103            action.setUserRetryCount(0);
104            int userRetryMax = getUserRetryMax(context);
105            int userRetryInterval = getUserRetryInterval(context);
106            action.setUserRetryMax(userRetryMax);
107            action.setUserRetryInterval(userRetryInterval);
108            log.debug("Setting action for userRetryMax: '"+ userRetryMax +
109                            "', userRetryInterval: '" + userRetryInterval +
110                            "', name: '"+ context.getNodeDef().getName() + "'");
111    
112            action.setName(nodeName);
113            action.setId(actionId);
114            context.setVar(nodeName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID, actionId);
115            List list = (List) context.getTransientVar(ACTIONS_TO_START);
116            if (list == null) {
117                list = new ArrayList();
118                context.setTransientVar(ACTIONS_TO_START, list);
119            }
120            list.add(action);
121        }
122    
123        private static int getUserRetryInterval(NodeHandler.Context context) throws WorkflowException {
124            Configuration conf = Services.get().get(ConfigurationService.class).getConf();
125            int ret = conf.getInt(CONF_USER_RETRY_INTEVAL, 5);
126            String userRetryInterval = context.getNodeDef().getUserRetryInterval();
127    
128            if (!userRetryInterval.equals("null")) {
129                try {
130                    ret = Integer.parseInt(userRetryInterval);
131                }
132                catch (NumberFormatException nfe) {
133                    throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe);
134                }
135            }
136            return ret;
137        }
138    
139        private static int getUserRetryMax(NodeHandler.Context context) throws WorkflowException {
140            XLog log = XLog.getLog(LiteWorkflowStoreService.class);
141            Configuration conf = Services.get().get(ConfigurationService.class).getConf();
142            int ret = conf.getInt(CONF_USER_RETRY_MAX, 0);
143            int max = ret;
144            String userRetryMax = context.getNodeDef().getUserRetryMax();
145    
146            if (!userRetryMax.equals("null")) {
147                try {
148                    ret = Integer.parseInt(userRetryMax);
149                    if (ret > max) {
150                        ret = max;
151                        log.warn(ErrorCode.E0820.getTemplate(), ret, max);
152                    }
153                }
154                catch (NumberFormatException nfe) {
155                    throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe);
156                }
157            }
158            else {
159                ret = 0;
160            }
161            return ret;
162        }
163    
164        /**
165         * Get system defined and instance defined error codes for which USER_RETRY is allowed
166         *
167         * @return set of error code user-retry is allowed for
168         */
169        public static Set<String> getUserRetryErrorCode() {
170            Configuration conf = Services.get().get(ConfigurationService.class).getConf();
171            Collection<String> strings = (Collection<String>) conf.getStringCollection(CONF_USER_RETRY_ERROR_CODE);
172            Collection<String> extra = (Collection<String>) conf.getStringCollection(CONF_USER_RETRY_ERROR_CODE_EXT);
173            Set<String> set = new HashSet<String>();
174            set.addAll(strings);
175            set.addAll(extra);
176            return set;
177        }
178    
179        /**
180         * Get NodeDef default version, _oozie_inst_v_0 or _oozie_inst_v_1
181         *
182         * @return nodedef default version
183         * @throws WorkflowException thrown if there was an error parsing the action configuration.
184        */
185        public static String getNodeDefDefaultVersion() throws WorkflowException {
186            Configuration conf = Services.get().get(ConfigurationService.class).getConf();
187            String ret = conf.get(CONF_NODE_DEF_VERSION);
188            if (ret == null) {
189                ret = NODE_DEF_VERSION_1;
190            }
191            return ret;
192        }
193    
194        /**
195         * Delegation method used when failing actions. <p/>
196         *
197         * @param context NodeHandler context.
198         */
199        @SuppressWarnings("unchecked")
200        protected static void liteFail(NodeHandler.Context context) {
201            liteTerminate(context, ACTIONS_TO_FAIL);
202        }
203    
204        /**
205         * Delegation method used when killing actions. <p/>
206         *
207         * @param context NodeHandler context.
208         */
209        @SuppressWarnings("unchecked")
210        protected static void liteKill(NodeHandler.Context context) {
211            liteTerminate(context, ACTIONS_TO_KILL);
212        }
213    
214        /**
215         * Used to terminate jobs - FAIL or KILL. <p/>
216         *
217         * @param context NodeHandler context.
218         * @param transientVar The transient variable name.
219         */
220        @SuppressWarnings("unchecked")
221        private static void liteTerminate(NodeHandler.Context context, String transientVar) {
222            List<String> list = (List<String>) context.getTransientVar(transientVar);
223            if (list == null) {
224                list = new ArrayList<String>();
225                context.setTransientVar(transientVar, list);
226            }
227            list.add(context.getVar(context.getNodeDef().getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID));
228        }
229    
230        // wires workflow lib action execution with Oozie Dag
231        public static class LiteActionHandler extends ActionNodeHandler {
232    
233            @Override
234            public void start(Context context) throws WorkflowException {
235                liteExecute(context);
236            }
237    
238            @Override
239            public void end(Context context) {
240            }
241    
242            @Override
243            public void kill(Context context) {
244                liteKill(context);
245            }
246    
247            @Override
248            public void fail(Context context) {
249                liteFail(context);
250            }
251        }
252    
253        // wires workflow lib decision execution with Oozie Dag
254        public static class LiteDecisionHandler extends DecisionNodeHandler {
255    
256            @Override
257            public void start(Context context) throws WorkflowException {
258                liteExecute(context);
259            }
260    
261            @Override
262            public void end(Context context) {
263            }
264    
265            @Override
266            public void kill(Context context) {
267                liteKill(context);
268            }
269    
270            @Override
271            public void fail(Context context) {
272                liteFail(context);
273            }
274        }
275    }