001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.sql.Timestamp;
021    import java.util.ArrayList;
022    import java.util.Date;
023    import java.util.List;
024    
025    import org.apache.oozie.ErrorCode;
026    import org.apache.oozie.WorkflowActionBean;
027    import org.apache.oozie.WorkflowJobBean;
028    import org.apache.oozie.XException;
029    import org.apache.oozie.action.ActionExecutor;
030    import org.apache.oozie.action.ActionExecutorException;
031    import org.apache.oozie.client.WorkflowAction;
032    import org.apache.oozie.client.WorkflowJob;
033    import org.apache.oozie.client.rest.JsonBean;
034    import org.apache.oozie.command.CommandException;
035    import org.apache.oozie.command.PreconditionException;
036    import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
037    import org.apache.oozie.executor.jpa.JPAExecutorException;
038    import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
039    import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
040    import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
041    import org.apache.oozie.service.ActionCheckerService;
042    import org.apache.oozie.service.ActionService;
043    import org.apache.oozie.service.EventHandlerService;
044    import org.apache.oozie.service.JPAService;
045    import org.apache.oozie.service.Services;
046    import org.apache.oozie.service.UUIDService;
047    import org.apache.oozie.util.Instrumentation;
048    import org.apache.oozie.util.LogUtils;
049    import org.apache.oozie.util.XLog;
050    
051    /**
052     * Executes the check command for ActionHandlers. </p> Ensures the action is in
053     * RUNNING state before executing
054     * {@link ActionExecutor#check(org.apache.oozie.action.ActionExecutor.Context, org.apache.oozie.client.WorkflowAction)}
055     */
056    public class ActionCheckXCommand extends ActionXCommand<Void> {
057        public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
058        private String actionId;
059        private String jobId;
060        private int actionCheckDelay;
061        private WorkflowJobBean wfJob = null;
062        private WorkflowActionBean wfAction = null;
063        private JPAService jpaService = null;
064        private ActionExecutor executor = null;
065        private List<JsonBean> updateList = new ArrayList<JsonBean>();
066        private boolean generateEvent = false;
067    
068        public ActionCheckXCommand(String actionId) {
069            this(actionId, -1);
070        }
071    
072        public ActionCheckXCommand(String actionId, int priority, int checkDelay) {
073            super("action.check", "action.check", priority);
074            this.actionId = actionId;
075            this.actionCheckDelay = checkDelay;
076            this.jobId = Services.get().get(UUIDService.class).getId(actionId);
077        }
078    
079        public ActionCheckXCommand(String actionId, int checkDelay) {
080            this(actionId, 0, checkDelay);
081        }
082    
083        @Override
084        protected void eagerLoadState() throws CommandException {
085            try {
086                jpaService = Services.get().get(JPAService.class);
087                if (jpaService != null) {
088                    this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
089                    this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
090                    LogUtils.setLogInfo(wfJob, logInfo);
091                    LogUtils.setLogInfo(wfAction, logInfo);
092                }
093                else {
094                    throw new CommandException(ErrorCode.E0610);
095                }
096            }
097            catch (XException ex) {
098                throw new CommandException(ex);
099            }
100        }
101    
102        @Override
103        protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
104            if (wfJob == null) {
105                throw new PreconditionException(ErrorCode.E0604, jobId);
106            }
107            if (wfAction == null) {
108                throw new PreconditionException(ErrorCode.E0605, actionId);
109            }
110            // if the action has been updated, quit this command
111            if (actionCheckDelay > 0) {
112                Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000);
113                Timestamp actionLmt = wfAction.getLastCheckTimestamp();
114                if (actionLmt.after(actionCheckTs)) {
115                    throw new PreconditionException(ErrorCode.E0817, actionId);
116                }
117            }
118    
119            executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
120            if (executor == null) {
121                throw new CommandException(ErrorCode.E0802, wfAction.getType());
122            }
123        }
124    
125        @Override
126        protected boolean isLockRequired() {
127            return true;
128        }
129    
130        @Override
131        public String getEntityKey() {
132            return this.jobId;
133        }
134    
135        @Override
136        protected void loadState() throws CommandException {
137            eagerLoadState();
138        }
139    
140        @Override
141        protected void verifyPrecondition() throws CommandException, PreconditionException {
142            if (!wfAction.isPending() || wfAction.getStatus() != WorkflowActionBean.Status.RUNNING) {
143                throw new PreconditionException(ErrorCode.E0815, wfAction.getPending(), wfAction.getStatusStr());
144            }
145            if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
146                wfAction.setLastCheckTime(new Date());
147                try {
148                    jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
149                }
150                catch (JPAExecutorException e) {
151                    throw new CommandException(e);
152                }
153                throw new PreconditionException(ErrorCode.E0818, wfAction.getId(), wfJob.getId(), wfJob.getStatus());
154            }
155        }
156    
157        @Override
158        protected Void execute() throws CommandException {
159            LOG.debug("STARTED ActionCheckXCommand for wf actionId=" + actionId + " priority =" + getPriority());
160    
161            long retryInterval = Services.get().getConf().getLong(ActionCheckerService.CONF_ACTION_CHECK_INTERVAL, executor
162                    .getRetryInterval());
163            executor.setRetryInterval(retryInterval);
164    
165            ActionExecutorContext context = null;
166            try {
167                boolean isRetry = false;
168                if (wfAction.getRetries() > 0) {
169                    isRetry = true;
170                }
171                boolean isUserRetry = false;
172                context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
173                incrActionCounter(wfAction.getType(), 1);
174    
175                Instrumentation.Cron cron = new Instrumentation.Cron();
176                cron.start();
177                executor.check(context, wfAction);
178                cron.stop();
179                addActionCron(wfAction.getType(), cron);
180    
181                if (wfAction.isExecutionComplete()) {
182                    if (!context.isExecuted()) {
183                        LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor
184                                .getType());
185                        wfAction.setErrorInfo(EXEC_DATA_MISSING,
186                                "Execution Complete, but Execution Data Missing from Action");
187                        failJob(context);
188                        generateEvent = true;
189                    } else {
190                        wfAction.setPending();
191                        queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
192                    }
193                }
194                wfAction.setLastCheckTime(new Date());
195                updateList.add(wfAction);
196                wfJob.setLastModifiedTime(new Date());
197                updateList.add(wfJob);
198            }
199            catch (ActionExecutorException ex) {
200                LOG.warn("Exception while executing check(). Error Code [{0}], Message[{1}]", ex.getErrorCode(), ex
201                        .getMessage(), ex);
202    
203                wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
204                switch (ex.getErrorType()) {
205                    case FAILED:
206                        failJob(context, wfAction);
207                        generateEvent = true;
208                        break;
209                    case ERROR:
210                        handleUserRetry(wfAction);
211                        break;
212                    case TRANSIENT:                 // retry N times, then suspend workflow
213                        if (!handleTransient(context, executor, WorkflowAction.Status.RUNNING)) {
214                            handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
215                            generateEvent = true;
216                            wfAction.setPendingAge(new Date());
217                            wfAction.setRetries(0);
218                            wfAction.setStartTime(null);
219                        }
220                        break;
221                }
222                wfAction.setLastCheckTime(new Date());
223                updateList = new ArrayList<JsonBean>();
224                updateList.add(wfAction);
225                wfJob.setLastModifiedTime(new Date());
226                updateList.add(wfJob);
227            }
228            finally {
229                try {
230                    jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
231                    if (generateEvent && EventHandlerService.isEnabled()) {
232                        generateEvent(wfAction, wfJob.getUser());
233                    }
234                }
235                catch (JPAExecutorException e) {
236                    throw new CommandException(e);
237                }
238            }
239    
240            LOG.debug("ENDED ActionCheckXCommand for wf actionId=" + actionId + ", jobId=" + jobId);
241            return null;
242        }
243    
244        protected long getRetryInterval() {
245            return (executor != null) ? executor.getRetryInterval() : ActionExecutor.RETRY_INTERVAL;
246        }
247    
248        @Override
249        public String getKey() {
250            return getName() + "_" + actionId;
251        }
252    
253    }