001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.util.Date;
021    
022    import org.apache.hadoop.conf.Configuration;
023    import org.apache.oozie.DagELFunctions;
024    import org.apache.oozie.ErrorCode;
025    import org.apache.oozie.WorkflowActionBean;
026    import org.apache.oozie.WorkflowJobBean;
027    import org.apache.oozie.XException;
028    import org.apache.oozie.action.ActionExecutor;
029    import org.apache.oozie.action.ActionExecutorException;
030    import org.apache.oozie.client.OozieClient;
031    import org.apache.oozie.client.WorkflowAction;
032    import org.apache.oozie.client.WorkflowJob;
033    import org.apache.oozie.client.SLAEvent.SlaAppType;
034    import org.apache.oozie.client.SLAEvent.Status;
035    import org.apache.oozie.command.CommandException;
036    import org.apache.oozie.command.PreconditionException;
037    import org.apache.oozie.executor.jpa.JPAExecutorException;
038    import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
039    import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
040    import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
041    import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
042    import org.apache.oozie.service.ActionService;
043    import org.apache.oozie.service.JPAService;
044    import org.apache.oozie.service.Services;
045    import org.apache.oozie.service.UUIDService;
046    import org.apache.oozie.util.Instrumentation;
047    import org.apache.oozie.util.LogUtils;
048    import org.apache.oozie.util.XLog;
049    import org.apache.oozie.util.db.SLADbXOperations;
050    import org.apache.oozie.workflow.WorkflowInstance;
051    
052    public class ActionEndXCommand extends ActionXCommand<Void> {
053        public static final String COULD_NOT_END = "COULD_NOT_END";
054        public static final String END_DATA_MISSING = "END_DATA_MISSING";
055    
056        private String jobId = null;
057        private String actionId = null;
058        private WorkflowJobBean wfJob = null;
059        private WorkflowActionBean wfAction = null;
060        private JPAService jpaService = null;
061        private ActionExecutor executor = null;
062    
063        public ActionEndXCommand(String actionId, String type) {
064            super("action.end", type, 0);
065            this.actionId = actionId;
066            this.jobId = Services.get().get(UUIDService.class).getId(actionId);
067        }
068    
069        @Override
070        protected boolean isLockRequired() {
071            return true;
072        }
073    
074        @Override
075        public String getEntityKey() {
076            return this.jobId;
077        }
078    
079        @Override
080        protected void loadState() throws CommandException {
081            try {
082                jpaService = Services.get().get(JPAService.class);
083                if (jpaService != null) {
084                    this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
085                    this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
086                    LogUtils.setLogInfo(wfJob, logInfo);
087                    LogUtils.setLogInfo(wfAction, logInfo);
088                }
089                else {
090                    throw new CommandException(ErrorCode.E0610);
091                }
092            }
093            catch (XException ex) {
094                throw new CommandException(ex);
095            }
096        }
097    
098        @Override
099        protected void verifyPrecondition() throws CommandException, PreconditionException {
100            if (wfJob == null) {
101                throw new PreconditionException(ErrorCode.E0604, jobId);
102            }
103            if (wfAction == null) {
104                throw new PreconditionException(ErrorCode.E0605, actionId);
105            }
106            if (wfAction.isPending()
107                    && (wfAction.getStatus() == WorkflowActionBean.Status.DONE
108                            || wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL)) {
109    
110                if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
111                    throw new PreconditionException(ErrorCode.E0811,  WorkflowJob.Status.RUNNING.toString());
112                }
113            }
114            else {
115                throw new PreconditionException(ErrorCode.E0812, wfAction.getPending(), wfAction.getStatusStr());
116            }
117    
118            executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
119            if (executor == null) {
120                throw new CommandException(ErrorCode.E0802, wfAction.getType());
121            }
122        }
123    
124        @Override
125        protected Void execute() throws CommandException {
126            LOG.debug("STARTED ActionEndXCommand for action " + actionId);
127    
128            Configuration conf = wfJob.getWorkflowInstance().getConf();
129            int maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
130            long retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
131            executor.setMaxRetries(maxRetries);
132            executor.setRetryInterval(retryInterval);
133    
134            boolean isRetry = false;
135            if (wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY
136                    || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
137                isRetry = true;
138            }
139            boolean isUserRetry = false;
140            ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
141            try {
142    
143                LOG.debug(
144                        "End, name [{0}] type [{1}] status[{2}] external status [{3}] signal value [{4}]",
145                        wfAction.getName(), wfAction.getType(), wfAction.getStatus(), wfAction.getExternalStatus(),
146                        wfAction.getSignalValue());
147    
148                Instrumentation.Cron cron = new Instrumentation.Cron();
149                cron.start();
150                executor.end(context, wfAction);
151                cron.stop();
152                addActionCron(wfAction.getType(), cron);
153    
154                WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
155                DagELFunctions.setActionInfo(wfInstance, wfAction);
156                wfJob.setWorkflowInstance(wfInstance);
157                incrActionCounter(wfAction.getType(), 1);
158    
159                if (!context.isEnded()) {
160                    LOG.warn(XLog.OPS, "Action Ended, ActionExecutor [{0}] must call setEndData()",
161                            executor.getType());
162                    wfAction.setErrorInfo(END_DATA_MISSING, "Execution Ended, but End Data Missing from Action");
163                    failJob(context);
164                    jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
165                    jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
166                    return null;
167                }
168                wfAction.setRetries(0);
169                wfAction.setEndTime(new Date());
170    
171                boolean shouldHandleUserRetry = false;
172                Status slaStatus = null;
173                switch (wfAction.getStatus()) {
174                    case OK:
175                        slaStatus = Status.SUCCEEDED;
176                        break;
177                    case KILLED:
178                        slaStatus = Status.KILLED;
179                        break;
180                    case FAILED:
181                        slaStatus = Status.FAILED;
182                        shouldHandleUserRetry = true;
183                        break;
184                    case ERROR:
185                        LOG.info("ERROR is considered as FAILED for SLA");
186                        slaStatus = Status.KILLED;
187                        shouldHandleUserRetry = true;
188                        break;
189                    default:
190                        slaStatus = Status.FAILED;
191                        shouldHandleUserRetry = true;
192                        break;
193                }
194                if (!shouldHandleUserRetry || !handleUserRetry(wfAction)) {
195                    SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION);
196                    LOG.debug(
197                            "Queuing commands for action=" + actionId + ", status=" + wfAction.getStatus()
198                            + ", Set pending=" + wfAction.getPending());
199                    queue(new SignalXCommand(jobId, actionId));
200                }
201    
202                jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
203                jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
204            }
205            catch (ActionExecutorException ex) {
206                LOG.warn(
207                        "Error ending action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]",
208                        wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage());
209                wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
210                wfAction.setEndTime(null);
211    
212                switch (ex.getErrorType()) {
213                    case TRANSIENT:
214                        if (!handleTransient(context, executor, WorkflowAction.Status.END_RETRY)) {
215                            handleNonTransient(context, executor, WorkflowAction.Status.END_MANUAL);
216                            wfAction.setPendingAge(new Date());
217                            wfAction.setRetries(0);
218                        }
219                        wfAction.setEndTime(null);
220                        break;
221                    case NON_TRANSIENT:
222                        handleNonTransient(context, executor, WorkflowAction.Status.END_MANUAL);
223                        wfAction.setEndTime(null);
224                        break;
225                    case ERROR:
226                        handleError(context, executor, COULD_NOT_END, false, WorkflowAction.Status.ERROR);
227                        queue(new SignalXCommand(jobId, actionId));
228                        break;
229                    case FAILED:
230                        failJob(context);
231                        break;
232                }
233    
234                WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
235                DagELFunctions.setActionInfo(wfInstance, wfAction);
236                wfJob.setWorkflowInstance(wfInstance);
237    
238                try {
239                    jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
240                    jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
241                }
242                catch (JPAExecutorException je) {
243                    throw new CommandException(je);
244                }
245    
246            }
247            catch (JPAExecutorException je) {
248                throw new CommandException(je);
249            }
250    
251    
252            LOG.debug("ENDED ActionEndXCommand for action " + actionId);
253            return null;
254        }
255    
256    }