001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.util.StringUtils;
023import org.apache.oozie.action.control.EndActionExecutor;
024import org.apache.oozie.action.control.ForkActionExecutor;
025import org.apache.oozie.action.control.JoinActionExecutor;
026import org.apache.oozie.action.control.KillActionExecutor;
027import org.apache.oozie.action.control.StartActionExecutor;
028import org.apache.oozie.command.wf.ReRunXCommand;
029
030import org.apache.oozie.client.WorkflowAction;
031import org.apache.oozie.WorkflowActionBean;
032import org.apache.oozie.WorkflowJobBean;
033import org.apache.oozie.ErrorCode;
034import org.apache.oozie.workflow.WorkflowException;
035import org.apache.oozie.workflow.WorkflowInstance;
036import org.apache.oozie.workflow.lite.ActionNodeHandler;
037import org.apache.oozie.workflow.lite.ControlNodeHandler;
038import org.apache.oozie.workflow.lite.DecisionNodeHandler;
039import org.apache.oozie.workflow.lite.EndNodeDef;
040import org.apache.oozie.workflow.lite.ForkNodeDef;
041import org.apache.oozie.workflow.lite.JoinNodeDef;
042import org.apache.oozie.workflow.lite.KillNodeDef;
043import org.apache.oozie.workflow.lite.NodeDef;
044import org.apache.oozie.workflow.lite.NodeHandler;
045import org.apache.oozie.util.XLog;
046import org.apache.oozie.util.XmlUtils;
047import org.apache.oozie.workflow.lite.StartNodeDef;
048import org.jdom.Element;
049import org.jdom.JDOMException;
050
051import java.util.ArrayList;
052import java.util.Collection;
053import java.util.HashSet;
054import java.util.List;
055import java.util.Set;
056
057public abstract class LiteWorkflowStoreService extends WorkflowStoreService {
058
059    public static final String CONF_PREFIX = Service.CONF_PREFIX + "LiteWorkflowStoreService.";
060    public static final String CONF_PREFIX_USER_RETRY = CONF_PREFIX + "user.retry.";
061    public static final String CONF_USER_RETRY_MAX = CONF_PREFIX_USER_RETRY + "max";
062    public static final String CONF_USER_RETRY_INTEVAL = CONF_PREFIX_USER_RETRY + "inteval";
063    public static final String CONF_USER_RETRY_POLICY = CONF_PREFIX_USER_RETRY + "policy";
064    public static final String CONF_USER_RETRY_ERROR_CODE = CONF_PREFIX_USER_RETRY + "error.code";
065    public static final String CONF_USER_RETRY_ERROR_CODE_EXT = CONF_PREFIX_USER_RETRY + "error.code.ext";
066    public static final String DEFAULT_USER_RETRY_POLICY = "PERIODIC";
067
068    public static final String NODE_DEF_VERSION_0 = "_oozie_inst_v_0";
069    public static final String NODE_DEF_VERSION_1 = "_oozie_inst_v_1";
070    public static final String NODE_DEF_VERSION_2 = "_oozie_inst_v_2";
071    public static final String CONF_NODE_DEF_VERSION = CONF_PREFIX + "node.def.version";
072
073    public static final String USER_ERROR_CODE_ALL = "ALL";
074
075    /**
076     * Delegation method used by the Action and Decision {@link NodeHandler} on start. <p> This method provides the
077     * necessary information to create ActionExecutors.
078     *
079     * @param context NodeHandler context.
080     * @param actionType the action type.
081     * @throws WorkflowException thrown if there was an error parsing the action configuration.
082     */
083    @SuppressWarnings("unchecked")
084    protected static void liteExecute(NodeHandler.Context context, String actionType) throws WorkflowException {
085        XLog log = XLog.getLog(LiteWorkflowStoreService.class);
086        String jobId = context.getProcessInstance().getId();
087        String nodeName = context.getNodeDef().getName();
088        String skipVar = context.getProcessInstance().getVar(context.getNodeDef().getName()
089                + WorkflowInstance.NODE_VAR_SEPARATOR + ReRunXCommand.TO_SKIP);
090        boolean skipAction = false;
091        if (skipVar != null) {
092            skipAction = skipVar.equals("true");
093        }
094        WorkflowActionBean action = new WorkflowActionBean();
095        String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, nodeName);
096
097        if (!skipAction) {
098            String nodeConf = context.getNodeDef().getConf();
099
100            if (actionType == null) {
101                try {
102                    Element element = XmlUtils.parseXml(nodeConf);
103                    actionType = element.getName();
104                    nodeConf = XmlUtils.prettyPrint(element).toString();
105                }
106                catch (JDOMException ex) {
107                    throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
108                }
109            }
110            log.debug(" Creating action for node [{0}]", nodeName);
111            action.setType(actionType);
112            action.setConf(nodeConf);
113            action.setLogToken(((WorkflowJobBean) context.getTransientVar(WORKFLOW_BEAN)).getLogToken());
114            action.setStatus(WorkflowAction.Status.PREP);
115            action.setJobId(jobId);
116        }
117
118        String executionPath = context.getExecutionPath();
119        action.setExecutionPath(executionPath);
120        action.setCred(context.getNodeDef().getCred());
121        log.debug("Setting action for cred: '"+context.getNodeDef().getCred() +
122                        "', name: '"+ context.getNodeDef().getName() + "'");
123
124        action.setUserRetryCount(0);
125        int userRetryMax = getUserRetryMax(context);
126        int userRetryInterval = getUserRetryInterval(context);
127        action.setUserRetryMax(userRetryMax);
128        action.setUserRetryInterval(userRetryInterval);
129        log.debug("Setting action for userRetryMax: '"+ userRetryMax +
130                        "', userRetryInterval: '" + userRetryInterval +
131                        "', name: '"+ context.getNodeDef().getName() + "'");
132
133        action.setName(nodeName);
134        action.setId(actionId);
135        context.setVar(nodeName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID, actionId);
136        List list = (List) context.getTransientVar(ACTIONS_TO_START);
137        if (list == null) {
138            list = new ArrayList();
139            context.setTransientVar(ACTIONS_TO_START, list);
140        }
141        list.add(action);
142    }
143
144    private static int getUserRetryInterval(NodeHandler.Context context) throws WorkflowException {
145        int ret = ConfigurationService.getInt(CONF_USER_RETRY_INTEVAL);
146        String userRetryInterval = context.getNodeDef().getUserRetryInterval();
147
148        if (!userRetryInterval.equals("null")) {
149            try {
150                ret = Integer.parseInt(userRetryInterval);
151            }
152            catch (NumberFormatException nfe) {
153                throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe);
154            }
155        }
156        return ret;
157    }
158
159    private static int getUserRetryMax(NodeHandler.Context context) throws WorkflowException {
160        XLog log = XLog.getLog(LiteWorkflowStoreService.class);
161        int ret = ConfigurationService.getInt(CONF_USER_RETRY_MAX);
162        int max = ret;
163        String userRetryMax = context.getNodeDef().getUserRetryMax();
164
165        if (!userRetryMax.equals("null")) {
166            try {
167                ret = Integer.parseInt(userRetryMax);
168                if (ret > max) {
169                    ret = max;
170                    log.warn(ErrorCode.E0820.getTemplate(), ret, max);
171                }
172            }
173            catch (NumberFormatException nfe) {
174                throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe);
175            }
176        }
177        else {
178            ret = 0;
179        }
180        return ret;
181    }
182
183    /**
184     * Get system defined and instance defined error codes for which USER_RETRY is allowed
185     *
186     * @return set of error code user-retry is allowed for
187     */
188    public static Set<String> getUserRetryErrorCode() {
189        // eliminating whitespaces in the error codes value specification
190        String errorCodeString = ConfigurationService.get(CONF_USER_RETRY_ERROR_CODE).replaceAll("\\s+", "");
191        Collection<String> strings = StringUtils.getStringCollection(errorCodeString);
192        String errorCodeExtString = ConfigurationService.get(CONF_USER_RETRY_ERROR_CODE_EXT).replaceAll("\\s+", "");
193        Collection<String> extra = StringUtils.getStringCollection(errorCodeExtString);
194        Set<String> set = new HashSet<String>();
195        set.addAll(strings);
196        set.addAll(extra);
197        return set;
198    }
199
200    /**
201     * Get NodeDef default version, _oozie_inst_v_0, _oozie_inst_v_1 or
202     * _oozie_inst_v_2
203     *
204     * @return nodedef default version
205     * @throws WorkflowException thrown if there was an error parsing the action
206     *         configuration.
207     */
208    public static String getNodeDefDefaultVersion() throws WorkflowException {
209        String ret = ConfigurationService.get(CONF_NODE_DEF_VERSION);
210        if (ret == null) {
211            ret = NODE_DEF_VERSION_2;
212        }
213        return ret;
214    }
215
216    /**
217     * Delegation method used when failing actions. <p>
218     *
219     * @param context NodeHandler context.
220     */
221    @SuppressWarnings("unchecked")
222    protected static void liteFail(NodeHandler.Context context) {
223        liteTerminate(context, ACTIONS_TO_FAIL);
224    }
225
226    /**
227     * Delegation method used when killing actions. <p>
228     *
229     * @param context NodeHandler context.
230     */
231    @SuppressWarnings("unchecked")
232    protected static void liteKill(NodeHandler.Context context) {
233        liteTerminate(context, ACTIONS_TO_KILL);
234    }
235
236    /**
237     * Used to terminate jobs - FAIL or KILL. <p>
238     *
239     * @param context NodeHandler context.
240     * @param transientVar The transient variable name.
241     */
242    @SuppressWarnings("unchecked")
243    private static void liteTerminate(NodeHandler.Context context, String transientVar) {
244        List<String> list = (List<String>) context.getTransientVar(transientVar);
245        if (list == null) {
246            list = new ArrayList<String>();
247            context.setTransientVar(transientVar, list);
248        }
249        list.add(context.getVar(context.getNodeDef().getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID));
250    }
251
252    // wires workflow lib action execution with Oozie Dag
253    public static class LiteActionHandler extends ActionNodeHandler {
254
255        @Override
256        public void start(Context context) throws WorkflowException {
257            liteExecute(context, null);
258        }
259
260        @Override
261        public void end(Context context) {
262        }
263
264        @Override
265        public void kill(Context context) {
266            liteKill(context);
267        }
268
269        @Override
270        public void fail(Context context) {
271            liteFail(context);
272        }
273    }
274
275    // wires workflow lib decision execution with Oozie Dag
276    public static class LiteDecisionHandler extends DecisionNodeHandler {
277
278        @Override
279        public void start(Context context) throws WorkflowException {
280            liteExecute(context, null);
281        }
282
283        @Override
284        public void end(Context context) {
285        }
286
287        @Override
288        public void kill(Context context) {
289            liteKill(context);
290        }
291
292        @Override
293        public void fail(Context context) {
294            liteFail(context);
295        }
296    }
297
298    // wires workflow lib control nodes with Oozie Dag
299    public static class LiteControlNodeHandler extends ControlNodeHandler {
300
301      @Override
302      public void touch(Context context) throws WorkflowException {
303          Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
304          String nodeType;
305          if (nodeClass.equals(StartNodeDef.class)) {
306            nodeType = StartActionExecutor.TYPE;
307          }
308          else if (nodeClass.equals(EndNodeDef.class)) {
309              nodeType = EndActionExecutor.TYPE;
310          }
311          else if (nodeClass.equals(KillNodeDef.class)) {
312              nodeType = KillActionExecutor.TYPE;
313          }
314          else if (nodeClass.equals(ForkNodeDef.class)) {
315              nodeType = ForkActionExecutor.TYPE;
316          }
317          else if (nodeClass.equals(JoinNodeDef.class)) {
318              nodeType = JoinActionExecutor.TYPE;
319          } else {
320            throw new IllegalStateException("Invalid node type: " + nodeClass);
321          }
322
323          liteExecute(context, nodeType);
324      }
325
326    }
327}