001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.util.ArrayList;
021    import java.util.Arrays;
022    import java.util.Date;
023    import java.util.HashMap;
024    import java.util.HashSet;
025    import java.util.List;
026    import java.util.Set;
027    import java.util.TreeSet;
028    import java.util.Comparator;
029    
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.oozie.BundleActionBean;
032    import org.apache.oozie.BundleJobBean;
033    import org.apache.oozie.CoordinatorActionBean;
034    import org.apache.oozie.CoordinatorJobBean;
035    import org.apache.oozie.ErrorCode;
036    import org.apache.oozie.client.CoordinatorAction;
037    import org.apache.oozie.client.Job;
038    import org.apache.oozie.command.CommandException;
039    import org.apache.oozie.command.bundle.BundleKillXCommand;
040    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
041    import org.apache.oozie.executor.jpa.BundleActionsGetByLastModifiedTimeJPAExecutor;
042    import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
043    import org.apache.oozie.executor.jpa.BundleActionsGetStatusPendingJPAExecutor;
044    import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
045    import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
046    import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor;
047    import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor;
048    import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor;
049    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
050    import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor;
051    import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
052    import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor;
053    import org.apache.oozie.executor.jpa.JPAExecutorException;
054    import org.apache.oozie.util.DateUtils;
055    import org.apache.oozie.util.MemoryLocks;
056    import org.apache.oozie.util.StatusUtils;
057    import org.apache.oozie.util.XLog;
058    
059    /**
060     * StateTransitService is scheduled to run at the configured interval.
061     * <p/>
062     * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 (job
063     * done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
064     * SUCCEEDED.
065     */
066    public class StatusTransitService implements Service {
067        public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService.";
068        public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval";
069        public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status";
070        public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR = CONF_PREFIX + "backward.support.for.states.without.error";
071        private static int limit = -1;
072        private static Date lastInstanceStartTime = null;
073        private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class);
074    
075        /**
076         * StateTransitRunnable is the runnable which is scheduled to run at the configured interval.
077         * <p/>
078         * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0
079         * (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
080         * SUCCEEDED.
081         */
082        static class StatusTransitRunnable implements Runnable {
083            private JPAService jpaService = null;
084            private MemoryLocks.LockToken lock;
085    
086            public StatusTransitRunnable() {
087                jpaService = Services.get().get(JPAService.class);
088                if (jpaService == null) {
089                    LOG.error("Missing JPAService");
090                }
091            }
092    
093            @Override
094            public void run() {
095                try {
096                    Date curDate = new Date(); // records the start time of this service run;
097    
098                    // first check if there is some other instance running;
099                    lock = Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(),
100                            lockTimeout);
101                    if (lock == null) {
102                        LOG.info("This StatusTransitService instance"
103                                + " will not run since there is already an instance running");
104                    }
105                    else {
106                        LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName());
107                        // running coord jobs transit service
108                        coordTransit();
109                        // running bundle jobs transit service
110                        bundleTransit();
111    
112                        lastInstanceStartTime = curDate;
113                    }
114                }
115                catch (Exception ex) {
116                    LOG.warn("Exception happened during StatusTransitRunnable ", ex);
117                }
118                finally {
119                    // release lock;
120                    if (lock != null) {
121                        lock.release();
122                        LOG.info("Released lock for [{0}]", StatusTransitService.class.getName());
123                    }
124                }
125            }
126    
127            public List<BundleJobBean> removeDuplicates(List<BundleJobBean> pendingJobList) {
128                Set<BundleJobBean> s = new TreeSet<BundleJobBean>(new Comparator<BundleJobBean>() {
129                    @Override
130                    public int compare(BundleJobBean b1, BundleJobBean b2) {
131                        if (b1.getId().equals(b2.getId())) {
132                            return 0;
133                        }
134                        else
135                            return 1;
136                    }
137                });
138                s.addAll(pendingJobList);
139                return new ArrayList<BundleJobBean>(s);
140            }
141    
142            /**
143             * Aggregate bundle actions' status to bundle jobs
144             *
145             * @throws JPAExecutorException thrown if failed in db updates or retrievals
146             * @throws CommandException thrown if failed to run commands
147             */
148            private void bundleTransit() throws JPAExecutorException, CommandException {
149                List<BundleJobBean> pendingJobCheckList = null;
150    
151                if (lastInstanceStartTime == null) {
152                    LOG.info("Running bundle status service first instance");
153                    // this is the first instance, we need to check for all pending or running jobs;
154                    pendingJobCheckList = jpaService.execute(new BundleJobsGetRunningOrPendingJPAExecutor(limit));
155                }
156                else {
157                    LOG.info("Running bundle status service from last instance time =  "
158                            + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
159                    // this is not the first instance, we should only check jobs that have actions been
160                    // updated >= start time of last service run;
161                    List<BundleActionBean> actionList = jpaService
162                            .execute(new BundleActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
163                    Set<String> bundleIds = new HashSet<String>();
164                    for (BundleActionBean action : actionList) {
165                        bundleIds.add(action.getBundleId());
166                    }
167                    pendingJobCheckList = new ArrayList<BundleJobBean>();
168                    for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) {
169                        BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId));
170                        // Running bundle job might have pending false
171                        if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING)
172                                || bundle.getStatus().equals(Job.Status.RUNNINGWITHERROR)
173                                || bundle.getStatus().equals(Job.Status.PAUSED)
174                                || bundle.getStatus().equals(Job.Status.PAUSEDWITHERROR)) {
175                            pendingJobCheckList.add(bundle);
176                        }
177                    }
178                }
179                aggregateBundleJobsStatus(pendingJobCheckList);
180            }
181    
182            private void aggregateBundleJobsStatus(List<BundleJobBean> bundleLists) throws JPAExecutorException,
183                    CommandException {
184                if (bundleLists != null) {
185                        for (BundleJobBean bundleJob : bundleLists) {
186                            try {
187                                String jobId = bundleJob.getId();
188                                Job.Status[] bundleStatus = new Job.Status[1];
189                                bundleStatus[0] = bundleJob.getStatus();
190                                List<BundleActionBean> bundleActions = jpaService.execute(new BundleActionsGetStatusPendingJPAExecutor(
191                                        jobId));
192                                HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>();
193                                boolean foundPending = false;
194                                for (BundleActionBean bAction : bundleActions) {
195                                        int counter = 0;
196                                        if (bundleActionStatus.containsKey(bAction.getStatus())) {
197                                            counter = bundleActionStatus.get(bAction.getStatus()) + 1;
198                                        }
199                                        else {
200                                            ++counter;
201                                        }
202                                        bundleActionStatus.put(bAction.getStatus(), counter);
203                                        if (bAction.getCoordId() == null
204                                                && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED)) {
205                                            (new BundleKillXCommand(jobId)).call();
206                                            LOG.info("Bundle job ["+ jobId
207                                                            + "] has been killed since one of its coordinator job failed submission.");
208                                        }
209    
210                                     if (bAction.isPending()) {
211                                        foundPending = true;
212                                    }
213                                }
214    
215                                if (!foundPending && checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) {
216                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
217                                            + "' from '" + bundleJob.getStatus() + "'");
218                                    updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
219                                }
220                                else if (!foundPending && checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) {
221                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
222                                            + "' from '" + bundleJob.getStatus() + "'");
223                                    updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
224                                }
225                                else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) {
226                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
227                                            + "' from '" + bundleJob.getStatus() + "'");
228                                    updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
229                                }
230                                else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus, foundPending)) {
231                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
232                                            + "' from '" + bundleJob.getStatus() + "'");
233                                    updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
234                                }
235                                else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) {
236                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
237                                            + "' from '" + bundleJob.getStatus() + "'");
238                                    updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
239                                }
240                            }
241                            catch (Exception ex) {
242                                LOG.error("Exception happened during aggregate bundle job's status, job = "
243                                        + bundleJob.getId(), ex);
244                            }
245                        }
246                    }
247    
248            }
249    
250            private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException,
251                    CommandException {
252                if (CoordList != null) {
253                    Configuration conf = Services.get().getConf();
254                    boolean backwardSupportForCoordStatus = conf.getBoolean(CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
255    
256                    for (CoordinatorJobBean coordJob : CoordList) {
257                        try {
258                            // if namespace 0.1 is used and backward support is true, then ignore this coord job
259                            if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
260                                    && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
261                                continue;
262                            }
263                            String jobId = coordJob.getId();
264                            Job.Status[] coordStatus = new Job.Status[1];
265                            coordStatus[0] = coordJob.getStatus();
266                            //Get count of Coordinator actions with pending true
267                            boolean isPending = false;
268                            int count = jpaService.execute(new CoordJobGetPendingActionsCountJPAExecutor(jobId));
269                            if (count > 0) {
270                                 isPending = true;
271                            }
272                            // Get status of Coordinator actions
273                            List<CoordinatorAction.Status> coordActionStatusList = jpaService
274                                    .execute(new CoordJobGetActionsStatusJPAExecutor(jobId));
275                            HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>();
276    
277                            for (CoordinatorAction.Status status : coordActionStatusList) {
278                                int counter = 0;
279                                if (coordActionStatus.containsKey(status)) {
280                                    counter = coordActionStatus.get(status) + 1;
281                                }
282                                else {
283                                    ++counter;
284                                }
285                                coordActionStatus.put(status, counter);
286                            }
287    
288                            int nonPendingCoordActionsCount = coordActionStatusList.size();
289    
290                            if ((coordJob.isDoneMaterialization() || coordStatus[0] == Job.Status.FAILED || coordStatus[0] == Job.Status.KILLED)
291                                    && checkCoordTerminalStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
292                                LOG.info("Set coordinator job [" + jobId + "] status to '" + coordStatus[0].toString()
293                                        + "' from '" + coordJob.getStatus() + "'");
294                                updateCoordJob(isPending, coordJob, coordStatus[0]);
295                            }
296                            else if (checkCoordPausedStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
297                                LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
298                                        + "' from '" + coordJob.getStatus() + "'");
299                                updateCoordJob(isPending, coordJob, coordStatus[0]);
300                            }
301                            else if(checkCoordSuspendStatus( coordActionStatus, nonPendingCoordActionsCount, coordStatus, coordJob.isDoneMaterialization(), isPending)) {
302                                LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
303                                        + "' from '" + coordJob.getStatus() + "'");
304                                updateCoordJob(isPending, coordJob, coordStatus[0]);
305                            }
306                            else if (checkCoordRunningStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
307                                LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
308                                        + "' from '" + coordJob.getStatus() + "'");
309                                updateCoordJob(isPending, coordJob, coordStatus[0]);
310                            }
311                            else {
312                                checkCoordPending(isPending, coordJob, true);
313                            }
314                        }
315                        catch (Exception ex) {
316                            LOG.error("Exception happened during aggregate coordinator job's status, job = "
317                                    + coordJob.getId(), ex);
318                        }
319                    }
320    
321                }
322            }
323    
324            private boolean checkTerminalStatus(HashMap<Job.Status, Integer> bundleActionStatus,
325                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
326                boolean ret = false;
327                int totalValuesSucceed = 0;
328                if (bundleActionStatus.containsKey(Job.Status.SUCCEEDED)) {
329                    totalValuesSucceed = bundleActionStatus.get(Job.Status.SUCCEEDED);
330                }
331                int totalValuesFailed = 0;
332                if (bundleActionStatus.containsKey(Job.Status.FAILED)) {
333                    totalValuesFailed = bundleActionStatus.get(Job.Status.FAILED);
334                }
335                int totalValuesKilled = 0;
336                if (bundleActionStatus.containsKey(Job.Status.KILLED)) {
337                    totalValuesKilled = bundleActionStatus.get(Job.Status.KILLED);
338                }
339    
340                int totalValuesDoneWithError = 0;
341                if (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)) {
342                    totalValuesDoneWithError = bundleActionStatus.get(Job.Status.DONEWITHERROR);
343                }
344    
345                if (bundleActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesDoneWithError)) {
346                    // If all the bundle actions are succeeded then bundle job should be succeeded.
347                    if (bundleActions.size() == totalValuesSucceed) {
348                        bundleStatus[0] = Job.Status.SUCCEEDED;
349                        ret = true;
350                    }
351                    else if (bundleActions.size() == totalValuesKilled) {
352                        // If all the bundle actions are KILLED then bundle job should be KILLED.
353                        bundleStatus[0] = Job.Status.KILLED;
354                        ret = true;
355                    }
356                    else if (bundleActions.size() == totalValuesFailed) {
357                        // If all the bundle actions are FAILED then bundle job should be FAILED.
358                        bundleStatus[0] = Job.Status.FAILED;
359                        ret = true;
360                    }
361                    else {
362                        bundleStatus[0] = Job.Status.DONEWITHERROR;
363                        ret = true;
364                    }
365                }
366                return ret;
367            }
368    
369            private boolean checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
370                    int coordActionsCount, Job.Status[] coordStatus) {
371                boolean ret = false;
372                int totalValuesSucceed = 0;
373                if (coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) {
374                    totalValuesSucceed = coordActionStatus.get(CoordinatorAction.Status.SUCCEEDED);
375                }
376                int totalValuesFailed = 0;
377                if (coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)) {
378                    totalValuesFailed = coordActionStatus.get(CoordinatorAction.Status.FAILED);
379                }
380                int totalValuesKilled = 0;
381                if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) {
382                    totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED);
383                }
384    
385                int totalValuesTimeOut = 0;
386                if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
387                    totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT);
388                }
389    
390                if (coordActionsCount == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut)) {
391                    // If all the coordinator actions are succeeded then coordinator job should be succeeded.
392                    if (coordActionsCount == totalValuesSucceed) {
393                        coordStatus[0] = Job.Status.SUCCEEDED;
394                        ret = true;
395                    }
396                    else if (coordActionsCount == totalValuesKilled) {
397                        // If all the coordinator actions are KILLED then coordinator job should be KILLED.
398                        coordStatus[0] = Job.Status.KILLED;
399                        ret = true;
400                    }
401                    else if (coordActionsCount == totalValuesFailed) {
402                        // If all the coordinator actions are FAILED then coordinator job should be FAILED.
403                        coordStatus[0] = Job.Status.FAILED;
404                        ret = true;
405                    }
406                    else {
407                        coordStatus[0] = Job.Status.DONEWITHERROR;
408                        ret = true;
409                    }
410                }
411                return ret;
412            }
413    
414            private boolean checkPrepStatus(HashMap<Job.Status, Integer> bundleActionStatus,
415                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
416                boolean ret = false;
417                if (bundleActionStatus.containsKey(Job.Status.PREP)) {
418                    // If all the bundle actions are PREP then bundle job should be RUNNING.
419                    if (bundleActions.size() > bundleActionStatus.get(Job.Status.PREP)) {
420                        bundleStatus[0] = Job.Status.RUNNING;
421                        ret = true;
422                    }
423                }
424                return ret;
425            }
426    
427            private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus,
428                    List<BundleActionBean> bundleActions, Job.Status[] bundleJobStatus) {
429                boolean ret = false;
430    
431                // TODO - When bottom up cmds are allowed to change the status of parent job,
432                // if none of the bundle actions are in paused or pausedwitherror, the function should return
433                // false
434    
435                // top down
436                // If the bundle job is PAUSED or PAUSEDINERROR and no children are in error
437                // state, then job should be PAUSED otherwise it should be pausedwitherror
438                if (bundleJobStatus[0] == Job.Status.PAUSED || bundleJobStatus[0] == Job.Status.PAUSEDWITHERROR) {
439                    if (bundleActionStatus.containsKey(Job.Status.KILLED)
440                            || bundleActionStatus.containsKey(Job.Status.FAILED)
441                            || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
442                            || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
443                            || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)
444                            || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
445                        bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
446                    }
447                    else {
448                        bundleJobStatus[0] = Job.Status.PAUSED;
449                    }
450                    ret = true;
451                }
452    
453                // bottom up; check the status of parent through their children
454                else if (bundleActionStatus.containsKey(Job.Status.PAUSED)
455                        && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED))) {
456                    bundleJobStatus[0] = Job.Status.PAUSED;
457                    ret = true;
458                }
459                else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
460                    int pausedActions = bundleActionStatus.containsKey(Job.Status.PAUSED) ? bundleActionStatus
461                            .get(Job.Status.PAUSED) : 0;
462                    if (bundleActions.size() == pausedActions + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR)) {
463                        bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
464                        ret = true;
465                    }
466                }
467                else {
468                    ret = false;
469                }
470                return ret;
471            }
472    
473    
474            private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus,
475                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus, boolean isPending) {
476                boolean ret = false;
477    
478                // TODO - When bottom up cmds are allowed to change the status of parent job,
479                // if none of the bundle actions are in suspended or suspendedwitherror, the function should return
480                // false
481    
482                // top down
483                // if job is suspended
484                if (bundleStatus[0] == Job.Status.SUSPENDED
485                        || bundleStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
486                    if (bundleActionStatus.containsKey(Job.Status.KILLED)
487                            || bundleActionStatus.containsKey(Job.Status.FAILED)
488                            || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
489                            || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
490                            || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
491                        bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
492                    }
493                    else {
494                        bundleStatus[0] = Job.Status.SUSPENDED;
495                    }
496                    ret =true;
497                }
498    
499                // bottom up
500                // Update status of parent from the status of its children
501                else if (!isPending && bundleActionStatus.containsKey(Job.Status.SUSPENDED)
502                        || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) {
503                    int succeededActions = bundleActionStatus.containsKey(Job.Status.SUCCEEDED) ? bundleActionStatus
504                            .get(Job.Status.SUCCEEDED) : 0;
505                    int killedActions = bundleActionStatus.containsKey(Job.Status.KILLED) ? bundleActionStatus
506                            .get(Job.Status.KILLED) : 0;
507                    int failedActions = bundleActionStatus.containsKey(Job.Status.FAILED) ? bundleActionStatus
508                            .get(Job.Status.FAILED) : 0;
509                    int doneWithErrorActions = bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) ? bundleActionStatus
510                            .get(Job.Status.DONEWITHERROR) : 0;
511    
512                    if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions) {
513                        bundleStatus[0] = Job.Status.SUSPENDED;
514                        ret = true;
515                    }
516                    else if (bundleActions.size()  == bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR)
517                            + bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions + killedActions + failedActions + doneWithErrorActions) {
518                        bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
519                        ret = true;
520                    }
521                }
522                return ret;
523    
524            }
525    
526            private boolean checkCoordPausedStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
527                    int coordActionsCount, Job.Status[] coordStatus){
528                boolean ret = false;
529                if (coordStatus[0].equals(Job.Status.PAUSED) || coordStatus[0].equals(Job.Status.PAUSEDWITHERROR)) {
530                    if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
531                            || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
532                            || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
533                        coordStatus[0] = Job.Status.PAUSEDWITHERROR;
534                    }
535                    else {
536                        coordStatus[0] = Job.Status.PAUSED;
537                    }
538                    ret = true;
539                }
540                return ret;
541            }
542            private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
543                    int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization, boolean isPending) {
544                boolean ret = false;
545    
546                // TODO - When bottom up cmds are allowed to change the status of parent job
547                //if none of the coord actions are in suspended or suspendedwitherror and materialization done is false
548                //,then the function should return
549                // false
550    
551                // top down
552                // check for children only when parent is suspended
553                if (coordStatus[0] == Job.Status.SUSPENDED || coordStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
554    
555                    if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
556                            || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
557                            || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
558                        coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
559                    }
560                    else {
561                        coordStatus[0] = Job.Status.SUSPENDED;
562                    }
563                    ret = true;
564                }
565                // bottom up
566                // look for children to check the parent's status only if materialization is
567                // done and all actions are non-pending
568                else if (isDoneMaterialization && !isPending && coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) {
569                    int succeededActions = coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED) ? coordActionStatus
570                           .get(CoordinatorAction.Status.SUCCEEDED) : 0;
571                    int killedActions = coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) ? coordActionStatus
572                            .get(CoordinatorAction.Status.KILLED) : 0;
573                    int failedActions = coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) ? coordActionStatus
574                            .get(CoordinatorAction.Status.FAILED) : 0;
575                    int timedoutActions = coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) ? coordActionStatus
576                            .get(CoordinatorAction.Status.TIMEDOUT) : 0;
577    
578                    if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) + succeededActions) {
579                        coordStatus[0] = Job.Status.SUSPENDED;
580                        ret = true;
581                    }
582                    else if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)
583                            + succeededActions + killedActions + failedActions + timedoutActions) {
584                        coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
585                        ret = true;
586                    }
587                }
588                return ret;
589            }
590    
591            private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
592                    int coordActionsCount, Job.Status[] coordStatus) {
593                boolean ret = false;
594                if (coordStatus[0] != Job.Status.PREP) {
595                    if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
596                            || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
597                            || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
598                        coordStatus[0] = Job.Status.RUNNINGWITHERROR;
599                    }
600                    else {
601                        coordStatus[0] = Job.Status.RUNNING;
602                    }
603                    ret = true;
604                }
605                return ret;
606            }
607    
608            private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus,
609                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
610                boolean ret = false;
611                if (bundleStatus[0] != Job.Status.PREP) {
612                    if (bundleActionStatus.containsKey(Job.Status.FAILED)
613                            || bundleActionStatus.containsKey(Job.Status.KILLED)
614                            || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
615                            || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) {
616                        bundleStatus[0] = Job.Status.RUNNINGWITHERROR;
617                    }
618                    else {
619                        bundleStatus[0] = Job.Status.RUNNING;
620                    }
621                    ret = true;
622                }
623                return ret;
624    
625            }
626    
627            private void updateBundleJob(boolean isPending, BundleJobBean bundleJob, Job.Status bundleStatus)
628                    throws JPAExecutorException {
629                String jobId = bundleJob.getId();
630                // Update the Bundle Job
631                // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
632                // PAUSEDWITHERROR is not supported
633                bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus));
634                if (isPending) {
635                    bundleJob.setPending();
636                    LOG.info("Bundle job [" + jobId + "] Pending set to TRUE");
637                }
638                else {
639                    bundleJob.resetPending();
640                    LOG.info("Bundle job [" + jobId + "] Pending set to FALSE");
641                }
642                jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
643            }
644    
645            private void updateCoordJob(boolean isPending, CoordinatorJobBean coordJob, Job.Status coordStatus)
646                    throws JPAExecutorException, CommandException {
647                Job.Status prevStatus = coordJob.getStatus();
648                // Update the Coord Job
649                if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED
650                        || coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) {
651                    if (coordStatus == Job.Status.SUSPENDED || coordStatus == Job.Status.SUSPENDEDWITHERROR) {
652                        LOG.info("Coord Job [" + coordJob.getId()
653                                + "] status to "+ coordStatus +" can not be updated as its already in Terminal state");
654                        return;
655                    }
656                }
657    
658                checkCoordPending(isPending, coordJob, false);
659                // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and PAUSEDWITHERROR is
660                // not supported
661                coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(coordStatus));
662                // Backward support when coordinator namespace is 0.1
663                coordJob.setStatus(StatusUtils.getStatus(coordJob));
664                coordJob.setLastModifiedTime(new Date());
665                jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
666                // update bundle action only when status changes in coord job
667                if (coordJob.getBundleId() != null) {
668                    if (!prevStatus.equals(coordJob.getStatus())) {
669                        BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
670                        bundleStatusUpdate.call();
671                    }
672                }
673            }
674    
675            private void checkCoordPending(boolean isPending, CoordinatorJobBean coordJob, boolean saveToDB) throws JPAExecutorException {
676                // Checking the coordinator pending should be updated or not
677                if (isPending) {
678                    coordJob.setPending();
679                    LOG.info("Coord job [" + coordJob.getId() + "] Pending set to TRUE");
680                }
681                else {
682                    coordJob.resetPending();
683                    LOG.info("Coord job [" + coordJob.getId() + "] Pending set to FALSE");
684                }
685    
686                if (saveToDB) {
687                    jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
688                }
689            }
690    
691            /**
692             * Aggregate coordinator actions' status to coordinator jobs
693             *
694             * @throws JPAExecutorException thrown if failed in db updates or retrievals
695             * @throws CommandException thrown if failed to run commands
696             */
697            private void coordTransit() throws JPAExecutorException, CommandException {
698                List<CoordinatorJobBean> pendingJobCheckList = null;
699                if (lastInstanceStartTime == null) {
700                    LOG.info("Running coordinator status service first instance");
701                    // this is the first instance, we need to check for all pending jobs;
702                    pendingJobCheckList = jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit));
703                }
704                else {
705                    LOG.info("Running coordinator status service from last instance time =  "
706                            + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
707                    // this is not the first instance, we should only check jobs
708                    // that have actions been
709                    // updated >= start time of last service run;
710                    List<String> coordJobIdList = jpaService
711                            .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
712                    Set<String> coordIds = new HashSet<String>();
713                    for (String coordJobId : coordJobIdList) {
714                        coordIds.add(coordJobId);
715                    }
716                    pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
717                    for (String coordId : coordIds.toArray(new String[coordIds.size()])) {
718                        CoordinatorJobBean coordJob;
719                        try{
720                            coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId));
721                        }
722                        catch (JPAExecutorException jpaee) {
723                            if (jpaee.getErrorCode().equals(ErrorCode.E0604)) {
724                                LOG.warn("Exception happened during StatusTransitRunnable; Coordinator Job doesn't exist", jpaee);
725                                continue;
726                            } else {
727                                throw jpaee;
728                            }
729                        }
730                        // Running coord job might have pending false
731                        Job.Status coordJobStatus = coordJob.getStatus();
732                        if (coordJob.isPending() || coordJobStatus.equals(Job.Status.PAUSED)
733                                || coordJobStatus.equals(Job.Status.RUNNING)
734                                || coordJobStatus.equals(Job.Status.RUNNINGWITHERROR)
735                                || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
736                            pendingJobCheckList.add(coordJob);
737                        }
738                    }
739                }
740                aggregateCoordJobsStatus(pendingJobCheckList);
741            }
742        }
743    
744        /**
745         * Initializes the {@link StatusTransitService}.
746         *
747         * @param services services instance.
748         */
749        @Override
750        public void init(Services services) {
751            Configuration conf = services.getConf();
752            Runnable stateTransitRunnable = new StatusTransitRunnable();
753            services.get(SchedulerService.class).schedule(stateTransitRunnable, 10,
754                    conf.getInt(CONF_STATUSTRANSIT_INTERVAL, 60), SchedulerService.Unit.SEC);
755        }
756    
757        /**
758         * Destroy the StateTransit Jobs Service.
759         */
760        @Override
761        public void destroy() {
762        }
763    
764        /**
765         * Return the public interface for the purge jobs service.
766         *
767         * @return {@link StatusTransitService}.
768         */
769        @Override
770        public Class<? extends Service> getInterface() {
771            return StatusTransitService.class;
772        }
773    }
774