001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.util.ArrayList;
021    import java.util.Date;
022    import java.util.List;
023    
024    import javax.servlet.jsp.el.ELException;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.oozie.ErrorCode;
027    import org.apache.oozie.FaultInjection;
028    import org.apache.oozie.SLAEventBean;
029    import org.apache.oozie.WorkflowActionBean;
030    import org.apache.oozie.WorkflowJobBean;
031    import org.apache.oozie.XException;
032    import org.apache.oozie.action.ActionExecutor;
033    import org.apache.oozie.action.ActionExecutorException;
034    import org.apache.oozie.action.control.ControlNodeActionExecutor;
035    import org.apache.oozie.client.OozieClient;
036    import org.apache.oozie.client.WorkflowAction;
037    import org.apache.oozie.client.WorkflowJob;
038    import org.apache.oozie.client.SLAEvent.SlaAppType;
039    import org.apache.oozie.client.SLAEvent.Status;
040    import org.apache.oozie.client.rest.JsonBean;
041    import org.apache.oozie.command.CommandException;
042    import org.apache.oozie.command.PreconditionException;
043    import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
044    import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
045    import org.apache.oozie.executor.jpa.JPAExecutorException;
046    import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
047    import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
048    import org.apache.oozie.service.ActionService;
049    import org.apache.oozie.service.EventHandlerService;
050    import org.apache.oozie.service.JPAService;
051    import org.apache.oozie.service.Services;
052    import org.apache.oozie.service.UUIDService;
053    import org.apache.oozie.util.ELEvaluationException;
054    import org.apache.oozie.util.Instrumentation;
055    import org.apache.oozie.util.LogUtils;
056    import org.apache.oozie.util.XLog;
057    import org.apache.oozie.util.XmlUtils;
058    import org.apache.oozie.util.db.SLADbXOperations;
059    
060    @SuppressWarnings("deprecation")
061    public class ActionStartXCommand extends ActionXCommand<Void> {
062        public static final String EL_ERROR = "EL_ERROR";
063        public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR";
064        public static final String COULD_NOT_START = "COULD_NOT_START";
065        public static final String START_DATA_MISSING = "START_DATA_MISSING";
066        public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
067    
068        private String jobId = null;
069        private String actionId = null;
070        private WorkflowJobBean wfJob = null;
071        private WorkflowActionBean wfAction = null;
072        private JPAService jpaService = null;
073        private ActionExecutor executor = null;
074        private List<JsonBean> updateList = new ArrayList<JsonBean>();
075        private List<JsonBean> insertList = new ArrayList<JsonBean>();
076    
077        public ActionStartXCommand(String actionId, String type) {
078            super("action.start", type, 0);
079            this.actionId = actionId;
080            this.jobId = Services.get().get(UUIDService.class).getId(actionId);
081        }
082    
083        @Override
084        protected boolean isLockRequired() {
085            return true;
086        }
087    
088        @Override
089        public String getEntityKey() {
090            return this.jobId;
091        }
092    
093        @Override
094        protected void loadState() throws CommandException {
095            try {
096                jpaService = Services.get().get(JPAService.class);
097                if (jpaService != null) {
098                    this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
099                    this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
100                    LogUtils.setLogInfo(wfJob, logInfo);
101                    LogUtils.setLogInfo(wfAction, logInfo);
102                }
103                else {
104                    throw new CommandException(ErrorCode.E0610);
105                }
106            }
107            catch (XException ex) {
108                throw new CommandException(ex);
109            }
110        }
111    
112        @Override
113        protected void verifyPrecondition() throws CommandException, PreconditionException {
114            if (wfJob == null) {
115                throw new PreconditionException(ErrorCode.E0604, jobId);
116            }
117            if (wfAction == null) {
118                throw new PreconditionException(ErrorCode.E0605, actionId);
119            }
120            if (wfAction.isPending()
121                    && (wfAction.getStatus() == WorkflowActionBean.Status.PREP
122                            || wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY
123                            || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL
124                            || wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY
125                            )) {
126                if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
127                    throw new PreconditionException(ErrorCode.E0810, WorkflowJob.Status.RUNNING.toString());
128                }
129            }
130            else {
131                throw new PreconditionException(ErrorCode.E0816, wfAction.getPending(), wfAction.getStatusStr());
132            }
133    
134            executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
135            if (executor == null) {
136                throw new CommandException(ErrorCode.E0802, wfAction.getType());
137            }
138        }
139    
140        @Override
141        protected Void execute() throws CommandException {
142    
143            LOG.debug("STARTED ActionStartXCommand for wf actionId=" + actionId);
144            Configuration conf = wfJob.getWorkflowInstance().getConf();
145    
146            int maxRetries = 0;
147            long retryInterval = 0;
148    
149            if (!(executor instanceof ControlNodeActionExecutor)) {
150                maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
151                retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
152            }
153    
154            executor.setMaxRetries(maxRetries);
155            executor.setRetryInterval(retryInterval);
156    
157            ActionExecutorContext context = null;
158            try {
159                boolean isRetry = false;
160                if (wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY
161                        || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
162                    isRetry = true;
163                    prepareForRetry(wfAction);
164                }
165                boolean isUserRetry = false;
166                if (wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
167                    isUserRetry = true;
168                    prepareForRetry(wfAction);
169                }
170                context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
171                boolean caught = false;
172                try {
173                    if (!(executor instanceof ControlNodeActionExecutor)) {
174                        String tmpActionConf = XmlUtils.removeComments(wfAction.getConf());
175                        String actionConf = context.getELEvaluator().evaluate(tmpActionConf, String.class);
176                        wfAction.setConf(actionConf);
177                        LOG.debug("Start, name [{0}] type [{1}] configuration{E}{E}{2}{E}", wfAction.getName(), wfAction
178                                .getType(), actionConf);
179                    }
180                }
181                catch (ELEvaluationException ex) {
182                    caught = true;
183                    throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, EL_EVAL_ERROR, ex
184                            .getMessage(), ex);
185                }
186                catch (ELException ex) {
187                    caught = true;
188                    context.setErrorInfo(EL_ERROR, ex.getMessage());
189                    LOG.warn("ELException in ActionStartXCommand ", ex.getMessage(), ex);
190                    handleError(context, wfJob, wfAction);
191                }
192                catch (org.jdom.JDOMException je) {
193                    caught = true;
194                    context.setErrorInfo("ParsingError", je.getMessage());
195                    LOG.warn("JDOMException in ActionStartXCommand ", je.getMessage(), je);
196                    handleError(context, wfJob, wfAction);
197                }
198                catch (Exception ex) {
199                    caught = true;
200                    context.setErrorInfo(EL_ERROR, ex.getMessage());
201                    LOG.warn("Exception in ActionStartXCommand ", ex.getMessage(), ex);
202                    handleError(context, wfJob, wfAction);
203                }
204                if(!caught) {
205                    wfAction.setErrorInfo(null, null);
206                    incrActionCounter(wfAction.getType(), 1);
207    
208                    LOG.info("Start action [{0}] with user-retry state : userRetryCount [{1}], userRetryMax [{2}], userRetryInterval [{3}]",
209                                    wfAction.getId(), wfAction.getUserRetryCount(), wfAction.getUserRetryMax(), wfAction
210                                            .getUserRetryInterval());
211    
212                    Instrumentation.Cron cron = new Instrumentation.Cron();
213                    cron.start();
214                    context.setStartTime();
215                    executor.start(context, wfAction);
216                    cron.stop();
217                    FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
218                    addActionCron(wfAction.getType(), cron);
219    
220                    wfAction.setRetries(0);
221                    if (wfAction.isExecutionComplete()) {
222                        if (!context.isExecuted()) {
223                            LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor
224                                    .getType());
225                            wfAction.setErrorInfo(EXEC_DATA_MISSING,
226                                    "Execution Complete, but Execution Data Missing from Action");
227                            failJob(context);
228                        } else {
229                            wfAction.setPending();
230                            queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
231                        }
232                    }
233                    else {
234                        if (!context.isStarted()) {
235                            LOG.warn(XLog.OPS, "Action Started, ActionExecutor [{0}] must call setStartData()", executor
236                                    .getType());
237                            wfAction.setErrorInfo(START_DATA_MISSING, "Execution Started, but Start Data Missing from Action");
238                            failJob(context);
239                        } else {
240                            queue(new NotificationXCommand(wfJob, wfAction));
241                        }
242                    }
243    
244                    LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action status=" + wfAction.getStatusStr());
245    
246                    updateList.add(wfAction);
247                    wfJob.setLastModifiedTime(new Date());
248                    updateList.add(wfJob);
249                    // Add SLA status event (STARTED) for WF_ACTION
250                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.STARTED,
251                            SlaAppType.WORKFLOW_ACTION);
252                    if(slaEvent != null) {
253                        insertList.add(slaEvent);
254                    }
255                    LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action updated in DB!");
256                }
257            }
258            catch (ActionExecutorException ex) {
259                LOG.warn("Error starting action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]",
260                        wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage(), ex);
261                wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
262                switch (ex.getErrorType()) {
263                    case TRANSIENT:
264                        if (!handleTransient(context, executor, WorkflowAction.Status.START_RETRY)) {
265                            handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
266                            wfAction.setPendingAge(new Date());
267                            wfAction.setRetries(0);
268                            wfAction.setStartTime(null);
269                        }
270                        break;
271                    case NON_TRANSIENT:
272                        handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
273                        break;
274                    case ERROR:
275                        handleError(context, executor, WorkflowAction.Status.ERROR.toString(), true,
276                                WorkflowAction.Status.DONE);
277                        break;
278                    case FAILED:
279                        try {
280                            failJob(context);
281                            // update coordinator action
282                            new CoordActionUpdateXCommand(wfJob, 3).call();
283                            new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
284                            SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
285                                    SlaAppType.WORKFLOW_ACTION);
286                            if(slaEvent1 != null) {
287                                insertList.add(slaEvent1);
288                            }
289                            SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED,
290                                    SlaAppType.WORKFLOW_JOB);
291                            if(slaEvent2 != null) {
292                                insertList.add(slaEvent2);
293                            }
294                        }
295                        catch (XException x) {
296                            LOG.warn("ActionStartXCommand - case:FAILED ", x.getMessage());
297                        }
298                        break;
299                }
300                updateList.add(wfAction);
301                wfJob.setLastModifiedTime(new Date());
302                updateList.add(wfJob);
303            }
304            finally {
305                try {
306                    jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
307                    if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
308                        generateEvent(wfAction, wfJob.getUser());
309                    }
310                }
311                catch (JPAExecutorException e) {
312                    throw new CommandException(e);
313                }
314            }
315    
316            LOG.debug("ENDED ActionStartXCommand for wf actionId=" + actionId + ", jobId=" + jobId);
317    
318            return null;
319        }
320    
321        private void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action)
322                throws CommandException {
323            failJob(context);
324            updateList.add(wfAction);
325            wfJob.setLastModifiedTime(new Date());
326            updateList.add(wfJob);
327            SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(),
328                    Status.FAILED, SlaAppType.WORKFLOW_ACTION);
329            if(slaEvent1 != null) {
330                insertList.add(slaEvent1);
331            }
332            SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(workflow.getSlaXml(), workflow.getId(),
333                    Status.FAILED, SlaAppType.WORKFLOW_JOB);
334            if(slaEvent2 != null) {
335                insertList.add(slaEvent2);
336            }
337    
338            new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
339            return;
340        }
341    
342        /* (non-Javadoc)
343         * @see org.apache.oozie.command.XCommand#getKey()
344         */
345        @Override
346        public String getKey(){
347            return getName() + "_" + actionId;
348        }
349    
350        private void prepareForRetry(WorkflowActionBean wfAction) {
351            if (wfAction.getType().equals("map-reduce")) {
352                // need to delete child job id of original run
353                wfAction.setExternalChildIDs("");
354            }
355        }
356    
357    }