001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.bundle;
020
021import java.util.Date;
022import java.util.HashMap;
023import java.util.List;
024
025import org.apache.oozie.BundleActionBean;
026import org.apache.oozie.BundleJobBean;
027import org.apache.oozie.ErrorCode;
028import org.apache.oozie.client.Job;
029import org.apache.oozie.client.Job.Status;
030import org.apache.oozie.command.CommandException;
031import org.apache.oozie.command.PreconditionException;
032import org.apache.oozie.command.StatusTransitXCommand;
033import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
034import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
035import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
036import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
037import org.apache.oozie.executor.jpa.JPAExecutorException;
038import org.apache.oozie.util.LogUtils;
039import org.apache.oozie.util.StatusUtils;
040
041/**
042 * BundleStatusTransitXCommand update job's status according to its child actions' status. If all child actions' pending
043 * flag equals 0 (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's
044 * status to SUCCEEDED.
045 */
046public class BundleStatusTransitXCommand extends StatusTransitXCommand {
047
048    private String jobId;
049    private List<BundleActionBean> bundleActions;
050    private BundleJobBean bundleJob;
051    private boolean foundPending;
052    private HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>();
053
054    public BundleStatusTransitXCommand(String id) {
055        super("bundle_status_transit", "bundle_status_transit", 0);
056        this.jobId = id;
057    }
058
059    @Override
060    public String getEntityKey() {
061        return jobId;
062    }
063
064    @Override
065    protected void loadState() throws CommandException {
066        try {
067            bundleJob = BundleJobQueryExecutor.getInstance().get(
068                    BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME, jobId);
069
070            bundleActions = BundleActionQueryExecutor.getInstance().getList(
071                    BundleActionQuery.GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE, jobId);
072            for (BundleActionBean bAction : bundleActions) {
073                int counter = 0;
074                if (bundleActionStatus.containsKey(bAction.getStatus())) {
075                    counter = getActionStatusCount(bAction.getStatus()) + 1;
076                }
077                else {
078                    ++counter;
079                }
080                bundleActionStatus.put(bAction.getStatus(), counter);
081                if (bAction.getCoordId() == null
082                        && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED) ) {
083                    new BundleKillXCommand(jobId).call();
084                    bundleJob = BundleJobQueryExecutor.getInstance().get(
085                            BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME, jobId);
086                    bundleJob.setStatus(Job.Status.FAILED);
087                    bundleJob.setLastModifiedTime(new Date());
088                    BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS,
089                            bundleJob);
090                }
091
092                if (bAction.isPending()) {
093                    LOG.debug(bAction + " has pending flag set");
094                    foundPending = true;
095                }
096            }
097            LogUtils.setLogInfo(bundleJob);
098        }
099        catch (JPAExecutorException e) {
100            throw new CommandException(ErrorCode.E1322, e);
101        }
102    }
103
104    @Override
105    protected Job.Status getJobStatus() throws CommandException {
106        Job.Status jobStatus = super.getJobStatus();
107        if (jobStatus == null) {
108            if (isPrepRunningState()) {
109                return getPrepRunningStatus();
110            }
111        }
112
113        return jobStatus;
114    }
115
116    @Override
117    protected boolean isTerminalState() {
118        return !foundPending
119                && bundleActions.size() == getActionStatusCount(Job.Status.SUCCEEDED)
120                        + getActionStatusCount(Job.Status.FAILED) + getActionStatusCount(Job.Status.KILLED)
121                        + getActionStatusCount(Job.Status.DONEWITHERROR);
122    }
123
124    @Override
125    protected Job.Status getTerminalStatus() {
126
127        // If all bundle action is done and bundle is killed, then don't change the status.
128        if (bundleJob.getStatus().equals(Job.Status.KILLED)) {
129            return Job.Status.KILLED;
130
131        }
132        // If all the bundle actions are succeeded then bundle job should be succeeded.
133        if (bundleActions.size() == getActionStatusCount(Job.Status.SUCCEEDED)) {
134            return Job.Status.SUCCEEDED;
135
136        }
137        else if (bundleActions.size() == getActionStatusCount(Job.Status.KILLED)) {
138            // If all the bundle actions are KILLED then bundle job should be KILLED.
139            return Job.Status.KILLED;
140        }
141        else if (bundleActions.size() == getActionStatusCount(Job.Status.FAILED)) {
142            // If all the bundle actions are FAILED then bundle job should be FAILED.
143            return Job.Status.FAILED;
144        }
145        else {
146            return Job.Status.DONEWITHERROR;
147
148        }
149    }
150
151    @Override
152    protected boolean isPausedState() {
153        //If bundle is paused then timestamp will be set.
154        //If bundleJob.getPauseTime() is not set, that means that status has to be computed from bottom-up.
155        if (bundleJob.getStatus() == Job.Status.PAUSED || bundleJob.getStatus() == Job.Status.PAUSEDWITHERROR
156                && bundleJob.getPauseTime() != null) {
157            return true;
158        }
159        else {
160            return getBottomUpPauseStatus() != null;
161        }
162
163    }
164
165    @Override
166    protected Job.Status getPausedState() {
167        if (bundleJob.getStatus() == Job.Status.PAUSED || bundleJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
168            if (hasTerminatedActions() || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
169                    || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)
170                    || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
171                return Job.Status.PAUSEDWITHERROR;
172            }
173            else {
174                return Job.Status.PAUSED;
175            }
176        }
177        return getBottomUpPauseStatus();
178
179    }
180
181    @Override
182    protected boolean isSuspendedState() {
183        //If bundle is suspended then timestamp will be set.
184        //If bundleJob.getSuspendedTimestamp() is not set, that means that status has to be computed from bottom-up.
185        if ((bundleJob.getStatus() == Job.Status.SUSPENDED || bundleJob.getStatus() == Job.Status.SUSPENDEDWITHERROR)
186                && bundleJob.getSuspendedTimestamp() != null) {
187            return true;
188        }
189
190        return getBottomUpSuspendedState() != null;
191
192    }
193
194    @Override
195    protected Job.Status getSuspendedStatus() {
196        if (bundleJob.getStatus() == Job.Status.SUSPENDED || bundleJob.getStatus() == Job.Status.SUSPENDEDWITHERROR) {
197            if (hasTerminatedActions() || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
198                    || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
199                return Job.Status.SUSPENDEDWITHERROR;
200            }
201            else {
202                return Job.Status.SUSPENDED;
203            }
204
205        }
206        return getBottomUpSuspendedState();
207
208    }
209
210    @Override
211    protected boolean isRunningState() {
212        return true;
213    }
214
215    @Override
216    protected Status getRunningState() {
217        if (bundleJob.getStatus() != Job.Status.PREP) {
218            return getRunningStatus(bundleActionStatus);
219        }
220        else
221            return null;
222    }
223
224    @Override
225    protected void updateJobStatus(Job.Status bundleStatus) throws JPAExecutorException {
226        LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus + "' from '" + bundleJob.getStatus() + "'");
227
228        String jobId = bundleJob.getId();
229        // Update the Bundle Job
230        // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
231        // PAUSEDWITHERROR is not supported
232        bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus));
233        bundleJob.setLastModifiedTime(new Date());
234        if (foundPending) {
235            bundleJob.setPending();
236            LOG.info("Bundle job [" + jobId + "] Pending set to TRUE");
237        }
238        else {
239            bundleJob.resetPending();
240            LOG.info("Bundle job [" + jobId + "] Pending set to FALSE");
241        }
242        BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME,
243                bundleJob);
244    }
245
246    /**
247     * bottom up; check the status of parent through their children.
248     *
249     * @return the bottom up pause status
250     */
251    private Job.Status getBottomUpPauseStatus() {
252
253        if (bundleActionStatus.containsKey(Job.Status.PAUSED)
254                && bundleActions.size() == getActionStatusCount(Job.Status.PAUSED)) {
255            return Job.Status.PAUSED;
256
257        }
258        else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)
259                && bundleActions.size() == getActionStatusCount(Job.Status.PAUSED)
260                        + getActionStatusCount(Job.Status.PAUSEDWITHERROR)) {
261            return Job.Status.PAUSEDWITHERROR;
262        }
263
264        return null;
265    }
266
267    /**
268     * Bottom up update status of parent from the status of its children.
269     *
270     * @return the bottom up suspended state
271     */
272    private Job.Status getBottomUpSuspendedState() {
273
274        if (!foundPending && bundleActionStatus.containsKey(Job.Status.SUSPENDED)
275                || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) {
276
277            if (bundleActions.size() == getActionStatusCount(Job.Status.SUSPENDED)
278                    + getActionStatusCount(Job.Status.SUCCEEDED)) {
279                return Job.Status.SUSPENDED;
280            }
281            else if (bundleActions.size() == getActionStatusCount(Job.Status.SUSPENDEDWITHERROR)
282                    + getActionStatusCount(Job.Status.SUSPENDED) + getActionStatusCount(Job.Status.SUCCEEDED)
283                    + getActionStatusCount(Job.Status.KILLED) + getActionStatusCount(Job.Status.FAILED)
284                    + getActionStatusCount(Job.Status.DONEWITHERROR)) {
285                return Job.Status.SUSPENDEDWITHERROR;
286
287            }
288        }
289        return null;
290    }
291
292    private boolean hasTerminatedActions() {
293        return bundleActionStatus.containsKey(Job.Status.KILLED) || bundleActionStatus.containsKey(Job.Status.FAILED)
294                || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR);
295
296    }
297
298    private boolean isPrepRunningState() {
299        return !foundPending && bundleActionStatus.containsKey(Job.Status.PREP)
300                && bundleActions.size() > getActionStatusCount(Job.Status.PREP);
301    }
302
303    private Status getPrepRunningStatus() {
304        return getRunningStatus(bundleActionStatus);
305
306    }
307
308    private int getActionStatusCount(final Job.Status status) {
309
310        if (bundleActionStatus.containsKey(status)) {
311            return bundleActionStatus.get(status);
312        }
313        else {
314            return 0;
315        }
316    }
317
318    private Job.Status getRunningStatus(HashMap<Job.Status, Integer> actionStatus) {
319        if (actionStatus.containsKey(Job.Status.FAILED) || actionStatus.containsKey(Job.Status.KILLED)
320                || actionStatus.containsKey(Job.Status.DONEWITHERROR)
321                || actionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) {
322            return Job.Status.RUNNINGWITHERROR;
323        }
324        else {
325            return Job.Status.RUNNING;
326        }
327    }
328
329    @Override
330    protected void verifyPrecondition() throws CommandException, PreconditionException {
331    }
332
333}