001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.wf;
020
021import java.util.ArrayList;
022import java.util.Date;
023import java.util.List;
024
025import javax.servlet.jsp.el.ELException;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.oozie.ErrorCode;
029import org.apache.oozie.FaultInjection;
030import org.apache.oozie.SLAEventBean;
031import org.apache.oozie.WorkflowActionBean;
032import org.apache.oozie.WorkflowJobBean;
033import org.apache.oozie.XException;
034import org.apache.oozie.action.ActionExecutor;
035import org.apache.oozie.action.ActionExecutorException;
036import org.apache.oozie.action.control.ControlNodeActionExecutor;
037import org.apache.oozie.client.OozieClient;
038import org.apache.oozie.client.WorkflowAction;
039import org.apache.oozie.client.WorkflowJob;
040import org.apache.oozie.client.SLAEvent.SlaAppType;
041import org.apache.oozie.client.SLAEvent.Status;
042import org.apache.oozie.client.rest.JsonBean;
043import org.apache.oozie.command.CommandException;
044import org.apache.oozie.command.PreconditionException;
045import org.apache.oozie.command.XCommand;
046import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
047import org.apache.oozie.executor.jpa.BatchQueryExecutor;
048import org.apache.oozie.executor.jpa.JPAExecutorException;
049import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
050import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
051import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
052import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
053import org.apache.oozie.service.ActionService;
054import org.apache.oozie.service.EventHandlerService;
055import org.apache.oozie.service.JPAService;
056import org.apache.oozie.service.Services;
057import org.apache.oozie.service.UUIDService;
058import org.apache.oozie.util.ELEvaluationException;
059import org.apache.oozie.util.Instrumentation;
060import org.apache.oozie.util.LogUtils;
061import org.apache.oozie.util.XLog;
062import org.apache.oozie.util.XmlUtils;
063import org.apache.oozie.util.db.SLADbXOperations;
064
065@SuppressWarnings("deprecation")
066public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext> {
067    public static final String EL_ERROR = "EL_ERROR";
068    public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR";
069    public static final String COULD_NOT_START = "COULD_NOT_START";
070    public static final String START_DATA_MISSING = "START_DATA_MISSING";
071    public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
072
073    private String jobId = null;
074    protected String actionId = null;
075    protected WorkflowJobBean wfJob = null;
076    protected WorkflowActionBean wfAction = null;
077    private JPAService jpaService = null;
078    private ActionExecutor executor = null;
079    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
080    private List<JsonBean> insertList = new ArrayList<JsonBean>();
081    protected ActionExecutorContext context = null;
082
083    public ActionStartXCommand(String actionId, String type) {
084        super("action.start", type, 0);
085        this.actionId = actionId;
086        this.jobId = Services.get().get(UUIDService.class).getId(actionId);
087    }
088
089    public ActionStartXCommand(WorkflowJobBean job, String actionId, String type) {
090        super("action.start", type, 0);
091        this.actionId = actionId;
092        this.wfJob = job;
093        this.jobId = wfJob.getId();
094    }
095
096    @Override
097    protected void setLogInfo() {
098        LogUtils.setLogInfo(actionId);
099    }
100
101    @Override
102    protected boolean isLockRequired() {
103        return true;
104    }
105
106    @Override
107    public String getEntityKey() {
108        return this.jobId;
109    }
110
111    @Override
112    protected void loadState() throws CommandException {
113        try {
114            jpaService = Services.get().get(JPAService.class);
115            if (jpaService != null) {
116                if (wfJob == null) {
117                    this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId);
118                }
119                this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, actionId);
120                LogUtils.setLogInfo( wfJob);
121                LogUtils.setLogInfo(wfAction);
122            }
123            else {
124                throw new CommandException(ErrorCode.E0610);
125            }
126        }
127        catch (XException ex) {
128            throw new CommandException(ex);
129        }
130    }
131
132    @Override
133    protected void verifyPrecondition() throws CommandException, PreconditionException {
134        if (wfJob == null) {
135            throw new PreconditionException(ErrorCode.E0604, jobId);
136        }
137        if (wfAction == null) {
138            throw new PreconditionException(ErrorCode.E0605, actionId);
139        }
140        if (wfAction.isPending()
141                && (wfAction.getStatus() == WorkflowActionBean.Status.PREP
142                        || wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY
143                        || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL
144                        || wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY
145                        )) {
146            if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
147                throw new PreconditionException(ErrorCode.E0810, WorkflowJob.Status.RUNNING.toString());
148            }
149        }
150        else {
151            throw new PreconditionException(ErrorCode.E0816, wfAction.isPending(), wfAction.getStatusStr());
152        }
153
154        executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
155        if (executor == null) {
156            throw new CommandException(ErrorCode.E0802, wfAction.getType());
157        }
158    }
159
160    @Override
161    protected ActionExecutorContext execute() throws CommandException {
162        LOG.debug("STARTED ActionStartXCommand for wf actionId=" + actionId);
163        Configuration conf = wfJob.getWorkflowInstance().getConf();
164
165        int maxRetries = 0;
166        long retryInterval = 0;
167        boolean execSynchronous = false;
168
169        if (!(executor instanceof ControlNodeActionExecutor)) {
170            maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
171            retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
172        }
173
174        executor.setMaxRetries(maxRetries);
175        executor.setRetryInterval(retryInterval);
176
177        try {
178            boolean isRetry = false;
179            if (wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY
180                    || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
181                isRetry = true;
182                prepareForRetry(wfAction);
183            }
184            boolean isUserRetry = false;
185            if (wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
186                isUserRetry = true;
187                prepareForRetry(wfAction);
188            }
189            context = getContext(isRetry, isUserRetry);
190            boolean caught = false;
191            try {
192                if (!(executor instanceof ControlNodeActionExecutor)) {
193                    String tmpActionConf = XmlUtils.removeComments(wfAction.getConf());
194                    String actionConf = context.getELEvaluator().evaluate(tmpActionConf, String.class);
195                    wfAction.setConf(actionConf);
196                    LOG.debug("Start, name [{0}] type [{1}] configuration{E}{E}{2}{E}", wfAction.getName(), wfAction
197                            .getType(), actionConf);
198                }
199            }
200            catch (ELEvaluationException ex) {
201                caught = true;
202                throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, EL_EVAL_ERROR, ex
203                        .getMessage(), ex);
204            }
205            catch (ELException ex) {
206                caught = true;
207                context.setErrorInfo(EL_ERROR, ex.getMessage());
208                LOG.warn("ELException in ActionStartXCommand ", ex.getMessage(), ex);
209                handleError(context, wfJob, wfAction);
210            }
211            catch (org.jdom.JDOMException je) {
212                caught = true;
213                context.setErrorInfo("ParsingError", je.getMessage());
214                LOG.warn("JDOMException in ActionStartXCommand ", je.getMessage(), je);
215                handleError(context, wfJob, wfAction);
216            }
217            catch (Exception ex) {
218                caught = true;
219                context.setErrorInfo(EL_ERROR, ex.getMessage());
220                LOG.warn("Exception in ActionStartXCommand ", ex.getMessage(), ex);
221                handleError(context, wfJob, wfAction);
222            }
223            if(!caught) {
224                wfAction.setErrorInfo(null, null);
225                incrActionCounter(wfAction.getType(), 1);
226
227                LOG.info("Start action [{0}] with user-retry state : userRetryCount [{1}], userRetryMax [{2}], userRetryInterval [{3}]",
228                                wfAction.getId(), wfAction.getUserRetryCount(), wfAction.getUserRetryMax(), wfAction
229                                        .getUserRetryInterval());
230
231                Instrumentation.Cron cron = new Instrumentation.Cron();
232                cron.start();
233                context.setStartTime();
234                executor.start(context, wfAction);
235                cron.stop();
236                FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
237                addActionCron(wfAction.getType(), cron);
238
239                wfAction.setRetries(0);
240                if (wfAction.isExecutionComplete()) {
241                    if (!context.isExecuted()) {
242                        LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor
243                                .getType());
244                        wfAction.setErrorInfo(EXEC_DATA_MISSING,
245                                "Execution Complete, but Execution Data Missing from Action");
246                        failJob(context);
247                    } else {
248                        wfAction.setPending();
249                        if (!(executor instanceof ControlNodeActionExecutor)) {
250                            queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
251                        }
252                        else {
253                            execSynchronous = true;
254                        }
255                    }
256                }
257                else {
258                    if (!context.isStarted()) {
259                        LOG.warn(XLog.OPS, "Action Started, ActionExecutor [{0}] must call setStartData()", executor
260                                .getType());
261                        wfAction.setErrorInfo(START_DATA_MISSING, "Execution Started, but Start Data Missing from Action");
262                        failJob(context);
263                    } else {
264                        queue(new WorkflowNotificationXCommand(wfJob, wfAction));
265                    }
266                }
267
268                LOG.info(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action status=" + wfAction.getStatusStr());
269
270                updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction));
271                updateJobLastModified();
272                // Add SLA status event (STARTED) for WF_ACTION
273                SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.STARTED,
274                        SlaAppType.WORKFLOW_ACTION);
275                if(slaEvent != null) {
276                    insertList.add(slaEvent);
277                }
278                LOG.info(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action updated in DB!");
279            }
280        }
281        catch (ActionExecutorException ex) {
282            LOG.warn("Error starting action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]",
283                    wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage(), ex);
284            wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
285            switch (ex.getErrorType()) {
286                case TRANSIENT:
287                    if (!handleTransient(context, executor, WorkflowAction.Status.START_RETRY)) {
288                        handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
289                        wfAction.setPendingAge(new Date());
290                        wfAction.setRetries(0);
291                        wfAction.setStartTime(null);
292                    }
293                    break;
294                case NON_TRANSIENT:
295                    handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
296                    break;
297                case ERROR:
298                    handleError(context, executor, WorkflowAction.Status.ERROR.toString(), true,
299                            WorkflowAction.Status.DONE);
300                    break;
301                case FAILED:
302                    try {
303                        failJob(context);
304                        endWF();
305                        SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
306                                SlaAppType.WORKFLOW_ACTION);
307                        if(slaEvent1 != null) {
308                            insertList.add(slaEvent1);
309                        }
310                    }
311                    catch (XException x) {
312                        LOG.warn("ActionStartXCommand - case:FAILED ", x.getMessage());
313                    }
314                    break;
315            }
316            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction));
317            updateJobLastModified();
318        }
319        finally {
320            try {
321                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
322                if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
323                    generateEvent(wfAction, wfJob.getUser());
324                }
325                if (execSynchronous) {
326                    // Changing to synchronous call from asynchronous queuing to prevent
327                    // undue delay from ::start:: to action due to queuing
328                    callActionEnd();
329                }
330            }
331            catch (JPAExecutorException e) {
332                throw new CommandException(e);
333            }
334        }
335
336        LOG.debug("ENDED ActionStartXCommand for wf actionId=" + actionId + ", jobId=" + jobId);
337
338        return null;
339    }
340
341    protected void callActionEnd() throws CommandException {
342        new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call();
343    }
344
345    /**
346     * Get action executor context
347     * @param isRetry
348     * @param isUserRetry
349     * @return
350     */
351    protected ActionExecutorContext getContext(boolean isRetry, boolean isUserRetry) {
352        return new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
353    }
354
355    protected void updateJobLastModified(){
356        wfJob.setLastModifiedTime(new Date());
357        updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
358    }
359
360    protected void endWF() throws CommandException{
361        updateParentIfNecessary(wfJob, 3);
362        new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
363        SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED,
364                SlaAppType.WORKFLOW_JOB);
365        if(slaEvent2 != null) {
366            insertList.add(slaEvent2);
367        }
368    }
369
370    protected void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action)
371            throws CommandException {
372        failJob(context);
373        updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction));
374        updateJobLastModified();
375        SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(),
376                Status.FAILED, SlaAppType.WORKFLOW_ACTION);
377        if(slaEvent1 != null) {
378            insertList.add(slaEvent1);
379        }
380        endWF();
381        return;
382    }
383
384    /* (non-Javadoc)
385     * @see org.apache.oozie.command.XCommand#getKey()
386     */
387    @Override
388    public String getKey(){
389        return getName() + "_" + actionId;
390    }
391
392    private void prepareForRetry(WorkflowActionBean wfAction) {
393        if (wfAction.getType().equals("map-reduce")) {
394            // need to delete child job id of original run
395            wfAction.setExternalChildIDs("");
396        }
397    }
398
399    @Override
400    protected void queueCommandForTransientFailure(long retryDelayMillis){
401        queue(new ActionStartXCommand(wfAction.getId(), wfAction.getType()), retryDelayMillis);
402    }
403
404    protected void queue(XCommand<?> command, long msDelay) {
405        // ActionStartXCommand is synchronously called from SignalXCommand passing wfJob so that it doesn't have to
406        //reload wfJob again. We need set wfJob to null, so that it get reloaded when the requeued command executes.
407        if (command instanceof ActionStartXCommand) {
408            ((ActionStartXCommand)command).wfJob = null;
409        }
410        super.queue(command, msDelay);
411    }
412}