001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.hadoop.util.StringUtils;
022import org.apache.oozie.action.control.EndActionExecutor;
023import org.apache.oozie.action.control.ForkActionExecutor;
024import org.apache.oozie.action.control.JoinActionExecutor;
025import org.apache.oozie.action.control.KillActionExecutor;
026import org.apache.oozie.action.control.StartActionExecutor;
027import org.apache.oozie.command.wf.ReRunXCommand;
028
029import org.apache.oozie.client.WorkflowAction;
030import org.apache.oozie.WorkflowActionBean;
031import org.apache.oozie.WorkflowJobBean;
032import org.apache.oozie.ErrorCode;
033import org.apache.oozie.workflow.WorkflowException;
034import org.apache.oozie.workflow.WorkflowInstance;
035import org.apache.oozie.workflow.lite.ActionNodeHandler;
036import org.apache.oozie.workflow.lite.ControlNodeHandler;
037import org.apache.oozie.workflow.lite.DecisionNodeHandler;
038import org.apache.oozie.workflow.lite.EndNodeDef;
039import org.apache.oozie.workflow.lite.ForkNodeDef;
040import org.apache.oozie.workflow.lite.JoinNodeDef;
041import org.apache.oozie.workflow.lite.KillNodeDef;
042import org.apache.oozie.workflow.lite.NodeDef;
043import org.apache.oozie.workflow.lite.NodeHandler;
044import org.apache.oozie.util.XLog;
045import org.apache.oozie.util.XmlUtils;
046import org.apache.oozie.workflow.lite.StartNodeDef;
047import org.jdom.Element;
048import org.jdom.JDOMException;
049
050import java.util.ArrayList;
051import java.util.Collection;
052import java.util.HashSet;
053import java.util.List;
054import java.util.Set;
055
056public abstract class LiteWorkflowStoreService extends WorkflowStoreService {
057
058    public static final String CONF_PREFIX = Service.CONF_PREFIX + "LiteWorkflowStoreService.";
059    public static final String CONF_PREFIX_USER_RETRY = CONF_PREFIX + "user.retry.";
060    public static final String CONF_USER_RETRY_MAX = CONF_PREFIX_USER_RETRY + "max";
061    public static final String CONF_USER_RETRY_INTEVAL = CONF_PREFIX_USER_RETRY + "inteval";
062    public static final String CONF_USER_RETRY_ERROR_CODE = CONF_PREFIX_USER_RETRY + "error.code";
063    public static final String CONF_USER_RETRY_ERROR_CODE_EXT = CONF_PREFIX_USER_RETRY + "error.code.ext";
064
065    public static final String NODE_DEF_VERSION_0 = "_oozie_inst_v_0";
066    public static final String NODE_DEF_VERSION_1 = "_oozie_inst_v_1";
067    public static final String CONF_NODE_DEF_VERSION = CONF_PREFIX + "node.def.version";
068
069    /**
070     * Delegation method used by the Action and Decision {@link NodeHandler} on start. <p/> This method provides the
071     * necessary information to create ActionExecutors.
072     *
073     * @param context NodeHandler context.
074     * @param actionType the action type.
075     * @throws WorkflowException thrown if there was an error parsing the action configuration.
076     */
077    @SuppressWarnings("unchecked")
078    protected static void liteExecute(NodeHandler.Context context, String actionType) throws WorkflowException {
079        XLog log = XLog.getLog(LiteWorkflowStoreService.class);
080        String jobId = context.getProcessInstance().getId();
081        String nodeName = context.getNodeDef().getName();
082        String skipVar = context.getProcessInstance().getVar(context.getNodeDef().getName()
083                + WorkflowInstance.NODE_VAR_SEPARATOR + ReRunXCommand.TO_SKIP);
084        boolean skipAction = false;
085        if (skipVar != null) {
086            skipAction = skipVar.equals("true");
087        }
088        WorkflowActionBean action = new WorkflowActionBean();
089        String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, nodeName);
090
091        if (!skipAction) {
092            String nodeConf = context.getNodeDef().getConf();
093            String executionPath = context.getExecutionPath();
094
095            if (actionType == null) {
096                try {
097                    Element element = XmlUtils.parseXml(nodeConf);
098                    actionType = element.getName();
099                    nodeConf = XmlUtils.prettyPrint(element).toString();
100                }
101                catch (JDOMException ex) {
102                    throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
103                }
104            }
105            log.debug(" Creating action for node [{0}]", nodeName);
106            action.setType(actionType);
107            action.setExecutionPath(executionPath);
108            action.setConf(nodeConf);
109            action.setLogToken(((WorkflowJobBean) context.getTransientVar(WORKFLOW_BEAN)).getLogToken());
110            action.setStatus(WorkflowAction.Status.PREP);
111            action.setJobId(jobId);
112        }
113        action.setCred(context.getNodeDef().getCred());
114        log.debug("Setting action for cred: '"+context.getNodeDef().getCred() +
115                        "', name: '"+ context.getNodeDef().getName() + "'");
116
117        action.setUserRetryCount(0);
118        int userRetryMax = getUserRetryMax(context);
119        int userRetryInterval = getUserRetryInterval(context);
120        action.setUserRetryMax(userRetryMax);
121        action.setUserRetryInterval(userRetryInterval);
122        log.debug("Setting action for userRetryMax: '"+ userRetryMax +
123                        "', userRetryInterval: '" + userRetryInterval +
124                        "', name: '"+ context.getNodeDef().getName() + "'");
125
126        action.setName(nodeName);
127        action.setId(actionId);
128        context.setVar(nodeName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID, actionId);
129        List list = (List) context.getTransientVar(ACTIONS_TO_START);
130        if (list == null) {
131            list = new ArrayList();
132            context.setTransientVar(ACTIONS_TO_START, list);
133        }
134        list.add(action);
135    }
136
137    private static int getUserRetryInterval(NodeHandler.Context context) throws WorkflowException {
138        Configuration conf = Services.get().get(ConfigurationService.class).getConf();
139        int ret = conf.getInt(CONF_USER_RETRY_INTEVAL, 5);
140        String userRetryInterval = context.getNodeDef().getUserRetryInterval();
141
142        if (!userRetryInterval.equals("null")) {
143            try {
144                ret = Integer.parseInt(userRetryInterval);
145            }
146            catch (NumberFormatException nfe) {
147                throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe);
148            }
149        }
150        return ret;
151    }
152
153    private static int getUserRetryMax(NodeHandler.Context context) throws WorkflowException {
154        XLog log = XLog.getLog(LiteWorkflowStoreService.class);
155        Configuration conf = Services.get().get(ConfigurationService.class).getConf();
156        int ret = conf.getInt(CONF_USER_RETRY_MAX, 0);
157        int max = ret;
158        String userRetryMax = context.getNodeDef().getUserRetryMax();
159
160        if (!userRetryMax.equals("null")) {
161            try {
162                ret = Integer.parseInt(userRetryMax);
163                if (ret > max) {
164                    ret = max;
165                    log.warn(ErrorCode.E0820.getTemplate(), ret, max);
166                }
167            }
168            catch (NumberFormatException nfe) {
169                throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe);
170            }
171        }
172        else {
173            ret = 0;
174        }
175        return ret;
176    }
177
178    /**
179     * Get system defined and instance defined error codes for which USER_RETRY is allowed
180     *
181     * @return set of error code user-retry is allowed for
182     */
183    public static Set<String> getUserRetryErrorCode() {
184        Configuration conf = Services.get().get(ConfigurationService.class).getConf();
185        // eliminating whitespaces in the error codes value specification
186        String errorCodeString = conf.get(CONF_USER_RETRY_ERROR_CODE).replaceAll("\\s+", "");
187        Collection<String> strings = StringUtils.getStringCollection(errorCodeString);
188        String errorCodeExtString = conf.get(CONF_USER_RETRY_ERROR_CODE_EXT).replaceAll("\\s+", "");
189        Collection<String> extra = StringUtils.getStringCollection(errorCodeExtString);
190        Set<String> set = new HashSet<String>();
191        set.addAll(strings);
192        set.addAll(extra);
193        return set;
194    }
195
196    /**
197     * Get NodeDef default version, _oozie_inst_v_0 or _oozie_inst_v_1
198     *
199     * @return nodedef default version
200     * @throws WorkflowException thrown if there was an error parsing the action configuration.
201    */
202    public static String getNodeDefDefaultVersion() throws WorkflowException {
203        Configuration conf = Services.get().get(ConfigurationService.class).getConf();
204        String ret = conf.get(CONF_NODE_DEF_VERSION);
205        if (ret == null) {
206            ret = NODE_DEF_VERSION_1;
207        }
208        return ret;
209    }
210
211    /**
212     * Delegation method used when failing actions. <p/>
213     *
214     * @param context NodeHandler context.
215     */
216    @SuppressWarnings("unchecked")
217    protected static void liteFail(NodeHandler.Context context) {
218        liteTerminate(context, ACTIONS_TO_FAIL);
219    }
220
221    /**
222     * Delegation method used when killing actions. <p/>
223     *
224     * @param context NodeHandler context.
225     */
226    @SuppressWarnings("unchecked")
227    protected static void liteKill(NodeHandler.Context context) {
228        liteTerminate(context, ACTIONS_TO_KILL);
229    }
230
231    /**
232     * Used to terminate jobs - FAIL or KILL. <p/>
233     *
234     * @param context NodeHandler context.
235     * @param transientVar The transient variable name.
236     */
237    @SuppressWarnings("unchecked")
238    private static void liteTerminate(NodeHandler.Context context, String transientVar) {
239        List<String> list = (List<String>) context.getTransientVar(transientVar);
240        if (list == null) {
241            list = new ArrayList<String>();
242            context.setTransientVar(transientVar, list);
243        }
244        list.add(context.getVar(context.getNodeDef().getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID));
245    }
246
247    // wires workflow lib action execution with Oozie Dag
248    public static class LiteActionHandler extends ActionNodeHandler {
249
250        @Override
251        public void start(Context context) throws WorkflowException {
252            liteExecute(context, null);
253        }
254
255        @Override
256        public void end(Context context) {
257        }
258
259        @Override
260        public void kill(Context context) {
261            liteKill(context);
262        }
263
264        @Override
265        public void fail(Context context) {
266            liteFail(context);
267        }
268    }
269
270    // wires workflow lib decision execution with Oozie Dag
271    public static class LiteDecisionHandler extends DecisionNodeHandler {
272
273        @Override
274        public void start(Context context) throws WorkflowException {
275            liteExecute(context, null);
276        }
277
278        @Override
279        public void end(Context context) {
280        }
281
282        @Override
283        public void kill(Context context) {
284            liteKill(context);
285        }
286
287        @Override
288        public void fail(Context context) {
289            liteFail(context);
290        }
291    }
292
293    // wires workflow lib control nodes with Oozie Dag
294    public static class LiteControlNodeHandler extends ControlNodeHandler {
295
296      @Override
297      public void touch(Context context) throws WorkflowException {
298          Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
299          String nodeType;
300          if (nodeClass.equals(StartNodeDef.class)) {
301            nodeType = StartActionExecutor.TYPE;
302          }
303          else if (nodeClass.equals(EndNodeDef.class)) {
304              nodeType = EndActionExecutor.TYPE;
305          }
306          else if (nodeClass.equals(KillNodeDef.class)) {
307              nodeType = KillActionExecutor.TYPE;
308          }
309          else if (nodeClass.equals(ForkNodeDef.class)) {
310              nodeType = ForkActionExecutor.TYPE;
311          }
312          else if (nodeClass.equals(JoinNodeDef.class)) {
313              nodeType = JoinActionExecutor.TYPE;
314          } else {
315            throw new IllegalStateException("Invalid node type: " + nodeClass);
316          }
317
318          liteExecute(context, nodeType);
319      }
320
321    }
322}