001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.util.ArrayList;
021    import java.util.Date;
022    import java.util.List;
023    
024    import javax.servlet.jsp.el.ELException;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.oozie.ErrorCode;
027    import org.apache.oozie.FaultInjection;
028    import org.apache.oozie.SLAEventBean;
029    import org.apache.oozie.WorkflowActionBean;
030    import org.apache.oozie.WorkflowJobBean;
031    import org.apache.oozie.XException;
032    import org.apache.oozie.action.ActionExecutor;
033    import org.apache.oozie.action.ActionExecutorException;
034    import org.apache.oozie.action.control.ControlNodeActionExecutor;
035    import org.apache.oozie.client.OozieClient;
036    import org.apache.oozie.client.WorkflowAction;
037    import org.apache.oozie.client.WorkflowJob;
038    import org.apache.oozie.client.SLAEvent.SlaAppType;
039    import org.apache.oozie.client.SLAEvent.Status;
040    import org.apache.oozie.client.rest.JsonBean;
041    import org.apache.oozie.command.CommandException;
042    import org.apache.oozie.command.PreconditionException;
043    import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
044    import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
045    import org.apache.oozie.executor.jpa.JPAExecutorException;
046    import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
047    import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
048    import org.apache.oozie.service.ActionService;
049    import org.apache.oozie.service.JPAService;
050    import org.apache.oozie.service.Services;
051    import org.apache.oozie.service.UUIDService;
052    import org.apache.oozie.util.ELEvaluationException;
053    import org.apache.oozie.util.Instrumentation;
054    import org.apache.oozie.util.LogUtils;
055    import org.apache.oozie.util.XLog;
056    import org.apache.oozie.util.XmlUtils;
057    import org.apache.oozie.util.db.SLADbXOperations;
058    
059    public class ActionStartXCommand extends ActionXCommand<Void> {
060        public static final String EL_ERROR = "EL_ERROR";
061        public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR";
062        public static final String COULD_NOT_START = "COULD_NOT_START";
063        public static final String START_DATA_MISSING = "START_DATA_MISSING";
064        public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
065    
066        private String jobId = null;
067        private String actionId = null;
068        private WorkflowJobBean wfJob = null;
069        private WorkflowActionBean wfAction = null;
070        private JPAService jpaService = null;
071        private ActionExecutor executor = null;
072        private List<JsonBean> updateList = new ArrayList<JsonBean>();
073        private List<JsonBean> insertList = new ArrayList<JsonBean>();
074    
075        public ActionStartXCommand(String actionId, String type) {
076            super("action.start", type, 0);
077            this.actionId = actionId;
078            this.jobId = Services.get().get(UUIDService.class).getId(actionId);
079        }
080    
081        @Override
082        protected boolean isLockRequired() {
083            return true;
084        }
085    
086        @Override
087        public String getEntityKey() {
088            return this.jobId;
089        }
090    
091        @Override
092        protected void loadState() throws CommandException {
093            try {
094                jpaService = Services.get().get(JPAService.class);
095                if (jpaService != null) {
096                    this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
097                    this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
098                    LogUtils.setLogInfo(wfJob, logInfo);
099                    LogUtils.setLogInfo(wfAction, logInfo);
100                }
101                else {
102                    throw new CommandException(ErrorCode.E0610);
103                }
104            }
105            catch (XException ex) {
106                throw new CommandException(ex);
107            }
108        }
109    
110        @Override
111        protected void verifyPrecondition() throws CommandException, PreconditionException {
112            if (wfJob == null) {
113                throw new PreconditionException(ErrorCode.E0604, jobId);
114            }
115            if (wfAction == null) {
116                throw new PreconditionException(ErrorCode.E0605, actionId);
117            }
118            if (wfAction.isPending()
119                    && (wfAction.getStatus() == WorkflowActionBean.Status.PREP
120                            || wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY
121                            || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL
122                            || wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY
123                            )) {
124                if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
125                    throw new PreconditionException(ErrorCode.E0810, WorkflowJob.Status.RUNNING.toString());
126                }
127            }
128            else {
129                throw new PreconditionException(ErrorCode.E0816, wfAction.getPending(), wfAction.getStatusStr());
130            }
131    
132            executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
133            if (executor == null) {
134                throw new CommandException(ErrorCode.E0802, wfAction.getType());
135            }
136        }
137    
138        @Override
139        protected Void execute() throws CommandException {
140    
141            LOG.debug("STARTED ActionStartXCommand for wf actionId=" + actionId);
142            Configuration conf = wfJob.getWorkflowInstance().getConf();
143    
144            int maxRetries = 0;
145            long retryInterval = 0;
146    
147            if (!(executor instanceof ControlNodeActionExecutor)) {
148                maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
149                retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
150            }
151    
152            executor.setMaxRetries(maxRetries);
153            executor.setRetryInterval(retryInterval);
154    
155            ActionExecutorContext context = null;
156            try {
157                boolean isRetry = false;
158                if (wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY
159                        || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
160                    isRetry = true;
161                }
162                boolean isUserRetry = false;
163                if (wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
164                    isUserRetry = true;
165                }
166                context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
167                boolean caught = false;
168                try {
169                    if (!(executor instanceof ControlNodeActionExecutor)) {
170                        String tmpActionConf = XmlUtils.removeComments(wfAction.getConf());
171                        String actionConf = context.getELEvaluator().evaluate(tmpActionConf, String.class);
172                        wfAction.setConf(actionConf);
173                        LOG.debug("Start, name [{0}] type [{1}] configuration{E}{E}{2}{E}", wfAction.getName(), wfAction
174                                .getType(), actionConf);
175                    }
176                }
177                catch (ELEvaluationException ex) {
178                    caught = true;
179                    throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, EL_EVAL_ERROR, ex
180                            .getMessage(), ex);
181                }
182                catch (ELException ex) {
183                    caught = true;
184                    context.setErrorInfo(EL_ERROR, ex.getMessage());
185                    LOG.warn("ELException in ActionStartXCommand ", ex.getMessage(), ex);
186                    handleError(context, wfJob, wfAction);
187                }
188                catch (org.jdom.JDOMException je) {
189                    caught = true;
190                    context.setErrorInfo("ParsingError", je.getMessage());
191                    LOG.warn("JDOMException in ActionStartXCommand ", je.getMessage(), je);
192                    handleError(context, wfJob, wfAction);
193                }
194                catch (Exception ex) {
195                    caught = true;
196                    context.setErrorInfo(EL_ERROR, ex.getMessage());
197                    LOG.warn("Exception in ActionStartXCommand ", ex.getMessage(), ex);
198                    handleError(context, wfJob, wfAction);
199                }
200                if(!caught) {
201                    wfAction.setErrorInfo(null, null);
202                    incrActionCounter(wfAction.getType(), 1);
203    
204                    LOG.info("Start action [{0}] with user-retry state : userRetryCount [{1}], userRetryMax [{2}], userRetryInterval [{3}]",
205                                    wfAction.getId(), wfAction.getUserRetryCount(), wfAction.getUserRetryMax(), wfAction
206                                            .getUserRetryInterval());
207    
208                    Instrumentation.Cron cron = new Instrumentation.Cron();
209                    cron.start();
210                    context.setStartTime();
211                    executor.start(context, wfAction);
212                    cron.stop();
213                    FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
214                    addActionCron(wfAction.getType(), cron);
215    
216                    wfAction.setRetries(0);
217                    if (wfAction.isExecutionComplete()) {
218                        if (!context.isExecuted()) {
219                            LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor
220                                    .getType());
221                            wfAction.setErrorInfo(EXEC_DATA_MISSING,
222                                    "Execution Complete, but Execution Data Missing from Action");
223                            failJob(context);
224                        } else {
225                            wfAction.setPending();
226                            queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
227                        }
228                    }
229                    else {
230                        if (!context.isStarted()) {
231                            LOG.warn(XLog.OPS, "Action Started, ActionExecutor [{0}] must call setStartData()", executor
232                                    .getType());
233                            wfAction.setErrorInfo(START_DATA_MISSING, "Execution Started, but Start Data Missing from Action");
234                            failJob(context);
235                        } else {
236                            queue(new NotificationXCommand(wfJob, wfAction));
237                        }
238                    }
239    
240                    LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action status=" + wfAction.getStatusStr());
241    
242                    updateList.add(wfAction);
243                    wfJob.setLastModifiedTime(new Date());
244                    updateList.add(wfJob);
245                    // Add SLA status event (STARTED) for WF_ACTION
246                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.STARTED,
247                            SlaAppType.WORKFLOW_ACTION);
248                    if(slaEvent != null) {
249                        insertList.add(slaEvent);
250                    }
251                    LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action updated in DB!");
252                }
253            }
254            catch (ActionExecutorException ex) {
255                LOG.warn("Error starting action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]",
256                        wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage(), ex);
257                wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
258                switch (ex.getErrorType()) {
259                    case TRANSIENT:
260                        if (!handleTransient(context, executor, WorkflowAction.Status.START_RETRY)) {
261                            handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
262                            wfAction.setPendingAge(new Date());
263                            wfAction.setRetries(0);
264                            wfAction.setStartTime(null);
265                        }
266                        break;
267                    case NON_TRANSIENT:
268                        handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
269                        break;
270                    case ERROR:
271                        handleError(context, executor, WorkflowAction.Status.ERROR.toString(), true,
272                                WorkflowAction.Status.DONE);
273                        break;
274                    case FAILED:
275                        try {
276                            failJob(context);
277                            // update coordinator action
278                            new CoordActionUpdateXCommand(wfJob, 3).call();
279                            new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
280                            SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
281                                    SlaAppType.WORKFLOW_ACTION);
282                            if(slaEvent1 != null) {
283                                insertList.add(slaEvent1);
284                            }
285                            SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED,
286                                    SlaAppType.WORKFLOW_JOB);
287                            if(slaEvent2 != null) {
288                                insertList.add(slaEvent2);
289                            }
290                        }
291                        catch (XException x) {
292                            LOG.warn("ActionStartXCommand - case:FAILED ", x.getMessage());
293                        }
294                        break;
295                }
296                updateList.add(wfAction);
297                wfJob.setLastModifiedTime(new Date());
298                updateList.add(wfJob);
299            }
300            finally {
301                try {
302                    jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
303                }
304                catch (JPAExecutorException e) {
305                    throw new CommandException(e);
306                }
307            }
308    
309            LOG.debug("ENDED ActionStartXCommand for wf actionId=" + actionId + ", jobId=" + jobId);
310    
311            return null;
312        }
313    
314        private void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action)
315                throws CommandException {
316            failJob(context);
317            updateList.add(wfAction);
318            wfJob.setLastModifiedTime(new Date());
319            updateList.add(wfJob);
320            SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(),
321                    Status.FAILED, SlaAppType.WORKFLOW_ACTION);
322            if(slaEvent1 != null) {
323                insertList.add(slaEvent1);
324            }
325            SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(workflow.getSlaXml(), workflow.getId(),
326                    Status.FAILED, SlaAppType.WORKFLOW_JOB);
327            if(slaEvent2 != null) {
328                insertList.add(slaEvent2);
329            }
330            // update coordinator action
331            new CoordActionUpdateXCommand(workflow, 3).call();
332            new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
333            return;
334        }
335    
336        /* (non-Javadoc)
337         * @see org.apache.oozie.command.XCommand#getKey()
338         */
339        @Override
340        public String getKey(){
341            return getName() + "_" + actionId;
342        }
343    
344    }