001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import org.apache.hadoop.util.StringUtils;
022import org.apache.oozie.action.control.EndActionExecutor;
023import org.apache.oozie.action.control.ForkActionExecutor;
024import org.apache.oozie.action.control.JoinActionExecutor;
025import org.apache.oozie.action.control.KillActionExecutor;
026import org.apache.oozie.action.control.StartActionExecutor;
027import org.apache.oozie.command.wf.ReRunXCommand;
028
029import org.apache.oozie.client.WorkflowAction;
030import org.apache.oozie.WorkflowActionBean;
031import org.apache.oozie.WorkflowJobBean;
032import org.apache.oozie.ErrorCode;
033import org.apache.oozie.workflow.WorkflowException;
034import org.apache.oozie.workflow.WorkflowInstance;
035import org.apache.oozie.workflow.lite.ActionNodeHandler;
036import org.apache.oozie.workflow.lite.ControlNodeHandler;
037import org.apache.oozie.workflow.lite.DecisionNodeHandler;
038import org.apache.oozie.workflow.lite.EndNodeDef;
039import org.apache.oozie.workflow.lite.ForkNodeDef;
040import org.apache.oozie.workflow.lite.JoinNodeDef;
041import org.apache.oozie.workflow.lite.KillNodeDef;
042import org.apache.oozie.workflow.lite.NodeDef;
043import org.apache.oozie.workflow.lite.NodeHandler;
044import org.apache.oozie.util.XLog;
045import org.apache.oozie.util.XmlUtils;
046import org.apache.oozie.workflow.lite.StartNodeDef;
047import org.jdom.Element;
048import org.jdom.JDOMException;
049
050import java.util.ArrayList;
051import java.util.Collection;
052import java.util.HashSet;
053import java.util.List;
054import java.util.Set;
055
056public abstract class LiteWorkflowStoreService extends WorkflowStoreService {
057
058    public static final String CONF_PREFIX = Service.CONF_PREFIX + "LiteWorkflowStoreService.";
059    public static final String CONF_PREFIX_USER_RETRY = CONF_PREFIX + "user.retry.";
060    public static final String CONF_USER_RETRY_MAX = CONF_PREFIX_USER_RETRY + "max";
061    public static final String CONF_USER_RETRY_INTEVAL = CONF_PREFIX_USER_RETRY + "inteval";
062    public static final String CONF_USER_RETRY_POLICY = CONF_PREFIX_USER_RETRY + "policy";
063    public static final String CONF_USER_RETRY_ERROR_CODE = CONF_PREFIX_USER_RETRY + "error.code";
064    public static final String CONF_USER_RETRY_ERROR_CODE_EXT = CONF_PREFIX_USER_RETRY + "error.code.ext";
065    public static final String DEFAULT_USER_RETRY_POLICY = "PERIODIC";
066
067    public static final String NODE_DEF_VERSION_0 = "_oozie_inst_v_0";
068    public static final String NODE_DEF_VERSION_1 = "_oozie_inst_v_1";
069    public static final String NODE_DEF_VERSION_2 = "_oozie_inst_v_2";
070    public static final String CONF_NODE_DEF_VERSION = CONF_PREFIX + "node.def.version";
071
072    public static final String USER_ERROR_CODE_ALL = "ALL";
073
074    /**
075     * Delegation method used by the Action and Decision {@link NodeHandler} on start. <p> This method provides the
076     * necessary information to create ActionExecutors.
077     *
078     * @param context NodeHandler context.
079     * @param actionType the action type.
080     * @throws WorkflowException thrown if there was an error parsing the action configuration.
081     */
082    @SuppressWarnings("unchecked")
083    protected static void liteExecute(NodeHandler.Context context, String actionType) throws WorkflowException {
084        XLog log = XLog.getLog(LiteWorkflowStoreService.class);
085        String jobId = context.getProcessInstance().getId();
086        String nodeName = context.getNodeDef().getName();
087        String skipVar = context.getProcessInstance().getVar(context.getNodeDef().getName()
088                + WorkflowInstance.NODE_VAR_SEPARATOR + ReRunXCommand.TO_SKIP);
089        boolean skipAction = false;
090        if (skipVar != null) {
091            skipAction = skipVar.equals("true");
092        }
093        WorkflowActionBean action = new WorkflowActionBean();
094        String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, nodeName);
095
096        if (!skipAction) {
097            String nodeConf = context.getNodeDef().getConf();
098
099            if (actionType == null) {
100                try {
101                    Element element = XmlUtils.parseXml(nodeConf);
102                    actionType = element.getName();
103                    nodeConf = XmlUtils.prettyPrint(element).toString();
104                }
105                catch (JDOMException ex) {
106                    throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
107                }
108            }
109            log.debug(" Creating action for node [{0}]", nodeName);
110            action.setType(actionType);
111            action.setConf(nodeConf);
112            action.setLogToken(((WorkflowJobBean) context.getTransientVar(WORKFLOW_BEAN)).getLogToken());
113            action.setStatus(WorkflowAction.Status.PREP);
114            action.setJobId(jobId);
115        }
116
117        String executionPath = context.getExecutionPath();
118        action.setExecutionPath(executionPath);
119        action.setCred(context.getNodeDef().getCred());
120        log.debug("Setting action for cred: '"+context.getNodeDef().getCred() +
121                        "', name: '"+ context.getNodeDef().getName() + "'");
122
123        action.setUserRetryCount(0);
124        int userRetryMax = getUserRetryMax(context);
125        int userRetryInterval = getUserRetryInterval(context);
126        action.setUserRetryMax(userRetryMax);
127        action.setUserRetryInterval(userRetryInterval);
128        log.debug("Setting action for userRetryMax: '"+ userRetryMax +
129                        "', userRetryInterval: '" + userRetryInterval +
130                        "', name: '"+ context.getNodeDef().getName() + "'");
131
132        action.setName(nodeName);
133        action.setId(actionId);
134        context.setVar(nodeName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID, actionId);
135        List list = (List) context.getTransientVar(ACTIONS_TO_START);
136        if (list == null) {
137            list = new ArrayList();
138            context.setTransientVar(ACTIONS_TO_START, list);
139        }
140        list.add(action);
141    }
142
143    private static int getUserRetryInterval(NodeHandler.Context context) throws WorkflowException {
144        int ret = ConfigurationService.getInt(CONF_USER_RETRY_INTEVAL);
145        String userRetryInterval = context.getNodeDef().getUserRetryInterval();
146
147        if (!userRetryInterval.equals("null")) {
148            try {
149                ret = Integer.parseInt(userRetryInterval);
150            }
151            catch (NumberFormatException nfe) {
152                throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe);
153            }
154        }
155        return ret;
156    }
157
158    private static int getUserRetryMax(NodeHandler.Context context) throws WorkflowException {
159        XLog log = XLog.getLog(LiteWorkflowStoreService.class);
160        int ret = ConfigurationService.getInt(CONF_USER_RETRY_MAX);
161        int max = ret;
162        String userRetryMax = context.getNodeDef().getUserRetryMax();
163
164        if (!userRetryMax.equals("null")) {
165            try {
166                ret = Integer.parseInt(userRetryMax);
167                if (ret > max) {
168                    ret = max;
169                    log.warn(ErrorCode.E0820.getTemplate(), ret, max);
170                }
171            }
172            catch (NumberFormatException nfe) {
173                throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe);
174            }
175        }
176        else {
177            ret = 0;
178        }
179        return ret;
180    }
181
182    /**
183     * Get system defined and instance defined error codes for which USER_RETRY is allowed
184     *
185     * @return set of error code user-retry is allowed for
186     */
187    public static Set<String> getUserRetryErrorCode() {
188        // eliminating whitespaces in the error codes value specification
189        String errorCodeString = ConfigurationService.get(CONF_USER_RETRY_ERROR_CODE).replaceAll("\\s+", "");
190        Collection<String> strings = StringUtils.getStringCollection(errorCodeString);
191        String errorCodeExtString = ConfigurationService.get(CONF_USER_RETRY_ERROR_CODE_EXT).replaceAll("\\s+", "");
192        Collection<String> extra = StringUtils.getStringCollection(errorCodeExtString);
193        Set<String> set = new HashSet<String>();
194        set.addAll(strings);
195        set.addAll(extra);
196        return set;
197    }
198
199    /**
200     * Get NodeDef default version, _oozie_inst_v_0, _oozie_inst_v_1 or
201     * _oozie_inst_v_2
202     *
203     * @return nodedef default version
204     * @throws WorkflowException thrown if there was an error parsing the action
205     *         configuration.
206     */
207    public static String getNodeDefDefaultVersion() throws WorkflowException {
208        String ret = ConfigurationService.get(CONF_NODE_DEF_VERSION);
209        if (ret == null) {
210            ret = NODE_DEF_VERSION_2;
211        }
212        return ret;
213    }
214
215    /**
216     * Delegation method used when failing actions. <p>
217     *
218     * @param context NodeHandler context.
219     */
220    @SuppressWarnings("unchecked")
221    protected static void liteFail(NodeHandler.Context context) {
222        liteTerminate(context, ACTIONS_TO_FAIL);
223    }
224
225    /**
226     * Delegation method used when killing actions. <p>
227     *
228     * @param context NodeHandler context.
229     */
230    @SuppressWarnings("unchecked")
231    protected static void liteKill(NodeHandler.Context context) {
232        liteTerminate(context, ACTIONS_TO_KILL);
233    }
234
235    /**
236     * Used to terminate jobs - FAIL or KILL. <p>
237     *
238     * @param context NodeHandler context.
239     * @param transientVar The transient variable name.
240     */
241    @SuppressWarnings("unchecked")
242    private static void liteTerminate(NodeHandler.Context context, String transientVar) {
243        List<String> list = (List<String>) context.getTransientVar(transientVar);
244        if (list == null) {
245            list = new ArrayList<String>();
246            context.setTransientVar(transientVar, list);
247        }
248        list.add(context.getVar(context.getNodeDef().getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID));
249    }
250
251    // wires workflow lib action execution with Oozie Dag
252    public static class LiteActionHandler extends ActionNodeHandler {
253
254        @Override
255        public void start(Context context) throws WorkflowException {
256            liteExecute(context, null);
257        }
258
259        @Override
260        public void end(Context context) {
261        }
262
263        @Override
264        public void kill(Context context) {
265            liteKill(context);
266        }
267
268        @Override
269        public void fail(Context context) {
270            liteFail(context);
271        }
272    }
273
274    // wires workflow lib decision execution with Oozie Dag
275    public static class LiteDecisionHandler extends DecisionNodeHandler {
276
277        @Override
278        public void start(Context context) throws WorkflowException {
279            liteExecute(context, null);
280        }
281
282        @Override
283        public void end(Context context) {
284        }
285
286        @Override
287        public void kill(Context context) {
288            liteKill(context);
289        }
290
291        @Override
292        public void fail(Context context) {
293            liteFail(context);
294        }
295    }
296
297    // wires workflow lib control nodes with Oozie Dag
298    public static class LiteControlNodeHandler extends ControlNodeHandler {
299
300      @Override
301      public void touch(Context context) throws WorkflowException {
302          Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
303          String nodeType;
304          if (nodeClass.equals(StartNodeDef.class)) {
305            nodeType = StartActionExecutor.TYPE;
306          }
307          else if (nodeClass.equals(EndNodeDef.class)) {
308              nodeType = EndActionExecutor.TYPE;
309          }
310          else if (nodeClass.equals(KillNodeDef.class)) {
311              nodeType = KillActionExecutor.TYPE;
312          }
313          else if (nodeClass.equals(ForkNodeDef.class)) {
314              nodeType = ForkActionExecutor.TYPE;
315          }
316          else if (nodeClass.equals(JoinNodeDef.class)) {
317              nodeType = JoinActionExecutor.TYPE;
318          } else {
319            throw new IllegalStateException("Invalid node type: " + nodeClass);
320          }
321
322          liteExecute(context, nodeType);
323      }
324
325    }
326}