001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.wf;
020
021import java.sql.Timestamp;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.List;
025
026import org.apache.oozie.ErrorCode;
027import org.apache.oozie.WorkflowActionBean;
028import org.apache.oozie.WorkflowJobBean;
029import org.apache.oozie.action.ActionExecutor;
030import org.apache.oozie.action.ActionExecutorException;
031import org.apache.oozie.client.WorkflowAction;
032import org.apache.oozie.client.WorkflowJob;
033import org.apache.oozie.command.CommandException;
034import org.apache.oozie.command.PreconditionException;
035import org.apache.oozie.executor.jpa.JPAExecutorException;
036import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
037import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
038import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
039import org.apache.oozie.executor.jpa.BatchQueryExecutor;
040import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
041import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
042import org.apache.oozie.service.ActionCheckerService;
043import org.apache.oozie.service.ActionService;
044import org.apache.oozie.service.EventHandlerService;
045import org.apache.oozie.service.Services;
046import org.apache.oozie.service.UUIDService;
047import org.apache.oozie.util.Instrumentation;
048import org.apache.oozie.util.LogUtils;
049import org.apache.oozie.util.XLog;
050
051/**
052 * Executes the check command for ActionHandlers. <p> Ensures the action is in
053 * RUNNING state before executing
054 * {@link ActionExecutor#check(org.apache.oozie.action.ActionExecutor.Context, org.apache.oozie.client.WorkflowAction)}
055 */
056public class ActionCheckXCommand extends ActionXCommand<Void> {
057    public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
058    private String actionId;
059    private String jobId;
060    private int actionCheckDelay;
061    private WorkflowJobBean wfJob = null;
062    private WorkflowActionBean wfAction = null;
063    private ActionExecutor executor = null;
064    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
065    private boolean generateEvent = false;
066
067    public ActionCheckXCommand(String actionId) {
068        this(actionId, -1);
069    }
070
071    public ActionCheckXCommand(String actionId, int priority, int checkDelay) {
072        super("action.check", "action.check", priority);
073        this.actionId = actionId;
074        this.actionCheckDelay = checkDelay;
075        this.jobId = Services.get().get(UUIDService.class).getId(actionId);
076    }
077
078    public ActionCheckXCommand(String actionId, int checkDelay) {
079        this(actionId, 0, checkDelay);
080    }
081
082    @Override
083    protected void setLogInfo() {
084        LogUtils.setLogInfo(actionId);
085    }
086
087    @Override
088    protected void eagerLoadState() throws CommandException {
089        try {
090            this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STATUS, jobId);
091            this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_ID_TYPE_LASTCHECK,
092                    actionId);
093            LogUtils.setLogInfo(wfJob);
094            LogUtils.setLogInfo(wfAction);
095        }
096        catch (JPAExecutorException ex) {
097            throw new CommandException(ex);
098        }
099    }
100
101    @Override
102    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
103        if (wfJob == null) {
104            throw new PreconditionException(ErrorCode.E0604, jobId);
105        }
106        if (wfAction == null) {
107            throw new PreconditionException(ErrorCode.E0605, actionId);
108        }
109        // if the action has been updated, quit this command
110        if (actionCheckDelay > 0) {
111            Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000);
112            Timestamp actionLmt = wfAction.getLastCheckTimestamp();
113            if (actionLmt.after(actionCheckTs)) {
114                throw new PreconditionException(ErrorCode.E0817, actionId);
115            }
116        }
117
118        executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
119        if (executor == null) {
120            throw new CommandException(ErrorCode.E0802, wfAction.getType());
121        }
122    }
123
124    @Override
125    protected boolean isLockRequired() {
126        return true;
127    }
128
129    @Override
130    public String getEntityKey() {
131        return this.jobId;
132    }
133
134    @Override
135    protected void loadState() throws CommandException {
136        try {
137            this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_ACTION_OP, jobId);
138            this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_CHECK,
139                    actionId);
140        }
141        catch (JPAExecutorException e) {
142            throw new CommandException(e);
143        }
144        LogUtils.setLogInfo(wfJob);
145        LogUtils.setLogInfo(wfAction);
146    }
147
148    @Override
149    protected void verifyPrecondition() throws CommandException, PreconditionException {
150        if (!wfAction.isPending() || wfAction.getStatus() != WorkflowActionBean.Status.RUNNING) {
151            throw new PreconditionException(ErrorCode.E0815, wfAction.isPending(), wfAction.getStatusStr());
152        }
153        if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) {
154            wfAction.setLastCheckTime(new Date());
155            try {
156                WorkflowActionQueryExecutor.getInstance().executeUpdate(
157                        WorkflowActionQuery.UPDATE_ACTION_FOR_LAST_CHECKED_TIME, wfAction);
158            }
159            catch (JPAExecutorException e) {
160                throw new CommandException(e);
161            }
162            throw new PreconditionException(ErrorCode.E0818, wfAction.getId(), wfJob.getId(), wfJob.getStatus());
163        }
164    }
165
166    @Override
167    protected Void execute() throws CommandException {
168        LOG.debug("STARTED ActionCheckXCommand for wf actionId=" + actionId + " priority =" + getPriority());
169        ActionExecutorContext context = null;
170        boolean execSynchronous = false;
171        try {
172            boolean isRetry = false;
173            if (wfAction.getRetries() > 0) {
174                isRetry = true;
175            }
176            boolean isUserRetry = false;
177            context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
178            incrActionCounter(wfAction.getType(), 1);
179
180            Instrumentation.Cron cron = new Instrumentation.Cron();
181            cron.start();
182            executor.check(context, wfAction);
183            cron.stop();
184            addActionCron(wfAction.getType(), cron);
185
186            if (wfAction.isExecutionComplete()) {
187                if (!context.isExecuted()) {
188                    LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor
189                            .getType());
190                    wfAction.setErrorInfo(EXEC_DATA_MISSING,
191                            "Execution Complete, but Execution Data Missing from Action");
192                    failJob(context);
193                    generateEvent = true;
194                } else {
195                    wfAction.setPending();
196                    execSynchronous = true;
197                }
198            }
199            wfAction.setLastCheckTime(new Date());
200            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_CHECK, wfAction));
201            wfJob.setLastModifiedTime(new Date());
202            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
203                    wfJob));
204        }
205        catch (ActionExecutorException ex) {
206            LOG.warn("Exception while executing check(). Error Code [{0}], Message[{1}]", ex.getErrorCode(), ex
207                    .getMessage(), ex);
208
209            wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
210            switch (ex.getErrorType()) {
211                case ERROR:
212                    // If allowed to retry, this will handle it; otherwise, we should fall through to FAILED
213                    if (handleUserRetry(wfAction, wfJob)) {
214                        break;
215                    }
216                case FAILED:
217                    failJob(context, wfAction);
218                    generateEvent = true;
219                    break;
220                case TRANSIENT:                 // retry N times, then suspend workflow
221                    if (!handleTransient(context, executor, WorkflowAction.Status.RUNNING)) {
222                        handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
223                        generateEvent = true;
224                        wfAction.setPendingAge(new Date());
225                        wfAction.setRetries(0);
226                        wfAction.setStartTime(null);
227                    }
228                    break;
229            }
230            wfAction.setLastCheckTime(new Date());
231            updateList = new ArrayList<UpdateEntry>();
232            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_CHECK, wfAction));
233            wfJob.setLastModifiedTime(new Date());
234            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
235                    wfJob));
236        }
237        finally {
238            try {
239                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
240                if (generateEvent && EventHandlerService.isEnabled()) {
241                    generateEvent(wfAction, wfJob.getUser());
242                }
243                if (execSynchronous) {
244                    new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call();
245                }
246            }
247            catch (JPAExecutorException e) {
248                throw new CommandException(e);
249            }
250        }
251
252        LOG.debug("ENDED ActionCheckXCommand for wf actionId=" + actionId + ", jobId=" + jobId);
253        return null;
254    }
255
256    protected long getRetryInterval() {
257        return (executor != null) ? executor.getRetryInterval() : ActionExecutor.RETRY_INTERVAL;
258    }
259
260    @Override
261    public String getKey() {
262        return getName() + "_" + actionId;
263    }
264
265}