This project has retired. For details please refer to its Attic page.
001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.util.ArrayList;
021    import java.util.Date;
022    import java.util.HashMap;
023    import java.util.HashSet;
024    import java.util.List;
025    import java.util.Set;
026    import java.util.TreeSet;
027    import java.util.Comparator;
028    
029    import org.apache.hadoop.conf.Configuration;
030    import org.apache.oozie.BundleActionBean;
031    import org.apache.oozie.BundleJobBean;
032    import org.apache.oozie.CoordinatorJobBean;
033    import org.apache.oozie.ErrorCode;
034    import org.apache.oozie.client.CoordinatorAction;
035    import org.apache.oozie.client.Job;
036    import org.apache.oozie.command.CommandException;
037    import org.apache.oozie.command.bundle.BundleKillXCommand;
038    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
039    import org.apache.oozie.executor.jpa.BundleActionsGetByLastModifiedTimeJPAExecutor;
040    import org.apache.oozie.executor.jpa.BundleActionsGetStatusPendingJPAExecutor;
041    import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
042    import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
043    import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor;
044    import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor;
045    import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor;
046    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
047    import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor;
048    import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
049    import org.apache.oozie.executor.jpa.CoordJobsGetChangedJPAExecutor;
050    import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor;
051    import org.apache.oozie.executor.jpa.JPAExecutorException;
052    import org.apache.oozie.util.DateUtils;
053    import org.apache.oozie.util.MemoryLocks;
054    import org.apache.oozie.util.StatusUtils;
055    import org.apache.oozie.util.XLog;
056    
057    /**
058     * StateTransitService is scheduled to run at the configured interval.
059     * <p/>
060     * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 (job
061     * done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
062     * SUCCEEDED.
063     */
064    public class StatusTransitService implements Service {
065        public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService.";
066        public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval";
067        public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status";
068        public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR = CONF_PREFIX + "backward.support.for.states.without.error";
069        private static int limit = -1;
070        private static Date lastInstanceStartTime = null;
071        private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class);
072    
073        /**
074         * StateTransitRunnable is the runnable which is scheduled to run at the configured interval.
075         * <p/>
076         * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0
077         * (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
078         * SUCCEEDED.
079         */
080        public static class StatusTransitRunnable implements Runnable {
081            private JPAService jpaService = null;
082            private MemoryLocks.LockToken lock;
083    
084            public StatusTransitRunnable() {
085                jpaService = Services.get().get(JPAService.class);
086                if (jpaService == null) {
087                    LOG.error("Missing JPAService");
088                }
089            }
090    
091            @Override
092            public void run() {
093                try {
094                    Date curDate = new Date(); // records the start time of this service run;
095    
096                    // first check if there is some other instance running;
097                    lock = Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(),
098                            lockTimeout);
099                    if (lock == null) {
100                        LOG.info("This StatusTransitService instance"
101                                + " will not run since there is already an instance running");
102                    }
103                    else {
104                        LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName());
105                        // running coord jobs transit service
106                        coordTransit();
107                        // running bundle jobs transit service
108                        bundleTransit();
109    
110                        lastInstanceStartTime = curDate;
111                    }
112                }
113                catch (Exception ex) {
114                    LOG.warn("Exception happened during StatusTransitRunnable ", ex);
115                }
116                finally {
117                    // release lock;
118                    if (lock != null) {
119                        lock.release();
120                        LOG.info("Released lock for [{0}]", StatusTransitService.class.getName());
121                    }
122                }
123            }
124    
125            public List<BundleJobBean> removeDuplicates(List<BundleJobBean> pendingJobList) {
126                Set<BundleJobBean> s = new TreeSet<BundleJobBean>(new Comparator<BundleJobBean>() {
127                    @Override
128                    public int compare(BundleJobBean b1, BundleJobBean b2) {
129                        if (b1.getId().equals(b2.getId())) {
130                            return 0;
131                        }
132                        else
133                            return 1;
134                    }
135                });
136                s.addAll(pendingJobList);
137                return new ArrayList<BundleJobBean>(s);
138            }
139    
140            /**
141             * Aggregate bundle actions' status to bundle jobs
142             *
143             * @throws JPAExecutorException thrown if failed in db updates or retrievals
144             * @throws CommandException thrown if failed to run commands
145             */
146            private void bundleTransit() throws JPAExecutorException, CommandException {
147                List<BundleJobBean> pendingJobCheckList = null;
148    
149                if (lastInstanceStartTime == null) {
150                    LOG.info("Running bundle status service first instance");
151                    // this is the first instance, we need to check for all pending or running jobs;
152                    pendingJobCheckList = jpaService.execute(new BundleJobsGetRunningOrPendingJPAExecutor(limit));
153                }
154                else {
155                    LOG.info("Running bundle status service from last instance time =  "
156                            + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
157                    // this is not the first instance, we should only check jobs that have actions been
158                    // updated >= start time of last service run;
159                    List<BundleActionBean> actionList = jpaService
160                            .execute(new BundleActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
161                    Set<String> bundleIds = new HashSet<String>();
162                    for (BundleActionBean action : actionList) {
163                        bundleIds.add(action.getBundleId());
164                    }
165                    pendingJobCheckList = new ArrayList<BundleJobBean>();
166                    for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) {
167                        BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId));
168                        // Running bundle job might have pending false
169                        if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING)
170                                || bundle.getStatus().equals(Job.Status.RUNNINGWITHERROR)
171                                || bundle.getStatus().equals(Job.Status.PAUSED)
172                                || bundle.getStatus().equals(Job.Status.PAUSEDWITHERROR)) {
173                            pendingJobCheckList.add(bundle);
174                        }
175                    }
176                }
177                aggregateBundleJobsStatus(pendingJobCheckList);
178            }
179    
180            private void aggregateBundleJobsStatus(List<BundleJobBean> bundleLists) throws JPAExecutorException,
181                    CommandException {
182                if (bundleLists != null) {
183                        for (BundleJobBean bundleJob : bundleLists) {
184                            try {
185                                String jobId = bundleJob.getId();
186                                Job.Status[] bundleStatus = new Job.Status[1];
187                                bundleStatus[0] = bundleJob.getStatus();
188                                List<BundleActionBean> bundleActions = jpaService.execute(new BundleActionsGetStatusPendingJPAExecutor(
189                                        jobId));
190                                HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>();
191                                boolean foundPending = false;
192                                for (BundleActionBean bAction : bundleActions) {
193                                        int counter = 0;
194                                        if (bundleActionStatus.containsKey(bAction.getStatus())) {
195                                            counter = bundleActionStatus.get(bAction.getStatus()) + 1;
196                                        }
197                                        else {
198                                            ++counter;
199                                        }
200                                        bundleActionStatus.put(bAction.getStatus(), counter);
201                                        if (bAction.getCoordId() == null
202                                                && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED)) {
203                                            (new BundleKillXCommand(jobId)).call();
204                                            LOG.info("Bundle job ["+ jobId
205                                                            + "] has been killed since one of its coordinator job failed submission.");
206                                        }
207    
208                                     if (bAction.isPending()) {
209                                        foundPending = true;
210                                    }
211                                }
212    
213                                if (!foundPending && checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) {
214                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
215                                            + "' from '" + bundleJob.getStatus() + "'");
216                                    updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
217                                }
218                                else if (!foundPending && checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) {
219                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
220                                            + "' from '" + bundleJob.getStatus() + "'");
221                                    updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
222                                }
223                                else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) {
224                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
225                                            + "' from '" + bundleJob.getStatus() + "'");
226                                    updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
227                                }
228                                else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus, foundPending)) {
229                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
230                                            + "' from '" + bundleJob.getStatus() + "'");
231                                    updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
232                                }
233                                else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) {
234                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
235                                            + "' from '" + bundleJob.getStatus() + "'");
236                                    updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
237                                }
238                            }
239                            catch (Exception ex) {
240                                LOG.error("Exception happened during aggregate bundle job's status, job = "
241                                        + bundleJob.getId(), ex);
242                            }
243                        }
244                    }
245    
246            }
247    
248            private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException,
249                    CommandException {
250                if (CoordList != null) {
251                    Configuration conf = Services.get().getConf();
252                    boolean backwardSupportForCoordStatus = conf.getBoolean(CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
253    
254                    for (CoordinatorJobBean coordJob : CoordList) {
255                        try {
256                            // if namespace 0.1 is used and backward support is true, then ignore this coord job
257                            if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
258                                    && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
259                                continue;
260                            }
261                            String jobId = coordJob.getId();
262                            Job.Status[] coordStatus = new Job.Status[1];
263                            coordStatus[0] = coordJob.getStatus();
264                            //Get count of Coordinator actions with pending true
265                            boolean isPending = false;
266                            int count = jpaService.execute(new CoordJobGetPendingActionsCountJPAExecutor(jobId));
267                            if (count > 0) {
268                                 isPending = true;
269                            }
270                            // Get status of Coordinator actions
271                            List<CoordinatorAction.Status> coordActionStatusList = jpaService
272                                    .execute(new CoordJobGetActionsStatusJPAExecutor(jobId));
273                            HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>();
274    
275                            for (CoordinatorAction.Status status : coordActionStatusList) {
276                                int counter = 0;
277                                if (coordActionStatus.containsKey(status)) {
278                                    counter = coordActionStatus.get(status) + 1;
279                                }
280                                else {
281                                    ++counter;
282                                }
283                                coordActionStatus.put(status, counter);
284                            }
285    
286                            int nonPendingCoordActionsCount = coordActionStatusList.size();
287                            boolean isDoneMaterialization = coordJob.isDoneMaterialization();
288                            if ((isDoneMaterialization || coordStatus[0] == Job.Status.FAILED || coordStatus[0] == Job.Status.KILLED)
289                                    && checkCoordTerminalStatus(coordActionStatus, nonPendingCoordActionsCount,
290                                            coordStatus, isDoneMaterialization)) {
291                                LOG.info("Set coordinator job [" + jobId + "] status to '" + coordStatus[0].toString()
292                                        + "' from '" + coordJob.getStatus() + "'");
293                                updateCoordJob(isPending, coordJob, coordStatus[0]);
294                            }
295                            else if (checkCoordPausedStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
296                                LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
297                                        + "' from '" + coordJob.getStatus() + "'");
298                                updateCoordJob(isPending, coordJob, coordStatus[0]);
299                            }
300                            else if(checkCoordSuspendStatus( coordActionStatus, nonPendingCoordActionsCount, coordStatus, coordJob.isDoneMaterialization(), isPending)) {
301                                LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
302                                        + "' from '" + coordJob.getStatus() + "'");
303                                updateCoordJob(isPending, coordJob, coordStatus[0]);
304                            }
305                            else if (checkCoordRunningStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
306                                LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
307                                        + "' from '" + coordJob.getStatus() + "'");
308                                updateCoordJob(isPending, coordJob, coordStatus[0]);
309                            }
310                            else {
311                                checkCoordPending(isPending, coordJob, true);
312                            }
313                        }
314                        catch (Exception ex) {
315                            LOG.error("Exception happened during aggregate coordinator job's status, job = "
316                                    + coordJob.getId(), ex);
317                        }
318                    }
319    
320                }
321            }
322    
323            private boolean checkTerminalStatus(HashMap<Job.Status, Integer> bundleActionStatus,
324                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
325                boolean ret = false;
326                int totalValuesSucceed = 0;
327                if (bundleActionStatus.containsKey(Job.Status.SUCCEEDED)) {
328                    totalValuesSucceed = bundleActionStatus.get(Job.Status.SUCCEEDED);
329                }
330                int totalValuesFailed = 0;
331                if (bundleActionStatus.containsKey(Job.Status.FAILED)) {
332                    totalValuesFailed = bundleActionStatus.get(Job.Status.FAILED);
333                }
334                int totalValuesKilled = 0;
335                if (bundleActionStatus.containsKey(Job.Status.KILLED)) {
336                    totalValuesKilled = bundleActionStatus.get(Job.Status.KILLED);
337                }
338    
339                int totalValuesDoneWithError = 0;
340                if (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)) {
341                    totalValuesDoneWithError = bundleActionStatus.get(Job.Status.DONEWITHERROR);
342                }
343    
344                if (bundleActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesDoneWithError)) {
345                    // If all the bundle actions are succeeded then bundle job should be succeeded.
346                    if (bundleActions.size() == totalValuesSucceed) {
347                        bundleStatus[0] = Job.Status.SUCCEEDED;
348                        ret = true;
349                    }
350                    else if (bundleActions.size() == totalValuesKilled) {
351                        // If all the bundle actions are KILLED then bundle job should be KILLED.
352                        bundleStatus[0] = Job.Status.KILLED;
353                        ret = true;
354                    }
355                    else if (bundleActions.size() == totalValuesFailed) {
356                        // If all the bundle actions are FAILED then bundle job should be FAILED.
357                        bundleStatus[0] = Job.Status.FAILED;
358                        ret = true;
359                    }
360                    else {
361                        bundleStatus[0] = Job.Status.DONEWITHERROR;
362                        ret = true;
363                    }
364                }
365                return ret;
366            }
367    
368            private boolean checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
369                    int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization) {
370                boolean ret = false;
371                int totalValuesSucceed = 0;
372                if (coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) {
373                    totalValuesSucceed = coordActionStatus.get(CoordinatorAction.Status.SUCCEEDED);
374                }
375                int totalValuesFailed = 0;
376                if (coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)) {
377                    totalValuesFailed = coordActionStatus.get(CoordinatorAction.Status.FAILED);
378                }
379                int totalValuesKilled = 0;
380                if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) {
381                    totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED);
382                }
383    
384                int totalValuesTimeOut = 0;
385                if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
386                    totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT);
387                }
388    
389                if (coordActionsCount == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut)) {
390                    // If all the coordinator actions are succeeded then coordinator job should be succeeded.
391                    if (coordActionsCount == totalValuesSucceed && isDoneMaterialization) {
392                        coordStatus[0] = Job.Status.SUCCEEDED;
393                        ret = true;
394                    }
395                    else if (coordActionsCount == totalValuesKilled) {
396                        // If all the coordinator actions are KILLED then coordinator job should be KILLED.
397                        coordStatus[0] = Job.Status.KILLED;
398                        ret = true;
399                    }
400                    else if (coordActionsCount == totalValuesFailed) {
401                        // If all the coordinator actions are FAILED then coordinator job should be FAILED.
402                        coordStatus[0] = Job.Status.FAILED;
403                        ret = true;
404                    }
405                    else {
406                        coordStatus[0] = Job.Status.DONEWITHERROR;
407                        ret = true;
408                    }
409                }
410                return ret;
411            }
412    
413            private boolean checkPrepStatus(HashMap<Job.Status, Integer> bundleActionStatus,
414                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
415                boolean ret = false;
416                if (bundleActionStatus.containsKey(Job.Status.PREP)) {
417                    // If all the bundle actions are PREP then bundle job should be RUNNING.
418                    if (bundleActions.size() > bundleActionStatus.get(Job.Status.PREP)) {
419                        bundleStatus[0] = Job.Status.RUNNING;
420                        ret = true;
421                    }
422                }
423                return ret;
424            }
425    
426            private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus,
427                    List<BundleActionBean> bundleActions, Job.Status[] bundleJobStatus) {
428                boolean ret = false;
429    
430                // TODO - When bottom up cmds are allowed to change the status of parent job,
431                // if none of the bundle actions are in paused or pausedwitherror, the function should return
432                // false
433    
434                // top down
435                // If the bundle job is PAUSED or PAUSEDINERROR and no children are in error
436                // state, then job should be PAUSED otherwise it should be pausedwitherror
437                if (bundleJobStatus[0] == Job.Status.PAUSED || bundleJobStatus[0] == Job.Status.PAUSEDWITHERROR) {
438                    if (bundleActionStatus.containsKey(Job.Status.KILLED)
439                            || bundleActionStatus.containsKey(Job.Status.FAILED)
440                            || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
441                            || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
442                            || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)
443                            || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
444                        bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
445                    }
446                    else {
447                        bundleJobStatus[0] = Job.Status.PAUSED;
448                    }
449                    ret = true;
450                }
451    
452                // bottom up; check the status of parent through their children
453                else if (bundleActionStatus.containsKey(Job.Status.PAUSED)
454                        && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED))) {
455                    bundleJobStatus[0] = Job.Status.PAUSED;
456                    ret = true;
457                }
458                else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
459                    int pausedActions = bundleActionStatus.containsKey(Job.Status.PAUSED) ? bundleActionStatus
460                            .get(Job.Status.PAUSED) : 0;
461                    if (bundleActions.size() == pausedActions + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR)) {
462                        bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
463                        ret = true;
464                    }
465                }
466                else {
467                    ret = false;
468                }
469                return ret;
470            }
471    
472    
473            private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus,
474                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus, boolean isPending) {
475                boolean ret = false;
476    
477                // TODO - When bottom up cmds are allowed to change the status of parent job,
478                // if none of the bundle actions are in suspended or suspendedwitherror, the function should return
479                // false
480    
481                // top down
482                // if job is suspended
483                if (bundleStatus[0] == Job.Status.SUSPENDED
484                        || bundleStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
485                    if (bundleActionStatus.containsKey(Job.Status.KILLED)
486                            || bundleActionStatus.containsKey(Job.Status.FAILED)
487                            || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
488                            || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
489                            || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
490                        bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
491                    }
492                    else {
493                        bundleStatus[0] = Job.Status.SUSPENDED;
494                    }
495                    ret =true;
496                }
497    
498                // bottom up
499                // Update status of parent from the status of its children
500                else if (!isPending && bundleActionStatus.containsKey(Job.Status.SUSPENDED)
501                        || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) {
502                    int succeededActions = bundleActionStatus.containsKey(Job.Status.SUCCEEDED) ? bundleActionStatus
503                            .get(Job.Status.SUCCEEDED) : 0;
504                    int killedActions = bundleActionStatus.containsKey(Job.Status.KILLED) ? bundleActionStatus
505                            .get(Job.Status.KILLED) : 0;
506                    int failedActions = bundleActionStatus.containsKey(Job.Status.FAILED) ? bundleActionStatus
507                            .get(Job.Status.FAILED) : 0;
508                    int doneWithErrorActions = bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) ? bundleActionStatus
509                            .get(Job.Status.DONEWITHERROR) : 0;
510    
511                    if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions) {
512                        bundleStatus[0] = Job.Status.SUSPENDED;
513                        ret = true;
514                    }
515                    else if (bundleActions.size()  == bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR)
516                            + bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions + killedActions + failedActions + doneWithErrorActions) {
517                        bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
518                        ret = true;
519                    }
520                }
521                return ret;
522    
523            }
524    
525            private boolean checkCoordPausedStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
526                    int coordActionsCount, Job.Status[] coordStatus){
527                boolean ret = false;
528                if (coordStatus[0].equals(Job.Status.PAUSED) || coordStatus[0].equals(Job.Status.PAUSEDWITHERROR)) {
529                    if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
530                            || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
531                            || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
532                        coordStatus[0] = Job.Status.PAUSEDWITHERROR;
533                    }
534                    else {
535                        coordStatus[0] = Job.Status.PAUSED;
536                    }
537                    ret = true;
538                }
539                return ret;
540            }
541            private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
542                    int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization, boolean isPending) {
543                boolean ret = false;
544    
545                // TODO - When bottom up cmds are allowed to change the status of parent job
546                //if none of the coord actions are in suspended or suspendedwitherror and materialization done is false
547                //,then the function should return
548                // false
549    
550                // top down
551                // check for children only when parent is suspended
552                if (coordStatus[0] == Job.Status.SUSPENDED || coordStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
553    
554                    if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
555                            || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
556                            || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
557                        coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
558                    }
559                    else {
560                        coordStatus[0] = Job.Status.SUSPENDED;
561                    }
562                    ret = true;
563                }
564                // bottom up
565                // look for children to check the parent's status only if materialization is
566                // done and all actions are non-pending
567                else if (isDoneMaterialization && !isPending && coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) {
568                    int succeededActions = coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED) ? coordActionStatus
569                           .get(CoordinatorAction.Status.SUCCEEDED) : 0;
570                    int killedActions = coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) ? coordActionStatus
571                            .get(CoordinatorAction.Status.KILLED) : 0;
572                    int failedActions = coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) ? coordActionStatus
573                            .get(CoordinatorAction.Status.FAILED) : 0;
574                    int timedoutActions = coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) ? coordActionStatus
575                            .get(CoordinatorAction.Status.TIMEDOUT) : 0;
576    
577                    if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) + succeededActions) {
578                        coordStatus[0] = Job.Status.SUSPENDED;
579                        ret = true;
580                    }
581                    else if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)
582                            + succeededActions + killedActions + failedActions + timedoutActions) {
583                        coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
584                        ret = true;
585                    }
586                }
587                return ret;
588            }
589    
590            private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
591                    int coordActionsCount, Job.Status[] coordStatus) {
592                boolean ret = false;
593                if (coordStatus[0] != Job.Status.PREP) {
594                    if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
595                            || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
596                            || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
597                        coordStatus[0] = Job.Status.RUNNINGWITHERROR;
598                    }
599                    else {
600                        coordStatus[0] = Job.Status.RUNNING;
601                    }
602                    ret = true;
603                }
604                return ret;
605            }
606    
607            private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus,
608                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
609                boolean ret = false;
610                if (bundleStatus[0] != Job.Status.PREP) {
611                    if (bundleActionStatus.containsKey(Job.Status.FAILED)
612                            || bundleActionStatus.containsKey(Job.Status.KILLED)
613                            || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
614                            || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) {
615                        bundleStatus[0] = Job.Status.RUNNINGWITHERROR;
616                    }
617                    else {
618                        bundleStatus[0] = Job.Status.RUNNING;
619                    }
620                    ret = true;
621                }
622                return ret;
623    
624            }
625    
626            private void updateBundleJob(boolean isPending, BundleJobBean bundleJob, Job.Status bundleStatus)
627                    throws JPAExecutorException {
628                String jobId = bundleJob.getId();
629                // Update the Bundle Job
630                // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
631                // PAUSEDWITHERROR is not supported
632                bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus));
633                if (isPending) {
634                    bundleJob.setPending();
635                    LOG.info("Bundle job [" + jobId + "] Pending set to TRUE");
636                }
637                else {
638                    bundleJob.resetPending();
639                    LOG.info("Bundle job [" + jobId + "] Pending set to FALSE");
640                }
641                jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
642            }
643    
644            private void updateCoordJob(boolean isPending, CoordinatorJobBean coordJob, Job.Status coordStatus)
645                    throws JPAExecutorException, CommandException {
646                Job.Status prevStatus = coordJob.getStatus();
647                // Update the Coord Job
648                if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED
649                        || coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) {
650                    if (coordStatus == Job.Status.SUSPENDED || coordStatus == Job.Status.SUSPENDEDWITHERROR) {
651                        LOG.info("Coord Job [" + coordJob.getId()
652                                + "] status to "+ coordStatus +" can not be updated as its already in Terminal state");
653                        return;
654                    }
655                }
656    
657                boolean isPendingStateChanged = checkCoordPending(isPending, coordJob, false);
658                // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and PAUSEDWITHERROR is
659                // not supported
660                coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(coordStatus));
661                // Backward support when coordinator namespace is 0.1
662                coordJob.setStatus(StatusUtils.getStatus(coordJob));
663                if (coordJob.getStatus() != prevStatus || isPendingStateChanged) {
664                    LOG.debug("Updating coord job " + coordJob.getId());
665                    coordJob.setLastModifiedTime(new Date());
666                    jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
667                }
668                // update bundle action only when status changes in coord job
669                if (coordJob.getBundleId() != null) {
670                    if (!prevStatus.equals(coordJob.getStatus())) {
671                        BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
672                        bundleStatusUpdate.call();
673                    }
674                }
675            }
676    
677            private boolean checkCoordPending(boolean isPending, CoordinatorJobBean coordJob, boolean saveToDB)
678                    throws JPAExecutorException {
679                // Checking the coordinator pending should be updated or not
680                boolean prevPending = coordJob.isPending();
681                if (isPending) {
682                    coordJob.setPending();
683                    LOG.info("Coord job [" + coordJob.getId() + "] Pending set to TRUE");
684                }
685                else {
686                    coordJob.resetPending();
687                    LOG.info("Coord job [" + coordJob.getId() + "] Pending set to FALSE");
688                }
689                boolean hasChange = prevPending != coordJob.isPending();
690                if (saveToDB && hasChange) {
691                    jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
692                }
693                return hasChange;
694    
695            }
696    
697            /**
698             * Aggregate coordinator actions' status to coordinator jobs
699             *
700             * @throws JPAExecutorException thrown if failed in db updates or retrievals
701             * @throws CommandException thrown if failed to run commands
702             */
703            private void coordTransit() throws JPAExecutorException, CommandException {
704                List<CoordinatorJobBean> pendingJobCheckList = null;
705                if (lastInstanceStartTime == null) {
706                    LOG.info("Running coordinator status service first instance");
707                    // this is the first instance, we need to check for all pending jobs;
708                    pendingJobCheckList = jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit));
709                }
710                else {
711                    LOG.info("Running coordinator status service from last instance time =  "
712                            + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
713                    // this is not the first instance, we should only check jobs
714                    // that have actions or jobs been
715                    // updated >= start time of last service run;
716                    List<String> coordJobIdList = jpaService
717                            .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
718                    Set<String> coordIds = new HashSet<String>();
719                    for (String coordJobId : coordJobIdList) {
720                        coordIds.add(coordJobId);
721                    }
722                    coordIds.addAll(jpaService.execute(new CoordJobsGetChangedJPAExecutor(lastInstanceStartTime)));
723    
724                    pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
725                    for (String coordId : coordIds.toArray(new String[coordIds.size()])) {
726                        CoordinatorJobBean coordJob;
727                        try{
728                            coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId));
729                        }
730                        catch (JPAExecutorException jpaee) {
731                            if (jpaee.getErrorCode().equals(ErrorCode.E0604)) {
732                                LOG.warn("Exception happened during StatusTransitRunnable; Coordinator Job doesn't exist", jpaee);
733                                continue;
734                            } else {
735                                throw jpaee;
736                            }
737                        }
738                        // Running coord job might have pending false
739                        Job.Status coordJobStatus = coordJob.getStatus();
740                        if (coordJob.isPending() || coordJobStatus.equals(Job.Status.PAUSED)
741                                || coordJobStatus.equals(Job.Status.RUNNING)
742                                || coordJobStatus.equals(Job.Status.RUNNINGWITHERROR)
743                                || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
744                            pendingJobCheckList.add(coordJob);
745                        }
746                    }
747                }
748                aggregateCoordJobsStatus(pendingJobCheckList);
749            }
750        }
751    
752        /**
753         * Initializes the {@link StatusTransitService}.
754         *
755         * @param services services instance.
756         */
757        @Override
758        public void init(Services services) {
759            Configuration conf = services.getConf();
760            Runnable stateTransitRunnable = new StatusTransitRunnable();
761            services.get(SchedulerService.class).schedule(stateTransitRunnable, 10,
762                    conf.getInt(CONF_STATUSTRANSIT_INTERVAL, 60), SchedulerService.Unit.SEC);
763        }
764    
765        /**
766         * Destroy the StateTransit Jobs Service.
767         */
768        @Override
769        public void destroy() {
770        }
771    
772        /**
773         * Return the public interface for the purge jobs service.
774         *
775         * @return {@link StatusTransitService}.
776         */
777        @Override
778        public Class<? extends Service> getInterface() {
779            return StatusTransitService.class;
780        }
781    }
782