001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.sql.Timestamp;
021    import java.util.ArrayList;
022    import java.util.Date;
023    import java.util.List;
024    
025    import org.apache.oozie.ErrorCode;
026    import org.apache.oozie.WorkflowActionBean;
027    import org.apache.oozie.WorkflowJobBean;
028    import org.apache.oozie.XException;
029    import org.apache.oozie.action.ActionExecutor;
030    import org.apache.oozie.action.ActionExecutorException;
031    import org.apache.oozie.client.WorkflowAction;
032    import org.apache.oozie.client.WorkflowJob;
033    import org.apache.oozie.client.WorkflowAction.Status;
034    import org.apache.oozie.client.rest.JsonBean;
035    import org.apache.oozie.command.CommandException;
036    import org.apache.oozie.command.PreconditionException;
037    import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
038    import org.apache.oozie.executor.jpa.JPAExecutorException;
039    import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
040    import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
041    import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
042    import org.apache.oozie.service.ActionService;
043    import org.apache.oozie.service.JPAService;
044    import org.apache.oozie.service.Services;
045    import org.apache.oozie.service.UUIDService;
046    import org.apache.oozie.util.InstrumentUtils;
047    import org.apache.oozie.util.Instrumentation;
048    import org.apache.oozie.util.LogUtils;
049    import org.apache.oozie.util.XLog;
050    
051    /**
052     * Executes the check command for ActionHandlers. </p> Ensures the action is in
053     * RUNNING state before executing
054     * {@link ActionExecutor#check(org.apache.oozie.action.ActionExecutor.Context, org.apache.oozie.client.WorkflowAction)}
055     */
056    public class ActionCheckXCommand extends ActionXCommand<Void> {
057        public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
058        private String actionId;
059        private String jobId;
060        private int actionCheckDelay;
061        private WorkflowJobBean wfJob = null;
062        private WorkflowActionBean wfAction = null;
063        private JPAService jpaService = null;
064        private ActionExecutor executor = null;
065        private List<JsonBean> updateList = new ArrayList<JsonBean>();
066    
067        public ActionCheckXCommand(String actionId) {
068            this(actionId, -1);
069        }
070    
071        public ActionCheckXCommand(String actionId, int priority, int checkDelay) {
072            super("action.check", "action.check", priority);
073            this.actionId = actionId;
074            this.actionCheckDelay = checkDelay;
075            this.jobId = Services.get().get(UUIDService.class).getId(actionId);
076        }
077    
078        public ActionCheckXCommand(String actionId, int checkDelay) {
079            this(actionId, 0, checkDelay);
080        }
081    
082        @Override
083        protected void eagerLoadState() throws CommandException {
084            try {
085                jpaService = Services.get().get(JPAService.class);
086                if (jpaService != null) {
087                    this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
088                    this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
089                    LogUtils.setLogInfo(wfJob, logInfo);
090                    LogUtils.setLogInfo(wfAction, logInfo);
091                }
092                else {
093                    throw new CommandException(ErrorCode.E0610);
094                }
095            }
096            catch (XException ex) {
097                throw new CommandException(ex);
098            }
099        }
100    
101        @Override
102        protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
103            if (wfJob == null) {
104                throw new PreconditionException(ErrorCode.E0604, jobId);
105            }
106            if (wfAction == null) {
107                throw new PreconditionException(ErrorCode.E0605, actionId);
108            }
109            // if the action has been updated, quit this command
110            if (actionCheckDelay > 0) {
111                Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000);
112                Timestamp actionLmt = wfAction.getLastCheckTimestamp();
113                if (actionLmt.after(actionCheckTs)) {
114                    throw new PreconditionException(ErrorCode.E0817, actionId);
115                }
116            }
117    
118            executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
119            if (executor == null) {
120                throw new CommandException(ErrorCode.E0802, wfAction.getType());
121            }
122        }
123    
124        @Override
125        protected boolean isLockRequired() {
126            return true;
127        }
128    
129        @Override
130        public String getEntityKey() {
131            return this.jobId;
132        }
133    
134        @Override
135        protected void loadState() throws CommandException {
136        }
137    
138        @Override
139        protected void verifyPrecondition() throws CommandException, PreconditionException {
140            if (!wfAction.isPending() || wfAction.getStatus() != WorkflowActionBean.Status.RUNNING) {
141                throw new PreconditionException(ErrorCode.E0815, wfAction.getPending(), wfAction.getStatusStr());
142            }
143            if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
144                wfAction.setLastCheckTime(new Date());
145                try {
146                    jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
147                }
148                catch (JPAExecutorException e) {
149                    throw new CommandException(e);
150                }
151                throw new PreconditionException(ErrorCode.E0818, wfAction.getId(), wfJob.getId(), wfJob.getStatus());
152            }
153        }
154    
155        @Override
156        protected Void execute() throws CommandException {
157            LOG.debug("STARTED ActionCheckXCommand for wf actionId=" + actionId + " priority =" + getPriority());
158    
159            ActionExecutorContext context = null;
160            try {
161                boolean isRetry = false;
162                if (wfAction.getRetries() > 0) {
163                    isRetry = true;
164                }
165                boolean isUserRetry = false;
166                context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
167                incrActionCounter(wfAction.getType(), 1);
168    
169                Instrumentation.Cron cron = new Instrumentation.Cron();
170                cron.start();
171                executor.check(context, wfAction);
172                cron.stop();
173                addActionCron(wfAction.getType(), cron);
174    
175                if (wfAction.isExecutionComplete()) {
176                    if (!context.isExecuted()) {
177                        LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor
178                                .getType());
179                        wfAction.setErrorInfo(EXEC_DATA_MISSING,
180                                "Execution Complete, but Execution Data Missing from Action");
181                        failJob(context);
182                    } else {
183                        wfAction.setPending();
184                        queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
185                    }
186                }
187                wfAction.setLastCheckTime(new Date());
188                updateList.add(wfAction);
189                wfJob.setLastModifiedTime(new Date());
190                updateList.add(wfJob);
191            }
192            catch (ActionExecutorException ex) {
193                LOG.warn("Exception while executing check(). Error Code [{0}], Message[{1}]", ex.getErrorCode(), ex
194                        .getMessage(), ex);
195    
196                wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
197    
198                switch (ex.getErrorType()) {
199                    case FAILED:
200                        failAction(wfJob, wfAction);
201                        break;
202                    case ERROR:
203                        handleUserRetry(wfAction);
204                        break;
205                    case TRANSIENT:                 // retry N times, then suspend workflow
206                        if (!handleTransient(context, executor, WorkflowAction.Status.RUNNING)) {
207                            handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
208                            wfAction.setPendingAge(new Date());
209                            wfAction.setRetries(0);
210                            wfAction.setStartTime(null);
211                        }
212                        break;
213                }
214                wfAction.setLastCheckTime(new Date());
215                updateList = new ArrayList<JsonBean>();
216                updateList.add(wfAction);
217                wfJob.setLastModifiedTime(new Date());
218                updateList.add(wfJob);
219            }
220            finally {
221                try {
222                    jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
223                }
224                catch (JPAExecutorException e) {
225                    throw new CommandException(e);
226                }
227            }
228    
229            LOG.debug("ENDED ActionCheckXCommand for wf actionId=" + actionId + ", jobId=" + jobId);
230            return null;
231        }
232    
233        private void failAction(WorkflowJobBean workflow, WorkflowActionBean action) throws CommandException {
234            if (!handleUserRetry(action)) {
235                LOG.warn("Failing Job [{0}] due to failed action [{1}]", workflow.getId(), action.getId());
236                action.resetPending();
237                action.setStatus(Status.FAILED);
238                workflow.setStatus(WorkflowJob.Status.FAILED);
239                InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1, getInstrumentation());
240            }
241        }
242    }