001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.util.ArrayList;
021    import java.util.Date;
022    import java.util.HashMap;
023    import java.util.HashSet;
024    import java.util.List;
025    import java.util.Set;
026    
027    import org.apache.hadoop.conf.Configuration;
028    import org.apache.oozie.BundleActionBean;
029    import org.apache.oozie.BundleJobBean;
030    import org.apache.oozie.CoordinatorActionBean;
031    import org.apache.oozie.CoordinatorJobBean;
032    import org.apache.oozie.ErrorCode;
033    import org.apache.oozie.client.CoordinatorAction;
034    import org.apache.oozie.client.Job;
035    import org.apache.oozie.command.CommandException;
036    import org.apache.oozie.command.bundle.BundleKillXCommand;
037    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
038    import org.apache.oozie.executor.jpa.BundleActionsGetByLastModifiedTimeJPAExecutor;
039    import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
040    import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
041    import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
042    import org.apache.oozie.executor.jpa.BundleJobsGetPendingJPAExecutor;
043    import org.apache.oozie.executor.jpa.BundleJobsGetRunningJPAExecutor;
044    import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor;
045    import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusByPendingFalseJPAExecutor;
046    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
047    import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor;
048    import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
049    import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor;
050    import org.apache.oozie.executor.jpa.JPAExecutorException;
051    import org.apache.oozie.util.DateUtils;
052    import org.apache.oozie.util.MemoryLocks;
053    import org.apache.oozie.util.StatusUtils;
054    import org.apache.oozie.util.XLog;
055    
056    /**
057     * StateTransitService is scheduled to run at the configured interval.
058     * <p/>
059     * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 (job
060     * done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
061     * SUCCEEDED.
062     */
063    public class StatusTransitService implements Service {
064        public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService.";
065        public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval";
066        public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status";
067        private static int limit = -1;
068        private static Date lastInstanceStartTime = null;
069        private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class);
070    
071        /**
072         * StateTransitRunnable is the runnable which is scheduled to run at the configured interval.
073         * <p/>
074         * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0
075         * (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
076         * SUCCEEDED.
077         */
078        static class StatusTransitRunnable implements Runnable {
079            private JPAService jpaService = null;
080            private MemoryLocks.LockToken lock;
081    
082            public StatusTransitRunnable() {
083                jpaService = Services.get().get(JPAService.class);
084                if (jpaService == null) {
085                    LOG.error("Missing JPAService");
086                }
087            }
088    
089            @Override
090            public void run() {
091                try {
092                    Date curDate = new Date(); // records the start time of this service run;
093    
094                    // first check if there is some other instance running;
095                    lock = Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(),
096                            lockTimeout);
097                    if (lock == null) {
098                        LOG.info("This StatusTransitService instance"
099                                + " will not run since there is already an instance running");
100                    }
101                    else {
102                        LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName());
103                        // running coord jobs transit service
104                        coordTransit();
105                        // running bundle jobs transit service
106                        bundleTransit();
107    
108                        lastInstanceStartTime = curDate;
109                    }
110                }
111                catch (Exception ex) {
112                    LOG.warn("Exception happened during StatusTransitRunnable ", ex);
113                }
114                finally {
115                    // release lock;
116                    if (lock != null) {
117                        lock.release();
118                        LOG.info("Released lock for [{0}]", StatusTransitService.class.getName());
119                    }
120                }
121            }
122    
123            /**
124             * Aggregate bundle actions' status to bundle jobs
125             *
126             * @throws JPAExecutorException thrown if failed in db updates or retrievals
127             * @throws CommandException thrown if failed to run commands
128             */
129            private void bundleTransit() throws JPAExecutorException, CommandException {
130                List<BundleJobBean> pendingJobCheckList = null;
131                List<BundleJobBean> runningJobCheckList = null;
132                List<List<BundleJobBean>> bundleLists = new ArrayList<List<BundleJobBean>>();
133                if (lastInstanceStartTime == null) {
134                    LOG.info("Running bundle status service first instance");
135                    // this is the first instance, we need to check for all pending jobs;
136                    pendingJobCheckList = jpaService.execute(new BundleJobsGetPendingJPAExecutor(limit));
137                    runningJobCheckList = jpaService.execute(new BundleJobsGetRunningJPAExecutor(limit));
138                    bundleLists.add(pendingJobCheckList);
139                    bundleLists.add(runningJobCheckList);
140                }
141                else {
142                    LOG.info("Running bundle status service from last instance time =  "
143                            + DateUtils.convertDateToString(lastInstanceStartTime));
144                    // this is not the first instance, we should only check jobs that have actions been
145                    // updated >= start time of last service run;
146                    List<BundleActionBean> actionList = jpaService
147                            .execute(new BundleActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
148                    Set<String> bundleIds = new HashSet<String>();
149                    for (BundleActionBean action : actionList) {
150                        bundleIds.add(action.getBundleId());
151                    }
152                    pendingJobCheckList = new ArrayList<BundleJobBean>();
153                    for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) {
154                        BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId));
155                        // Running bundle job might have pending false
156                        if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING)) {
157                            pendingJobCheckList.add(bundle);
158                        }
159                    }
160                    runningJobCheckList = pendingJobCheckList;
161                    bundleLists.add(pendingJobCheckList);
162                }
163                aggregateBundleJobsStatus(bundleLists);
164            }
165    
166            private void aggregateBundleJobsStatus(List<List<BundleJobBean>> bundleLists) throws JPAExecutorException,
167                    CommandException {
168                if (bundleLists != null) {
169                    for (List<BundleJobBean> listBundleBean : bundleLists) {
170                        for (BundleJobBean bundleJob : listBundleBean) {
171                            try {
172                                String jobId = bundleJob.getId();
173                                Job.Status[] bundleStatus = new Job.Status[1];
174                                bundleStatus[0] = bundleJob.getStatus();
175                                List<BundleActionBean> bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(
176                                        jobId));
177                                HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>();
178                                boolean foundPending = false;
179                                for (BundleActionBean bAction : bundleActions) {
180                                    if (!bAction.isPending()) {
181                                        int counter = 0;
182                                        if (bundleActionStatus.containsKey(bAction.getStatus())) {
183                                            counter = bundleActionStatus.get(bAction.getStatus()) + 1;
184                                        }
185                                        else {
186                                            ++counter;
187                                        }
188                                        bundleActionStatus.put(bAction.getStatus(), counter);
189                                        if (bAction.getCoordId() == null
190                                                && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED)) {
191                                            (new BundleKillXCommand(jobId)).call();
192                                            LOG.info("Bundle job ["+ jobId
193                                                            + "] has been killed since one of its coordinator job failed submission.");
194                                        }
195                                    }
196                                    else {
197                                        foundPending = true;
198                                        break;
199                                    }
200                                }
201    
202                                if (foundPending) {
203                                    continue;
204                                }
205    
206                                if (checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) {
207                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
208                                            + "' from '" + bundleJob.getStatus() + "'");
209                                    updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
210                                }
211                                else if (checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) {
212                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
213                                            + "' from '" + bundleJob.getStatus() + "'");
214                                    updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
215                                }
216                                else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) {
217                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
218                                            + "' from '" + bundleJob.getStatus() + "'");
219                                    updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
220                                }
221                                else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus)) {
222                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
223                                            + "' from '" + bundleJob.getStatus() + "'");
224                                    updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
225                                }
226                                else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) {
227                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
228                                            + "' from '" + bundleJob.getStatus() + "'");
229                                    updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
230                                }
231                            }
232                            catch (Exception ex) {
233                                LOG.error("Exception happened during aggregate bundle job's status, job = "
234                                        + bundleJob.getId(), ex);
235                            }
236                        }
237                    }
238                }
239            }
240    
241            private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException,
242                    CommandException {
243                if (CoordList != null) {
244                    Configuration conf = Services.get().getConf();
245                    boolean backwardSupportForCoordStatus = conf.getBoolean(CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
246    
247                    for (CoordinatorJobBean coordJob : CoordList) {
248                        try {
249                            // if namespace 0.1 is used and backward support is true, then ignore this coord job
250                            if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
251                                    && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
252                                continue;
253                            }
254                            String jobId = coordJob.getId();
255                            Job.Status[] coordStatus = new Job.Status[1];
256                            coordStatus[0] = coordJob.getStatus();
257                            //Get count of Coordinator actions with pending true
258                            int count = jpaService.execute(new CoordJobGetPendingActionsCountJPAExecutor(jobId));
259                            if (count > 0) {
260                                continue;
261                            }
262                            // Code below this is executed only when none of the Coordinator actions are pending
263                            // Get status of Coordinator actions with pending false
264                            List<CoordinatorAction.Status> coordActionStatusList = jpaService
265                                    .execute(new CoordJobGetActionsStatusByPendingFalseJPAExecutor(jobId));
266                            HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>();
267    
268                            for (CoordinatorAction.Status status : coordActionStatusList) {
269                                int counter = 0;
270                                if (coordActionStatus.containsKey(status)) {
271                                    counter = coordActionStatus.get(status) + 1;
272                                }
273                                else {
274                                    ++counter;
275                                }
276                                coordActionStatus.put(status, counter);
277                            }
278    
279                            int nonPendingCoordActionsCount = coordActionStatusList.size();
280                            if (coordJob.isDoneMaterialization()
281                                    && checkCoordTerminalStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
282                                LOG.info("Set coordinator job [" + jobId + "] status to '" + coordStatus[0].toString()
283                                        + "' from '" + coordJob.getStatus() + "'");
284                                updateCoordJob(coordActionStatus, nonPendingCoordActionsCount, coordJob, coordStatus[0]);
285                            }
286                            else if (coordJob.isDoneMaterialization()
287                                    && checkCoordSuspendStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
288                                LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
289                                        + "' from '" + coordJob.getStatus() + "'");
290                                updateCoordJob(coordActionStatus, nonPendingCoordActionsCount, coordJob, coordStatus[0]);
291                            }
292                            else if (checkCoordRunningStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
293                                LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
294                                        + "' from '" + coordJob.getStatus() + "'");
295                                updateCoordJob(coordActionStatus, nonPendingCoordActionsCount, coordJob, coordStatus[0]);
296                            }
297                            // checking pending flag for job when user killed or suspended the job
298                            else {
299                                checkCoordPending(coordActionStatus, nonPendingCoordActionsCount, coordJob, true);
300                            }
301                        }
302                        catch (Exception ex) {
303                            LOG.error("Exception happened during aggregate coordinator job's status, job = "
304                                    + coordJob.getId(), ex);
305                        }
306                    }
307    
308                }
309            }
310    
311            private boolean checkTerminalStatus(HashMap<Job.Status, Integer> bundleActionStatus,
312                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
313                boolean ret = false;
314                int totalValuesSucceed = 0;
315                if (bundleActionStatus.containsKey(Job.Status.SUCCEEDED)) {
316                    totalValuesSucceed = bundleActionStatus.get(Job.Status.SUCCEEDED);
317                }
318                int totalValuesFailed = 0;
319                if (bundleActionStatus.containsKey(Job.Status.FAILED)) {
320                    totalValuesFailed = bundleActionStatus.get(Job.Status.FAILED);
321                }
322                int totalValuesKilled = 0;
323                if (bundleActionStatus.containsKey(Job.Status.KILLED)) {
324                    totalValuesKilled = bundleActionStatus.get(Job.Status.KILLED);
325                }
326    
327                int totalValuesDoneWithError = 0;
328                if (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)) {
329                    totalValuesDoneWithError = bundleActionStatus.get(Job.Status.DONEWITHERROR);
330                }
331    
332                if (bundleActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesDoneWithError)) {
333                    // If all the bundle actions are succeeded then bundle job should be succeeded.
334                    if (bundleActions.size() == totalValuesSucceed) {
335                        bundleStatus[0] = Job.Status.SUCCEEDED;
336                        ret = true;
337                    }
338                    else if (bundleActions.size() == totalValuesKilled) {
339                        // If all the bundle actions are KILLED then bundle job should be KILLED.
340                        bundleStatus[0] = Job.Status.KILLED;
341                        ret = true;
342                    }
343                    else if (bundleActions.size() == totalValuesFailed) {
344                        // If all the bundle actions are FAILED then bundle job should be FAILED.
345                        bundleStatus[0] = Job.Status.FAILED;
346                        ret = true;
347                    }
348                    else {
349                        bundleStatus[0] = Job.Status.DONEWITHERROR;
350                        ret = true;
351                    }
352                }
353                return ret;
354            }
355    
356            private boolean checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
357                    int coordActionsCount, Job.Status[] coordStatus) {
358                boolean ret = false;
359                int totalValuesSucceed = 0;
360                if (coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) {
361                    totalValuesSucceed = coordActionStatus.get(CoordinatorAction.Status.SUCCEEDED);
362                }
363                int totalValuesFailed = 0;
364                if (coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)) {
365                    totalValuesFailed = coordActionStatus.get(CoordinatorAction.Status.FAILED);
366                }
367                int totalValuesKilled = 0;
368                if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) {
369                    totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED);
370                }
371    
372                int totalValuesTimeOut = 0;
373                if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
374                    totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT);
375                }
376    
377                if (coordActionsCount == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut)) {
378                    // If all the coordinator actions are succeeded then coordinator job should be succeeded.
379                    if (coordActionsCount == totalValuesSucceed) {
380                        coordStatus[0] = Job.Status.SUCCEEDED;
381                        ret = true;
382                    }
383                    else if (coordActionsCount == totalValuesKilled) {
384                        // If all the coordinator actions are KILLED then coordinator job should be KILLED.
385                        coordStatus[0] = Job.Status.KILLED;
386                        ret = true;
387                    }
388                    else if (coordActionsCount == totalValuesFailed) {
389                        // If all the coordinator actions are FAILED then coordinator job should be FAILED.
390                        coordStatus[0] = Job.Status.FAILED;
391                        ret = true;
392                    }
393                    else {
394                        coordStatus[0] = Job.Status.DONEWITHERROR;
395                        ret = true;
396                    }
397                }
398                return ret;
399            }
400    
401            private boolean checkPrepStatus(HashMap<Job.Status, Integer> bundleActionStatus,
402                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
403                boolean ret = false;
404                if (bundleActionStatus.containsKey(Job.Status.PREP)) {
405                    // If all the bundle actions are PREP then bundle job should be RUNNING.
406                    if (bundleActions.size() > bundleActionStatus.get(Job.Status.PREP)) {
407                        bundleStatus[0] = Job.Status.RUNNING;
408                        ret = true;
409                    }
410                }
411                return ret;
412            }
413    
414            private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus,
415                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
416                boolean ret = false;
417                if (bundleActionStatus.containsKey(Job.Status.PAUSED)) {
418                    if (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED)) {
419                        bundleStatus[0] = Job.Status.PAUSED;
420                        ret = true;
421                    }
422                    else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)
423                            && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED)
424                                    + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR))) {
425                        // bundleStatus = Job.Status.PAUSEDWITHERROR;
426                        // We need to change this to PAUSEDWITHERROR in future when we add this to coordinator
427                        bundleStatus[0] = Job.Status.PAUSED;
428                        ret = true;
429                    }
430                }
431                return ret;
432            }
433    
434            private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus,
435                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
436                boolean ret = false;
437                if (bundleActionStatus.containsKey(Job.Status.SUSPENDED)) {
438                    if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED)) {
439                        bundleStatus[0] = Job.Status.SUSPENDED;
440                        ret = true;
441                    }
442                    else if (bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
443                            && (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED)
444                                    + bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR))) {
445                        // bundleStatus = Job.Status.SUSPENDEDWITHERROR;
446                        // We need to change this to SUSPENDEDWITHERROR in future when we add this to coordinator
447                        bundleStatus[0] = Job.Status.SUSPENDED;
448                        ret = true;
449                    }
450                }
451                return ret;
452            }
453    
454            private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
455                    int coordActionsCount, Job.Status[] coordStatus) {
456                boolean ret = false;
457                if (coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) {
458                    if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)) {
459                        coordStatus[0] = Job.Status.SUSPENDED;
460                        ret = true;
461                    }
462                }
463                return ret;
464            }
465    
466            private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
467                    int coordActionsCount, Job.Status[] coordStatus) {
468                boolean ret = false;
469                if (coordActionStatus.containsKey(CoordinatorAction.Status.RUNNING)) {
470                    // If all the bundle actions are succeeded then bundle job should be succeeded.
471                    if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.RUNNING)) {
472                        coordStatus[0] = Job.Status.RUNNING;
473                        ret = true;
474                    }
475                    else if (coordActionStatus.get(CoordinatorAction.Status.RUNNING) > 0) {
476                        if ((coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) && coordActionStatus.get(CoordinatorAction.Status.FAILED) > 0)
477                                || (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) && coordActionStatus
478                                        .get(CoordinatorAction.Status.KILLED) > 0)
479                                || (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) && coordActionStatus
480                                        .get(CoordinatorAction.Status.TIMEDOUT) > 0)) {
481                            // coordStatus = Job.Status.RUNNINGWITHERROR;
482                            // We need to change this to RUNNINGWIHERROR in future when we add this to coordinator
483                            coordStatus[0] = Job.Status.RUNNING;
484                            ret = true;
485                        }
486                    }
487                }
488                return ret;
489            }
490    
491            private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus,
492                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
493                boolean ret = false;
494                if (bundleActionStatus.containsKey(Job.Status.RUNNING)) {
495                    // If all the bundle actions are succeeded then bundle job should be succeeded.
496                    if (bundleActions.size() == bundleActionStatus.get(Job.Status.RUNNING)) {
497                        bundleStatus[0] = Job.Status.RUNNING;
498                        ret = true;
499                    }
500                    else if (bundleActionStatus.get(Job.Status.RUNNING) > 0) {
501                        if ((bundleActionStatus.containsKey(Job.Status.FAILED) && bundleActionStatus.get(Job.Status.FAILED) > 0)
502                                || (bundleActionStatus.containsKey(Job.Status.KILLED) && bundleActionStatus
503                                        .get(Job.Status.KILLED) > 0)
504                                || (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) && bundleActionStatus
505                                        .get(Job.Status.DONEWITHERROR) > 0)
506                                || (bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR) && bundleActionStatus
507                                        .get(Job.Status.RUNNINGWITHERROR) > 0)) {
508                            // bundleStatus = Job.Status.RUNNINGWITHERROR;
509                            // We need to change this to RUNNINGWIHERROR in future when we add this to coordinator
510                            bundleStatus[0] = Job.Status.RUNNING;
511                            ret = true;
512                        }
513                    }
514                }
515                return ret;
516            }
517    
518            private void updateBundleJob(HashMap<Job.Status, Integer> bundleActionStatus,
519                    List<BundleActionBean> bundleActions, BundleJobBean bundleJob, Job.Status bundleStatus)
520                    throws JPAExecutorException {
521                String jobId = bundleJob.getId();
522                boolean pendingBundleJob = bundleJob.isPending();
523                // Checking the bundle pending should be updated or not
524                int totalNonPendingActions = 0;
525                for (Job.Status js : bundleActionStatus.keySet()) {
526                    totalNonPendingActions += bundleActionStatus.get(js);
527                }
528    
529                if (totalNonPendingActions == bundleActions.size()) {
530                    pendingBundleJob = false;
531                }
532    
533                // Update the Bundle Job
534                bundleJob.setStatus(bundleStatus);
535                if (pendingBundleJob) {
536                    bundleJob.setPending();
537                    LOG.info("Bundle job [" + jobId + "] Pending set to TRUE");
538                }
539                else {
540                    bundleJob.resetPending();
541                    LOG.info("Bundle job [" + jobId + "] Pending set to FALSE");
542                }
543                jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
544            }
545    
546            private void updateCoordJob(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
547                    int coordActionsCount, CoordinatorJobBean coordJob, Job.Status coordStatus)
548                    throws JPAExecutorException, CommandException {
549                Job.Status prevStatus = coordJob.getStatus();
550                // Update the Coord Job
551                if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED
552                        || coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) {
553                    if (coordStatus == Job.Status.SUSPENDED) {
554                        LOG.info("Coord Job [" + coordJob.getId()
555                                + "] status can not be updated as its already in Terminal state");
556                        return;
557                    }
558                }
559    
560                checkCoordPending(coordActionStatus, coordActionsCount, coordJob, false);
561                coordJob.setStatus(coordStatus);
562                coordJob.setStatus(StatusUtils.getStatus(coordJob));
563                coordJob.setLastModifiedTime(new Date());
564                jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
565                // update bundle action only when status changes in coord job
566                if (coordJob.getBundleId() != null) {
567                    if (!prevStatus.equals(coordJob.getStatus())) {
568                        BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
569                        bundleStatusUpdate.call();
570                    }
571                }
572            }
573    
574            private void checkCoordPending(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
575                    int coordActionsCount, CoordinatorJobBean coordJob, boolean saveToDB) throws JPAExecutorException {
576                boolean pendingCoordJob = coordJob.isPending();
577                // Checking the coordinator pending should be updated or not
578                int totalNonPendingActions = 0;
579                for (CoordinatorAction.Status js : coordActionStatus.keySet()) {
580                    totalNonPendingActions += coordActionStatus.get(js);
581                }
582    
583                if (totalNonPendingActions == coordActionsCount) {
584                    pendingCoordJob = false;
585                }
586    
587                if (pendingCoordJob) {
588                    coordJob.setPending();
589                    LOG.info("Coord job [" + coordJob.getId() + "] Pending set to TRUE");
590                }
591                else {
592                    coordJob.resetPending();
593                    LOG.info("Coord job [" + coordJob.getId() + "] Pending set to FALSE");
594                }
595    
596                if (saveToDB) {
597                    jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
598                }
599            }
600    
601            /**
602             * Aggregate coordinator actions' status to coordinator jobs
603             *
604             * @throws JPAExecutorException thrown if failed in db updates or retrievals
605             * @throws CommandException thrown if failed to run commands
606             */
607            private void coordTransit() throws JPAExecutorException, CommandException {
608                List<CoordinatorJobBean> pendingJobCheckList = null;
609                if (lastInstanceStartTime == null) {
610                    LOG.info("Running coordinator status service first instance");
611                    // this is the first instance, we need to check for all pending jobs;
612                    pendingJobCheckList = jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit));
613                }
614                else {
615                    LOG.info("Running coordinator status service from last instance time =  "
616                            + DateUtils.convertDateToString(lastInstanceStartTime));
617                    // this is not the first instance, we should only check jobs
618                    // that have actions been
619                    // updated >= start time of last service run;
620                    List<String> coordJobIdList = jpaService
621                            .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
622                    Set<String> coordIds = new HashSet<String>();
623                    for (String coordJobId : coordJobIdList) {
624                        coordIds.add(coordJobId);
625                    }
626                    pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
627                    for (String coordId : coordIds.toArray(new String[coordIds.size()])) {
628                        CoordinatorJobBean coordJob;
629                        try{
630                            coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId));
631                        }
632                        catch (JPAExecutorException jpaee) {
633                            if (jpaee.getErrorCode().equals(ErrorCode.E0604)) {
634                                LOG.warn("Exception happened during StatusTransitRunnable; Coordinator Job doesn't exist", jpaee);
635                                continue;
636                            } else {
637                                throw jpaee;
638                            }
639                        }
640                        // Running coord job might have pending false
641                        if (coordJob.isPending() || coordJob.getStatus().equals(Job.Status.RUNNING)) {
642                            pendingJobCheckList.add(coordJob);
643                        }
644                    }
645                }
646                aggregateCoordJobsStatus(pendingJobCheckList);
647            }
648        }
649    
650        /**
651         * Initializes the {@link StatusTransitService}.
652         *
653         * @param services services instance.
654         */
655        @Override
656        public void init(Services services) {
657            Configuration conf = services.getConf();
658            Runnable stateTransitRunnable = new StatusTransitRunnable();
659            services.get(SchedulerService.class).schedule(stateTransitRunnable, 10,
660                    conf.getInt(CONF_STATUSTRANSIT_INTERVAL, 60), SchedulerService.Unit.SEC);
661        }
662    
663        /**
664         * Destroy the StateTransit Jobs Service.
665         */
666        @Override
667        public void destroy() {
668        }
669    
670        /**
671         * Return the public interface for the purge jobs service.
672         *
673         * @return {@link StatusTransitService}.
674         */
675        @Override
676        public Class<? extends Service> getInterface() {
677            return StatusTransitService.class;
678        }
679    }
680