001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.util.ArrayList;
021    import java.util.Arrays;
022    import java.util.Date;
023    import java.util.HashMap;
024    import java.util.HashSet;
025    import java.util.List;
026    import java.util.Set;
027    import java.util.TreeSet;
028    import java.util.Comparator;
029    
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.oozie.BundleActionBean;
032    import org.apache.oozie.BundleJobBean;
033    import org.apache.oozie.CoordinatorActionBean;
034    import org.apache.oozie.CoordinatorJobBean;
035    import org.apache.oozie.ErrorCode;
036    import org.apache.oozie.client.CoordinatorAction;
037    import org.apache.oozie.client.Job;
038    import org.apache.oozie.command.CommandException;
039    import org.apache.oozie.command.bundle.BundleKillXCommand;
040    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
041    import org.apache.oozie.executor.jpa.BundleActionsGetByLastModifiedTimeJPAExecutor;
042    import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
043    import org.apache.oozie.executor.jpa.BundleActionsGetStatusPendingJPAExecutor;
044    import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
045    import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
046    import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor;
047    import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor;
048    import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor;
049    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
050    import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor;
051    import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
052    import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor;
053    import org.apache.oozie.executor.jpa.JPAExecutorException;
054    import org.apache.oozie.util.DateUtils;
055    import org.apache.oozie.util.MemoryLocks;
056    import org.apache.oozie.util.StatusUtils;
057    import org.apache.oozie.util.XLog;
058    
059    /**
060     * StateTransitService is scheduled to run at the configured interval.
061     * <p/>
062     * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 (job
063     * done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
064     * SUCCEEDED.
065     */
066    public class StatusTransitService implements Service {
067        public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService.";
068        public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval";
069        public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status";
070        public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR = CONF_PREFIX + "backward.support.for.states.without.error";
071        private static int limit = -1;
072        private static Date lastInstanceStartTime = null;
073        private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class);
074    
075        /**
076         * StateTransitRunnable is the runnable which is scheduled to run at the configured interval.
077         * <p/>
078         * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0
079         * (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
080         * SUCCEEDED.
081         */
082        static class StatusTransitRunnable implements Runnable {
083            private JPAService jpaService = null;
084            private MemoryLocks.LockToken lock;
085    
086            public StatusTransitRunnable() {
087                jpaService = Services.get().get(JPAService.class);
088                if (jpaService == null) {
089                    LOG.error("Missing JPAService");
090                }
091            }
092    
093            @Override
094            public void run() {
095                try {
096                    Date curDate = new Date(); // records the start time of this service run;
097    
098                    // first check if there is some other instance running;
099                    lock = Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(),
100                            lockTimeout);
101                    if (lock == null) {
102                        LOG.info("This StatusTransitService instance"
103                                + " will not run since there is already an instance running");
104                    }
105                    else {
106                        LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName());
107                        // running coord jobs transit service
108                        coordTransit();
109                        // running bundle jobs transit service
110                        bundleTransit();
111    
112                        lastInstanceStartTime = curDate;
113                    }
114                }
115                catch (Exception ex) {
116                    LOG.warn("Exception happened during StatusTransitRunnable ", ex);
117                }
118                finally {
119                    // release lock;
120                    if (lock != null) {
121                        lock.release();
122                        LOG.info("Released lock for [{0}]", StatusTransitService.class.getName());
123                    }
124                }
125            }
126    
127            public List<BundleJobBean> removeDuplicates(List<BundleJobBean> pendingJobList) {
128                Set<BundleJobBean> s = new TreeSet<BundleJobBean>(new Comparator<BundleJobBean>() {
129                    @Override
130                    public int compare(BundleJobBean b1, BundleJobBean b2) {
131                        if (b1.getId().equals(b2.getId())) {
132                            return 0;
133                        }
134                        else
135                            return 1;
136                    }
137                });
138                s.addAll(pendingJobList);
139                return new ArrayList<BundleJobBean>(s);
140            }
141    
142            /**
143             * Aggregate bundle actions' status to bundle jobs
144             *
145             * @throws JPAExecutorException thrown if failed in db updates or retrievals
146             * @throws CommandException thrown if failed to run commands
147             */
148            private void bundleTransit() throws JPAExecutorException, CommandException {
149                List<BundleJobBean> pendingJobCheckList = null;
150    
151                if (lastInstanceStartTime == null) {
152                    LOG.info("Running bundle status service first instance");
153                    // this is the first instance, we need to check for all pending or running jobs;
154                    pendingJobCheckList = jpaService.execute(new BundleJobsGetRunningOrPendingJPAExecutor(limit));
155                }
156                else {
157                    LOG.info("Running bundle status service from last instance time =  "
158                            + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
159                    // this is not the first instance, we should only check jobs that have actions been
160                    // updated >= start time of last service run;
161                    List<BundleActionBean> actionList = jpaService
162                            .execute(new BundleActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
163                    Set<String> bundleIds = new HashSet<String>();
164                    for (BundleActionBean action : actionList) {
165                        bundleIds.add(action.getBundleId());
166                    }
167                    pendingJobCheckList = new ArrayList<BundleJobBean>();
168                    for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) {
169                        BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId));
170                        // Running bundle job might have pending false
171                        if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING)
172                                || bundle.getStatus().equals(Job.Status.RUNNINGWITHERROR)
173                                || bundle.getStatus().equals(Job.Status.PAUSED)
174                                || bundle.getStatus().equals(Job.Status.PAUSEDWITHERROR)) {
175                            pendingJobCheckList.add(bundle);
176                        }
177                    }
178                }
179                aggregateBundleJobsStatus(pendingJobCheckList);
180            }
181    
182            private void aggregateBundleJobsStatus(List<BundleJobBean> bundleLists) throws JPAExecutorException,
183                    CommandException {
184                if (bundleLists != null) {
185                        for (BundleJobBean bundleJob : bundleLists) {
186                            try {
187                                String jobId = bundleJob.getId();
188                                Job.Status[] bundleStatus = new Job.Status[1];
189                                bundleStatus[0] = bundleJob.getStatus();
190                                List<BundleActionBean> bundleActions = jpaService.execute(new BundleActionsGetStatusPendingJPAExecutor(
191                                        jobId));
192                                HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>();
193                                boolean foundPending = false;
194                                for (BundleActionBean bAction : bundleActions) {
195                                        int counter = 0;
196                                        if (bundleActionStatus.containsKey(bAction.getStatus())) {
197                                            counter = bundleActionStatus.get(bAction.getStatus()) + 1;
198                                        }
199                                        else {
200                                            ++counter;
201                                        }
202                                        bundleActionStatus.put(bAction.getStatus(), counter);
203                                        if (bAction.getCoordId() == null
204                                                && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED)) {
205                                            (new BundleKillXCommand(jobId)).call();
206                                            LOG.info("Bundle job ["+ jobId
207                                                            + "] has been killed since one of its coordinator job failed submission.");
208                                        }
209    
210                                     if (bAction.isPending()) {
211                                        foundPending = true;
212                                    }
213                                }
214    
215                                if (!foundPending && checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) {
216                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
217                                            + "' from '" + bundleJob.getStatus() + "'");
218                                    updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
219                                }
220                                else if (!foundPending && checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) {
221                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
222                                            + "' from '" + bundleJob.getStatus() + "'");
223                                    updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
224                                }
225                                else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) {
226                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
227                                            + "' from '" + bundleJob.getStatus() + "'");
228                                    updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
229                                }
230                                else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus, foundPending)) {
231                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
232                                            + "' from '" + bundleJob.getStatus() + "'");
233                                    updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
234                                }
235                                else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) {
236                                    LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
237                                            + "' from '" + bundleJob.getStatus() + "'");
238                                    updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
239                                }
240                            }
241                            catch (Exception ex) {
242                                LOG.error("Exception happened during aggregate bundle job's status, job = "
243                                        + bundleJob.getId(), ex);
244                            }
245                        }
246                    }
247    
248            }
249    
250            private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException,
251                    CommandException {
252                if (CoordList != null) {
253                    Configuration conf = Services.get().getConf();
254                    boolean backwardSupportForCoordStatus = conf.getBoolean(CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
255    
256                    for (CoordinatorJobBean coordJob : CoordList) {
257                        try {
258                            // if namespace 0.1 is used and backward support is true, then ignore this coord job
259                            if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
260                                    && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
261                                continue;
262                            }
263                            String jobId = coordJob.getId();
264                            Job.Status[] coordStatus = new Job.Status[1];
265                            coordStatus[0] = coordJob.getStatus();
266                            //Get count of Coordinator actions with pending true
267                            boolean isPending = false;
268                            int count = jpaService.execute(new CoordJobGetPendingActionsCountJPAExecutor(jobId));
269                            if (count > 0) {
270                                 isPending = true;
271                            }
272                            // Get status of Coordinator actions
273                            List<CoordinatorAction.Status> coordActionStatusList = jpaService
274                                    .execute(new CoordJobGetActionsStatusJPAExecutor(jobId));
275                            HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>();
276    
277                            for (CoordinatorAction.Status status : coordActionStatusList) {
278                                int counter = 0;
279                                if (coordActionStatus.containsKey(status)) {
280                                    counter = coordActionStatus.get(status) + 1;
281                                }
282                                else {
283                                    ++counter;
284                                }
285                                coordActionStatus.put(status, counter);
286                            }
287    
288                            int nonPendingCoordActionsCount = coordActionStatusList.size();
289                            boolean isDoneMaterialization = coordJob.isDoneMaterialization();
290                            if ((isDoneMaterialization || coordStatus[0] == Job.Status.FAILED || coordStatus[0] == Job.Status.KILLED)
291                                    && checkCoordTerminalStatus(coordActionStatus, nonPendingCoordActionsCount,
292                                            coordStatus, isDoneMaterialization)) {
293                                LOG.info("Set coordinator job [" + jobId + "] status to '" + coordStatus[0].toString()
294                                        + "' from '" + coordJob.getStatus() + "'");
295                                updateCoordJob(isPending, coordJob, coordStatus[0]);
296                            }
297                            else if (checkCoordPausedStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
298                                LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
299                                        + "' from '" + coordJob.getStatus() + "'");
300                                updateCoordJob(isPending, coordJob, coordStatus[0]);
301                            }
302                            else if(checkCoordSuspendStatus( coordActionStatus, nonPendingCoordActionsCount, coordStatus, coordJob.isDoneMaterialization(), isPending)) {
303                                LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
304                                        + "' from '" + coordJob.getStatus() + "'");
305                                updateCoordJob(isPending, coordJob, coordStatus[0]);
306                            }
307                            else if (checkCoordRunningStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
308                                LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
309                                        + "' from '" + coordJob.getStatus() + "'");
310                                updateCoordJob(isPending, coordJob, coordStatus[0]);
311                            }
312                            else {
313                                checkCoordPending(isPending, coordJob, true);
314                            }
315                        }
316                        catch (Exception ex) {
317                            LOG.error("Exception happened during aggregate coordinator job's status, job = "
318                                    + coordJob.getId(), ex);
319                        }
320                    }
321    
322                }
323            }
324    
325            private boolean checkTerminalStatus(HashMap<Job.Status, Integer> bundleActionStatus,
326                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
327                boolean ret = false;
328                int totalValuesSucceed = 0;
329                if (bundleActionStatus.containsKey(Job.Status.SUCCEEDED)) {
330                    totalValuesSucceed = bundleActionStatus.get(Job.Status.SUCCEEDED);
331                }
332                int totalValuesFailed = 0;
333                if (bundleActionStatus.containsKey(Job.Status.FAILED)) {
334                    totalValuesFailed = bundleActionStatus.get(Job.Status.FAILED);
335                }
336                int totalValuesKilled = 0;
337                if (bundleActionStatus.containsKey(Job.Status.KILLED)) {
338                    totalValuesKilled = bundleActionStatus.get(Job.Status.KILLED);
339                }
340    
341                int totalValuesDoneWithError = 0;
342                if (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)) {
343                    totalValuesDoneWithError = bundleActionStatus.get(Job.Status.DONEWITHERROR);
344                }
345    
346                if (bundleActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesDoneWithError)) {
347                    // If all the bundle actions are succeeded then bundle job should be succeeded.
348                    if (bundleActions.size() == totalValuesSucceed) {
349                        bundleStatus[0] = Job.Status.SUCCEEDED;
350                        ret = true;
351                    }
352                    else if (bundleActions.size() == totalValuesKilled) {
353                        // If all the bundle actions are KILLED then bundle job should be KILLED.
354                        bundleStatus[0] = Job.Status.KILLED;
355                        ret = true;
356                    }
357                    else if (bundleActions.size() == totalValuesFailed) {
358                        // If all the bundle actions are FAILED then bundle job should be FAILED.
359                        bundleStatus[0] = Job.Status.FAILED;
360                        ret = true;
361                    }
362                    else {
363                        bundleStatus[0] = Job.Status.DONEWITHERROR;
364                        ret = true;
365                    }
366                }
367                return ret;
368            }
369    
370            private boolean checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
371                    int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization) {
372                boolean ret = false;
373                int totalValuesSucceed = 0;
374                if (coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) {
375                    totalValuesSucceed = coordActionStatus.get(CoordinatorAction.Status.SUCCEEDED);
376                }
377                int totalValuesFailed = 0;
378                if (coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)) {
379                    totalValuesFailed = coordActionStatus.get(CoordinatorAction.Status.FAILED);
380                }
381                int totalValuesKilled = 0;
382                if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) {
383                    totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED);
384                }
385    
386                int totalValuesTimeOut = 0;
387                if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
388                    totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT);
389                }
390    
391                if (coordActionsCount == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut)) {
392                    // If all the coordinator actions are succeeded then coordinator job should be succeeded.
393                    if (coordActionsCount == totalValuesSucceed && isDoneMaterialization) {
394                        coordStatus[0] = Job.Status.SUCCEEDED;
395                        ret = true;
396                    }
397                    else if (coordActionsCount == totalValuesKilled) {
398                        // If all the coordinator actions are KILLED then coordinator job should be KILLED.
399                        coordStatus[0] = Job.Status.KILLED;
400                        ret = true;
401                    }
402                    else if (coordActionsCount == totalValuesFailed) {
403                        // If all the coordinator actions are FAILED then coordinator job should be FAILED.
404                        coordStatus[0] = Job.Status.FAILED;
405                        ret = true;
406                    }
407                    else {
408                        coordStatus[0] = Job.Status.DONEWITHERROR;
409                        ret = true;
410                    }
411                }
412                return ret;
413            }
414    
415            private boolean checkPrepStatus(HashMap<Job.Status, Integer> bundleActionStatus,
416                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
417                boolean ret = false;
418                if (bundleActionStatus.containsKey(Job.Status.PREP)) {
419                    // If all the bundle actions are PREP then bundle job should be RUNNING.
420                    if (bundleActions.size() > bundleActionStatus.get(Job.Status.PREP)) {
421                        bundleStatus[0] = Job.Status.RUNNING;
422                        ret = true;
423                    }
424                }
425                return ret;
426            }
427    
428            private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus,
429                    List<BundleActionBean> bundleActions, Job.Status[] bundleJobStatus) {
430                boolean ret = false;
431    
432                // TODO - When bottom up cmds are allowed to change the status of parent job,
433                // if none of the bundle actions are in paused or pausedwitherror, the function should return
434                // false
435    
436                // top down
437                // If the bundle job is PAUSED or PAUSEDINERROR and no children are in error
438                // state, then job should be PAUSED otherwise it should be pausedwitherror
439                if (bundleJobStatus[0] == Job.Status.PAUSED || bundleJobStatus[0] == Job.Status.PAUSEDWITHERROR) {
440                    if (bundleActionStatus.containsKey(Job.Status.KILLED)
441                            || bundleActionStatus.containsKey(Job.Status.FAILED)
442                            || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
443                            || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
444                            || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)
445                            || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
446                        bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
447                    }
448                    else {
449                        bundleJobStatus[0] = Job.Status.PAUSED;
450                    }
451                    ret = true;
452                }
453    
454                // bottom up; check the status of parent through their children
455                else if (bundleActionStatus.containsKey(Job.Status.PAUSED)
456                        && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED))) {
457                    bundleJobStatus[0] = Job.Status.PAUSED;
458                    ret = true;
459                }
460                else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
461                    int pausedActions = bundleActionStatus.containsKey(Job.Status.PAUSED) ? bundleActionStatus
462                            .get(Job.Status.PAUSED) : 0;
463                    if (bundleActions.size() == pausedActions + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR)) {
464                        bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
465                        ret = true;
466                    }
467                }
468                else {
469                    ret = false;
470                }
471                return ret;
472            }
473    
474    
475            private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus,
476                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus, boolean isPending) {
477                boolean ret = false;
478    
479                // TODO - When bottom up cmds are allowed to change the status of parent job,
480                // if none of the bundle actions are in suspended or suspendedwitherror, the function should return
481                // false
482    
483                // top down
484                // if job is suspended
485                if (bundleStatus[0] == Job.Status.SUSPENDED
486                        || bundleStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
487                    if (bundleActionStatus.containsKey(Job.Status.KILLED)
488                            || bundleActionStatus.containsKey(Job.Status.FAILED)
489                            || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
490                            || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
491                            || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
492                        bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
493                    }
494                    else {
495                        bundleStatus[0] = Job.Status.SUSPENDED;
496                    }
497                    ret =true;
498                }
499    
500                // bottom up
501                // Update status of parent from the status of its children
502                else if (!isPending && bundleActionStatus.containsKey(Job.Status.SUSPENDED)
503                        || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) {
504                    int succeededActions = bundleActionStatus.containsKey(Job.Status.SUCCEEDED) ? bundleActionStatus
505                            .get(Job.Status.SUCCEEDED) : 0;
506                    int killedActions = bundleActionStatus.containsKey(Job.Status.KILLED) ? bundleActionStatus
507                            .get(Job.Status.KILLED) : 0;
508                    int failedActions = bundleActionStatus.containsKey(Job.Status.FAILED) ? bundleActionStatus
509                            .get(Job.Status.FAILED) : 0;
510                    int doneWithErrorActions = bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) ? bundleActionStatus
511                            .get(Job.Status.DONEWITHERROR) : 0;
512    
513                    if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions) {
514                        bundleStatus[0] = Job.Status.SUSPENDED;
515                        ret = true;
516                    }
517                    else if (bundleActions.size()  == bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR)
518                            + bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions + killedActions + failedActions + doneWithErrorActions) {
519                        bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
520                        ret = true;
521                    }
522                }
523                return ret;
524    
525            }
526    
527            private boolean checkCoordPausedStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
528                    int coordActionsCount, Job.Status[] coordStatus){
529                boolean ret = false;
530                if (coordStatus[0].equals(Job.Status.PAUSED) || coordStatus[0].equals(Job.Status.PAUSEDWITHERROR)) {
531                    if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
532                            || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
533                            || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
534                        coordStatus[0] = Job.Status.PAUSEDWITHERROR;
535                    }
536                    else {
537                        coordStatus[0] = Job.Status.PAUSED;
538                    }
539                    ret = true;
540                }
541                return ret;
542            }
543            private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
544                    int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization, boolean isPending) {
545                boolean ret = false;
546    
547                // TODO - When bottom up cmds are allowed to change the status of parent job
548                //if none of the coord actions are in suspended or suspendedwitherror and materialization done is false
549                //,then the function should return
550                // false
551    
552                // top down
553                // check for children only when parent is suspended
554                if (coordStatus[0] == Job.Status.SUSPENDED || coordStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
555    
556                    if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
557                            || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
558                            || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
559                        coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
560                    }
561                    else {
562                        coordStatus[0] = Job.Status.SUSPENDED;
563                    }
564                    ret = true;
565                }
566                // bottom up
567                // look for children to check the parent's status only if materialization is
568                // done and all actions are non-pending
569                else if (isDoneMaterialization && !isPending && coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) {
570                    int succeededActions = coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED) ? coordActionStatus
571                           .get(CoordinatorAction.Status.SUCCEEDED) : 0;
572                    int killedActions = coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) ? coordActionStatus
573                            .get(CoordinatorAction.Status.KILLED) : 0;
574                    int failedActions = coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) ? coordActionStatus
575                            .get(CoordinatorAction.Status.FAILED) : 0;
576                    int timedoutActions = coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) ? coordActionStatus
577                            .get(CoordinatorAction.Status.TIMEDOUT) : 0;
578    
579                    if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) + succeededActions) {
580                        coordStatus[0] = Job.Status.SUSPENDED;
581                        ret = true;
582                    }
583                    else if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)
584                            + succeededActions + killedActions + failedActions + timedoutActions) {
585                        coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
586                        ret = true;
587                    }
588                }
589                return ret;
590            }
591    
592            private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
593                    int coordActionsCount, Job.Status[] coordStatus) {
594                boolean ret = false;
595                if (coordStatus[0] != Job.Status.PREP) {
596                    if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
597                            || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
598                            || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
599                        coordStatus[0] = Job.Status.RUNNINGWITHERROR;
600                    }
601                    else {
602                        coordStatus[0] = Job.Status.RUNNING;
603                    }
604                    ret = true;
605                }
606                return ret;
607            }
608    
609            private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus,
610                    List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
611                boolean ret = false;
612                if (bundleStatus[0] != Job.Status.PREP) {
613                    if (bundleActionStatus.containsKey(Job.Status.FAILED)
614                            || bundleActionStatus.containsKey(Job.Status.KILLED)
615                            || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
616                            || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) {
617                        bundleStatus[0] = Job.Status.RUNNINGWITHERROR;
618                    }
619                    else {
620                        bundleStatus[0] = Job.Status.RUNNING;
621                    }
622                    ret = true;
623                }
624                return ret;
625    
626            }
627    
628            private void updateBundleJob(boolean isPending, BundleJobBean bundleJob, Job.Status bundleStatus)
629                    throws JPAExecutorException {
630                String jobId = bundleJob.getId();
631                // Update the Bundle Job
632                // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
633                // PAUSEDWITHERROR is not supported
634                bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus));
635                if (isPending) {
636                    bundleJob.setPending();
637                    LOG.info("Bundle job [" + jobId + "] Pending set to TRUE");
638                }
639                else {
640                    bundleJob.resetPending();
641                    LOG.info("Bundle job [" + jobId + "] Pending set to FALSE");
642                }
643                jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
644            }
645    
646            private void updateCoordJob(boolean isPending, CoordinatorJobBean coordJob, Job.Status coordStatus)
647                    throws JPAExecutorException, CommandException {
648                Job.Status prevStatus = coordJob.getStatus();
649                // Update the Coord Job
650                if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED
651                        || coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) {
652                    if (coordStatus == Job.Status.SUSPENDED || coordStatus == Job.Status.SUSPENDEDWITHERROR) {
653                        LOG.info("Coord Job [" + coordJob.getId()
654                                + "] status to "+ coordStatus +" can not be updated as its already in Terminal state");
655                        return;
656                    }
657                }
658    
659                checkCoordPending(isPending, coordJob, false);
660                // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and PAUSEDWITHERROR is
661                // not supported
662                coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(coordStatus));
663                // Backward support when coordinator namespace is 0.1
664                coordJob.setStatus(StatusUtils.getStatus(coordJob));
665                coordJob.setLastModifiedTime(new Date());
666                jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
667                // update bundle action only when status changes in coord job
668                if (coordJob.getBundleId() != null) {
669                    if (!prevStatus.equals(coordJob.getStatus())) {
670                        BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
671                        bundleStatusUpdate.call();
672                    }
673                }
674            }
675    
676            private void checkCoordPending(boolean isPending, CoordinatorJobBean coordJob, boolean saveToDB) throws JPAExecutorException {
677                // Checking the coordinator pending should be updated or not
678                if (isPending) {
679                    coordJob.setPending();
680                    LOG.info("Coord job [" + coordJob.getId() + "] Pending set to TRUE");
681                }
682                else {
683                    coordJob.resetPending();
684                    LOG.info("Coord job [" + coordJob.getId() + "] Pending set to FALSE");
685                }
686    
687                if (saveToDB) {
688                    jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
689                }
690            }
691    
692            /**
693             * Aggregate coordinator actions' status to coordinator jobs
694             *
695             * @throws JPAExecutorException thrown if failed in db updates or retrievals
696             * @throws CommandException thrown if failed to run commands
697             */
698            private void coordTransit() throws JPAExecutorException, CommandException {
699                List<CoordinatorJobBean> pendingJobCheckList = null;
700                if (lastInstanceStartTime == null) {
701                    LOG.info("Running coordinator status service first instance");
702                    // this is the first instance, we need to check for all pending jobs;
703                    pendingJobCheckList = jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit));
704                }
705                else {
706                    LOG.info("Running coordinator status service from last instance time =  "
707                            + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
708                    // this is not the first instance, we should only check jobs
709                    // that have actions been
710                    // updated >= start time of last service run;
711                    List<String> coordJobIdList = jpaService
712                            .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
713                    Set<String> coordIds = new HashSet<String>();
714                    for (String coordJobId : coordJobIdList) {
715                        coordIds.add(coordJobId);
716                    }
717                    pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
718                    for (String coordId : coordIds.toArray(new String[coordIds.size()])) {
719                        CoordinatorJobBean coordJob;
720                        try{
721                            coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId));
722                        }
723                        catch (JPAExecutorException jpaee) {
724                            if (jpaee.getErrorCode().equals(ErrorCode.E0604)) {
725                                LOG.warn("Exception happened during StatusTransitRunnable; Coordinator Job doesn't exist", jpaee);
726                                continue;
727                            } else {
728                                throw jpaee;
729                            }
730                        }
731                        // Running coord job might have pending false
732                        Job.Status coordJobStatus = coordJob.getStatus();
733                        if (coordJob.isPending() || coordJobStatus.equals(Job.Status.PAUSED)
734                                || coordJobStatus.equals(Job.Status.RUNNING)
735                                || coordJobStatus.equals(Job.Status.RUNNINGWITHERROR)
736                                || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
737                            pendingJobCheckList.add(coordJob);
738                        }
739                    }
740                }
741                aggregateCoordJobsStatus(pendingJobCheckList);
742            }
743        }
744    
745        /**
746         * Initializes the {@link StatusTransitService}.
747         *
748         * @param services services instance.
749         */
750        @Override
751        public void init(Services services) {
752            Configuration conf = services.getConf();
753            Runnable stateTransitRunnable = new StatusTransitRunnable();
754            services.get(SchedulerService.class).schedule(stateTransitRunnable, 10,
755                    conf.getInt(CONF_STATUSTRANSIT_INTERVAL, 60), SchedulerService.Unit.SEC);
756        }
757    
758        /**
759         * Destroy the StateTransit Jobs Service.
760         */
761        @Override
762        public void destroy() {
763        }
764    
765        /**
766         * Return the public interface for the purge jobs service.
767         *
768         * @return {@link StatusTransitService}.
769         */
770        @Override
771        public Class<? extends Service> getInterface() {
772            return StatusTransitService.class;
773        }
774    }
775