001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.wf;
019
020import java.sql.Timestamp;
021import java.util.ArrayList;
022import java.util.Date;
023import java.util.List;
024
025import org.apache.oozie.ErrorCode;
026import org.apache.oozie.WorkflowActionBean;
027import org.apache.oozie.WorkflowJobBean;
028import org.apache.oozie.action.ActionExecutor;
029import org.apache.oozie.action.ActionExecutorException;
030import org.apache.oozie.client.WorkflowAction;
031import org.apache.oozie.client.WorkflowJob;
032import org.apache.oozie.command.CommandException;
033import org.apache.oozie.command.PreconditionException;
034import org.apache.oozie.executor.jpa.JPAExecutorException;
035import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
036import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
037import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
038import org.apache.oozie.executor.jpa.BatchQueryExecutor;
039import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
040import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
041import org.apache.oozie.service.ActionCheckerService;
042import org.apache.oozie.service.ActionService;
043import org.apache.oozie.service.EventHandlerService;
044import org.apache.oozie.service.Services;
045import org.apache.oozie.service.UUIDService;
046import org.apache.oozie.util.Instrumentation;
047import org.apache.oozie.util.LogUtils;
048import org.apache.oozie.util.XLog;
049
050/**
051 * Executes the check command for ActionHandlers. </p> Ensures the action is in
052 * RUNNING state before executing
053 * {@link ActionExecutor#check(org.apache.oozie.action.ActionExecutor.Context, org.apache.oozie.client.WorkflowAction)}
054 */
055public class ActionCheckXCommand extends ActionXCommand<Void> {
056    public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
057    private String actionId;
058    private String jobId;
059    private int actionCheckDelay;
060    private WorkflowJobBean wfJob = null;
061    private WorkflowActionBean wfAction = null;
062    private ActionExecutor executor = null;
063    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
064    private boolean generateEvent = false;
065
066    public ActionCheckXCommand(String actionId) {
067        this(actionId, -1);
068    }
069
070    public ActionCheckXCommand(String actionId, int priority, int checkDelay) {
071        super("action.check", "action.check", priority);
072        this.actionId = actionId;
073        this.actionCheckDelay = checkDelay;
074        this.jobId = Services.get().get(UUIDService.class).getId(actionId);
075    }
076
077    public ActionCheckXCommand(String actionId, int checkDelay) {
078        this(actionId, 0, checkDelay);
079    }
080
081    @Override
082    protected void eagerLoadState() throws CommandException {
083        try {
084            this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STATUS, jobId);
085            this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_ID_TYPE_LASTCHECK,
086                    actionId);
087            LogUtils.setLogInfo(wfJob, logInfo);
088            LogUtils.setLogInfo(wfAction, logInfo);
089        }
090        catch (JPAExecutorException ex) {
091            throw new CommandException(ex);
092        }
093    }
094
095    @Override
096    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
097        if (wfJob == null) {
098            throw new PreconditionException(ErrorCode.E0604, jobId);
099        }
100        if (wfAction == null) {
101            throw new PreconditionException(ErrorCode.E0605, actionId);
102        }
103        // if the action has been updated, quit this command
104        if (actionCheckDelay > 0) {
105            Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000);
106            Timestamp actionLmt = wfAction.getLastCheckTimestamp();
107            if (actionLmt.after(actionCheckTs)) {
108                throw new PreconditionException(ErrorCode.E0817, actionId);
109            }
110        }
111
112        executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
113        if (executor == null) {
114            throw new CommandException(ErrorCode.E0802, wfAction.getType());
115        }
116    }
117
118    @Override
119    protected boolean isLockRequired() {
120        return true;
121    }
122
123    @Override
124    public String getEntityKey() {
125        return this.jobId;
126    }
127
128    @Override
129    protected void loadState() throws CommandException {
130        try {
131            this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_ACTION_OP, jobId);
132            this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_CHECK,
133                    actionId);
134        }
135        catch (JPAExecutorException e) {
136            throw new CommandException(e);
137        }
138        LogUtils.setLogInfo(wfJob, logInfo);
139        LogUtils.setLogInfo(wfAction, logInfo);
140    }
141
142    @Override
143    protected void verifyPrecondition() throws CommandException, PreconditionException {
144        if (!wfAction.isPending() || wfAction.getStatus() != WorkflowActionBean.Status.RUNNING) {
145            throw new PreconditionException(ErrorCode.E0815, wfAction.isPending(), wfAction.getStatusStr());
146        }
147        if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) {
148            wfAction.setLastCheckTime(new Date());
149            try {
150                WorkflowActionQueryExecutor.getInstance().executeUpdate(
151                        WorkflowActionQuery.UPDATE_ACTION_FOR_LAST_CHECKED_TIME, wfAction);
152            }
153            catch (JPAExecutorException e) {
154                throw new CommandException(e);
155            }
156            throw new PreconditionException(ErrorCode.E0818, wfAction.getId(), wfJob.getId(), wfJob.getStatus());
157        }
158    }
159
160    @Override
161    protected Void execute() throws CommandException {
162        LOG.debug("STARTED ActionCheckXCommand for wf actionId=" + actionId + " priority =" + getPriority());
163
164        long retryInterval = Services.get().getConf().getLong(ActionCheckerService.CONF_ACTION_CHECK_INTERVAL, executor
165                .getRetryInterval());
166        executor.setRetryInterval(retryInterval);
167
168        ActionExecutorContext context = null;
169        boolean execSynchronous = false;
170        try {
171            boolean isRetry = false;
172            if (wfAction.getRetries() > 0) {
173                isRetry = true;
174            }
175            boolean isUserRetry = false;
176            context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
177            incrActionCounter(wfAction.getType(), 1);
178
179            Instrumentation.Cron cron = new Instrumentation.Cron();
180            cron.start();
181            executor.check(context, wfAction);
182            cron.stop();
183            addActionCron(wfAction.getType(), cron);
184
185            if (wfAction.isExecutionComplete()) {
186                if (!context.isExecuted()) {
187                    LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor
188                            .getType());
189                    wfAction.setErrorInfo(EXEC_DATA_MISSING,
190                            "Execution Complete, but Execution Data Missing from Action");
191                    failJob(context);
192                    generateEvent = true;
193                } else {
194                    wfAction.setPending();
195                    execSynchronous = true;
196                }
197            }
198            wfAction.setLastCheckTime(new Date());
199            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_CHECK, wfAction));
200            wfJob.setLastModifiedTime(new Date());
201            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
202                    wfJob));
203        }
204        catch (ActionExecutorException ex) {
205            LOG.warn("Exception while executing check(). Error Code [{0}], Message[{1}]", ex.getErrorCode(), ex
206                    .getMessage(), ex);
207
208            wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
209            switch (ex.getErrorType()) {
210                case FAILED:
211                    failJob(context, wfAction);
212                    generateEvent = true;
213                    break;
214                case ERROR:
215                    handleUserRetry(wfAction);
216                    break;
217                case TRANSIENT:                 // retry N times, then suspend workflow
218                    if (!handleTransient(context, executor, WorkflowAction.Status.RUNNING)) {
219                        handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
220                        generateEvent = true;
221                        wfAction.setPendingAge(new Date());
222                        wfAction.setRetries(0);
223                        wfAction.setStartTime(null);
224                    }
225                    break;
226            }
227            wfAction.setLastCheckTime(new Date());
228            updateList = new ArrayList<UpdateEntry>();
229            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_CHECK, wfAction));
230            wfJob.setLastModifiedTime(new Date());
231            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
232                    wfJob));
233        }
234        finally {
235            try {
236                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
237                if (generateEvent && EventHandlerService.isEnabled()) {
238                    generateEvent(wfAction, wfJob.getUser());
239                }
240                if (execSynchronous) {
241                    new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(getEntityKey());
242                }
243            }
244            catch (JPAExecutorException e) {
245                throw new CommandException(e);
246            }
247        }
248
249        LOG.debug("ENDED ActionCheckXCommand for wf actionId=" + actionId + ", jobId=" + jobId);
250        return null;
251    }
252
253    protected long getRetryInterval() {
254        return (executor != null) ? executor.getRetryInterval() : ActionExecutor.RETRY_INTERVAL;
255    }
256
257    @Override
258    public String getKey() {
259        return getName() + "_" + actionId;
260    }
261
262}