001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.wf;
019
020import java.util.ArrayList;
021import java.util.Date;
022import java.util.List;
023
024import javax.servlet.jsp.el.ELException;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.oozie.ErrorCode;
028import org.apache.oozie.FaultInjection;
029import org.apache.oozie.SLAEventBean;
030import org.apache.oozie.WorkflowActionBean;
031import org.apache.oozie.WorkflowJobBean;
032import org.apache.oozie.XException;
033import org.apache.oozie.action.ActionExecutor;
034import org.apache.oozie.action.ActionExecutorException;
035import org.apache.oozie.action.control.ControlNodeActionExecutor;
036import org.apache.oozie.client.OozieClient;
037import org.apache.oozie.client.WorkflowAction;
038import org.apache.oozie.client.WorkflowJob;
039import org.apache.oozie.client.SLAEvent.SlaAppType;
040import org.apache.oozie.client.SLAEvent.Status;
041import org.apache.oozie.client.rest.JsonBean;
042import org.apache.oozie.command.CommandException;
043import org.apache.oozie.command.PreconditionException;
044import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
045import org.apache.oozie.executor.jpa.BatchQueryExecutor;
046import org.apache.oozie.executor.jpa.JPAExecutorException;
047import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
048import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
049import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
050import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
051import org.apache.oozie.service.ActionService;
052import org.apache.oozie.service.EventHandlerService;
053import org.apache.oozie.service.JPAService;
054import org.apache.oozie.service.Services;
055import org.apache.oozie.service.UUIDService;
056import org.apache.oozie.util.ELEvaluationException;
057import org.apache.oozie.util.Instrumentation;
058import org.apache.oozie.util.LogUtils;
059import org.apache.oozie.util.XLog;
060import org.apache.oozie.util.XmlUtils;
061import org.apache.oozie.util.db.SLADbXOperations;
062
063@SuppressWarnings("deprecation")
064public class ActionStartXCommand extends ActionXCommand<Void> {
065    public static final String EL_ERROR = "EL_ERROR";
066    public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR";
067    public static final String COULD_NOT_START = "COULD_NOT_START";
068    public static final String START_DATA_MISSING = "START_DATA_MISSING";
069    public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
070
071    private String jobId = null;
072    private String actionId = null;
073    private WorkflowJobBean wfJob = null;
074    private WorkflowActionBean wfAction = null;
075    private JPAService jpaService = null;
076    private ActionExecutor executor = null;
077    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
078    private List<JsonBean> insertList = new ArrayList<JsonBean>();
079
080    public ActionStartXCommand(String actionId, String type) {
081        super("action.start", type, 0);
082        this.actionId = actionId;
083        this.jobId = Services.get().get(UUIDService.class).getId(actionId);
084    }
085
086    public ActionStartXCommand(WorkflowJobBean job, String actionId, String type) {
087        super("action.start", type, 0);
088        this.actionId = actionId;
089        this.wfJob = job;
090        this.jobId = wfJob.getId();
091    }
092
093    @Override
094    protected boolean isLockRequired() {
095        return true;
096    }
097
098    @Override
099    public String getEntityKey() {
100        return this.jobId;
101    }
102
103    @Override
104    protected void loadState() throws CommandException {
105        try {
106            jpaService = Services.get().get(JPAService.class);
107            if (jpaService != null) {
108                if (wfJob == null) {
109                    this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId);
110                }
111                this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, actionId);
112                LogUtils.setLogInfo(wfJob, logInfo);
113                LogUtils.setLogInfo(wfAction, logInfo);
114            }
115            else {
116                throw new CommandException(ErrorCode.E0610);
117            }
118        }
119        catch (XException ex) {
120            throw new CommandException(ex);
121        }
122    }
123
124    @Override
125    protected void verifyPrecondition() throws CommandException, PreconditionException {
126        if (wfJob == null) {
127            throw new PreconditionException(ErrorCode.E0604, jobId);
128        }
129        if (wfAction == null) {
130            throw new PreconditionException(ErrorCode.E0605, actionId);
131        }
132        if (wfAction.isPending()
133                && (wfAction.getStatus() == WorkflowActionBean.Status.PREP
134                        || wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY
135                        || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL
136                        || wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY
137                        )) {
138            if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
139                throw new PreconditionException(ErrorCode.E0810, WorkflowJob.Status.RUNNING.toString());
140            }
141        }
142        else {
143            throw new PreconditionException(ErrorCode.E0816, wfAction.isPending(), wfAction.getStatusStr());
144        }
145
146        executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
147        if (executor == null) {
148            throw new CommandException(ErrorCode.E0802, wfAction.getType());
149        }
150    }
151
152    @Override
153    protected Void execute() throws CommandException {
154
155        LOG.debug("STARTED ActionStartXCommand for wf actionId=" + actionId);
156        Configuration conf = wfJob.getWorkflowInstance().getConf();
157
158        int maxRetries = 0;
159        long retryInterval = 0;
160        boolean execSynchronous = false;
161
162        if (!(executor instanceof ControlNodeActionExecutor)) {
163            maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
164            retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
165        }
166
167        executor.setMaxRetries(maxRetries);
168        executor.setRetryInterval(retryInterval);
169
170        ActionExecutorContext context = null;
171        try {
172            boolean isRetry = false;
173            if (wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY
174                    || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
175                isRetry = true;
176                prepareForRetry(wfAction);
177            }
178            boolean isUserRetry = false;
179            if (wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
180                isUserRetry = true;
181                prepareForRetry(wfAction);
182            }
183            context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
184            boolean caught = false;
185            try {
186                if (!(executor instanceof ControlNodeActionExecutor)) {
187                    String tmpActionConf = XmlUtils.removeComments(wfAction.getConf());
188                    String actionConf = context.getELEvaluator().evaluate(tmpActionConf, String.class);
189                    wfAction.setConf(actionConf);
190                    LOG.debug("Start, name [{0}] type [{1}] configuration{E}{E}{2}{E}", wfAction.getName(), wfAction
191                            .getType(), actionConf);
192                }
193            }
194            catch (ELEvaluationException ex) {
195                caught = true;
196                throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, EL_EVAL_ERROR, ex
197                        .getMessage(), ex);
198            }
199            catch (ELException ex) {
200                caught = true;
201                context.setErrorInfo(EL_ERROR, ex.getMessage());
202                LOG.warn("ELException in ActionStartXCommand ", ex.getMessage(), ex);
203                handleError(context, wfJob, wfAction);
204            }
205            catch (org.jdom.JDOMException je) {
206                caught = true;
207                context.setErrorInfo("ParsingError", je.getMessage());
208                LOG.warn("JDOMException in ActionStartXCommand ", je.getMessage(), je);
209                handleError(context, wfJob, wfAction);
210            }
211            catch (Exception ex) {
212                caught = true;
213                context.setErrorInfo(EL_ERROR, ex.getMessage());
214                LOG.warn("Exception in ActionStartXCommand ", ex.getMessage(), ex);
215                handleError(context, wfJob, wfAction);
216            }
217            if(!caught) {
218                wfAction.setErrorInfo(null, null);
219                incrActionCounter(wfAction.getType(), 1);
220
221                LOG.info("Start action [{0}] with user-retry state : userRetryCount [{1}], userRetryMax [{2}], userRetryInterval [{3}]",
222                                wfAction.getId(), wfAction.getUserRetryCount(), wfAction.getUserRetryMax(), wfAction
223                                        .getUserRetryInterval());
224
225                Instrumentation.Cron cron = new Instrumentation.Cron();
226                cron.start();
227                context.setStartTime();
228                executor.start(context, wfAction);
229                cron.stop();
230                FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
231                addActionCron(wfAction.getType(), cron);
232
233                wfAction.setRetries(0);
234                if (wfAction.isExecutionComplete()) {
235                    if (!context.isExecuted()) {
236                        LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor
237                                .getType());
238                        wfAction.setErrorInfo(EXEC_DATA_MISSING,
239                                "Execution Complete, but Execution Data Missing from Action");
240                        failJob(context);
241                    } else {
242                        wfAction.setPending();
243                        if (!(executor instanceof ControlNodeActionExecutor)) {
244                            queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
245                        }
246                        else {
247                            execSynchronous = true;
248                        }
249                    }
250                }
251                else {
252                    if (!context.isStarted()) {
253                        LOG.warn(XLog.OPS, "Action Started, ActionExecutor [{0}] must call setStartData()", executor
254                                .getType());
255                        wfAction.setErrorInfo(START_DATA_MISSING, "Execution Started, but Start Data Missing from Action");
256                        failJob(context);
257                    } else {
258                        queue(new NotificationXCommand(wfJob, wfAction));
259                    }
260                }
261
262                LOG.info(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action status=" + wfAction.getStatusStr());
263
264                updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction));
265                wfJob.setLastModifiedTime(new Date());
266                updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
267                // Add SLA status event (STARTED) for WF_ACTION
268                SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.STARTED,
269                        SlaAppType.WORKFLOW_ACTION);
270                if(slaEvent != null) {
271                    insertList.add(slaEvent);
272                }
273                LOG.info(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action updated in DB!");
274            }
275        }
276        catch (ActionExecutorException ex) {
277            LOG.warn("Error starting action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]",
278                    wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage(), ex);
279            wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
280            switch (ex.getErrorType()) {
281                case TRANSIENT:
282                    if (!handleTransient(context, executor, WorkflowAction.Status.START_RETRY)) {
283                        handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
284                        wfAction.setPendingAge(new Date());
285                        wfAction.setRetries(0);
286                        wfAction.setStartTime(null);
287                    }
288                    break;
289                case NON_TRANSIENT:
290                    handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
291                    break;
292                case ERROR:
293                    handleError(context, executor, WorkflowAction.Status.ERROR.toString(), true,
294                            WorkflowAction.Status.DONE);
295                    break;
296                case FAILED:
297                    try {
298                        failJob(context);
299                        updateParentIfNecessary(wfJob, 3);
300                        new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
301                        SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
302                                SlaAppType.WORKFLOW_ACTION);
303                        if(slaEvent1 != null) {
304                            insertList.add(slaEvent1);
305                        }
306                        SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED,
307                                SlaAppType.WORKFLOW_JOB);
308                        if(slaEvent2 != null) {
309                            insertList.add(slaEvent2);
310                        }
311                    }
312                    catch (XException x) {
313                        LOG.warn("ActionStartXCommand - case:FAILED ", x.getMessage());
314                    }
315                    break;
316            }
317            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction));
318            wfJob.setLastModifiedTime(new Date());
319            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
320        }
321        finally {
322            try {
323                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
324                if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
325                    generateEvent(wfAction, wfJob.getUser());
326                }
327                if (execSynchronous) {
328                    // Changing to synchronous call from asynchronous queuing to prevent
329                    // undue delay from ::start:: to action due to queuing
330                    new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(getEntityKey());
331                }
332            }
333            catch (JPAExecutorException e) {
334                throw new CommandException(e);
335            }
336        }
337
338        LOG.debug("ENDED ActionStartXCommand for wf actionId=" + actionId + ", jobId=" + jobId);
339
340        return null;
341    }
342
343    private void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action)
344            throws CommandException {
345        failJob(context);
346        updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction));
347        wfJob.setLastModifiedTime(new Date());
348        updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
349        SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(),
350                Status.FAILED, SlaAppType.WORKFLOW_ACTION);
351        if(slaEvent1 != null) {
352            insertList.add(slaEvent1);
353        }
354        SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(workflow.getSlaXml(), workflow.getId(),
355                Status.FAILED, SlaAppType.WORKFLOW_JOB);
356        if(slaEvent2 != null) {
357            insertList.add(slaEvent2);
358        }
359
360        new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
361        return;
362    }
363
364    /* (non-Javadoc)
365     * @see org.apache.oozie.command.XCommand#getKey()
366     */
367    @Override
368    public String getKey(){
369        return getName() + "_" + actionId;
370    }
371
372    private void prepareForRetry(WorkflowActionBean wfAction) {
373        if (wfAction.getType().equals("map-reduce")) {
374            // need to delete child job id of original run
375            wfAction.setExternalChildIDs("");
376        }
377    }
378
379}