001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.util.ArrayList;
021    import java.util.Date;
022    import java.util.HashMap;
023    import java.util.HashSet;
024    import java.util.List;
025    import java.util.Set;
026    
027    import org.apache.hadoop.conf.Configuration;
028    import org.apache.oozie.BundleActionBean;
029    import org.apache.oozie.BundleJobBean;
030    import org.apache.oozie.CoordinatorActionBean;
031    import org.apache.oozie.CoordinatorJobBean;
032    import org.apache.oozie.client.CoordinatorAction;
033    import org.apache.oozie.client.Job;
034    import org.apache.oozie.command.CommandException;
035    import org.apache.oozie.command.bundle.BundleKillXCommand;
036    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
037    import org.apache.oozie.executor.jpa.BundleActionsGetByLastModifiedTimeJPAExecutor;
038    import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
039    import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
040    import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
041    import org.apache.oozie.executor.jpa.BundleJobsGetPendingJPAExecutor;
042    import org.apache.oozie.executor.jpa.BundleJobsGetRunningJPAExecutor;
043    import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor;
044    import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor;
045    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
046    import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
047    import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor;
048    import org.apache.oozie.executor.jpa.JPAExecutorException;
049    import org.apache.oozie.util.DateUtils;
050    import org.apache.oozie.util.MemoryLocks;
051    import org.apache.oozie.util.StatusUtils;
052    import org.apache.oozie.util.XLog;
053    
054    /**
055     * StateTransitService is scheduled to run at the configured interval.
056     * <p/>
057     * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 (job
058     * done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
059     * SUCCEEDED.
060     */
061    public class StatusTransitService implements Service {
062        public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService.";
063        public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval";
064        public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status";
065        private static int limit = -1;
066        private static Date lastInstanceStartTime = null;
067        private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class);
068    
069        /**
070         * StateTransitRunnable is the runnable which is scheduled to run at the configured interval.
071         * <p/>
072         * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0
073         * (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
074         * SUCCEEDED.
075         */
076        static class StatusTransitRunnable implements Runnable {
077            private JPAService jpaService = null;
078            private MemoryLocks.LockToken lock;
079    
080            public StatusTransitRunnable() {
081                jpaService = Services.get().get(JPAService.class);
082                if (jpaService == null) {
083                    LOG.error("Missing JPAService");
084                }
085            }
086    
087            public void run() {
088                try {
089                    Date curDate = new Date(); // records the start time of this service run;
090    
091                    // first check if there is some other instance running;
092                    lock = Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(),
093                            lockTimeout);
094                    if (lock == null) {
095                        LOG.info("This StatusTransitService instance"
096                                + " will not run since there is already an instance running");
097                    }
098                    else {
099                        LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName());
100                        // running coord jobs transit service
101                        coordTransit();
102                        // running bundle jobs transit service
103                        bundleTransit();
104    
105                        lastInstanceStartTime = curDate;
106                    }
107                }
108                catch (Exception ex) {
109                    LOG.warn("Exception happened during StatusTransitRunnable ", ex);
110                }
111                finally {
112                    // release lock;
113                    if (lock != null) {
114                        lock.release();
115                        LOG.info("Released lock for [{0}]", StatusTransitService.class.getName());
116                    }
117                }
118            }
119    
120            /**
121             * Aggregate bundle actions' status to bundle jobs
122             *
123             * @throws JPAExecutorException thrown if failed in db updates or retrievals
124             * @throws CommandException thrown if failed to run commands
125             */
126            private void bundleTransit() throws JPAExecutorException, CommandException {
127                List<BundleJobBean> pendingJobCheckList = null;
128                List<BundleJobBean> runningJobCheckList = null;
129                List<List<BundleJobBean>> bundleLists = new ArrayList<List<BundleJobBean>>();
130                if (lastInstanceStartTime == null) {
131                    LOG.info("Running bundle status service first instance");
132                    // this is the first instance, we need to check for all pending jobs;
133                    pendingJobCheckList = jpaService.execute(new BundleJobsGetPendingJPAExecutor(limit));
134                    runningJobCheckList = jpaService.execute(new BundleJobsGetRunningJPAExecutor(limit));
135                    bundleLists.add(pendingJobCheckList);
136                    bundleLists.add(runningJobCheckList);
137                }
138                else {
139                    LOG.info("Running bundle status service from last instance time =  "
140                            + DateUtils.convertDateToString(lastInstanceStartTime));
141                    // this is not the first instance, we should only check jobs that have actions been
142                    // updated >= start time of last service run;
143                    List<BundleActionBean> actionList = jpaService
144                            .execute(new BundleActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
145                    Set<String> bundleIds = new HashSet<String>();
146                    for (BundleActionBean action : actionList) {
147                        bundleIds.add(action.getBundleId());
148                    }
149                    pendingJobCheckList = new ArrayList<BundleJobBean>();
150                    for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) {
151                        BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId));
152                        // Running bundle job might have pending false
153                        if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING)) {
154                            pendingJobCheckList.add(bundle);
155                        }
156                    }
157                    runningJobCheckList = pendingJobCheckList;
158                    bundleLists.add(pendingJobCheckList);
159                }
160                aggregateBundleJobsStatus(bundleLists);
161            }
162    
163            private void aggregateBundleJobsStatus(List<List<BundleJobBean>> bundleLists) throws JPAExecutorException,
164                    CommandException {
165                if (bundleLists != null) {
166                    for (List<BundleJobBean> listBundleBean : bundleLists) {
167                        for (BundleJobBean bundleJob : listBundleBean) {
168                            try {
169                                String jobId = bundleJob.getId();
170                                Job.Status[] bundleStatus = new Job.Status[1];
171                                bundleStatus[0] = bundleJob.getStatus();
172                                List<BundleActionBean> bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(
173                                        jobId));
174                                HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>();
175                                boolean foundPending = false;
176                                for (BundleActionBean bAction : bundleActions) {
177                                    if (!bAction.isPending()) {
178                                        int counter = 0;
179                                        if (bundleActionStatus.containsKey(bAction.getStatus())) {
180                                            counter = bundleActionStatus.get(bAction.getStatus()) + 1;
181                                        }
182                                        else {
183                                            ++counter;
184                                        }
185                                        bundleActionStatus.put(bAction.getStatus(), counter);
186                                        if (bAction.getCoordId() == null
187                                                && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED)) {
188                                            (new BundleKillXCommand(jobId)).call();
189                                            LOG.info("Bundle job ["+ jobId
190                                                            + "] has been killed since one of its coordinator job failed submission.");
191                                        }
192                                    }
193                                    else {
194                                        foundPending = true;
195                                        break;
196                                    }
197                                }
198    
199                                if (foundPending) {
200                                    continue;
201                                }
202    
203                                if (checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) {
204                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
205                                            + "' from '" + bundleJob.getStatus() + "'");
206                                    updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
207                                }
208                                else if (checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) {
209                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
210                                            + "' from '" + bundleJob.getStatus() + "'");
211                                    updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
212                                }
213                                else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) {
214                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
215                                            + "' from '" + bundleJob.getStatus() + "'");
216                                    updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
217                                }
218                                else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus)) {
219                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
220                                            + "' from '" + bundleJob.getStatus() + "'");
221                                    updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
222                                }
223                                else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) {
224                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
225                                            + "' from '" + bundleJob.getStatus() + "'");
226                                    updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
227                                }
228                            }
229                            catch (Exception ex) {
230                                LOG.error("Exception happened during aggregate bundle job's status, job = "
231                                        + bundleJob.getId(), ex);
232                            }
233                        }
234                    }
235                }
236            }
237    
238            private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException,
239                    CommandException {
240                if (CoordList != null) {
241                    Configuration conf = Services.get().getConf();
242                    boolean backwardSupportForCoordStatus = conf.getBoolean(CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
243    
244                    for (CoordinatorJobBean coordJob : CoordList) {
245                        try {
246                            // if namespace 0.1 is used and backward support is true, then ignore this coord job
247                            if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
248                                    && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
249                                continue;
250                            }
251                            String jobId = coordJob.getId();
252                            Job.Status[] coordStatus = new Job.Status[1];
253                            coordStatus[0] = coordJob.getStatus();
254                            List<CoordinatorActionBean> coordActions = jpaService
255                                    .execute(new CoordJobGetActionsJPAExecutor(jobId));
256                            HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>();
257                            boolean foundPending = false;
258                            for (CoordinatorActionBean cAction : coordActions) {
259                                if (!cAction.isPending()) {
260                                    int counter = 0;
261                                    if (coordActionStatus.containsKey(cAction.getStatus())) {
262                                        counter = coordActionStatus.get(cAction.getStatus()) + 1;
263                                    }
264                                    else {
265                                        ++counter;
266                                    }
267                                    coordActionStatus.put(cAction.getStatus(), counter);
268                                }
269                                else {
270                                    foundPending = true;
271                                    break;
272                                }
273                            }
274    
275                            if (foundPending) {
276                                continue;
277                            }
278    
279                            if (coordJob.isDoneMaterialization()
280                                    && checkCoordTerminalStatus(coordActionStatus, coordActions, coordStatus)) {
281                                LOG.info("Set coordinator job [" + jobId + "] status to '" + coordStatus[0].toString()
282                                        + "' from '" + coordJob.getStatus() + "'");
283                                updateCoordJob(coordActionStatus, coordActions, coordJob, coordStatus[0]);
284                            }
285                            else if (coordJob.isDoneMaterialization()
286                                    && checkCoordSuspendStatus(coordActionStatus, coordActions, coordStatus)) {
287                                LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
288                                        + "' from '" + coordJob.getStatus() + "'");
289                                updateCoordJob(coordActionStatus, coordActions, coordJob, coordStatus[0]);
290                            }
291                            else if (checkCoordRunningStatus(coordActionStatus, coordActions, coordStatus)) {
292                                LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
293                                        + "' from '" + coordJob.getStatus() + "'");
294                                updateCoordJob(coordActionStatus, coordActions, coordJob, coordStatus[0]);
295                            }
296                            // checking pending flag for job when user killed or suspended the job
297                            else {
298                                checkCoordPending(coordActionStatus, coordActions, coordJob, true);
299                            }
300                        }
301                        catch (Exception ex) {
302                            LOG.error("Exception happened during aggregate coordinator job's status, job = "
303                                    + coordJob.getId(), ex);
304                        }
305                    }
306    
307                }
308            }
309    
310            private boolean checkTerminalStatus(HashMap<Job.Status, Integer> bundleActionStatus,
311                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
312                boolean ret = false;
313                int totalValuesSucceed = 0;
314                if (bundleActionStatus.containsKey(Job.Status.SUCCEEDED)) {
315                    totalValuesSucceed = bundleActionStatus.get(Job.Status.SUCCEEDED);
316                }
317                int totalValuesFailed = 0;
318                if (bundleActionStatus.containsKey(Job.Status.FAILED)) {
319                    totalValuesFailed = bundleActionStatus.get(Job.Status.FAILED);
320                }
321                int totalValuesKilled = 0;
322                if (bundleActionStatus.containsKey(Job.Status.KILLED)) {
323                    totalValuesKilled = bundleActionStatus.get(Job.Status.KILLED);
324                }
325    
326                int totalValuesDoneWithError = 0;
327                if (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)) {
328                    totalValuesDoneWithError = bundleActionStatus.get(Job.Status.DONEWITHERROR);
329                }
330    
331                if (bundleActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesDoneWithError)) {
332                    // If all the bundle actions are succeeded then bundle job should be succeeded.
333                    if (bundleActions.size() == totalValuesSucceed) {
334                        bundleStatus[0] = Job.Status.SUCCEEDED;
335                        ret = true;
336                    }
337                    else if (bundleActions.size() == totalValuesKilled) {
338                        // If all the bundle actions are KILLED then bundle job should be KILLED.
339                        bundleStatus[0] = Job.Status.KILLED;
340                        ret = true;
341                    }
342                    else if (bundleActions.size() == totalValuesFailed) {
343                        // If all the bundle actions are FAILED then bundle job should be FAILED.
344                        bundleStatus[0] = Job.Status.FAILED;
345                        ret = true;
346                    }
347                    else {
348                        bundleStatus[0] = Job.Status.DONEWITHERROR;
349                        ret = true;
350                    }
351                }
352                return ret;
353            }
354    
355            private boolean checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
356                    List<CoordinatorActionBean> coordActions, Job.Status[] coordStatus) {
357                boolean ret = false;
358                int totalValuesSucceed = 0;
359                if (coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) {
360                    totalValuesSucceed = coordActionStatus.get(CoordinatorAction.Status.SUCCEEDED);
361                }
362                int totalValuesFailed = 0;
363                if (coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)) {
364                    totalValuesFailed = coordActionStatus.get(CoordinatorAction.Status.FAILED);
365                }
366                int totalValuesKilled = 0;
367                if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) {
368                    totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED);
369                }
370    
371                int totalValuesTimeOut = 0;
372                if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
373                    totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT);
374                }
375    
376                if (coordActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut)) {
377                    // If all the coordinator actions are succeeded then coordinator job should be succeeded.
378                    if (coordActions.size() == totalValuesSucceed) {
379                        coordStatus[0] = Job.Status.SUCCEEDED;
380                        ret = true;
381                    }
382                    else if (coordActions.size() == totalValuesKilled) {
383                        // If all the coordinator actions are KILLED then coordinator job should be KILLED.
384                        coordStatus[0] = Job.Status.KILLED;
385                        ret = true;
386                    }
387                    else if (coordActions.size() == totalValuesFailed) {
388                        // If all the coordinator actions are FAILED then coordinator job should be FAILED.
389                        coordStatus[0] = Job.Status.FAILED;
390                        ret = true;
391                    }
392                    else {
393                        coordStatus[0] = Job.Status.DONEWITHERROR;
394                        ret = true;
395                    }
396                }
397                return ret;
398            }
399    
400            private boolean checkPrepStatus(HashMap<Job.Status, Integer> bundleActionStatus,
401                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
402                boolean ret = false;
403                if (bundleActionStatus.containsKey(Job.Status.PREP)) {
404                    // If all the bundle actions are PREP then bundle job should be RUNNING.
405                    if (bundleActions.size() > bundleActionStatus.get(Job.Status.PREP)) {
406                        bundleStatus[0] = Job.Status.RUNNING;
407                        ret = true;
408                    }
409                }
410                return ret;
411            }
412    
413            private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus,
414                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
415                boolean ret = false;
416                if (bundleActionStatus.containsKey(Job.Status.PAUSED)) {
417                    if (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED)) {
418                        bundleStatus[0] = Job.Status.PAUSED;
419                        ret = true;
420                    }
421                    else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)
422                            && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED)
423                                    + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR))) {
424                        // bundleStatus = Job.Status.PAUSEDWITHERROR;
425                        // We need to change this to PAUSEDWITHERROR in future when we add this to coordinator
426                        bundleStatus[0] = Job.Status.PAUSED;
427                        ret = true;
428                    }
429                }
430                return ret;
431            }
432    
433            private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus,
434                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
435                boolean ret = false;
436                if (bundleActionStatus.containsKey(Job.Status.SUSPENDED)) {
437                    if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED)) {
438                        bundleStatus[0] = Job.Status.SUSPENDED;
439                        ret = true;
440                    }
441                    else if (bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
442                            && (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED)
443                                    + bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR))) {
444                        // bundleStatus = Job.Status.SUSPENDEDWITHERROR;
445                        // We need to change this to SUSPENDEDWITHERROR in future when we add this to coordinator
446                        bundleStatus[0] = Job.Status.SUSPENDED;
447                        ret = true;
448                    }
449                }
450                return ret;
451            }
452    
453            private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
454                    List<CoordinatorActionBean> coordActions, Job.Status[] coordStatus) {
455                boolean ret = false;
456                if (coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) {
457                    if (coordActions.size() == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)) {
458                        coordStatus[0] = Job.Status.SUSPENDED;
459                        ret = true;
460                    }
461                }
462                return ret;
463            }
464    
465            private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
466                    List<CoordinatorActionBean> coordActions, Job.Status[] coordStatus) {
467                boolean ret = false;
468                if (coordActionStatus.containsKey(CoordinatorAction.Status.RUNNING)) {
469                    // If all the bundle actions are succeeded then bundle job should be succeeded.
470                    if (coordActions.size() == coordActionStatus.get(CoordinatorAction.Status.RUNNING)) {
471                        coordStatus[0] = Job.Status.RUNNING;
472                        ret = true;
473                    }
474                    else if (coordActionStatus.get(CoordinatorAction.Status.RUNNING) > 0) {
475                        if ((coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) && coordActionStatus.get(CoordinatorAction.Status.FAILED) > 0)
476                                || (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) && coordActionStatus
477                                        .get(CoordinatorAction.Status.KILLED) > 0)
478                                || (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) && coordActionStatus
479                                        .get(CoordinatorAction.Status.TIMEDOUT) > 0)) {
480                            // coordStatus = Job.Status.RUNNINGWITHERROR;
481                            // We need to change this to RUNNINGWIHERROR in future when we add this to coordinator
482                            coordStatus[0] = Job.Status.RUNNING;
483                            ret = true;
484                        }
485                    }
486                }
487                return ret;
488            }
489    
490            private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus,
491                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
492                boolean ret = false;
493                if (bundleActionStatus.containsKey(Job.Status.RUNNING)) {
494                    // If all the bundle actions are succeeded then bundle job should be succeeded.
495                    if (bundleActions.size() == bundleActionStatus.get(Job.Status.RUNNING)) {
496                        bundleStatus[0] = Job.Status.RUNNING;
497                        ret = true;
498                    }
499                    else if (bundleActionStatus.get(Job.Status.RUNNING) > 0) {
500                        if ((bundleActionStatus.containsKey(Job.Status.FAILED) && bundleActionStatus.get(Job.Status.FAILED) > 0)
501                                || (bundleActionStatus.containsKey(Job.Status.KILLED) && bundleActionStatus
502                                        .get(Job.Status.KILLED) > 0)
503                                || (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) && bundleActionStatus
504                                        .get(Job.Status.DONEWITHERROR) > 0)
505                                || (bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR) && bundleActionStatus
506                                        .get(Job.Status.RUNNINGWITHERROR) > 0)) {
507                            // bundleStatus = Job.Status.RUNNINGWITHERROR;
508                            // We need to change this to RUNNINGWIHERROR in future when we add this to coordinator
509                            bundleStatus[0] = Job.Status.RUNNING;
510                            ret = true;
511                        }
512                    }
513                }
514                return ret;
515            }
516    
517            private void updateBundleJob(HashMap<Job.Status, Integer> bundleActionStatus,
518                    List<BundleActionBean> bundleActions, BundleJobBean bundleJob, Job.Status bundleStatus)
519                    throws JPAExecutorException {
520                String jobId = bundleJob.getId();
521                boolean pendingBundleJob = bundleJob.isPending();
522                // Checking the bundle pending should be updated or not
523                int totalNonPendingActions = 0;
524                for (Job.Status js : bundleActionStatus.keySet()) {
525                    totalNonPendingActions += bundleActionStatus.get(js);
526                }
527    
528                if (totalNonPendingActions == bundleActions.size()) {
529                    pendingBundleJob = false;
530                }
531    
532                // Update the Bundle Job
533                bundleJob.setStatus(bundleStatus);
534                if (pendingBundleJob) {
535                    bundleJob.setPending();
536                    LOG.info("Bundle job [" + jobId + "] Pending set to TRUE");
537                }
538                else {
539                    bundleJob.resetPending();
540                    LOG.info("Bundle job [" + jobId + "] Pending set to FALSE");
541                }
542                jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
543            }
544    
545            private void updateCoordJob(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
546                    List<CoordinatorActionBean> coordActions, CoordinatorJobBean coordJob, Job.Status coordStatus)
547                    throws JPAExecutorException, CommandException {
548                Job.Status prevStatus = coordJob.getStatus();
549                // Update the Coord Job
550                if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED
551                        || coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) {
552                    if (coordStatus == Job.Status.SUSPENDED) {
553                        LOG.info("Coord Job [" + coordJob.getId()
554                                + "] status can not be updated as its already in Terminal state");
555                        return;
556                    }
557                }
558    
559                checkCoordPending(coordActionStatus, coordActions, coordJob, false);
560                coordJob.setStatus(coordStatus);
561                coordJob.setStatus(StatusUtils.getStatus(coordJob));
562                coordJob.setLastModifiedTime(new Date());
563                jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
564                // update bundle action only when status changes in coord job
565                if (coordJob.getBundleId() != null) {
566                    if (!prevStatus.equals(coordJob.getStatus())) {
567                        BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
568                        bundleStatusUpdate.call();
569                    }
570                }
571            }
572    
573            private void checkCoordPending(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
574                    List<CoordinatorActionBean> coordActions, CoordinatorJobBean coordJob, boolean saveToDB) throws JPAExecutorException {
575                boolean pendingCoordJob = coordJob.isPending();
576                // Checking the coordinator pending should be updated or not
577                int totalNonPendingActions = 0;
578                for (CoordinatorAction.Status js : coordActionStatus.keySet()) {
579                    totalNonPendingActions += coordActionStatus.get(js);
580                }
581    
582                if (totalNonPendingActions == coordActions.size()) {
583                    pendingCoordJob = false;
584                }
585    
586                if (pendingCoordJob) {
587                    coordJob.setPending();
588                    LOG.info("Coord job [" + coordJob.getId() + "] Pending set to TRUE");
589                }
590                else {
591                    coordJob.resetPending();
592                    LOG.info("Coord job [" + coordJob.getId() + "] Pending set to FALSE");
593                }
594    
595                if (saveToDB) {
596                    jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
597                }
598            }
599    
600            /**
601             * Aggregate coordinator actions' status to coordinator jobs
602             *
603             * @throws JPAExecutorException thrown if failed in db updates or retrievals
604             * @throws CommandException thrown if failed to run commands
605             */
606            private void coordTransit() throws JPAExecutorException, CommandException {
607                List<CoordinatorJobBean> pendingJobCheckList = null;
608                if (lastInstanceStartTime == null) {
609                    LOG.info("Running coordinator status service first instance");
610                    // this is the first instance, we need to check for all pending jobs;
611                    pendingJobCheckList = jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit));
612                }
613                else {
614                    LOG.info("Running coordinator status service from last instance time =  "
615                            + DateUtils.convertDateToString(lastInstanceStartTime));
616                    // this is not the first instance, we should only check jobs that have actions been
617                    // updated >= start time of last service run;
618                    List<CoordinatorActionBean> actionList = jpaService
619                            .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
620                    Set<String> coordIds = new HashSet<String>();
621                    for (CoordinatorActionBean action : actionList) {
622                        coordIds.add(action.getJobId());
623                    }
624                    pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
625                    for (String coordId : coordIds.toArray(new String[coordIds.size()])) {
626                        CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId));
627                        // Running coord job might have pending false
628                        if (coordJob.isPending() || coordJob.getStatus().equals(Job.Status.RUNNING)) {
629                            pendingJobCheckList.add(coordJob);
630                        }
631                    }
632                }
633                aggregateCoordJobsStatus(pendingJobCheckList);
634            }
635        }
636    
637        /**
638         * Initializes the {@link StatusTransitService}.
639         *
640         * @param services services instance.
641         */
642        @Override
643        public void init(Services services) {
644            Configuration conf = services.getConf();
645            Runnable stateTransitRunnable = new StatusTransitRunnable();
646            services.get(SchedulerService.class).schedule(stateTransitRunnable, 10,
647                    conf.getInt(CONF_STATUSTRANSIT_INTERVAL, 60), SchedulerService.Unit.SEC);
648        }
649    
650        /**
651         * Destroy the StateTransit Jobs Service.
652         */
653        @Override
654        public void destroy() {
655        }
656    
657        /**
658         * Return the public interface for the purge jobs service.
659         *
660         * @return {@link StatusTransitService}.
661         */
662        @Override
663        public Class<? extends Service> getInterface() {
664            return StatusTransitService.class;
665        }
666    }