001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.util.ArrayList;
021    import java.util.Date;
022    import java.util.HashMap;
023    import java.util.HashSet;
024    import java.util.List;
025    import java.util.Set;
026    import java.util.TreeSet;
027    import java.util.Comparator;
028    
029    import org.apache.hadoop.conf.Configuration;
030    import org.apache.oozie.BundleActionBean;
031    import org.apache.oozie.BundleJobBean;
032    import org.apache.oozie.CoordinatorJobBean;
033    import org.apache.oozie.ErrorCode;
034    import org.apache.oozie.client.CoordinatorAction;
035    import org.apache.oozie.client.Job;
036    import org.apache.oozie.command.CommandException;
037    import org.apache.oozie.command.bundle.BundleKillXCommand;
038    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
039    import org.apache.oozie.executor.jpa.BundleActionsGetByLastModifiedTimeJPAExecutor;
040    import org.apache.oozie.executor.jpa.BundleActionsGetStatusPendingJPAExecutor;
041    import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
042    import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
043    import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor;
044    import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor;
045    import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor;
046    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
047    import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor;
048    import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
049    import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor;
050    import org.apache.oozie.executor.jpa.JPAExecutorException;
051    import org.apache.oozie.util.DateUtils;
052    import org.apache.oozie.util.MemoryLocks;
053    import org.apache.oozie.util.StatusUtils;
054    import org.apache.oozie.util.XLog;
055    
056    /**
057     * StateTransitService is scheduled to run at the configured interval.
058     * <p/>
059     * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 (job
060     * done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
061     * SUCCEEDED.
062     */
063    public class StatusTransitService implements Service {
064        public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService.";
065        public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval";
066        public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status";
067        public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR = CONF_PREFIX + "backward.support.for.states.without.error";
068        private static int limit = -1;
069        private static Date lastInstanceStartTime = null;
070        private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class);
071    
072        /**
073         * StateTransitRunnable is the runnable which is scheduled to run at the configured interval.
074         * <p/>
075         * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0
076         * (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
077         * SUCCEEDED.
078         */
079        public static class StatusTransitRunnable implements Runnable {
080            private JPAService jpaService = null;
081            private MemoryLocks.LockToken lock;
082    
083            public StatusTransitRunnable() {
084                jpaService = Services.get().get(JPAService.class);
085                if (jpaService == null) {
086                    LOG.error("Missing JPAService");
087                }
088            }
089    
090            @Override
091            public void run() {
092                try {
093                    Date curDate = new Date(); // records the start time of this service run;
094    
095                    // first check if there is some other instance running;
096                    lock = Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(),
097                            lockTimeout);
098                    if (lock == null) {
099                        LOG.info("This StatusTransitService instance"
100                                + " will not run since there is already an instance running");
101                    }
102                    else {
103                        LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName());
104                        // running coord jobs transit service
105                        coordTransit();
106                        // running bundle jobs transit service
107                        bundleTransit();
108    
109                        lastInstanceStartTime = curDate;
110                    }
111                }
112                catch (Exception ex) {
113                    LOG.warn("Exception happened during StatusTransitRunnable ", ex);
114                }
115                finally {
116                    // release lock;
117                    if (lock != null) {
118                        lock.release();
119                        LOG.info("Released lock for [{0}]", StatusTransitService.class.getName());
120                    }
121                }
122            }
123    
124            public List<BundleJobBean> removeDuplicates(List<BundleJobBean> pendingJobList) {
125                Set<BundleJobBean> s = new TreeSet<BundleJobBean>(new Comparator<BundleJobBean>() {
126                    @Override
127                    public int compare(BundleJobBean b1, BundleJobBean b2) {
128                        if (b1.getId().equals(b2.getId())) {
129                            return 0;
130                        }
131                        else
132                            return 1;
133                    }
134                });
135                s.addAll(pendingJobList);
136                return new ArrayList<BundleJobBean>(s);
137            }
138    
139            /**
140             * Aggregate bundle actions' status to bundle jobs
141             *
142             * @throws JPAExecutorException thrown if failed in db updates or retrievals
143             * @throws CommandException thrown if failed to run commands
144             */
145            private void bundleTransit() throws JPAExecutorException, CommandException {
146                List<BundleJobBean> pendingJobCheckList = null;
147    
148                if (lastInstanceStartTime == null) {
149                    LOG.info("Running bundle status service first instance");
150                    // this is the first instance, we need to check for all pending or running jobs;
151                    pendingJobCheckList = jpaService.execute(new BundleJobsGetRunningOrPendingJPAExecutor(limit));
152                }
153                else {
154                    LOG.info("Running bundle status service from last instance time =  "
155                            + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
156                    // this is not the first instance, we should only check jobs that have actions been
157                    // updated >= start time of last service run;
158                    List<BundleActionBean> actionList = jpaService
159                            .execute(new BundleActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
160                    Set<String> bundleIds = new HashSet<String>();
161                    for (BundleActionBean action : actionList) {
162                        bundleIds.add(action.getBundleId());
163                    }
164                    pendingJobCheckList = new ArrayList<BundleJobBean>();
165                    for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) {
166                        BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId));
167                        // Running bundle job might have pending false
168                        if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING)
169                                || bundle.getStatus().equals(Job.Status.RUNNINGWITHERROR)
170                                || bundle.getStatus().equals(Job.Status.PAUSED)
171                                || bundle.getStatus().equals(Job.Status.PAUSEDWITHERROR)) {
172                            pendingJobCheckList.add(bundle);
173                        }
174                    }
175                }
176                aggregateBundleJobsStatus(pendingJobCheckList);
177            }
178    
179            private void aggregateBundleJobsStatus(List<BundleJobBean> bundleLists) throws JPAExecutorException,
180                    CommandException {
181                if (bundleLists != null) {
182                        for (BundleJobBean bundleJob : bundleLists) {
183                            try {
184                                String jobId = bundleJob.getId();
185                                Job.Status[] bundleStatus = new Job.Status[1];
186                                bundleStatus[0] = bundleJob.getStatus();
187                                List<BundleActionBean> bundleActions = jpaService.execute(new BundleActionsGetStatusPendingJPAExecutor(
188                                        jobId));
189                                HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>();
190                                boolean foundPending = false;
191                                for (BundleActionBean bAction : bundleActions) {
192                                        int counter = 0;
193                                        if (bundleActionStatus.containsKey(bAction.getStatus())) {
194                                            counter = bundleActionStatus.get(bAction.getStatus()) + 1;
195                                        }
196                                        else {
197                                            ++counter;
198                                        }
199                                        bundleActionStatus.put(bAction.getStatus(), counter);
200                                        if (bAction.getCoordId() == null
201                                                && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED)) {
202                                            (new BundleKillXCommand(jobId)).call();
203                                            LOG.info("Bundle job ["+ jobId
204                                                            + "] has been killed since one of its coordinator job failed submission.");
205                                        }
206    
207                                     if (bAction.isPending()) {
208                                        foundPending = true;
209                                    }
210                                }
211    
212                                if (!foundPending && checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) {
213                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
214                                            + "' from '" + bundleJob.getStatus() + "'");
215                                    updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
216                                }
217                                else if (!foundPending && checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) {
218                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
219                                            + "' from '" + bundleJob.getStatus() + "'");
220                                    updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
221                                }
222                                else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) {
223                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
224                                            + "' from '" + bundleJob.getStatus() + "'");
225                                    updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
226                                }
227                                else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus, foundPending)) {
228                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
229                                            + "' from '" + bundleJob.getStatus() + "'");
230                                    updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
231                                }
232                                else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) {
233                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
234                                            + "' from '" + bundleJob.getStatus() + "'");
235                                    updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
236                                }
237                            }
238                            catch (Exception ex) {
239                                LOG.error("Exception happened during aggregate bundle job's status, job = "
240                                        + bundleJob.getId(), ex);
241                            }
242                        }
243                    }
244    
245            }
246    
247            private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException,
248                    CommandException {
249                if (CoordList != null) {
250                    Configuration conf = Services.get().getConf();
251                    boolean backwardSupportForCoordStatus = conf.getBoolean(CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
252    
253                    for (CoordinatorJobBean coordJob : CoordList) {
254                        try {
255                            // if namespace 0.1 is used and backward support is true, then ignore this coord job
256                            if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
257                                    && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
258                                continue;
259                            }
260                            String jobId = coordJob.getId();
261                            Job.Status[] coordStatus = new Job.Status[1];
262                            coordStatus[0] = coordJob.getStatus();
263                            //Get count of Coordinator actions with pending true
264                            boolean isPending = false;
265                            int count = jpaService.execute(new CoordJobGetPendingActionsCountJPAExecutor(jobId));
266                            if (count > 0) {
267                                 isPending = true;
268                            }
269                            // Get status of Coordinator actions
270                            List<CoordinatorAction.Status> coordActionStatusList = jpaService
271                                    .execute(new CoordJobGetActionsStatusJPAExecutor(jobId));
272                            HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>();
273    
274                            for (CoordinatorAction.Status status : coordActionStatusList) {
275                                int counter = 0;
276                                if (coordActionStatus.containsKey(status)) {
277                                    counter = coordActionStatus.get(status) + 1;
278                                }
279                                else {
280                                    ++counter;
281                                }
282                                coordActionStatus.put(status, counter);
283                            }
284    
285                            int nonPendingCoordActionsCount = coordActionStatusList.size();
286                            boolean isDoneMaterialization = coordJob.isDoneMaterialization();
287                            if ((isDoneMaterialization || coordStatus[0] == Job.Status.FAILED || coordStatus[0] == Job.Status.KILLED)
288                                    && checkCoordTerminalStatus(coordActionStatus, nonPendingCoordActionsCount,
289                                            coordStatus, isDoneMaterialization)) {
290                                LOG.info("Set coordinator job [" + jobId + "] status to '" + coordStatus[0].toString()
291                                        + "' from '" + coordJob.getStatus() + "'");
292                                updateCoordJob(isPending, coordJob, coordStatus[0]);
293                            }
294                            else if (checkCoordPausedStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
295                                LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
296                                        + "' from '" + coordJob.getStatus() + "'");
297                                updateCoordJob(isPending, coordJob, coordStatus[0]);
298                            }
299                            else if(checkCoordSuspendStatus( coordActionStatus, nonPendingCoordActionsCount, coordStatus, coordJob.isDoneMaterialization(), isPending)) {
300                                LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
301                                        + "' from '" + coordJob.getStatus() + "'");
302                                updateCoordJob(isPending, coordJob, coordStatus[0]);
303                            }
304                            else if (checkCoordRunningStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
305                                LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
306                                        + "' from '" + coordJob.getStatus() + "'");
307                                updateCoordJob(isPending, coordJob, coordStatus[0]);
308                            }
309                            else {
310                                checkCoordPending(isPending, coordJob, true);
311                            }
312                        }
313                        catch (Exception ex) {
314                            LOG.error("Exception happened during aggregate coordinator job's status, job = "
315                                    + coordJob.getId(), ex);
316                        }
317                    }
318    
319                }
320            }
321    
322            private boolean checkTerminalStatus(HashMap<Job.Status, Integer> bundleActionStatus,
323                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
324                boolean ret = false;
325                int totalValuesSucceed = 0;
326                if (bundleActionStatus.containsKey(Job.Status.SUCCEEDED)) {
327                    totalValuesSucceed = bundleActionStatus.get(Job.Status.SUCCEEDED);
328                }
329                int totalValuesFailed = 0;
330                if (bundleActionStatus.containsKey(Job.Status.FAILED)) {
331                    totalValuesFailed = bundleActionStatus.get(Job.Status.FAILED);
332                }
333                int totalValuesKilled = 0;
334                if (bundleActionStatus.containsKey(Job.Status.KILLED)) {
335                    totalValuesKilled = bundleActionStatus.get(Job.Status.KILLED);
336                }
337    
338                int totalValuesDoneWithError = 0;
339                if (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)) {
340                    totalValuesDoneWithError = bundleActionStatus.get(Job.Status.DONEWITHERROR);
341                }
342    
343                if (bundleActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesDoneWithError)) {
344                    // If all the bundle actions are succeeded then bundle job should be succeeded.
345                    if (bundleActions.size() == totalValuesSucceed) {
346                        bundleStatus[0] = Job.Status.SUCCEEDED;
347                        ret = true;
348                    }
349                    else if (bundleActions.size() == totalValuesKilled) {
350                        // If all the bundle actions are KILLED then bundle job should be KILLED.
351                        bundleStatus[0] = Job.Status.KILLED;
352                        ret = true;
353                    }
354                    else if (bundleActions.size() == totalValuesFailed) {
355                        // If all the bundle actions are FAILED then bundle job should be FAILED.
356                        bundleStatus[0] = Job.Status.FAILED;
357                        ret = true;
358                    }
359                    else {
360                        bundleStatus[0] = Job.Status.DONEWITHERROR;
361                        ret = true;
362                    }
363                }
364                return ret;
365            }
366    
367            private boolean checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
368                    int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization) {
369                boolean ret = false;
370                int totalValuesSucceed = 0;
371                if (coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) {
372                    totalValuesSucceed = coordActionStatus.get(CoordinatorAction.Status.SUCCEEDED);
373                }
374                int totalValuesFailed = 0;
375                if (coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)) {
376                    totalValuesFailed = coordActionStatus.get(CoordinatorAction.Status.FAILED);
377                }
378                int totalValuesKilled = 0;
379                if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) {
380                    totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED);
381                }
382    
383                int totalValuesTimeOut = 0;
384                if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
385                    totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT);
386                }
387    
388                if (coordActionsCount == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut)) {
389                    // If all the coordinator actions are succeeded then coordinator job should be succeeded.
390                    if (coordActionsCount == totalValuesSucceed && isDoneMaterialization) {
391                        coordStatus[0] = Job.Status.SUCCEEDED;
392                        ret = true;
393                    }
394                    else if (coordActionsCount == totalValuesKilled) {
395                        // If all the coordinator actions are KILLED then coordinator job should be KILLED.
396                        coordStatus[0] = Job.Status.KILLED;
397                        ret = true;
398                    }
399                    else if (coordActionsCount == totalValuesFailed) {
400                        // If all the coordinator actions are FAILED then coordinator job should be FAILED.
401                        coordStatus[0] = Job.Status.FAILED;
402                        ret = true;
403                    }
404                    else {
405                        coordStatus[0] = Job.Status.DONEWITHERROR;
406                        ret = true;
407                    }
408                }
409                return ret;
410            }
411    
412            private boolean checkPrepStatus(HashMap<Job.Status, Integer> bundleActionStatus,
413                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
414                boolean ret = false;
415                if (bundleActionStatus.containsKey(Job.Status.PREP)) {
416                    // If all the bundle actions are PREP then bundle job should be RUNNING.
417                    if (bundleActions.size() > bundleActionStatus.get(Job.Status.PREP)) {
418                        bundleStatus[0] = Job.Status.RUNNING;
419                        ret = true;
420                    }
421                }
422                return ret;
423            }
424    
425            private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus,
426                    List<BundleActionBean> bundleActions, Job.Status[] bundleJobStatus) {
427                boolean ret = false;
428    
429                // TODO - When bottom up cmds are allowed to change the status of parent job,
430                // if none of the bundle actions are in paused or pausedwitherror, the function should return
431                // false
432    
433                // top down
434                // If the bundle job is PAUSED or PAUSEDINERROR and no children are in error
435                // state, then job should be PAUSED otherwise it should be pausedwitherror
436                if (bundleJobStatus[0] == Job.Status.PAUSED || bundleJobStatus[0] == Job.Status.PAUSEDWITHERROR) {
437                    if (bundleActionStatus.containsKey(Job.Status.KILLED)
438                            || bundleActionStatus.containsKey(Job.Status.FAILED)
439                            || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
440                            || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
441                            || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)
442                            || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
443                        bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
444                    }
445                    else {
446                        bundleJobStatus[0] = Job.Status.PAUSED;
447                    }
448                    ret = true;
449                }
450    
451                // bottom up; check the status of parent through their children
452                else if (bundleActionStatus.containsKey(Job.Status.PAUSED)
453                        && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED))) {
454                    bundleJobStatus[0] = Job.Status.PAUSED;
455                    ret = true;
456                }
457                else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
458                    int pausedActions = bundleActionStatus.containsKey(Job.Status.PAUSED) ? bundleActionStatus
459                            .get(Job.Status.PAUSED) : 0;
460                    if (bundleActions.size() == pausedActions + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR)) {
461                        bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
462                        ret = true;
463                    }
464                }
465                else {
466                    ret = false;
467                }
468                return ret;
469            }
470    
471    
472            private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus,
473                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus, boolean isPending) {
474                boolean ret = false;
475    
476                // TODO - When bottom up cmds are allowed to change the status of parent job,
477                // if none of the bundle actions are in suspended or suspendedwitherror, the function should return
478                // false
479    
480                // top down
481                // if job is suspended
482                if (bundleStatus[0] == Job.Status.SUSPENDED
483                        || bundleStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
484                    if (bundleActionStatus.containsKey(Job.Status.KILLED)
485                            || bundleActionStatus.containsKey(Job.Status.FAILED)
486                            || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
487                            || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
488                            || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
489                        bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
490                    }
491                    else {
492                        bundleStatus[0] = Job.Status.SUSPENDED;
493                    }
494                    ret =true;
495                }
496    
497                // bottom up
498                // Update status of parent from the status of its children
499                else if (!isPending && bundleActionStatus.containsKey(Job.Status.SUSPENDED)
500                        || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) {
501                    int succeededActions = bundleActionStatus.containsKey(Job.Status.SUCCEEDED) ? bundleActionStatus
502                            .get(Job.Status.SUCCEEDED) : 0;
503                    int killedActions = bundleActionStatus.containsKey(Job.Status.KILLED) ? bundleActionStatus
504                            .get(Job.Status.KILLED) : 0;
505                    int failedActions = bundleActionStatus.containsKey(Job.Status.FAILED) ? bundleActionStatus
506                            .get(Job.Status.FAILED) : 0;
507                    int doneWithErrorActions = bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) ? bundleActionStatus
508                            .get(Job.Status.DONEWITHERROR) : 0;
509    
510                    if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions) {
511                        bundleStatus[0] = Job.Status.SUSPENDED;
512                        ret = true;
513                    }
514                    else if (bundleActions.size()  == bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR)
515                            + bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions + killedActions + failedActions + doneWithErrorActions) {
516                        bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
517                        ret = true;
518                    }
519                }
520                return ret;
521    
522            }
523    
524            private boolean checkCoordPausedStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
525                    int coordActionsCount, Job.Status[] coordStatus){
526                boolean ret = false;
527                if (coordStatus[0].equals(Job.Status.PAUSED) || coordStatus[0].equals(Job.Status.PAUSEDWITHERROR)) {
528                    if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
529                            || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
530                            || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
531                        coordStatus[0] = Job.Status.PAUSEDWITHERROR;
532                    }
533                    else {
534                        coordStatus[0] = Job.Status.PAUSED;
535                    }
536                    ret = true;
537                }
538                return ret;
539            }
540            private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
541                    int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization, boolean isPending) {
542                boolean ret = false;
543    
544                // TODO - When bottom up cmds are allowed to change the status of parent job
545                //if none of the coord actions are in suspended or suspendedwitherror and materialization done is false
546                //,then the function should return
547                // false
548    
549                // top down
550                // check for children only when parent is suspended
551                if (coordStatus[0] == Job.Status.SUSPENDED || coordStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
552    
553                    if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
554                            || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
555                            || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
556                        coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
557                    }
558                    else {
559                        coordStatus[0] = Job.Status.SUSPENDED;
560                    }
561                    ret = true;
562                }
563                // bottom up
564                // look for children to check the parent's status only if materialization is
565                // done and all actions are non-pending
566                else if (isDoneMaterialization && !isPending && coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) {
567                    int succeededActions = coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED) ? coordActionStatus
568                           .get(CoordinatorAction.Status.SUCCEEDED) : 0;
569                    int killedActions = coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) ? coordActionStatus
570                            .get(CoordinatorAction.Status.KILLED) : 0;
571                    int failedActions = coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) ? coordActionStatus
572                            .get(CoordinatorAction.Status.FAILED) : 0;
573                    int timedoutActions = coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) ? coordActionStatus
574                            .get(CoordinatorAction.Status.TIMEDOUT) : 0;
575    
576                    if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) + succeededActions) {
577                        coordStatus[0] = Job.Status.SUSPENDED;
578                        ret = true;
579                    }
580                    else if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)
581                            + succeededActions + killedActions + failedActions + timedoutActions) {
582                        coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
583                        ret = true;
584                    }
585                }
586                return ret;
587            }
588    
589            private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
590                    int coordActionsCount, Job.Status[] coordStatus) {
591                boolean ret = false;
592                if (coordStatus[0] != Job.Status.PREP) {
593                    if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
594                            || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
595                            || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
596                        coordStatus[0] = Job.Status.RUNNINGWITHERROR;
597                    }
598                    else {
599                        coordStatus[0] = Job.Status.RUNNING;
600                    }
601                    ret = true;
602                }
603                return ret;
604            }
605    
606            private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus,
607                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
608                boolean ret = false;
609                if (bundleStatus[0] != Job.Status.PREP) {
610                    if (bundleActionStatus.containsKey(Job.Status.FAILED)
611                            || bundleActionStatus.containsKey(Job.Status.KILLED)
612                            || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
613                            || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) {
614                        bundleStatus[0] = Job.Status.RUNNINGWITHERROR;
615                    }
616                    else {
617                        bundleStatus[0] = Job.Status.RUNNING;
618                    }
619                    ret = true;
620                }
621                return ret;
622    
623            }
624    
625            private void updateBundleJob(boolean isPending, BundleJobBean bundleJob, Job.Status bundleStatus)
626                    throws JPAExecutorException {
627                String jobId = bundleJob.getId();
628                // Update the Bundle Job
629                // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
630                // PAUSEDWITHERROR is not supported
631                bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus));
632                if (isPending) {
633                    bundleJob.setPending();
634                    LOG.info("Bundle job [" + jobId + "] Pending set to TRUE");
635                }
636                else {
637                    bundleJob.resetPending();
638                    LOG.info("Bundle job [" + jobId + "] Pending set to FALSE");
639                }
640                jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
641            }
642    
643            private void updateCoordJob(boolean isPending, CoordinatorJobBean coordJob, Job.Status coordStatus)
644                    throws JPAExecutorException, CommandException {
645                Job.Status prevStatus = coordJob.getStatus();
646                // Update the Coord Job
647                if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED
648                        || coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) {
649                    if (coordStatus == Job.Status.SUSPENDED || coordStatus == Job.Status.SUSPENDEDWITHERROR) {
650                        LOG.info("Coord Job [" + coordJob.getId()
651                                + "] status to "+ coordStatus +" can not be updated as its already in Terminal state");
652                        return;
653                    }
654                }
655    
656                boolean isPendingStateChanged = checkCoordPending(isPending, coordJob, false);
657                // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and PAUSEDWITHERROR is
658                // not supported
659                coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(coordStatus));
660                // Backward support when coordinator namespace is 0.1
661                coordJob.setStatus(StatusUtils.getStatus(coordJob));
662                if (coordJob.getStatus() != prevStatus || isPendingStateChanged) {
663                    LOG.debug("Updating coord job " + coordJob.getId());
664                    coordJob.setLastModifiedTime(new Date());
665                    jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
666                }
667                // update bundle action only when status changes in coord job
668                if (coordJob.getBundleId() != null) {
669                    if (!prevStatus.equals(coordJob.getStatus())) {
670                        BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
671                        bundleStatusUpdate.call();
672                    }
673                }
674            }
675    
676            private boolean checkCoordPending(boolean isPending, CoordinatorJobBean coordJob, boolean saveToDB)
677                    throws JPAExecutorException {
678                // Checking the coordinator pending should be updated or not
679                boolean prevPending = coordJob.isPending();
680                if (isPending) {
681                    coordJob.setPending();
682                    LOG.info("Coord job [" + coordJob.getId() + "] Pending set to TRUE");
683                }
684                else {
685                    coordJob.resetPending();
686                    LOG.info("Coord job [" + coordJob.getId() + "] Pending set to FALSE");
687                }
688                boolean hasChange = prevPending != coordJob.isPending();
689                if (saveToDB && hasChange) {
690                    jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
691                }
692                return hasChange;
693    
694            }
695    
696            /**
697             * Aggregate coordinator actions' status to coordinator jobs
698             *
699             * @throws JPAExecutorException thrown if failed in db updates or retrievals
700             * @throws CommandException thrown if failed to run commands
701             */
702            private void coordTransit() throws JPAExecutorException, CommandException {
703                List<CoordinatorJobBean> pendingJobCheckList = null;
704                if (lastInstanceStartTime == null) {
705                    LOG.info("Running coordinator status service first instance");
706                    // this is the first instance, we need to check for all pending jobs;
707                    pendingJobCheckList = jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit));
708                }
709                else {
710                    LOG.info("Running coordinator status service from last instance time =  "
711                            + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
712                    // this is not the first instance, we should only check jobs
713                    // that have actions been
714                    // updated >= start time of last service run;
715                    List<String> coordJobIdList = jpaService
716                            .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
717                    Set<String> coordIds = new HashSet<String>();
718                    for (String coordJobId : coordJobIdList) {
719                        coordIds.add(coordJobId);
720                    }
721                    pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
722                    for (String coordId : coordIds.toArray(new String[coordIds.size()])) {
723                        CoordinatorJobBean coordJob;
724                        try{
725                            coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId));
726                        }
727                        catch (JPAExecutorException jpaee) {
728                            if (jpaee.getErrorCode().equals(ErrorCode.E0604)) {
729                                LOG.warn("Exception happened during StatusTransitRunnable; Coordinator Job doesn't exist", jpaee);
730                                continue;
731                            } else {
732                                throw jpaee;
733                            }
734                        }
735                        // Running coord job might have pending false
736                        Job.Status coordJobStatus = coordJob.getStatus();
737                        if (coordJob.isPending() || coordJobStatus.equals(Job.Status.PAUSED)
738                                || coordJobStatus.equals(Job.Status.RUNNING)
739                                || coordJobStatus.equals(Job.Status.RUNNINGWITHERROR)
740                                || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
741                            pendingJobCheckList.add(coordJob);
742                        }
743                    }
744                }
745                aggregateCoordJobsStatus(pendingJobCheckList);
746            }
747        }
748    
749        /**
750         * Initializes the {@link StatusTransitService}.
751         *
752         * @param services services instance.
753         */
754        @Override
755        public void init(Services services) {
756            Configuration conf = services.getConf();
757            Runnable stateTransitRunnable = new StatusTransitRunnable();
758            services.get(SchedulerService.class).schedule(stateTransitRunnable, 10,
759                    conf.getInt(CONF_STATUSTRANSIT_INTERVAL, 60), SchedulerService.Unit.SEC);
760        }
761    
762        /**
763         * Destroy the StateTransit Jobs Service.
764         */
765        @Override
766        public void destroy() {
767        }
768    
769        /**
770         * Return the public interface for the purge jobs service.
771         *
772         * @return {@link StatusTransitService}.
773         */
774        @Override
775        public Class<? extends Service> getInterface() {
776            return StatusTransitService.class;
777        }
778    }
779