This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.util.ArrayList;
021 import java.util.Arrays;
022 import java.util.Date;
023 import java.util.HashMap;
024 import java.util.HashSet;
025 import java.util.List;
026 import java.util.Set;
027 import java.util.TreeSet;
028 import java.util.Comparator;
029
030 import org.apache.hadoop.conf.Configuration;
031 import org.apache.oozie.BundleActionBean;
032 import org.apache.oozie.BundleJobBean;
033 import org.apache.oozie.CoordinatorActionBean;
034 import org.apache.oozie.CoordinatorJobBean;
035 import org.apache.oozie.ErrorCode;
036 import org.apache.oozie.client.CoordinatorAction;
037 import org.apache.oozie.client.Job;
038 import org.apache.oozie.command.CommandException;
039 import org.apache.oozie.command.bundle.BundleKillXCommand;
040 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
041 import org.apache.oozie.executor.jpa.BundleActionsGetByLastModifiedTimeJPAExecutor;
042 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
043 import org.apache.oozie.executor.jpa.BundleActionsGetStatusPendingJPAExecutor;
044 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
045 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
046 import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor;
047 import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor;
048 import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor;
049 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
050 import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor;
051 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
052 import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor;
053 import org.apache.oozie.executor.jpa.JPAExecutorException;
054 import org.apache.oozie.util.DateUtils;
055 import org.apache.oozie.util.MemoryLocks;
056 import org.apache.oozie.util.StatusUtils;
057 import org.apache.oozie.util.XLog;
058
059 /**
060 * StateTransitService is scheduled to run at the configured interval.
061 * <p/>
062 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 (job
063 * done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
064 * SUCCEEDED.
065 */
066 public class StatusTransitService implements Service {
067 public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService.";
068 public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval";
069 public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status";
070 public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR = CONF_PREFIX + "backward.support.for.states.without.error";
071 private static int limit = -1;
072 private static Date lastInstanceStartTime = null;
073 private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class);
074
075 /**
076 * StateTransitRunnable is the runnable which is scheduled to run at the configured interval.
077 * <p/>
078 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0
079 * (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
080 * SUCCEEDED.
081 */
082 static class StatusTransitRunnable implements Runnable {
083 private JPAService jpaService = null;
084 private MemoryLocks.LockToken lock;
085
086 public StatusTransitRunnable() {
087 jpaService = Services.get().get(JPAService.class);
088 if (jpaService == null) {
089 LOG.error("Missing JPAService");
090 }
091 }
092
093 @Override
094 public void run() {
095 try {
096 Date curDate = new Date(); // records the start time of this service run;
097
098 // first check if there is some other instance running;
099 lock = Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(),
100 lockTimeout);
101 if (lock == null) {
102 LOG.info("This StatusTransitService instance"
103 + " will not run since there is already an instance running");
104 }
105 else {
106 LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName());
107 // running coord jobs transit service
108 coordTransit();
109 // running bundle jobs transit service
110 bundleTransit();
111
112 lastInstanceStartTime = curDate;
113 }
114 }
115 catch (Exception ex) {
116 LOG.warn("Exception happened during StatusTransitRunnable ", ex);
117 }
118 finally {
119 // release lock;
120 if (lock != null) {
121 lock.release();
122 LOG.info("Released lock for [{0}]", StatusTransitService.class.getName());
123 }
124 }
125 }
126
127 public List<BundleJobBean> removeDuplicates(List<BundleJobBean> pendingJobList) {
128 Set<BundleJobBean> s = new TreeSet<BundleJobBean>(new Comparator<BundleJobBean>() {
129 @Override
130 public int compare(BundleJobBean b1, BundleJobBean b2) {
131 if (b1.getId().equals(b2.getId())) {
132 return 0;
133 }
134 else
135 return 1;
136 }
137 });
138 s.addAll(pendingJobList);
139 return new ArrayList<BundleJobBean>(s);
140 }
141
142 /**
143 * Aggregate bundle actions' status to bundle jobs
144 *
145 * @throws JPAExecutorException thrown if failed in db updates or retrievals
146 * @throws CommandException thrown if failed to run commands
147 */
148 private void bundleTransit() throws JPAExecutorException, CommandException {
149 List<BundleJobBean> pendingJobCheckList = null;
150
151 if (lastInstanceStartTime == null) {
152 LOG.info("Running bundle status service first instance");
153 // this is the first instance, we need to check for all pending or running jobs;
154 pendingJobCheckList = jpaService.execute(new BundleJobsGetRunningOrPendingJPAExecutor(limit));
155 }
156 else {
157 LOG.info("Running bundle status service from last instance time = "
158 + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
159 // this is not the first instance, we should only check jobs that have actions been
160 // updated >= start time of last service run;
161 List<BundleActionBean> actionList = jpaService
162 .execute(new BundleActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
163 Set<String> bundleIds = new HashSet<String>();
164 for (BundleActionBean action : actionList) {
165 bundleIds.add(action.getBundleId());
166 }
167 pendingJobCheckList = new ArrayList<BundleJobBean>();
168 for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) {
169 BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId));
170 // Running bundle job might have pending false
171 if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING)
172 || bundle.getStatus().equals(Job.Status.RUNNINGWITHERROR)
173 || bundle.getStatus().equals(Job.Status.PAUSED)
174 || bundle.getStatus().equals(Job.Status.PAUSEDWITHERROR)) {
175 pendingJobCheckList.add(bundle);
176 }
177 }
178 }
179 aggregateBundleJobsStatus(pendingJobCheckList);
180 }
181
182 private void aggregateBundleJobsStatus(List<BundleJobBean> bundleLists) throws JPAExecutorException,
183 CommandException {
184 if (bundleLists != null) {
185 for (BundleJobBean bundleJob : bundleLists) {
186 try {
187 String jobId = bundleJob.getId();
188 Job.Status[] bundleStatus = new Job.Status[1];
189 bundleStatus[0] = bundleJob.getStatus();
190 List<BundleActionBean> bundleActions = jpaService.execute(new BundleActionsGetStatusPendingJPAExecutor(
191 jobId));
192 HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>();
193 boolean foundPending = false;
194 for (BundleActionBean bAction : bundleActions) {
195 int counter = 0;
196 if (bundleActionStatus.containsKey(bAction.getStatus())) {
197 counter = bundleActionStatus.get(bAction.getStatus()) + 1;
198 }
199 else {
200 ++counter;
201 }
202 bundleActionStatus.put(bAction.getStatus(), counter);
203 if (bAction.getCoordId() == null
204 && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED)) {
205 (new BundleKillXCommand(jobId)).call();
206 LOG.info("Bundle job ["+ jobId
207 + "] has been killed since one of its coordinator job failed submission.");
208 }
209
210 if (bAction.isPending()) {
211 foundPending = true;
212 }
213 }
214
215 if (!foundPending && checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) {
216 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
217 + "' from '" + bundleJob.getStatus() + "'");
218 updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
219 }
220 else if (!foundPending && checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) {
221 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
222 + "' from '" + bundleJob.getStatus() + "'");
223 updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
224 }
225 else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) {
226 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
227 + "' from '" + bundleJob.getStatus() + "'");
228 updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
229 }
230 else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus, foundPending)) {
231 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
232 + "' from '" + bundleJob.getStatus() + "'");
233 updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
234 }
235 else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) {
236 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
237 + "' from '" + bundleJob.getStatus() + "'");
238 updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
239 }
240 }
241 catch (Exception ex) {
242 LOG.error("Exception happened during aggregate bundle job's status, job = "
243 + bundleJob.getId(), ex);
244 }
245 }
246 }
247
248 }
249
250 private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException,
251 CommandException {
252 if (CoordList != null) {
253 Configuration conf = Services.get().getConf();
254 boolean backwardSupportForCoordStatus = conf.getBoolean(CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
255
256 for (CoordinatorJobBean coordJob : CoordList) {
257 try {
258 // if namespace 0.1 is used and backward support is true, then ignore this coord job
259 if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
260 && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
261 continue;
262 }
263 String jobId = coordJob.getId();
264 Job.Status[] coordStatus = new Job.Status[1];
265 coordStatus[0] = coordJob.getStatus();
266 //Get count of Coordinator actions with pending true
267 boolean isPending = false;
268 int count = jpaService.execute(new CoordJobGetPendingActionsCountJPAExecutor(jobId));
269 if (count > 0) {
270 isPending = true;
271 }
272 // Get status of Coordinator actions
273 List<CoordinatorAction.Status> coordActionStatusList = jpaService
274 .execute(new CoordJobGetActionsStatusJPAExecutor(jobId));
275 HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>();
276
277 for (CoordinatorAction.Status status : coordActionStatusList) {
278 int counter = 0;
279 if (coordActionStatus.containsKey(status)) {
280 counter = coordActionStatus.get(status) + 1;
281 }
282 else {
283 ++counter;
284 }
285 coordActionStatus.put(status, counter);
286 }
287
288 int nonPendingCoordActionsCount = coordActionStatusList.size();
289 boolean isDoneMaterialization = coordJob.isDoneMaterialization();
290 if ((isDoneMaterialization || coordStatus[0] == Job.Status.FAILED || coordStatus[0] == Job.Status.KILLED)
291 && checkCoordTerminalStatus(coordActionStatus, nonPendingCoordActionsCount,
292 coordStatus, isDoneMaterialization)) {
293 LOG.info("Set coordinator job [" + jobId + "] status to '" + coordStatus[0].toString()
294 + "' from '" + coordJob.getStatus() + "'");
295 updateCoordJob(isPending, coordJob, coordStatus[0]);
296 }
297 else if (checkCoordPausedStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
298 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
299 + "' from '" + coordJob.getStatus() + "'");
300 updateCoordJob(isPending, coordJob, coordStatus[0]);
301 }
302 else if(checkCoordSuspendStatus( coordActionStatus, nonPendingCoordActionsCount, coordStatus, coordJob.isDoneMaterialization(), isPending)) {
303 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
304 + "' from '" + coordJob.getStatus() + "'");
305 updateCoordJob(isPending, coordJob, coordStatus[0]);
306 }
307 else if (checkCoordRunningStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
308 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
309 + "' from '" + coordJob.getStatus() + "'");
310 updateCoordJob(isPending, coordJob, coordStatus[0]);
311 }
312 else {
313 checkCoordPending(isPending, coordJob, true);
314 }
315 }
316 catch (Exception ex) {
317 LOG.error("Exception happened during aggregate coordinator job's status, job = "
318 + coordJob.getId(), ex);
319 }
320 }
321
322 }
323 }
324
325 private boolean checkTerminalStatus(HashMap<Job.Status, Integer> bundleActionStatus,
326 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
327 boolean ret = false;
328 int totalValuesSucceed = 0;
329 if (bundleActionStatus.containsKey(Job.Status.SUCCEEDED)) {
330 totalValuesSucceed = bundleActionStatus.get(Job.Status.SUCCEEDED);
331 }
332 int totalValuesFailed = 0;
333 if (bundleActionStatus.containsKey(Job.Status.FAILED)) {
334 totalValuesFailed = bundleActionStatus.get(Job.Status.FAILED);
335 }
336 int totalValuesKilled = 0;
337 if (bundleActionStatus.containsKey(Job.Status.KILLED)) {
338 totalValuesKilled = bundleActionStatus.get(Job.Status.KILLED);
339 }
340
341 int totalValuesDoneWithError = 0;
342 if (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)) {
343 totalValuesDoneWithError = bundleActionStatus.get(Job.Status.DONEWITHERROR);
344 }
345
346 if (bundleActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesDoneWithError)) {
347 // If all the bundle actions are succeeded then bundle job should be succeeded.
348 if (bundleActions.size() == totalValuesSucceed) {
349 bundleStatus[0] = Job.Status.SUCCEEDED;
350 ret = true;
351 }
352 else if (bundleActions.size() == totalValuesKilled) {
353 // If all the bundle actions are KILLED then bundle job should be KILLED.
354 bundleStatus[0] = Job.Status.KILLED;
355 ret = true;
356 }
357 else if (bundleActions.size() == totalValuesFailed) {
358 // If all the bundle actions are FAILED then bundle job should be FAILED.
359 bundleStatus[0] = Job.Status.FAILED;
360 ret = true;
361 }
362 else {
363 bundleStatus[0] = Job.Status.DONEWITHERROR;
364 ret = true;
365 }
366 }
367 return ret;
368 }
369
370 private boolean checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
371 int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization) {
372 boolean ret = false;
373 int totalValuesSucceed = 0;
374 if (coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) {
375 totalValuesSucceed = coordActionStatus.get(CoordinatorAction.Status.SUCCEEDED);
376 }
377 int totalValuesFailed = 0;
378 if (coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)) {
379 totalValuesFailed = coordActionStatus.get(CoordinatorAction.Status.FAILED);
380 }
381 int totalValuesKilled = 0;
382 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) {
383 totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED);
384 }
385
386 int totalValuesTimeOut = 0;
387 if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
388 totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT);
389 }
390
391 if (coordActionsCount == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut)) {
392 // If all the coordinator actions are succeeded then coordinator job should be succeeded.
393 if (coordActionsCount == totalValuesSucceed && isDoneMaterialization) {
394 coordStatus[0] = Job.Status.SUCCEEDED;
395 ret = true;
396 }
397 else if (coordActionsCount == totalValuesKilled) {
398 // If all the coordinator actions are KILLED then coordinator job should be KILLED.
399 coordStatus[0] = Job.Status.KILLED;
400 ret = true;
401 }
402 else if (coordActionsCount == totalValuesFailed) {
403 // If all the coordinator actions are FAILED then coordinator job should be FAILED.
404 coordStatus[0] = Job.Status.FAILED;
405 ret = true;
406 }
407 else {
408 coordStatus[0] = Job.Status.DONEWITHERROR;
409 ret = true;
410 }
411 }
412 return ret;
413 }
414
415 private boolean checkPrepStatus(HashMap<Job.Status, Integer> bundleActionStatus,
416 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
417 boolean ret = false;
418 if (bundleActionStatus.containsKey(Job.Status.PREP)) {
419 // If all the bundle actions are PREP then bundle job should be RUNNING.
420 if (bundleActions.size() > bundleActionStatus.get(Job.Status.PREP)) {
421 bundleStatus[0] = Job.Status.RUNNING;
422 ret = true;
423 }
424 }
425 return ret;
426 }
427
428 private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus,
429 List<BundleActionBean> bundleActions, Job.Status[] bundleJobStatus) {
430 boolean ret = false;
431
432 // TODO - When bottom up cmds are allowed to change the status of parent job,
433 // if none of the bundle actions are in paused or pausedwitherror, the function should return
434 // false
435
436 // top down
437 // If the bundle job is PAUSED or PAUSEDINERROR and no children are in error
438 // state, then job should be PAUSED otherwise it should be pausedwitherror
439 if (bundleJobStatus[0] == Job.Status.PAUSED || bundleJobStatus[0] == Job.Status.PAUSEDWITHERROR) {
440 if (bundleActionStatus.containsKey(Job.Status.KILLED)
441 || bundleActionStatus.containsKey(Job.Status.FAILED)
442 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
443 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
444 || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)
445 || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
446 bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
447 }
448 else {
449 bundleJobStatus[0] = Job.Status.PAUSED;
450 }
451 ret = true;
452 }
453
454 // bottom up; check the status of parent through their children
455 else if (bundleActionStatus.containsKey(Job.Status.PAUSED)
456 && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED))) {
457 bundleJobStatus[0] = Job.Status.PAUSED;
458 ret = true;
459 }
460 else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
461 int pausedActions = bundleActionStatus.containsKey(Job.Status.PAUSED) ? bundleActionStatus
462 .get(Job.Status.PAUSED) : 0;
463 if (bundleActions.size() == pausedActions + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR)) {
464 bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
465 ret = true;
466 }
467 }
468 else {
469 ret = false;
470 }
471 return ret;
472 }
473
474
475 private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus,
476 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus, boolean isPending) {
477 boolean ret = false;
478
479 // TODO - When bottom up cmds are allowed to change the status of parent job,
480 // if none of the bundle actions are in suspended or suspendedwitherror, the function should return
481 // false
482
483 // top down
484 // if job is suspended
485 if (bundleStatus[0] == Job.Status.SUSPENDED
486 || bundleStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
487 if (bundleActionStatus.containsKey(Job.Status.KILLED)
488 || bundleActionStatus.containsKey(Job.Status.FAILED)
489 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
490 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
491 || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
492 bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
493 }
494 else {
495 bundleStatus[0] = Job.Status.SUSPENDED;
496 }
497 ret =true;
498 }
499
500 // bottom up
501 // Update status of parent from the status of its children
502 else if (!isPending && bundleActionStatus.containsKey(Job.Status.SUSPENDED)
503 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) {
504 int succeededActions = bundleActionStatus.containsKey(Job.Status.SUCCEEDED) ? bundleActionStatus
505 .get(Job.Status.SUCCEEDED) : 0;
506 int killedActions = bundleActionStatus.containsKey(Job.Status.KILLED) ? bundleActionStatus
507 .get(Job.Status.KILLED) : 0;
508 int failedActions = bundleActionStatus.containsKey(Job.Status.FAILED) ? bundleActionStatus
509 .get(Job.Status.FAILED) : 0;
510 int doneWithErrorActions = bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) ? bundleActionStatus
511 .get(Job.Status.DONEWITHERROR) : 0;
512
513 if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions) {
514 bundleStatus[0] = Job.Status.SUSPENDED;
515 ret = true;
516 }
517 else if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR)
518 + bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions + killedActions + failedActions + doneWithErrorActions) {
519 bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
520 ret = true;
521 }
522 }
523 return ret;
524
525 }
526
527 private boolean checkCoordPausedStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
528 int coordActionsCount, Job.Status[] coordStatus){
529 boolean ret = false;
530 if (coordStatus[0].equals(Job.Status.PAUSED) || coordStatus[0].equals(Job.Status.PAUSEDWITHERROR)) {
531 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
532 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
533 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
534 coordStatus[0] = Job.Status.PAUSEDWITHERROR;
535 }
536 else {
537 coordStatus[0] = Job.Status.PAUSED;
538 }
539 ret = true;
540 }
541 return ret;
542 }
543 private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
544 int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization, boolean isPending) {
545 boolean ret = false;
546
547 // TODO - When bottom up cmds are allowed to change the status of parent job
548 //if none of the coord actions are in suspended or suspendedwitherror and materialization done is false
549 //,then the function should return
550 // false
551
552 // top down
553 // check for children only when parent is suspended
554 if (coordStatus[0] == Job.Status.SUSPENDED || coordStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
555
556 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
557 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
558 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
559 coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
560 }
561 else {
562 coordStatus[0] = Job.Status.SUSPENDED;
563 }
564 ret = true;
565 }
566 // bottom up
567 // look for children to check the parent's status only if materialization is
568 // done and all actions are non-pending
569 else if (isDoneMaterialization && !isPending && coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) {
570 int succeededActions = coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED) ? coordActionStatus
571 .get(CoordinatorAction.Status.SUCCEEDED) : 0;
572 int killedActions = coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) ? coordActionStatus
573 .get(CoordinatorAction.Status.KILLED) : 0;
574 int failedActions = coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) ? coordActionStatus
575 .get(CoordinatorAction.Status.FAILED) : 0;
576 int timedoutActions = coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) ? coordActionStatus
577 .get(CoordinatorAction.Status.TIMEDOUT) : 0;
578
579 if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) + succeededActions) {
580 coordStatus[0] = Job.Status.SUSPENDED;
581 ret = true;
582 }
583 else if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)
584 + succeededActions + killedActions + failedActions + timedoutActions) {
585 coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
586 ret = true;
587 }
588 }
589 return ret;
590 }
591
592 private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
593 int coordActionsCount, Job.Status[] coordStatus) {
594 boolean ret = false;
595 if (coordStatus[0] != Job.Status.PREP) {
596 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
597 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
598 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
599 coordStatus[0] = Job.Status.RUNNINGWITHERROR;
600 }
601 else {
602 coordStatus[0] = Job.Status.RUNNING;
603 }
604 ret = true;
605 }
606 return ret;
607 }
608
609 private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus,
610 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
611 boolean ret = false;
612 if (bundleStatus[0] != Job.Status.PREP) {
613 if (bundleActionStatus.containsKey(Job.Status.FAILED)
614 || bundleActionStatus.containsKey(Job.Status.KILLED)
615 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
616 || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) {
617 bundleStatus[0] = Job.Status.RUNNINGWITHERROR;
618 }
619 else {
620 bundleStatus[0] = Job.Status.RUNNING;
621 }
622 ret = true;
623 }
624 return ret;
625
626 }
627
628 private void updateBundleJob(boolean isPending, BundleJobBean bundleJob, Job.Status bundleStatus)
629 throws JPAExecutorException {
630 String jobId = bundleJob.getId();
631 // Update the Bundle Job
632 // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
633 // PAUSEDWITHERROR is not supported
634 bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus));
635 if (isPending) {
636 bundleJob.setPending();
637 LOG.info("Bundle job [" + jobId + "] Pending set to TRUE");
638 }
639 else {
640 bundleJob.resetPending();
641 LOG.info("Bundle job [" + jobId + "] Pending set to FALSE");
642 }
643 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
644 }
645
646 private void updateCoordJob(boolean isPending, CoordinatorJobBean coordJob, Job.Status coordStatus)
647 throws JPAExecutorException, CommandException {
648 Job.Status prevStatus = coordJob.getStatus();
649 // Update the Coord Job
650 if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED
651 || coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) {
652 if (coordStatus == Job.Status.SUSPENDED || coordStatus == Job.Status.SUSPENDEDWITHERROR) {
653 LOG.info("Coord Job [" + coordJob.getId()
654 + "] status to "+ coordStatus +" can not be updated as its already in Terminal state");
655 return;
656 }
657 }
658
659 checkCoordPending(isPending, coordJob, false);
660 // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and PAUSEDWITHERROR is
661 // not supported
662 coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(coordStatus));
663 // Backward support when coordinator namespace is 0.1
664 coordJob.setStatus(StatusUtils.getStatus(coordJob));
665 coordJob.setLastModifiedTime(new Date());
666 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
667 // update bundle action only when status changes in coord job
668 if (coordJob.getBundleId() != null) {
669 if (!prevStatus.equals(coordJob.getStatus())) {
670 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
671 bundleStatusUpdate.call();
672 }
673 }
674 }
675
676 private void checkCoordPending(boolean isPending, CoordinatorJobBean coordJob, boolean saveToDB) throws JPAExecutorException {
677 // Checking the coordinator pending should be updated or not
678 if (isPending) {
679 coordJob.setPending();
680 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to TRUE");
681 }
682 else {
683 coordJob.resetPending();
684 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to FALSE");
685 }
686
687 if (saveToDB) {
688 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
689 }
690 }
691
692 /**
693 * Aggregate coordinator actions' status to coordinator jobs
694 *
695 * @throws JPAExecutorException thrown if failed in db updates or retrievals
696 * @throws CommandException thrown if failed to run commands
697 */
698 private void coordTransit() throws JPAExecutorException, CommandException {
699 List<CoordinatorJobBean> pendingJobCheckList = null;
700 if (lastInstanceStartTime == null) {
701 LOG.info("Running coordinator status service first instance");
702 // this is the first instance, we need to check for all pending jobs;
703 pendingJobCheckList = jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit));
704 }
705 else {
706 LOG.info("Running coordinator status service from last instance time = "
707 + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
708 // this is not the first instance, we should only check jobs
709 // that have actions been
710 // updated >= start time of last service run;
711 List<String> coordJobIdList = jpaService
712 .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
713 Set<String> coordIds = new HashSet<String>();
714 for (String coordJobId : coordJobIdList) {
715 coordIds.add(coordJobId);
716 }
717 pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
718 for (String coordId : coordIds.toArray(new String[coordIds.size()])) {
719 CoordinatorJobBean coordJob;
720 try{
721 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId));
722 }
723 catch (JPAExecutorException jpaee) {
724 if (jpaee.getErrorCode().equals(ErrorCode.E0604)) {
725 LOG.warn("Exception happened during StatusTransitRunnable; Coordinator Job doesn't exist", jpaee);
726 continue;
727 } else {
728 throw jpaee;
729 }
730 }
731 // Running coord job might have pending false
732 Job.Status coordJobStatus = coordJob.getStatus();
733 if (coordJob.isPending() || coordJobStatus.equals(Job.Status.PAUSED)
734 || coordJobStatus.equals(Job.Status.RUNNING)
735 || coordJobStatus.equals(Job.Status.RUNNINGWITHERROR)
736 || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
737 pendingJobCheckList.add(coordJob);
738 }
739 }
740 }
741 aggregateCoordJobsStatus(pendingJobCheckList);
742 }
743 }
744
745 /**
746 * Initializes the {@link StatusTransitService}.
747 *
748 * @param services services instance.
749 */
750 @Override
751 public void init(Services services) {
752 Configuration conf = services.getConf();
753 Runnable stateTransitRunnable = new StatusTransitRunnable();
754 services.get(SchedulerService.class).schedule(stateTransitRunnable, 10,
755 conf.getInt(CONF_STATUSTRANSIT_INTERVAL, 60), SchedulerService.Unit.SEC);
756 }
757
758 /**
759 * Destroy the StateTransit Jobs Service.
760 */
761 @Override
762 public void destroy() {
763 }
764
765 /**
766 * Return the public interface for the purge jobs service.
767 *
768 * @return {@link StatusTransitService}.
769 */
770 @Override
771 public Class<? extends Service> getInterface() {
772 return StatusTransitService.class;
773 }
774 }
775