001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.util.Date;
021    
022    import javax.servlet.jsp.el.ELException;
023    
024    import org.apache.hadoop.conf.Configuration;
025    import org.apache.oozie.ErrorCode;
026    import org.apache.oozie.FaultInjection;
027    import org.apache.oozie.WorkflowActionBean;
028    import org.apache.oozie.WorkflowJobBean;
029    import org.apache.oozie.XException;
030    import org.apache.oozie.action.ActionExecutor;
031    import org.apache.oozie.action.ActionExecutorException;
032    import org.apache.oozie.client.OozieClient;
033    import org.apache.oozie.client.WorkflowAction;
034    import org.apache.oozie.client.WorkflowJob;
035    import org.apache.oozie.client.SLAEvent.SlaAppType;
036    import org.apache.oozie.client.SLAEvent.Status;
037    import org.apache.oozie.command.CommandException;
038    import org.apache.oozie.command.coord.CoordActionUpdateCommand;
039    import org.apache.oozie.service.ActionService;
040    import org.apache.oozie.service.Services;
041    import org.apache.oozie.service.UUIDService;
042    import org.apache.oozie.store.StoreException;
043    import org.apache.oozie.store.WorkflowStore;
044    import org.apache.oozie.util.ELEvaluationException;
045    import org.apache.oozie.util.Instrumentation;
046    import org.apache.oozie.util.XLog;
047    import org.apache.oozie.util.XmlUtils;
048    import org.apache.oozie.util.db.SLADbOperations;
049    
050    public class ActionStartCommand extends ActionCommand<Void> {
051        public static final String EL_ERROR = "EL_ERROR";
052        public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR";
053        public static final String COULD_NOT_START = "COULD_NOT_START";
054        public static final String START_DATA_MISSING = "START_DATA_MISSING";
055        public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
056    
057        private String id;
058        private String jobId;
059    
060        public ActionStartCommand(String id, String type) {
061            super("action.start", type, 0);
062            this.id = id;
063        }
064    
065        @Override
066        protected Void call(WorkflowStore store) throws StoreException, CommandException {
067            WorkflowJobBean workflow = store.getWorkflow(jobId, false);
068            setLogInfo(workflow);
069            WorkflowActionBean action = store.getAction(id, false);
070            XLog.getLog(getClass()).warn(XLog.STD,
071                                         "[***" + action.getId() + "***]" + "In call()....status=" + action.getStatusStr());
072            setLogInfo(action);
073            if (action.isPending()
074                    && (action.getStatus() == WorkflowActionBean.Status.PREP
075                    || action.getStatus() == WorkflowActionBean.Status.START_RETRY || action.getStatus() == WorkflowActionBean.Status.START_MANUAL)) {
076                if (workflow.getStatus() == WorkflowJob.Status.RUNNING) {
077    
078                    ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(action.getType());
079                    Configuration conf = workflow.getWorkflowInstance().getConf();
080    
081                    int maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
082                    long retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
083                    executor.setMaxRetries(maxRetries);
084                    executor.setRetryInterval(retryInterval);
085    
086                    if (executor != null) {
087                        ActionExecutorContext context = null;
088                        try {
089                            boolean isRetry = false;
090                            if (action.getStatus() == WorkflowActionBean.Status.START_RETRY
091                                    || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
092                                isRetry = true;
093                            }
094                            context = new ActionCommand.ActionExecutorContext(workflow, action, isRetry);
095                            try {
096                                String tmpActionConf = XmlUtils.removeComments(action.getConf());
097                                String actionConf = context.getELEvaluator().evaluate(tmpActionConf, String.class);
098                                action.setConf(actionConf);
099    
100                                XLog.getLog(getClass()).debug("Start, name [{0}] type [{1}] configuration{E}{E}{2}{E}",
101                                                              action.getName(), action.getType(), actionConf);
102    
103                            }
104                            catch (ELEvaluationException ex) {
105                                throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT,
106                                                                  EL_EVAL_ERROR, ex.getMessage(), ex);
107                            }
108                            catch (ELException ex) {
109                                context.setErrorInfo(EL_ERROR, ex.getMessage());
110                                XLog.getLog(getClass()).warn("ELException in ActionStartCommand ", ex.getMessage(), ex);
111                                handleError(context, store, workflow, action);
112                                return null;
113                            }
114                            catch (org.jdom.JDOMException je) {
115                                context.setErrorInfo("ParsingError", je.getMessage());
116                                XLog.getLog(getClass()).warn("JDOMException in ActionStartCommand ", je.getMessage(), je);
117                                handleError(context, store, workflow, action);
118                                return null;
119                            }
120                            catch (Exception ex) {
121                                context.setErrorInfo(EL_ERROR, ex.getMessage());
122                                XLog.getLog(getClass()).warn("Exception in ActionStartCommand ", ex.getMessage(), ex);
123                                handleError(context, store, workflow, action);
124                                return null;
125                            }
126                            action.setErrorInfo(null, null);
127                            incrActionCounter(action.getType(), 1);
128    
129                            Instrumentation.Cron cron = new Instrumentation.Cron();
130                            cron.start();
131                            executor.start(context, action);
132                            cron.stop();
133                            FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
134                            addActionCron(action.getType(), cron);
135    
136                            action.setRetries(0);
137                            if (action.isExecutionComplete()) {
138                                if (!context.isExecuted()) {
139                                    XLog.getLog(getClass()).warn(XLog.OPS,
140                                                                 "Action Completed, ActionExecutor [{0}] must call setExecutionData()",
141                                                                 executor.getType());
142                                    action.setErrorInfo(EXEC_DATA_MISSING,
143                                                        "Execution Complete, but Execution Data Missing from Action");
144                                    failJob(context);
145                                    store.updateAction(action);
146                                    store.updateWorkflow(workflow);
147                                    return null;
148                                }
149                                action.setPending();
150                                queueCallable(new ActionEndCommand(action.getId(), action.getType()));
151                            }
152                            else {
153                                if (!context.isStarted()) {
154                                    XLog.getLog(getClass()).warn(XLog.OPS,
155                                                                 "Action Started, ActionExecutor [{0}] must call setStartData()",
156                                                                 executor.getType());
157                                    action.setErrorInfo(START_DATA_MISSING,
158                                                        "Execution Started, but Start Data Missing from Action");
159                                    failJob(context);
160                                    store.updateAction(action);
161                                    store.updateWorkflow(workflow);
162                                    return null;
163                                }
164                                queueCallable(new NotificationCommand(workflow, action));
165                            }
166    
167                            XLog.getLog(getClass()).warn(XLog.STD,
168                                                         "[***" + action.getId() + "***]" + "Action status=" + action.getStatusStr());
169    
170                            store.updateAction(action);
171                            store.updateWorkflow(workflow);
172                            // Add SLA status event (STARTED) for WF_ACTION
173                            // SLADbOperations.writeSlaStatusEvent(eSla,
174                            // action.getId(), Status.STARTED, store);
175                            SLADbOperations.writeStausEvent(action.getSlaXml(), action.getId(), store, Status.STARTED,
176                                                            SlaAppType.WORKFLOW_ACTION);
177                            XLog.getLog(getClass()).warn(XLog.STD,
178                                                         "[***" + action.getId() + "***]" + "Action updated in DB!");
179    
180                        }
181                        catch (ActionExecutorException ex) {
182                            XLog.getLog(getClass()).warn(
183                                    "Error starting action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]",
184                                    action.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage(), ex);
185                            action.setErrorInfo(ex.getErrorCode(), ex.getMessage());
186                            switch (ex.getErrorType()) {
187                                case TRANSIENT:
188                                    if (!handleTransient(context, executor, WorkflowAction.Status.START_RETRY)) {
189                                        handleNonTransient(store, context, executor, WorkflowAction.Status.START_MANUAL);
190                                        action.setPendingAge(new Date());
191                                        action.setRetries(0);
192                                        action.setStartTime(null);
193                                    }
194                                    break;
195                                case NON_TRANSIENT:
196                                    handleNonTransient(store, context, executor, WorkflowAction.Status.START_MANUAL);
197                                    break;
198                                case ERROR:
199                                    handleError(context, executor, WorkflowAction.Status.ERROR.toString(), true,
200                                                WorkflowAction.Status.DONE);
201                                    break;
202                                case FAILED:
203                                    try {
204                                        failJob(context);
205                                        queueCallable(new CoordActionUpdateCommand(workflow));
206                                        SLADbOperations.writeStausEvent(action.getSlaXml(), action.getId(), store,
207                                                                        Status.FAILED, SlaAppType.WORKFLOW_ACTION);
208                                        SLADbOperations.writeStausEvent(workflow.getSlaXml(), workflow.getId(), store,
209                                                                        Status.FAILED, SlaAppType.WORKFLOW_JOB);
210                                    }
211                                    catch (XException x) {
212                                        XLog.getLog(getClass()).warn("ActionStartCommand - case:FAILED ", x.getMessage());
213                                    }
214                                    break;
215                            }
216                            store.updateAction(action);
217                            store.updateWorkflow(workflow);
218                        }
219                    }
220                    else {
221                        throw new CommandException(ErrorCode.E0802, action.getType());
222                    }
223    
224                }
225                else {
226                    XLog.getLog(getClass()).warn("Job state is not {0}. Skipping Action Execution",
227                                                 WorkflowJob.Status.RUNNING.toString());
228                }
229            }
230            return null;
231        }
232    
233        private void handleError(ActionExecutorContext context, WorkflowStore store, WorkflowJobBean workflow,
234                                 WorkflowActionBean action) throws CommandException, StoreException {
235            failJob(context);
236            store.updateAction(action);
237            store.updateWorkflow(workflow);
238            SLADbOperations.writeStausEvent(action.getSlaXml(), action.getId(), store, Status.FAILED,
239                                            SlaAppType.WORKFLOW_ACTION);
240            SLADbOperations.writeStausEvent(workflow.getSlaXml(), workflow.getId(), store, Status.FAILED,
241                                            SlaAppType.WORKFLOW_JOB);
242            queueCallable(new CoordActionUpdateCommand(workflow));
243            return;
244        }
245    
246        @Override
247        protected Void execute(WorkflowStore store) throws CommandException, StoreException {
248            try {
249                XLog.getLog(getClass()).debug("STARTED ActionStartCommand for wf actionId=" + id);
250                jobId = Services.get().get(UUIDService.class).getId(id);
251                if (lock(jobId)) {
252                    call(store);
253                }
254                else {
255                    queueCallable(new ActionStartCommand(id, getType()), LOCK_FAILURE_REQUEUE_INTERVAL);
256                    XLog.getLog(getClass()).warn("ActionStartCommand lock was not acquired - failed {0}", id);
257                }
258            }
259            catch (InterruptedException e) {
260                queueCallable(new ActionStartCommand(id, getType()), LOCK_FAILURE_REQUEUE_INTERVAL);
261                XLog.getLog(getClass()).warn("ActionStartCommand lock was not acquired - interrupted exception failed {0}",
262                                             id);
263            }
264            XLog.getLog(getClass()).debug("ENDED ActionStartCommand for wf actionId=" + id + ", jobId=" + jobId);
265            return null;
266        }
267    
268    }