001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.util.ArrayList;
021    import java.util.Date;
022    import java.util.List;
023    
024    import org.apache.hadoop.conf.Configuration;
025    import org.apache.oozie.DagELFunctions;
026    import org.apache.oozie.ErrorCode;
027    import org.apache.oozie.SLAEventBean;
028    import org.apache.oozie.WorkflowActionBean;
029    import org.apache.oozie.WorkflowJobBean;
030    import org.apache.oozie.XException;
031    import org.apache.oozie.action.ActionExecutor;
032    import org.apache.oozie.action.ActionExecutorException;
033    import org.apache.oozie.action.control.ControlNodeActionExecutor;
034    import org.apache.oozie.client.OozieClient;
035    import org.apache.oozie.client.WorkflowAction;
036    import org.apache.oozie.client.WorkflowJob;
037    import org.apache.oozie.client.SLAEvent.SlaAppType;
038    import org.apache.oozie.client.SLAEvent.Status;
039    import org.apache.oozie.client.rest.JsonBean;
040    import org.apache.oozie.command.CommandException;
041    import org.apache.oozie.command.PreconditionException;
042    import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
043    import org.apache.oozie.executor.jpa.JPAExecutorException;
044    import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
045    import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
046    import org.apache.oozie.service.ActionService;
047    import org.apache.oozie.service.JPAService;
048    import org.apache.oozie.service.Services;
049    import org.apache.oozie.service.UUIDService;
050    import org.apache.oozie.util.Instrumentation;
051    import org.apache.oozie.util.LogUtils;
052    import org.apache.oozie.util.XLog;
053    import org.apache.oozie.util.db.SLADbXOperations;
054    import org.apache.oozie.workflow.WorkflowInstance;
055    
056    public class ActionEndXCommand extends ActionXCommand<Void> {
057        public static final String COULD_NOT_END = "COULD_NOT_END";
058        public static final String END_DATA_MISSING = "END_DATA_MISSING";
059    
060        private String jobId = null;
061        private String actionId = null;
062        private WorkflowJobBean wfJob = null;
063        private WorkflowActionBean wfAction = null;
064        private JPAService jpaService = null;
065        private ActionExecutor executor = null;
066        private List<JsonBean> updateList = new ArrayList<JsonBean>();
067        private List<JsonBean> insertList = new ArrayList<JsonBean>();
068    
069        public ActionEndXCommand(String actionId, String type) {
070            super("action.end", type, 0);
071            this.actionId = actionId;
072            this.jobId = Services.get().get(UUIDService.class).getId(actionId);
073        }
074    
075        @Override
076        protected boolean isLockRequired() {
077            return true;
078        }
079    
080        @Override
081        public String getEntityKey() {
082            return this.jobId;
083        }
084    
085        @Override
086        protected void loadState() throws CommandException {
087            try {
088                jpaService = Services.get().get(JPAService.class);
089                if (jpaService != null) {
090                    this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
091                    this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
092                    LogUtils.setLogInfo(wfJob, logInfo);
093                    LogUtils.setLogInfo(wfAction, logInfo);
094                }
095                else {
096                    throw new CommandException(ErrorCode.E0610);
097                }
098            }
099            catch (XException ex) {
100                throw new CommandException(ex);
101            }
102        }
103    
104        @Override
105        protected void verifyPrecondition() throws CommandException, PreconditionException {
106            if (wfJob == null) {
107                throw new PreconditionException(ErrorCode.E0604, jobId);
108            }
109            if (wfAction == null) {
110                throw new PreconditionException(ErrorCode.E0605, actionId);
111            }
112            if (wfAction.isPending()
113                    && (wfAction.getStatus() == WorkflowActionBean.Status.DONE
114                            || wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL)) {
115    
116                if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
117                    throw new PreconditionException(ErrorCode.E0811,  WorkflowJob.Status.RUNNING.toString());
118                }
119            }
120            else {
121                throw new PreconditionException(ErrorCode.E0812, wfAction.getPending(), wfAction.getStatusStr());
122            }
123    
124            executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
125            if (executor == null) {
126                throw new CommandException(ErrorCode.E0802, wfAction.getType());
127            }
128        }
129    
130        @Override
131        protected Void execute() throws CommandException {
132            LOG.debug("STARTED ActionEndXCommand for action " + actionId);
133    
134            Configuration conf = wfJob.getWorkflowInstance().getConf();
135    
136            int maxRetries = 0;
137            long retryInterval = 0;
138    
139            if (!(executor instanceof ControlNodeActionExecutor)) {
140                maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
141                retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
142            }
143    
144            executor.setMaxRetries(maxRetries);
145            executor.setRetryInterval(retryInterval);
146    
147            boolean isRetry = false;
148            if (wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY
149                    || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
150                isRetry = true;
151            }
152            boolean isUserRetry = false;
153            ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
154            try {
155    
156                LOG.debug(
157                        "End, name [{0}] type [{1}] status[{2}] external status [{3}] signal value [{4}]",
158                        wfAction.getName(), wfAction.getType(), wfAction.getStatus(), wfAction.getExternalStatus(),
159                        wfAction.getSignalValue());
160    
161                Instrumentation.Cron cron = new Instrumentation.Cron();
162                cron.start();
163                executor.end(context, wfAction);
164                cron.stop();
165                addActionCron(wfAction.getType(), cron);
166    
167                WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
168                DagELFunctions.setActionInfo(wfInstance, wfAction);
169                wfJob.setWorkflowInstance(wfInstance);
170                incrActionCounter(wfAction.getType(), 1);
171    
172                if (!context.isEnded()) {
173                    LOG.warn(XLog.OPS, "Action Ended, ActionExecutor [{0}] must call setEndData()",
174                            executor.getType());
175                    wfAction.setErrorInfo(END_DATA_MISSING, "Execution Ended, but End Data Missing from Action");
176                    failJob(context);
177                } else {
178                    wfAction.setRetries(0);
179                    wfAction.setEndTime(new Date());
180        
181                    boolean shouldHandleUserRetry = false;
182                    Status slaStatus = null;
183                    switch (wfAction.getStatus()) {
184                        case OK:
185                            slaStatus = Status.SUCCEEDED;
186                            break;
187                        case KILLED:
188                            slaStatus = Status.KILLED;
189                            break;
190                        case FAILED:
191                            slaStatus = Status.FAILED;
192                            shouldHandleUserRetry = true;
193                            break;
194                        case ERROR:
195                            LOG.info("ERROR is considered as FAILED for SLA");
196                            slaStatus = Status.KILLED;
197                            shouldHandleUserRetry = true;
198                            break;
199                        default:
200                            slaStatus = Status.FAILED;
201                            shouldHandleUserRetry = true;
202                            break;
203                    }
204                    if (!shouldHandleUserRetry || !handleUserRetry(wfAction)) {
205                        SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION);
206                        LOG.debug("Queuing commands for action=" + actionId + ", status=" + wfAction.getStatus()
207                                + ", Set pending=" + wfAction.getPending());
208                        if(slaEvent != null) {
209                            insertList.add(slaEvent);
210                        }
211                        queue(new SignalXCommand(jobId, actionId));
212                    }
213                }
214                updateList.add(wfAction);
215                wfJob.setLastModifiedTime(new Date());
216                updateList.add(wfJob);
217            }
218            catch (ActionExecutorException ex) {
219                LOG.warn(
220                        "Error ending action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]",
221                        wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage());
222                wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
223                wfAction.setEndTime(null);
224    
225                switch (ex.getErrorType()) {
226                    case TRANSIENT:
227                        if (!handleTransient(context, executor, WorkflowAction.Status.END_RETRY)) {
228                            handleNonTransient(context, executor, WorkflowAction.Status.END_MANUAL);
229                            wfAction.setPendingAge(new Date());
230                            wfAction.setRetries(0);
231                        }
232                        wfAction.setEndTime(null);
233                        break;
234                    case NON_TRANSIENT:
235                        handleNonTransient(context, executor, WorkflowAction.Status.END_MANUAL);
236                        wfAction.setEndTime(null);
237                        break;
238                    case ERROR:
239                        handleError(context, executor, COULD_NOT_END, false, WorkflowAction.Status.ERROR);
240                        queue(new SignalXCommand(jobId, actionId));
241                        break;
242                    case FAILED:
243                        failJob(context);
244                        break;
245                }
246    
247                WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
248                DagELFunctions.setActionInfo(wfInstance, wfAction);
249                wfJob.setWorkflowInstance(wfInstance);
250    
251                updateList.add(wfAction);
252                wfJob.setLastModifiedTime(new Date());
253                updateList.add(wfJob);
254            }
255            finally {
256                try {
257                    jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
258                }
259                catch (JPAExecutorException e) {
260                    throw new CommandException(e);
261                }
262            }
263    
264            LOG.debug("ENDED ActionEndXCommand for action " + actionId);
265            return null;
266        }
267    
268    }