001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.util.Date;
021    
022    import javax.servlet.jsp.el.ELException;
023    import org.apache.hadoop.conf.Configuration;
024    import org.apache.oozie.ErrorCode;
025    import org.apache.oozie.FaultInjection;
026    import org.apache.oozie.WorkflowActionBean;
027    import org.apache.oozie.WorkflowJobBean;
028    import org.apache.oozie.XException;
029    import org.apache.oozie.action.ActionExecutor;
030    import org.apache.oozie.action.ActionExecutorException;
031    import org.apache.oozie.client.OozieClient;
032    import org.apache.oozie.client.WorkflowAction;
033    import org.apache.oozie.client.WorkflowJob;
034    import org.apache.oozie.client.SLAEvent.SlaAppType;
035    import org.apache.oozie.client.SLAEvent.Status;
036    import org.apache.oozie.command.CommandException;
037    import org.apache.oozie.command.PreconditionException;
038    import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
039    import org.apache.oozie.executor.jpa.JPAExecutorException;
040    import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
041    import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
042    import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
043    import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
044    import org.apache.oozie.service.ActionService;
045    import org.apache.oozie.service.JPAService;
046    import org.apache.oozie.service.Services;
047    import org.apache.oozie.service.UUIDService;
048    import org.apache.oozie.util.ELEvaluationException;
049    import org.apache.oozie.util.Instrumentation;
050    import org.apache.oozie.util.LogUtils;
051    import org.apache.oozie.util.XLog;
052    import org.apache.oozie.util.XmlUtils;
053    import org.apache.oozie.util.db.SLADbXOperations;
054    
055    public class ActionStartXCommand extends ActionXCommand<Void> {
056        public static final String EL_ERROR = "EL_ERROR";
057        public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR";
058        public static final String COULD_NOT_START = "COULD_NOT_START";
059        public static final String START_DATA_MISSING = "START_DATA_MISSING";
060        public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
061    
062        private String jobId = null;
063        private String actionId = null;
064        private WorkflowJobBean wfJob = null;
065        private WorkflowActionBean wfAction = null;
066        private JPAService jpaService = null;
067        private ActionExecutor executor = null;
068    
069        public ActionStartXCommand(String actionId, String type) {
070            super("action.start", type, 0);
071            this.actionId = actionId;
072            this.jobId = Services.get().get(UUIDService.class).getId(actionId);
073        }
074    
075        @Override
076        protected boolean isLockRequired() {
077            return true;
078        }
079    
080        @Override
081        protected String getEntityKey() {
082            return this.jobId;
083        }
084    
085        @Override
086        protected void loadState() throws CommandException {
087            try {
088                jpaService = Services.get().get(JPAService.class);
089                if (jpaService != null) {
090                    this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
091                    this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
092                    LogUtils.setLogInfo(wfJob, logInfo);
093                    LogUtils.setLogInfo(wfAction, logInfo);
094                }
095                else {
096                    throw new CommandException(ErrorCode.E0610);
097                }
098            }
099            catch (XException ex) {
100                throw new CommandException(ex);
101            }
102        }
103    
104        @Override
105        protected void verifyPrecondition() throws CommandException, PreconditionException {
106            if (wfJob == null) {
107                throw new PreconditionException(ErrorCode.E0604, jobId);
108            }
109            if (wfAction == null) {
110                throw new PreconditionException(ErrorCode.E0605, actionId);
111            }
112            if (wfAction.isPending()
113                    && (wfAction.getStatus() == WorkflowActionBean.Status.PREP
114                            || wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY
115                            || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL
116                            || wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY
117                            )) {
118                if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
119                    throw new PreconditionException(ErrorCode.E0810, WorkflowJob.Status.RUNNING.toString());
120                }
121            }
122            else {
123                throw new PreconditionException(ErrorCode.E0816, wfAction.getPending(), wfAction.getStatusStr());
124            }
125    
126            executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
127            if (executor == null) {
128                throw new CommandException(ErrorCode.E0802, wfAction.getType());
129            }
130        }
131    
132        @Override
133        protected Void execute() throws CommandException {
134    
135            LOG.debug("STARTED ActionStartXCommand for wf actionId=" + actionId);
136            Configuration conf = wfJob.getWorkflowInstance().getConf();
137    
138            int maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
139            long retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
140            executor.setMaxRetries(maxRetries);
141            executor.setRetryInterval(retryInterval);
142    
143            ActionExecutorContext context = null;
144            try {
145                boolean isRetry = false;
146                if (wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY
147                        || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
148                    isRetry = true;
149                }
150                boolean isUserRetry = false;
151                if (wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
152                    isUserRetry = true;
153                }
154                context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
155                try {
156                    String tmpActionConf = XmlUtils.removeComments(wfAction.getConf());
157                    String actionConf = context.getELEvaluator().evaluate(tmpActionConf, String.class);
158                    wfAction.setConf(actionConf);
159                    LOG.debug("Start, name [{0}] type [{1}] configuration{E}{E}{2}{E}", wfAction.getName(), wfAction
160                            .getType(), actionConf);
161                }
162                catch (ELEvaluationException ex) {
163                    throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, EL_EVAL_ERROR, ex
164                            .getMessage(), ex);
165                }
166                catch (ELException ex) {
167                    context.setErrorInfo(EL_ERROR, ex.getMessage());
168                    LOG.warn("ELException in ActionStartXCommand ", ex.getMessage(), ex);
169                    handleError(context, wfJob, wfAction);
170                    return null;
171                }
172                catch (org.jdom.JDOMException je) {
173                    context.setErrorInfo("ParsingError", je.getMessage());
174                    LOG.warn("JDOMException in ActionStartXCommand ", je.getMessage(), je);
175                    handleError(context, wfJob, wfAction);
176                    return null;
177                }
178                catch (Exception ex) {
179                    context.setErrorInfo(EL_ERROR, ex.getMessage());
180                    LOG.warn("Exception in ActionStartXCommand ", ex.getMessage(), ex);
181                    handleError(context, wfJob, wfAction);
182                    return null;
183                }
184                wfAction.setErrorInfo(null, null);
185                incrActionCounter(wfAction.getType(), 1);
186    
187                LOG.info("Start action [{0}] with user-retry state : userRetryCount [{1}], userRetryMax [{2}], userRetryInterval [{3}]",
188                                wfAction.getId(), wfAction.getUserRetryCount(), wfAction.getUserRetryMax(), wfAction
189                                        .getUserRetryInterval());
190    
191                Instrumentation.Cron cron = new Instrumentation.Cron();
192                cron.start();
193                executor.start(context, wfAction);
194                cron.stop();
195                FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
196                addActionCron(wfAction.getType(), cron);
197    
198                wfAction.setRetries(0);
199                if (wfAction.isExecutionComplete()) {
200                    if (!context.isExecuted()) {
201                        LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor
202                                .getType());
203                        wfAction.setErrorInfo(EXEC_DATA_MISSING,
204                                "Execution Complete, but Execution Data Missing from Action");
205                        failJob(context);
206                        jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
207                        jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
208                        return null;
209                    }
210                    wfAction.setPending();
211                    queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
212                }
213                else {
214                    if (!context.isStarted()) {
215                        LOG.warn(XLog.OPS, "Action Started, ActionExecutor [{0}] must call setStartData()", executor
216                                .getType());
217                        wfAction.setErrorInfo(START_DATA_MISSING, "Execution Started, but Start Data Missing from Action");
218                        failJob(context);
219                        jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
220                        jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
221                        return null;
222                    }
223                    queue(new NotificationXCommand(wfJob, wfAction));
224                }
225    
226                LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action status=" + wfAction.getStatusStr());
227    
228                jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
229                jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
230                // Add SLA status event (STARTED) for WF_ACTION
231                SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.STARTED,
232                        SlaAppType.WORKFLOW_ACTION);
233                LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action updated in DB!");
234    
235            }
236            catch (ActionExecutorException ex) {
237                LOG.warn("Error starting action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]",
238                        wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage(), ex);
239                wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
240                switch (ex.getErrorType()) {
241                    case TRANSIENT:
242                        if (!handleTransient(context, executor, WorkflowAction.Status.START_RETRY)) {
243                            handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
244                            wfAction.setPendingAge(new Date());
245                            wfAction.setRetries(0);
246                            wfAction.setStartTime(null);
247                        }
248                        break;
249                    case NON_TRANSIENT:
250                        handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
251                        break;
252                    case ERROR:
253                        handleError(context, executor, WorkflowAction.Status.ERROR.toString(), true,
254                                WorkflowAction.Status.DONE);
255                        break;
256                    case FAILED:
257                        try {
258                            failJob(context);
259                            // update coordinator action
260                            new CoordActionUpdateXCommand(wfJob, 3).call();
261                            new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
262                            SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
263                                    SlaAppType.WORKFLOW_ACTION);
264                            SLADbXOperations.writeStausEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED,
265                                    SlaAppType.WORKFLOW_JOB);
266                        }
267                        catch (XException x) {
268                            LOG.warn("ActionStartXCommand - case:FAILED ", x.getMessage());
269                        }
270                        break;
271                }
272                try {
273                    jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
274                    jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
275                }
276                catch (JPAExecutorException je) {
277                    throw new CommandException(je);
278                }
279            }
280            catch (JPAExecutorException je) {
281                throw new CommandException(je);
282            }
283    
284            LOG.debug("ENDED ActionStartXCommand for wf actionId=" + actionId + ", jobId=" + jobId);
285    
286            return null;
287        }
288    
289        private void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action)
290                throws CommandException {
291            failJob(context);
292            try {
293                jpaService.execute(new WorkflowActionUpdateJPAExecutor(action));
294                jpaService.execute(new WorkflowJobUpdateJPAExecutor(workflow));
295            }
296            catch (JPAExecutorException je) {
297                throw new CommandException(je);
298            }
299            SLADbXOperations.writeStausEvent(action.getSlaXml(), action.getId(), Status.FAILED, SlaAppType.WORKFLOW_ACTION);
300            SLADbXOperations.writeStausEvent(workflow.getSlaXml(), workflow.getId(), Status.FAILED, SlaAppType.WORKFLOW_JOB);
301            // update coordinator action
302            new CoordActionUpdateXCommand(workflow, 3).call();
303            new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
304            return;
305        }
306    
307        /* (non-Javadoc)
308         * @see org.apache.oozie.command.XCommand#getKey()
309         */
310        @Override
311        public String getKey(){
312            return getName() + "_" + actionId;
313        }
314    
315    }