001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.sql.Timestamp;
021    import java.util.Date;
022    import org.apache.oozie.ErrorCode;
023    import org.apache.oozie.WorkflowActionBean;
024    import org.apache.oozie.WorkflowJobBean;
025    import org.apache.oozie.XException;
026    import org.apache.oozie.action.ActionExecutor;
027    import org.apache.oozie.action.ActionExecutorException;
028    import org.apache.oozie.client.WorkflowJob;
029    import org.apache.oozie.client.WorkflowAction.Status;
030    import org.apache.oozie.command.CommandException;
031    import org.apache.oozie.command.PreconditionException;
032    import org.apache.oozie.executor.jpa.JPAExecutorException;
033    import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
034    import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
035    import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
036    import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
037    import org.apache.oozie.service.ActionService;
038    import org.apache.oozie.service.JPAService;
039    import org.apache.oozie.service.Services;
040    import org.apache.oozie.service.UUIDService;
041    import org.apache.oozie.util.InstrumentUtils;
042    import org.apache.oozie.util.Instrumentation;
043    import org.apache.oozie.util.LogUtils;
044    import org.apache.oozie.util.XLog;
045    
046    /**
047     * Executes the check command for ActionHandlers. </p> Ensures the action is in
048     * RUNNING state before executing
049     * {@link ActionExecutor#check(org.apache.oozie.action.ActionExecutor.Context, org.apache.oozie.client.WorkflowAction)}
050     */
051    public class ActionCheckXCommand extends ActionXCommand<Void> {
052        public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
053        private String actionId;
054        private String jobId;
055        private int actionCheckDelay;
056        private WorkflowJobBean wfJob = null;
057        private WorkflowActionBean wfAction = null;
058        private JPAService jpaService = null;
059        private ActionExecutor executor = null;
060    
061        public ActionCheckXCommand(String actionId) {
062            this(actionId, -1);
063        }
064    
065        public ActionCheckXCommand(String actionId, int priority, int checkDelay) {
066            super("action.check", "action.check", priority);
067            this.actionId = actionId;
068            this.actionCheckDelay = checkDelay;
069            this.jobId = Services.get().get(UUIDService.class).getId(actionId);
070        }
071    
072        public ActionCheckXCommand(String actionId, int checkDelay) {
073            this(actionId, 0, checkDelay);
074        }
075    
076        @Override
077        protected void eagerLoadState() throws CommandException {
078            try {
079                jpaService = Services.get().get(JPAService.class);
080                if (jpaService != null) {
081                    this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
082                    this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
083                    LogUtils.setLogInfo(wfJob, logInfo);
084                    LogUtils.setLogInfo(wfAction, logInfo);
085                }
086                else {
087                    throw new CommandException(ErrorCode.E0610);
088                }
089            }
090            catch (XException ex) {
091                throw new CommandException(ex);
092            }
093        }
094    
095        @Override
096        protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
097            if (wfJob == null) {
098                throw new PreconditionException(ErrorCode.E0604, jobId);
099            }
100            if (wfAction == null) {
101                throw new PreconditionException(ErrorCode.E0605, actionId);
102            }
103            // if the action has been updated, quit this command
104            if (actionCheckDelay > 0) {
105                Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000);
106                Timestamp actionLmt = wfAction.getLastCheckTimestamp();
107                if (actionLmt.after(actionCheckTs)) {
108                    throw new PreconditionException(ErrorCode.E0817, actionId);
109                }
110            }
111    
112            executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
113            if (executor == null) {
114                throw new CommandException(ErrorCode.E0802, wfAction.getType());
115            }
116        }
117    
118        @Override
119        protected boolean isLockRequired() {
120            return true;
121        }
122    
123        @Override
124        protected String getEntityKey() {
125            return this.jobId;
126        }
127    
128        @Override
129        protected void loadState() throws CommandException {
130        }
131    
132        @Override
133        protected void verifyPrecondition() throws CommandException, PreconditionException {
134            if (!wfAction.isPending() || wfAction.getStatus() != WorkflowActionBean.Status.RUNNING) {
135                throw new PreconditionException(ErrorCode.E0815, wfAction.getPending(), wfAction.getStatusStr());
136            }
137            if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
138                wfAction.setLastCheckTime(new Date());
139                try {
140                    jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
141                }
142                catch (JPAExecutorException e) {
143                    throw new CommandException(e);
144                }
145                throw new PreconditionException(ErrorCode.E0818, wfAction.getId(), wfJob.getId(), wfJob.getStatus());
146            }
147        }
148    
149        @Override
150        protected Void execute() throws CommandException {
151            LOG.debug("STARTED ActionCheckXCommand for wf actionId=" + actionId + " priority =" + getPriority());
152    
153            ActionExecutorContext context = null;
154            try {
155                boolean isRetry = false;
156                boolean isUserRetry = false;
157                context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
158                incrActionCounter(wfAction.getType(), 1);
159    
160                Instrumentation.Cron cron = new Instrumentation.Cron();
161                cron.start();
162                executor.check(context, wfAction);
163                cron.stop();
164                addActionCron(wfAction.getType(), cron);
165    
166                if (wfAction.isExecutionComplete()) {
167                    if (!context.isExecuted()) {
168                        LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor
169                                .getType());
170                        wfAction.setErrorInfo(EXEC_DATA_MISSING,
171                                "Execution Complete, but Execution Data Missing from Action");
172                        failJob(context);
173                        wfAction.setLastCheckTime(new Date());
174                        jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
175                        jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
176                        return null;
177                    }
178                    wfAction.setPending();
179                    queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
180                }
181                wfAction.setLastCheckTime(new Date());
182                jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
183                jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
184            }
185            catch (ActionExecutorException ex) {
186                LOG.warn("Exception while executing check(). Error Code [{0}], Message[{1}]", ex.getErrorCode(), ex
187                        .getMessage(), ex);
188    
189                wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
190    
191                switch (ex.getErrorType()) {
192                    case FAILED:
193                        failAction(wfJob, wfAction);
194                        break;
195                    case ERROR:
196                        handleUserRetry(wfAction);
197                        break;
198                }
199                wfAction.setLastCheckTime(new Date());
200                try {
201                    jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
202                    jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
203                }
204                catch (JPAExecutorException e) {
205                    throw new CommandException(e);
206                }
207                return null;
208            }
209            catch (JPAExecutorException e) {
210                throw new CommandException(e);
211            }
212    
213            LOG.debug("ENDED ActionCheckXCommand for wf actionId=" + actionId + ", jobId=" + jobId);
214            return null;
215        }
216    
217        private void failAction(WorkflowJobBean workflow, WorkflowActionBean action) throws CommandException {
218            if (!handleUserRetry(action)) {
219                LOG.warn("Failing Job [{0}] due to failed action [{1}]", workflow.getId(), action.getId());
220                action.resetPending();
221                action.setStatus(Status.FAILED);
222                workflow.setStatus(WorkflowJob.Status.FAILED);
223                InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1, getInstrumentation());
224            }
225        }
226    }