001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.util.ArrayList;
021import java.util.Date;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Set;
026import java.util.TreeSet;
027import java.util.Comparator;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.oozie.BundleActionBean;
031import org.apache.oozie.BundleJobBean;
032import org.apache.oozie.CoordinatorActionBean;
033import org.apache.oozie.CoordinatorJobBean;
034import org.apache.oozie.ErrorCode;
035import org.apache.oozie.client.CoordinatorAction;
036import org.apache.oozie.client.Job;
037import org.apache.oozie.command.CommandException;
038import org.apache.oozie.command.bundle.BundleKillXCommand;
039import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
040import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
041import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
042import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
043import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
044import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor;
045import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
046import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
047import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor;
048import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor;
049import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
050import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
051import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor;
052import org.apache.oozie.executor.jpa.JPAExecutorException;
053import org.apache.oozie.util.DateUtils;
054import org.apache.oozie.lock.LockToken;
055import org.apache.oozie.util.StatusUtils;
056import org.apache.oozie.util.XLog;
057
058/**
059 * StateTransitService is scheduled to run at the configured interval.
060 * <p/>
061 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 (job
062 * done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
063 * SUCCEEDED.
064 */
065public class StatusTransitService implements Service {
066    public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService.";
067    public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval";
068    public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status";
069    public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR = CONF_PREFIX + "backward.support.for.states.without.error";
070    private static int limit = -1;
071    private static Date lastInstanceStartTime = null;
072    private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class);
073
074    /**
075     * StateTransitRunnable is the runnable which is scheduled to run at the configured interval.
076     * <p/>
077     * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0
078     * (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
079     * SUCCEEDED.
080     */
081    public static class StatusTransitRunnable implements Runnable {
082        private JPAService jpaService = null;
083        private LockToken lock;
084
085        public StatusTransitRunnable() {
086            jpaService = Services.get().get(JPAService.class);
087            if (jpaService == null) {
088                LOG.error("Missing JPAService");
089            }
090        }
091
092        @Override
093        public void run() {
094            try {
095                Date curDate = new Date(); // records the start time of this service run;
096
097                // first check if there is some other instance running;
098                lock = Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(),
099                        lockTimeout);
100                if (lock == null) {
101                    LOG.info("This StatusTransitService instance"
102                            + " will not run since there is already an instance running");
103                }
104                else {
105                    LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName());
106                    // running coord jobs transit service
107                    coordTransit();
108                    // running bundle jobs transit service
109                    bundleTransit();
110
111                    lastInstanceStartTime = curDate;
112                }
113            }
114            catch (Exception ex) {
115                LOG.warn("Exception happened during StatusTransitRunnable ", ex);
116            }
117            finally {
118                // release lock;
119                if (lock != null) {
120                    lock.release();
121                    LOG.info("Released lock for [{0}]", StatusTransitService.class.getName());
122                }
123            }
124        }
125
126        public List<BundleJobBean> removeDuplicates(List<BundleJobBean> pendingJobList) {
127            Set<BundleJobBean> s = new TreeSet<BundleJobBean>(new Comparator<BundleJobBean>() {
128                @Override
129                public int compare(BundleJobBean b1, BundleJobBean b2) {
130                    if (b1.getId().equals(b2.getId())) {
131                        return 0;
132                    }
133                    else {
134                        return 1;
135                    }
136                }
137            });
138            s.addAll(pendingJobList);
139            return new ArrayList<BundleJobBean>(s);
140        }
141
142        /**
143         * Aggregate bundle actions' status to bundle jobs
144         *
145         * @throws JPAExecutorException thrown if failed in db updates or retrievals
146         * @throws CommandException thrown if failed to run commands
147         */
148        private void bundleTransit() throws JPAExecutorException, CommandException {
149            List<BundleJobBean> pendingJobCheckList = null;
150
151            if (lastInstanceStartTime == null) {
152                LOG.info("Running bundle status service first instance");
153                // this is the first instance, we need to check for all pending or running jobs;
154                pendingJobCheckList = jpaService.execute(new BundleJobsGetRunningOrPendingJPAExecutor(limit));
155            }
156            else {
157                LOG.info("Running bundle status service from last instance time =  "
158                        + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
159                // this is not the first instance, we should only check jobs that have actions been
160                // updated >= start time of last service run;
161                List<BundleActionBean> actionList = BundleActionQueryExecutor.getInstance().getList(
162                        BundleActionQuery.GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME, lastInstanceStartTime);
163                Set<String> bundleIds = new HashSet<String>();
164                for (BundleActionBean action : actionList) {
165                    bundleIds.add(action.getBundleId());
166                }
167                pendingJobCheckList = new ArrayList<BundleJobBean>();
168                for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) {
169                    BundleJobBean bundle = BundleJobQueryExecutor.getInstance().get(
170                            BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME, bundleId);
171                    // Running bundle job might have pending false
172                    if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING)
173                            || bundle.getStatus().equals(Job.Status.RUNNINGWITHERROR)
174                            || bundle.getStatus().equals(Job.Status.PAUSED)
175                            || bundle.getStatus().equals(Job.Status.PAUSEDWITHERROR)) {
176                        pendingJobCheckList.add(bundle);
177                    }
178                }
179            }
180            aggregateBundleJobsStatus(pendingJobCheckList);
181        }
182
183        private void aggregateBundleJobsStatus(List<BundleJobBean> bundleLists) throws JPAExecutorException,
184                CommandException {
185            if (bundleLists != null) {
186                    for (BundleJobBean bundleJob : bundleLists) {
187                        try {
188                            String jobId = bundleJob.getId();
189                            Job.Status[] bundleStatus = new Job.Status[1];
190                            bundleStatus[0] = bundleJob.getStatus();
191                            List<BundleActionBean> bundleActions = BundleActionQueryExecutor.getInstance().getList(
192                                BundleActionQuery.GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE, jobId);
193                            HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>();
194                            boolean foundPending = false;
195                            for (BundleActionBean bAction : bundleActions) {
196                                    int counter = 0;
197                                    if (bundleActionStatus.containsKey(bAction.getStatus())) {
198                                        counter = bundleActionStatus.get(bAction.getStatus()) + 1;
199                                    }
200                                    else {
201                                        ++counter;
202                                    }
203                                    bundleActionStatus.put(bAction.getStatus(), counter);
204                                    if (bAction.getCoordId() == null
205                                            && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED)) {
206                                        (new BundleKillXCommand(jobId)).call();
207                                        LOG.info("Bundle job ["+ jobId
208                                                        + "] has been killed since one of its coordinator job failed submission.");
209                                    }
210
211                                 if (bAction.isPending()) {
212                                    foundPending = true;
213                                }
214                            }
215
216                            if (!foundPending && checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) {
217                                LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
218                                        + "' from '" + bundleJob.getStatus() + "'");
219                                updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
220                            }
221                            else if (!foundPending && checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) {
222                                LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
223                                        + "' from '" + bundleJob.getStatus() + "'");
224                                updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
225                            }
226                            else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) {
227                                LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
228                                        + "' from '" + bundleJob.getStatus() + "'");
229                                updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
230                            }
231                            else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus, foundPending)) {
232                                LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
233                                        + "' from '" + bundleJob.getStatus() + "'");
234                                updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
235                            }
236                            else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) {
237                                LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
238                                        + "' from '" + bundleJob.getStatus() + "'");
239                                updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
240                            }
241                        }
242                        catch (Exception ex) {
243                            LOG.error("Exception happened during aggregate bundle job's status, job = "
244                                    + bundleJob.getId(), ex);
245                        }
246                    }
247                }
248
249        }
250
251        private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException,
252                CommandException {
253            if (CoordList != null) {
254                Configuration conf = Services.get().getConf();
255                boolean backwardSupportForCoordStatus = conf.getBoolean(CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
256
257                for (CoordinatorJobBean coordJob : CoordList) {
258                    try {
259                        // if namespace 0.1 is used and backward support is true, then ignore this coord job
260                        if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
261                                && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
262                            continue;
263                        }
264                        String jobId = coordJob.getId();
265                        Job.Status[] coordStatus = new Job.Status[1];
266                        coordStatus[0] = coordJob.getStatus();
267                        //Get count of Coordinator actions with pending true
268                        boolean isPending = false;
269                        int count = jpaService.execute(new CoordJobGetPendingActionsCountJPAExecutor(jobId));
270                        if (count > 0) {
271                             isPending = true;
272                        }
273                        // Get status of Coordinator actions
274                        List<CoordinatorAction.Status> coordActionStatusList = jpaService
275                                .execute(new CoordJobGetActionsStatusJPAExecutor(jobId));
276                        HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>();
277
278                        for (CoordinatorAction.Status status : coordActionStatusList) {
279                            int counter = 0;
280                            if (coordActionStatus.containsKey(status)) {
281                                counter = coordActionStatus.get(status) + 1;
282                            }
283                            else {
284                                ++counter;
285                            }
286                            coordActionStatus.put(status, counter);
287                        }
288
289                        int nonPendingCoordActionsCount = coordActionStatusList.size();
290                        boolean isDoneMaterialization = coordJob.isDoneMaterialization();
291                        if ((isDoneMaterialization || coordStatus[0] == Job.Status.FAILED || coordStatus[0] == Job.Status.KILLED)
292                                && checkCoordTerminalStatus(coordActionStatus, nonPendingCoordActionsCount,
293                                        coordStatus, isDoneMaterialization)) {
294                            updateCoordJob(isPending, coordJob, coordStatus[0]);
295                        }
296                        else if (checkCoordPausedStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
297                            updateCoordJob(isPending, coordJob, coordStatus[0]);
298                        }
299                        else if(checkCoordSuspendStatus( coordActionStatus, nonPendingCoordActionsCount, coordStatus, coordJob.isDoneMaterialization(), isPending)) {
300                            updateCoordJob(isPending, coordJob, coordStatus[0]);
301                        }
302                        else if (checkCoordRunningStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
303                            updateCoordJob(isPending, coordJob, coordStatus[0]);
304                        }
305                        else {
306                            checkCoordPending(isPending, coordJob, true);
307                        }
308                    }
309                    catch (Exception ex) {
310                        LOG.error("Exception happened during aggregate coordinator job's status, job = "
311                                + coordJob.getId(), ex);
312                    }
313                }
314
315            }
316        }
317
318        private boolean checkTerminalStatus(HashMap<Job.Status, Integer> bundleActionStatus,
319                List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
320            boolean ret = false;
321            int totalValuesSucceed = 0;
322            if (bundleActionStatus.containsKey(Job.Status.SUCCEEDED)) {
323                totalValuesSucceed = bundleActionStatus.get(Job.Status.SUCCEEDED);
324            }
325            int totalValuesFailed = 0;
326            if (bundleActionStatus.containsKey(Job.Status.FAILED)) {
327                totalValuesFailed = bundleActionStatus.get(Job.Status.FAILED);
328            }
329            int totalValuesKilled = 0;
330            if (bundleActionStatus.containsKey(Job.Status.KILLED)) {
331                totalValuesKilled = bundleActionStatus.get(Job.Status.KILLED);
332            }
333
334            int totalValuesDoneWithError = 0;
335            if (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)) {
336                totalValuesDoneWithError = bundleActionStatus.get(Job.Status.DONEWITHERROR);
337            }
338
339            if (bundleActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesDoneWithError)) {
340                // If all bundle action is done and bundle is killed, then don't change the status.
341                if (bundleStatus[0].equals(Job.Status.KILLED)) {
342                    bundleStatus[0] = Job.Status.KILLED;
343                    return true;
344                }
345                // If all the bundle actions are succeeded then bundle job should be succeeded.
346                if (bundleActions.size() == totalValuesSucceed) {
347                    bundleStatus[0] = Job.Status.SUCCEEDED;
348                    ret = true;
349                }
350                else if (bundleActions.size() == totalValuesKilled) {
351                    // If all the bundle actions are KILLED then bundle job should be KILLED.
352                    bundleStatus[0] = Job.Status.KILLED;
353                    ret = true;
354                }
355                else if (bundleActions.size() == totalValuesFailed) {
356                    // If all the bundle actions are FAILED then bundle job should be FAILED.
357                    bundleStatus[0] = Job.Status.FAILED;
358                    ret = true;
359                }
360                else {
361                    bundleStatus[0] = Job.Status.DONEWITHERROR;
362                    ret = true;
363                }
364            }
365            return ret;
366        }
367
368        private boolean checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
369                int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization) {
370            boolean ret = false;
371            int totalValuesSucceed = 0;
372            if (coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) {
373                totalValuesSucceed = coordActionStatus.get(CoordinatorAction.Status.SUCCEEDED);
374            }
375            int totalValuesFailed = 0;
376            if (coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)) {
377                totalValuesFailed = coordActionStatus.get(CoordinatorAction.Status.FAILED);
378            }
379            int totalValuesKilled = 0;
380            if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) {
381                totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED);
382            }
383            int totalValuesTimeOut = 0;
384            if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
385                totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT);
386            }
387            int totalValuesSkipped = 0;
388            if (coordActionStatus.containsKey(CoordinatorAction.Status.SKIPPED)) {
389                totalValuesSkipped = coordActionStatus.get(CoordinatorAction.Status.SKIPPED);
390            }
391
392            if (coordActionsCount ==
393                    (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut + totalValuesSkipped)) {
394
395                // If all coord action is done and coord is killed, then don't change the status.
396                if (coordStatus[0].equals(Job.Status.KILLED)) {
397                    coordStatus[0] = Job.Status.KILLED;
398                    return true;
399                }
400                // If all the coordinator actions are succeeded then coordinator job should be succeeded.
401                if (coordActionsCount == (totalValuesSucceed + totalValuesSkipped) && isDoneMaterialization) {
402                    coordStatus[0] = Job.Status.SUCCEEDED;
403                    ret = true;
404                }
405                else if (coordActionsCount == totalValuesKilled) {
406                    // If all the coordinator actions are KILLED then coordinator job should be KILLED.
407                    coordStatus[0] = Job.Status.KILLED;
408                    ret = true;
409                }
410                else if (coordActionsCount == totalValuesFailed) {
411                    // If all the coordinator actions are FAILED then coordinator job should be FAILED.
412                    coordStatus[0] = Job.Status.FAILED;
413                    ret = true;
414                }
415                else {
416                    coordStatus[0] = Job.Status.DONEWITHERROR;
417                    ret = true;
418                }
419            }
420            return ret;
421        }
422
423        private boolean checkPrepStatus(HashMap<Job.Status, Integer> bundleActionStatus,
424                List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
425            boolean ret = false;
426            if (bundleActionStatus.containsKey(Job.Status.PREP)) {
427                // If all the bundle actions are PREP then bundle job should be RUNNING.
428                if (bundleActions.size() > bundleActionStatus.get(Job.Status.PREP)) {
429                    bundleStatus[0] = getRunningStatus(bundleActionStatus);
430                    ret = true;
431                }
432            }
433            return ret;
434        }
435
436        private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus,
437                List<BundleActionBean> bundleActions, Job.Status[] bundleJobStatus) {
438            boolean ret = false;
439
440            // TODO - When bottom up cmds are allowed to change the status of parent job,
441            // if none of the bundle actions are in paused or pausedwitherror, the function should return
442            // false
443
444            // top down
445            // If the bundle job is PAUSED or PAUSEDINERROR and no children are in error
446            // state, then job should be PAUSED otherwise it should be pausedwitherror
447            if (bundleJobStatus[0] == Job.Status.PAUSED || bundleJobStatus[0] == Job.Status.PAUSEDWITHERROR) {
448                if (bundleActionStatus.containsKey(Job.Status.KILLED)
449                        || bundleActionStatus.containsKey(Job.Status.FAILED)
450                        || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
451                        || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
452                        || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)
453                        || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
454                    bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
455                }
456                else {
457                    bundleJobStatus[0] = Job.Status.PAUSED;
458                }
459                ret = true;
460            }
461
462            // bottom up; check the status of parent through their children
463            else if (bundleActionStatus.containsKey(Job.Status.PAUSED)
464                    && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED))) {
465                bundleJobStatus[0] = Job.Status.PAUSED;
466                ret = true;
467            }
468            else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
469                int pausedActions = bundleActionStatus.containsKey(Job.Status.PAUSED) ? bundleActionStatus
470                        .get(Job.Status.PAUSED) : 0;
471                if (bundleActions.size() == pausedActions + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR)) {
472                    bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
473                    ret = true;
474                }
475            }
476            else {
477                ret = false;
478            }
479            return ret;
480        }
481
482
483        private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus,
484                List<BundleActionBean> bundleActions, Job.Status[] bundleStatus, boolean isPending) {
485            boolean ret = false;
486
487            // TODO - When bottom up cmds are allowed to change the status of parent job,
488            // if none of the bundle actions are in suspended or suspendedwitherror, the function should return
489            // false
490
491            // top down
492            // if job is suspended
493            if (bundleStatus[0] == Job.Status.SUSPENDED
494                    || bundleStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
495                if (bundleActionStatus.containsKey(Job.Status.KILLED)
496                        || bundleActionStatus.containsKey(Job.Status.FAILED)
497                        || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
498                        || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
499                        || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
500                    bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
501                }
502                else {
503                    bundleStatus[0] = Job.Status.SUSPENDED;
504                }
505                ret =true;
506            }
507
508            // bottom up
509            // Update status of parent from the status of its children
510            else if (!isPending && bundleActionStatus.containsKey(Job.Status.SUSPENDED)
511                    || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) {
512                int succeededActions = bundleActionStatus.containsKey(Job.Status.SUCCEEDED) ? bundleActionStatus
513                        .get(Job.Status.SUCCEEDED) : 0;
514                int killedActions = bundleActionStatus.containsKey(Job.Status.KILLED) ? bundleActionStatus
515                        .get(Job.Status.KILLED) : 0;
516                int failedActions = bundleActionStatus.containsKey(Job.Status.FAILED) ? bundleActionStatus
517                        .get(Job.Status.FAILED) : 0;
518                int doneWithErrorActions = bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) ? bundleActionStatus
519                        .get(Job.Status.DONEWITHERROR) : 0;
520
521                if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions) {
522                    bundleStatus[0] = Job.Status.SUSPENDED;
523                    ret = true;
524                }
525                else if (bundleActions.size()  == bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR)
526                        + bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions + killedActions + failedActions + doneWithErrorActions) {
527                    bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
528                    ret = true;
529                }
530            }
531            return ret;
532
533        }
534
535        private boolean checkCoordPausedStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
536                int coordActionsCount, Job.Status[] coordStatus){
537            boolean ret = false;
538            if (coordStatus[0].equals(Job.Status.PAUSED) || coordStatus[0].equals(Job.Status.PAUSEDWITHERROR)) {
539                if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
540                        || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
541                        || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
542                    coordStatus[0] = Job.Status.PAUSEDWITHERROR;
543                }
544                else {
545                    coordStatus[0] = Job.Status.PAUSED;
546                }
547                ret = true;
548            }
549            return ret;
550        }
551        private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
552                int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization, boolean isPending) {
553            boolean ret = false;
554
555            // TODO - When bottom up cmds are allowed to change the status of parent job
556            //if none of the coord actions are in suspended or suspendedwitherror and materialization done is false
557            //,then the function should return
558            // false
559
560            // top down
561            // check for children only when parent is suspended
562            if (coordStatus[0] == Job.Status.SUSPENDED || coordStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
563
564                if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
565                        || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
566                        || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
567                    coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
568                }
569                else {
570                    coordStatus[0] = Job.Status.SUSPENDED;
571                }
572                ret = true;
573            }
574            // bottom up
575            // look for children to check the parent's status only if materialization is
576            // done and all actions are non-pending
577            else if (isDoneMaterialization && !isPending && coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) {
578                int succeededActions = coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED) ? coordActionStatus
579                       .get(CoordinatorAction.Status.SUCCEEDED) : 0;
580                int killedActions = coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) ? coordActionStatus
581                        .get(CoordinatorAction.Status.KILLED) : 0;
582                int failedActions = coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) ? coordActionStatus
583                        .get(CoordinatorAction.Status.FAILED) : 0;
584                int timedoutActions = coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) ? coordActionStatus
585                        .get(CoordinatorAction.Status.TIMEDOUT) : 0;
586
587                if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) + succeededActions) {
588                    coordStatus[0] = Job.Status.SUSPENDED;
589                    ret = true;
590                }
591                else if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)
592                        + succeededActions + killedActions + failedActions + timedoutActions) {
593                    coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
594                    ret = true;
595                }
596            }
597            return ret;
598        }
599
600        private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
601                int coordActionsCount, Job.Status[] coordStatus) {
602            boolean ret = false;
603            if (coordStatus[0] != Job.Status.PREP
604                    && coordStatus[0] != Job.Status.PREPSUSPENDED
605                    && coordStatus[0] !=  Job.Status.PREPPAUSED) {
606                if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
607                        || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
608                        || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
609                    coordStatus[0] = Job.Status.RUNNINGWITHERROR;
610                }
611                else {
612                    coordStatus[0] = Job.Status.RUNNING;
613                }
614                ret = true;
615            }
616            return ret;
617        }
618
619        private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus,
620                List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
621            boolean ret = false;
622            if (bundleStatus[0] != Job.Status.PREP) {
623                bundleStatus[0] = getRunningStatus(bundleActionStatus);
624                ret = true;
625            }
626            return ret;
627
628        }
629
630        private Job.Status getRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus) {
631            if (bundleActionStatus.containsKey(Job.Status.FAILED)
632                    || bundleActionStatus.containsKey(Job.Status.KILLED)
633                    || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
634                    || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) {
635                return Job.Status.RUNNINGWITHERROR;
636            }
637            else {
638                return Job.Status.RUNNING;
639            }
640        }
641
642        private void updateBundleJob(boolean isPending, BundleJobBean bundleJob, Job.Status bundleStatus)
643                throws JPAExecutorException {
644            String jobId = bundleJob.getId();
645            // Update the Bundle Job
646            // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
647            // PAUSEDWITHERROR is not supported
648            bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus));
649            if (isPending) {
650                bundleJob.setPending();
651                LOG.info("Bundle job [" + jobId + "] Pending set to TRUE");
652            }
653            else {
654                bundleJob.resetPending();
655                LOG.info("Bundle job [" + jobId + "] Pending set to FALSE");
656            }
657            BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME,
658                    bundleJob);
659        }
660
661        private void updateCoordJob(boolean isPending, CoordinatorJobBean coordJob, Job.Status coordStatus)
662                throws JPAExecutorException, CommandException {
663            Job.Status prevStatus = coordJob.getStatus();
664            // Update the Coord Job
665            if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED
666                    || coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) {
667                if (coordStatus == Job.Status.SUSPENDED || coordStatus == Job.Status.SUSPENDEDWITHERROR) {
668                    LOG.info("Coord Job [" + coordJob.getId()
669                            + "] status to "+ coordStatus +" can not be updated as its already in Terminal state");
670                    return;
671                }
672            }
673
674            boolean isPendingStateChanged = checkCoordPending(isPending, coordJob, false);
675            // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and PAUSEDWITHERROR is
676            // not supported
677            coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(coordStatus));
678            // Backward support when coordinator namespace is 0.1
679            coordJob.setStatus(StatusUtils.getStatus(coordJob));
680            if (coordJob.getStatus() != prevStatus || isPendingStateChanged) {
681                LOG.info("Set coordinator job [" + coordJob.getId() + "] status to '" + coordJob.getStatus() + "' from '"
682                        + prevStatus + "'");
683                coordJob.setLastModifiedTime(new Date());
684                CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME, coordJob);
685            }
686            // update bundle action only when status changes in coord job
687            if (coordJob.getBundleId() != null) {
688                if (!prevStatus.equals(coordJob.getStatus())) {
689                    BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
690                    bundleStatusUpdate.call();
691                }
692            }
693        }
694
695        private boolean checkCoordPending(boolean isPending, CoordinatorJobBean coordJob, boolean saveToDB)
696                throws JPAExecutorException {
697            // Checking the coordinator pending should be updated or not
698            boolean prevPending = coordJob.isPending();
699            if (isPending) {
700                coordJob.setPending();
701            }
702            else {
703                coordJob.resetPending();
704            }
705            boolean hasChange = prevPending != coordJob.isPending();
706            if (saveToDB && hasChange) {
707                LOG.info("Change coordinator job [" + coordJob.getId() + "] pending to '" + coordJob.isPending()
708                        + "' from '" + prevPending + "'");
709                CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME, coordJob);
710            }
711            return hasChange;
712
713        }
714
715        /**
716         * Aggregate coordinator actions' status to coordinator jobs
717         *
718         * @throws JPAExecutorException thrown if failed in db updates or retrievals
719         * @throws CommandException thrown if failed to run commands
720         */
721        private void coordTransit() throws JPAExecutorException, CommandException {
722            List<CoordinatorJobBean> pendingJobCheckList = null;
723            if (lastInstanceStartTime == null) {
724                LOG.info("Running coordinator status service first instance");
725                // this is the first instance, we need to check for all pending jobs;
726                pendingJobCheckList = jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit));
727            }
728            else {
729                LOG.info("Running coordinator status service from last instance time =  "
730                        + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
731                // this is not the first instance, we should only check jobs
732                // that have actions or jobs been
733                // updated >= start time of last service run;
734                List<CoordinatorActionBean> actionsList = CoordActionQueryExecutor.getInstance().getList(
735                        CoordActionQuery.GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME, lastInstanceStartTime);
736                Set<String> coordIds = new HashSet<String>();
737                for (CoordinatorActionBean action : actionsList) {
738                    coordIds.add(action.getJobId());
739                }
740
741                pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
742                for (String coordId : coordIds.toArray(new String[coordIds.size()])) {
743                    CoordinatorJobBean coordJob;
744                    try {
745                        coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coordId);
746                    }
747                    catch (JPAExecutorException jpaee) {
748                        if (jpaee.getErrorCode().equals(ErrorCode.E0604)) {
749                            LOG.warn("Exception happened during StatusTransitRunnable; Coordinator Job doesn't exist", jpaee);
750                            continue;
751                        } else {
752                            throw jpaee;
753                        }
754                    }
755                    // Running coord job might have pending false
756                    Job.Status coordJobStatus = coordJob.getStatus();
757                    if ((coordJob.isPending() || coordJobStatus.equals(Job.Status.PAUSED)
758                            || coordJobStatus.equals(Job.Status.RUNNING)
759                            || coordJobStatus.equals(Job.Status.RUNNINGWITHERROR)
760                            || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR))
761                            && !coordJobStatus.equals(Job.Status.IGNORED)) {
762                        pendingJobCheckList.add(coordJob);
763                    }
764                }
765                pendingJobCheckList.addAll(CoordJobQueryExecutor.getInstance().getList(
766                        CoordJobQuery.GET_COORD_JOBS_CHANGED, lastInstanceStartTime));
767            }
768            aggregateCoordJobsStatus(pendingJobCheckList);
769        }
770    }
771
772    /**
773     * Initializes the {@link StatusTransitService}.
774     *
775     * @param services services instance.
776     */
777    @Override
778    public void init(Services services) {
779        Configuration conf = services.getConf();
780        Runnable stateTransitRunnable = new StatusTransitRunnable();
781        services.get(SchedulerService.class).schedule(stateTransitRunnable, 10,
782                conf.getInt(CONF_STATUSTRANSIT_INTERVAL, 60), SchedulerService.Unit.SEC);
783    }
784
785    /**
786     * Destroy the StateTransit Jobs Service.
787     */
788    @Override
789    public void destroy() {
790    }
791
792    /**
793     * Return the public interface for the purge jobs service.
794     *
795     * @return {@link StatusTransitService}.
796     */
797    @Override
798    public Class<? extends Service> getInterface() {
799        return StatusTransitService.class;
800    }
801}
802