001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.wf;
020
021import java.util.Properties;
022
023import org.apache.oozie.ErrorCode;
024import org.apache.oozie.WorkflowActionBean;
025import org.apache.oozie.action.ActionExecutor;
026import org.apache.oozie.command.CommandException;
027import org.apache.oozie.command.PreconditionException;
028import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
029import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
030import org.apache.oozie.service.ActionService;
031import org.apache.oozie.service.CallbackService;
032import org.apache.oozie.service.Services;
033import org.apache.oozie.util.LogUtils;
034import org.apache.oozie.util.ParamChecker;
035
036/**
037 * This command is executed once the Workflow command is finished.
038 */
039public class CompletedActionXCommand extends WorkflowXCommand<Void> {
040    private final String actionId;
041    private final String externalStatus;
042    private WorkflowActionBean wfactionBean;
043    private int earlyRequeueCount;
044
045    public CompletedActionXCommand(String actionId, String externalStatus, Properties actionData, int priority,
046                                   int earlyRequeueCount) {
047        super("callback", "callback", priority);
048        this.actionId = ParamChecker.notEmpty(actionId, "actionId");
049        this.externalStatus = ParamChecker.notEmpty(externalStatus, "externalStatus");
050        this.earlyRequeueCount = earlyRequeueCount;
051    }
052
053    public CompletedActionXCommand(String actionId, String externalStatus, Properties actionData, int priority) {
054        this(actionId, externalStatus, actionData, priority, 0);
055    }
056
057    public CompletedActionXCommand(String actionId, String externalStatus, Properties actionData) {
058        this(actionId, externalStatus, actionData, 1);
059    }
060
061    @Override
062    protected void setLogInfo() {
063        LogUtils.setLogInfo(actionId);
064    }
065
066    /*
067     * (non-Javadoc)
068     *
069     * @see org.apache.oozie.command.XCommand#eagerLoadState()
070     */
071    @Override
072    protected void eagerLoadState() throws CommandException {
073        try {
074            this.wfactionBean = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_COMPLETED,
075                    this.actionId);
076        }
077        catch (Exception ex) {
078            throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex);
079        }
080        LogUtils.setLogInfo(this.wfactionBean);
081    }
082
083    /*
084     * (non-Javadoc)
085     *
086     * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
087     */
088    @Override
089    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
090        if (this.wfactionBean.getStatus() != WorkflowActionBean.Status.RUNNING
091                && this.wfactionBean.getStatus() != WorkflowActionBean.Status.PREP) {
092            throw new CommandException(ErrorCode.E0800, actionId, this.wfactionBean.getStatus());
093        }
094    }
095
096    /*
097     * (non-Javadoc)
098     *
099     * @see org.apache.oozie.command.XCommand#execute()
100     */
101    @Override
102    protected Void execute() throws CommandException {
103        // If the action is still in PREP, we probably received a callback before Oozie was able to update from PREP to RUNNING;
104        // we'll requeue this command a few times and hope that it switches to RUNNING before giving up
105        if (this.wfactionBean.getStatus() == WorkflowActionBean.Status.PREP) {
106            int maxEarlyRequeueCount = Services.get().get(CallbackService.class).getEarlyRequeueMaxRetries();
107            if (this.earlyRequeueCount < maxEarlyRequeueCount) {
108                long delay = getRequeueDelay();
109                LOG.warn("Received early callback for action still in PREP state; will wait [{0}]ms and requeue up to [{1}] more"
110                        + " times", delay, (maxEarlyRequeueCount - earlyRequeueCount));
111                queue(new CompletedActionXCommand(this.actionId, this.externalStatus, null, this.getPriority(),
112                        this.earlyRequeueCount + 1), delay);
113            } else {
114                throw new CommandException(ErrorCode.E0822, actionId);
115            }
116        } else {    // RUNNING
117            ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(this.wfactionBean.getType());
118            // this is done because oozie notifications (of sub-wfs) is send
119            // every status change, not only on completion.
120            if (executor.isCompleted(externalStatus)) {
121                queue(new ActionCheckXCommand(this.wfactionBean.getId(), getPriority(), -1));
122            }
123        }
124        return null;
125    }
126
127    /*
128     * (non-Javadoc)
129     *
130     * @see org.apache.oozie.command.XCommand#getEntityKey()
131     */
132    @Override
133    public String getEntityKey() {
134        return null;
135    }
136
137    /*
138     * (non-Javadoc)
139     *
140     * @see org.apache.oozie.command.XCommand#isLockRequired()
141     */
142    @Override
143    protected boolean isLockRequired() {
144        return false;
145    }
146
147    /*
148     * (non-Javadoc)
149     *
150     * @see org.apache.oozie.command.XCommand#loadState()
151     */
152    @Override
153    protected void loadState() throws CommandException {
154        eagerLoadState();
155    }
156
157    /*
158     * (non-Javadoc)
159     *
160     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
161     */
162    @Override
163    protected void verifyPrecondition() throws CommandException, PreconditionException {
164        eagerVerifyPrecondition();
165    }
166}