This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.util.ArrayList;
021 import java.util.Arrays;
022 import java.util.Date;
023 import java.util.HashMap;
024 import java.util.HashSet;
025 import java.util.List;
026 import java.util.Set;
027 import java.util.TreeSet;
028 import java.util.Comparator;
029
030 import org.apache.hadoop.conf.Configuration;
031 import org.apache.oozie.BundleActionBean;
032 import org.apache.oozie.BundleJobBean;
033 import org.apache.oozie.CoordinatorActionBean;
034 import org.apache.oozie.CoordinatorJobBean;
035 import org.apache.oozie.ErrorCode;
036 import org.apache.oozie.client.CoordinatorAction;
037 import org.apache.oozie.client.Job;
038 import org.apache.oozie.command.CommandException;
039 import org.apache.oozie.command.bundle.BundleKillXCommand;
040 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
041 import org.apache.oozie.executor.jpa.BundleActionsGetByLastModifiedTimeJPAExecutor;
042 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
043 import org.apache.oozie.executor.jpa.BundleActionsGetStatusPendingJPAExecutor;
044 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
045 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
046 import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor;
047 import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor;
048 import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor;
049 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
050 import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor;
051 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
052 import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor;
053 import org.apache.oozie.executor.jpa.JPAExecutorException;
054 import org.apache.oozie.util.DateUtils;
055 import org.apache.oozie.util.MemoryLocks;
056 import org.apache.oozie.util.StatusUtils;
057 import org.apache.oozie.util.XLog;
058
059 /**
060 * StateTransitService is scheduled to run at the configured interval.
061 * <p/>
062 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 (job
063 * done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
064 * SUCCEEDED.
065 */
066 public class StatusTransitService implements Service {
067 public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService.";
068 public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval";
069 public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status";
070 public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR = CONF_PREFIX + "backward.support.for.states.without.error";
071 private static int limit = -1;
072 private static Date lastInstanceStartTime = null;
073 private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class);
074
075 /**
076 * StateTransitRunnable is the runnable which is scheduled to run at the configured interval.
077 * <p/>
078 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0
079 * (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
080 * SUCCEEDED.
081 */
082 static class StatusTransitRunnable implements Runnable {
083 private JPAService jpaService = null;
084 private MemoryLocks.LockToken lock;
085
086 public StatusTransitRunnable() {
087 jpaService = Services.get().get(JPAService.class);
088 if (jpaService == null) {
089 LOG.error("Missing JPAService");
090 }
091 }
092
093 @Override
094 public void run() {
095 try {
096 Date curDate = new Date(); // records the start time of this service run;
097
098 // first check if there is some other instance running;
099 lock = Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(),
100 lockTimeout);
101 if (lock == null) {
102 LOG.info("This StatusTransitService instance"
103 + " will not run since there is already an instance running");
104 }
105 else {
106 LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName());
107 // running coord jobs transit service
108 coordTransit();
109 // running bundle jobs transit service
110 bundleTransit();
111
112 lastInstanceStartTime = curDate;
113 }
114 }
115 catch (Exception ex) {
116 LOG.warn("Exception happened during StatusTransitRunnable ", ex);
117 }
118 finally {
119 // release lock;
120 if (lock != null) {
121 lock.release();
122 LOG.info("Released lock for [{0}]", StatusTransitService.class.getName());
123 }
124 }
125 }
126
127 public List<BundleJobBean> removeDuplicates(List<BundleJobBean> pendingJobList) {
128 Set<BundleJobBean> s = new TreeSet<BundleJobBean>(new Comparator<BundleJobBean>() {
129 @Override
130 public int compare(BundleJobBean b1, BundleJobBean b2) {
131 if (b1.getId().equals(b2.getId())) {
132 return 0;
133 }
134 else
135 return 1;
136 }
137 });
138 s.addAll(pendingJobList);
139 return new ArrayList<BundleJobBean>(s);
140 }
141
142 /**
143 * Aggregate bundle actions' status to bundle jobs
144 *
145 * @throws JPAExecutorException thrown if failed in db updates or retrievals
146 * @throws CommandException thrown if failed to run commands
147 */
148 private void bundleTransit() throws JPAExecutorException, CommandException {
149 List<BundleJobBean> pendingJobCheckList = null;
150
151 if (lastInstanceStartTime == null) {
152 LOG.info("Running bundle status service first instance");
153 // this is the first instance, we need to check for all pending or running jobs;
154 pendingJobCheckList = jpaService.execute(new BundleJobsGetRunningOrPendingJPAExecutor(limit));
155 }
156 else {
157 LOG.info("Running bundle status service from last instance time = "
158 + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
159 // this is not the first instance, we should only check jobs that have actions been
160 // updated >= start time of last service run;
161 List<BundleActionBean> actionList = jpaService
162 .execute(new BundleActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
163 Set<String> bundleIds = new HashSet<String>();
164 for (BundleActionBean action : actionList) {
165 bundleIds.add(action.getBundleId());
166 }
167 pendingJobCheckList = new ArrayList<BundleJobBean>();
168 for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) {
169 BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId));
170 // Running bundle job might have pending false
171 if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING)
172 || bundle.getStatus().equals(Job.Status.RUNNINGWITHERROR)
173 || bundle.getStatus().equals(Job.Status.PAUSED)
174 || bundle.getStatus().equals(Job.Status.PAUSEDWITHERROR)) {
175 pendingJobCheckList.add(bundle);
176 }
177 }
178 }
179 aggregateBundleJobsStatus(pendingJobCheckList);
180 }
181
182 private void aggregateBundleJobsStatus(List<BundleJobBean> bundleLists) throws JPAExecutorException,
183 CommandException {
184 if (bundleLists != null) {
185 for (BundleJobBean bundleJob : bundleLists) {
186 try {
187 String jobId = bundleJob.getId();
188 Job.Status[] bundleStatus = new Job.Status[1];
189 bundleStatus[0] = bundleJob.getStatus();
190 List<BundleActionBean> bundleActions = jpaService.execute(new BundleActionsGetStatusPendingJPAExecutor(
191 jobId));
192 HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>();
193 boolean foundPending = false;
194 for (BundleActionBean bAction : bundleActions) {
195 int counter = 0;
196 if (bundleActionStatus.containsKey(bAction.getStatus())) {
197 counter = bundleActionStatus.get(bAction.getStatus()) + 1;
198 }
199 else {
200 ++counter;
201 }
202 bundleActionStatus.put(bAction.getStatus(), counter);
203 if (bAction.getCoordId() == null
204 && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED)) {
205 (new BundleKillXCommand(jobId)).call();
206 LOG.info("Bundle job ["+ jobId
207 + "] has been killed since one of its coordinator job failed submission.");
208 }
209
210 if (bAction.isPending()) {
211 foundPending = true;
212 }
213 }
214
215 if (!foundPending && checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) {
216 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
217 + "' from '" + bundleJob.getStatus() + "'");
218 updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
219 }
220 else if (!foundPending && checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) {
221 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
222 + "' from '" + bundleJob.getStatus() + "'");
223 updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
224 }
225 else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) {
226 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
227 + "' from '" + bundleJob.getStatus() + "'");
228 updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
229 }
230 else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus, foundPending)) {
231 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
232 + "' from '" + bundleJob.getStatus() + "'");
233 updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
234 }
235 else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) {
236 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
237 + "' from '" + bundleJob.getStatus() + "'");
238 updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
239 }
240 }
241 catch (Exception ex) {
242 LOG.error("Exception happened during aggregate bundle job's status, job = "
243 + bundleJob.getId(), ex);
244 }
245 }
246 }
247
248 }
249
250 private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException,
251 CommandException {
252 if (CoordList != null) {
253 Configuration conf = Services.get().getConf();
254 boolean backwardSupportForCoordStatus = conf.getBoolean(CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
255
256 for (CoordinatorJobBean coordJob : CoordList) {
257 try {
258 // if namespace 0.1 is used and backward support is true, then ignore this coord job
259 if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
260 && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
261 continue;
262 }
263 String jobId = coordJob.getId();
264 Job.Status[] coordStatus = new Job.Status[1];
265 coordStatus[0] = coordJob.getStatus();
266 //Get count of Coordinator actions with pending true
267 boolean isPending = false;
268 int count = jpaService.execute(new CoordJobGetPendingActionsCountJPAExecutor(jobId));
269 if (count > 0) {
270 isPending = true;
271 }
272 // Get status of Coordinator actions
273 List<CoordinatorAction.Status> coordActionStatusList = jpaService
274 .execute(new CoordJobGetActionsStatusJPAExecutor(jobId));
275 HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>();
276
277 for (CoordinatorAction.Status status : coordActionStatusList) {
278 int counter = 0;
279 if (coordActionStatus.containsKey(status)) {
280 counter = coordActionStatus.get(status) + 1;
281 }
282 else {
283 ++counter;
284 }
285 coordActionStatus.put(status, counter);
286 }
287
288 int nonPendingCoordActionsCount = coordActionStatusList.size();
289
290 if ((coordJob.isDoneMaterialization() || coordStatus[0] == Job.Status.FAILED || coordStatus[0] == Job.Status.KILLED)
291 && checkCoordTerminalStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
292 LOG.info("Set coordinator job [" + jobId + "] status to '" + coordStatus[0].toString()
293 + "' from '" + coordJob.getStatus() + "'");
294 updateCoordJob(isPending, coordJob, coordStatus[0]);
295 }
296 else if (checkCoordPausedStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
297 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
298 + "' from '" + coordJob.getStatus() + "'");
299 updateCoordJob(isPending, coordJob, coordStatus[0]);
300 }
301 else if(checkCoordSuspendStatus( coordActionStatus, nonPendingCoordActionsCount, coordStatus, coordJob.isDoneMaterialization(), isPending)) {
302 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
303 + "' from '" + coordJob.getStatus() + "'");
304 updateCoordJob(isPending, coordJob, coordStatus[0]);
305 }
306 else if (checkCoordRunningStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
307 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
308 + "' from '" + coordJob.getStatus() + "'");
309 updateCoordJob(isPending, coordJob, coordStatus[0]);
310 }
311 else {
312 checkCoordPending(isPending, coordJob, true);
313 }
314 }
315 catch (Exception ex) {
316 LOG.error("Exception happened during aggregate coordinator job's status, job = "
317 + coordJob.getId(), ex);
318 }
319 }
320
321 }
322 }
323
324 private boolean checkTerminalStatus(HashMap<Job.Status, Integer> bundleActionStatus,
325 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
326 boolean ret = false;
327 int totalValuesSucceed = 0;
328 if (bundleActionStatus.containsKey(Job.Status.SUCCEEDED)) {
329 totalValuesSucceed = bundleActionStatus.get(Job.Status.SUCCEEDED);
330 }
331 int totalValuesFailed = 0;
332 if (bundleActionStatus.containsKey(Job.Status.FAILED)) {
333 totalValuesFailed = bundleActionStatus.get(Job.Status.FAILED);
334 }
335 int totalValuesKilled = 0;
336 if (bundleActionStatus.containsKey(Job.Status.KILLED)) {
337 totalValuesKilled = bundleActionStatus.get(Job.Status.KILLED);
338 }
339
340 int totalValuesDoneWithError = 0;
341 if (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)) {
342 totalValuesDoneWithError = bundleActionStatus.get(Job.Status.DONEWITHERROR);
343 }
344
345 if (bundleActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesDoneWithError)) {
346 // If all the bundle actions are succeeded then bundle job should be succeeded.
347 if (bundleActions.size() == totalValuesSucceed) {
348 bundleStatus[0] = Job.Status.SUCCEEDED;
349 ret = true;
350 }
351 else if (bundleActions.size() == totalValuesKilled) {
352 // If all the bundle actions are KILLED then bundle job should be KILLED.
353 bundleStatus[0] = Job.Status.KILLED;
354 ret = true;
355 }
356 else if (bundleActions.size() == totalValuesFailed) {
357 // If all the bundle actions are FAILED then bundle job should be FAILED.
358 bundleStatus[0] = Job.Status.FAILED;
359 ret = true;
360 }
361 else {
362 bundleStatus[0] = Job.Status.DONEWITHERROR;
363 ret = true;
364 }
365 }
366 return ret;
367 }
368
369 private boolean checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
370 int coordActionsCount, Job.Status[] coordStatus) {
371 boolean ret = false;
372 int totalValuesSucceed = 0;
373 if (coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) {
374 totalValuesSucceed = coordActionStatus.get(CoordinatorAction.Status.SUCCEEDED);
375 }
376 int totalValuesFailed = 0;
377 if (coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)) {
378 totalValuesFailed = coordActionStatus.get(CoordinatorAction.Status.FAILED);
379 }
380 int totalValuesKilled = 0;
381 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) {
382 totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED);
383 }
384
385 int totalValuesTimeOut = 0;
386 if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
387 totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT);
388 }
389
390 if (coordActionsCount == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut)) {
391 // If all the coordinator actions are succeeded then coordinator job should be succeeded.
392 if (coordActionsCount == totalValuesSucceed) {
393 coordStatus[0] = Job.Status.SUCCEEDED;
394 ret = true;
395 }
396 else if (coordActionsCount == totalValuesKilled) {
397 // If all the coordinator actions are KILLED then coordinator job should be KILLED.
398 coordStatus[0] = Job.Status.KILLED;
399 ret = true;
400 }
401 else if (coordActionsCount == totalValuesFailed) {
402 // If all the coordinator actions are FAILED then coordinator job should be FAILED.
403 coordStatus[0] = Job.Status.FAILED;
404 ret = true;
405 }
406 else {
407 coordStatus[0] = Job.Status.DONEWITHERROR;
408 ret = true;
409 }
410 }
411 return ret;
412 }
413
414 private boolean checkPrepStatus(HashMap<Job.Status, Integer> bundleActionStatus,
415 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
416 boolean ret = false;
417 if (bundleActionStatus.containsKey(Job.Status.PREP)) {
418 // If all the bundle actions are PREP then bundle job should be RUNNING.
419 if (bundleActions.size() > bundleActionStatus.get(Job.Status.PREP)) {
420 bundleStatus[0] = Job.Status.RUNNING;
421 ret = true;
422 }
423 }
424 return ret;
425 }
426
427 private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus,
428 List<BundleActionBean> bundleActions, Job.Status[] bundleJobStatus) {
429 boolean ret = false;
430
431 // TODO - When bottom up cmds are allowed to change the status of parent job,
432 // if none of the bundle actions are in paused or pausedwitherror, the function should return
433 // false
434
435 // top down
436 // If the bundle job is PAUSED or PAUSEDINERROR and no children are in error
437 // state, then job should be PAUSED otherwise it should be pausedwitherror
438 if (bundleJobStatus[0] == Job.Status.PAUSED || bundleJobStatus[0] == Job.Status.PAUSEDWITHERROR) {
439 if (bundleActionStatus.containsKey(Job.Status.KILLED)
440 || bundleActionStatus.containsKey(Job.Status.FAILED)
441 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
442 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
443 || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)
444 || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
445 bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
446 }
447 else {
448 bundleJobStatus[0] = Job.Status.PAUSED;
449 }
450 ret = true;
451 }
452
453 // bottom up; check the status of parent through their children
454 else if (bundleActionStatus.containsKey(Job.Status.PAUSED)
455 && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED))) {
456 bundleJobStatus[0] = Job.Status.PAUSED;
457 ret = true;
458 }
459 else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
460 int pausedActions = bundleActionStatus.containsKey(Job.Status.PAUSED) ? bundleActionStatus
461 .get(Job.Status.PAUSED) : 0;
462 if (bundleActions.size() == pausedActions + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR)) {
463 bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
464 ret = true;
465 }
466 }
467 else {
468 ret = false;
469 }
470 return ret;
471 }
472
473
474 private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus,
475 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus, boolean isPending) {
476 boolean ret = false;
477
478 // TODO - When bottom up cmds are allowed to change the status of parent job,
479 // if none of the bundle actions are in suspended or suspendedwitherror, the function should return
480 // false
481
482 // top down
483 // if job is suspended
484 if (bundleStatus[0] == Job.Status.SUSPENDED
485 || bundleStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
486 if (bundleActionStatus.containsKey(Job.Status.KILLED)
487 || bundleActionStatus.containsKey(Job.Status.FAILED)
488 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
489 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
490 || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
491 bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
492 }
493 else {
494 bundleStatus[0] = Job.Status.SUSPENDED;
495 }
496 ret =true;
497 }
498
499 // bottom up
500 // Update status of parent from the status of its children
501 else if (!isPending && bundleActionStatus.containsKey(Job.Status.SUSPENDED)
502 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) {
503 int succeededActions = bundleActionStatus.containsKey(Job.Status.SUCCEEDED) ? bundleActionStatus
504 .get(Job.Status.SUCCEEDED) : 0;
505 int killedActions = bundleActionStatus.containsKey(Job.Status.KILLED) ? bundleActionStatus
506 .get(Job.Status.KILLED) : 0;
507 int failedActions = bundleActionStatus.containsKey(Job.Status.FAILED) ? bundleActionStatus
508 .get(Job.Status.FAILED) : 0;
509 int doneWithErrorActions = bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) ? bundleActionStatus
510 .get(Job.Status.DONEWITHERROR) : 0;
511
512 if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions) {
513 bundleStatus[0] = Job.Status.SUSPENDED;
514 ret = true;
515 }
516 else if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR)
517 + bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions + killedActions + failedActions + doneWithErrorActions) {
518 bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
519 ret = true;
520 }
521 }
522 return ret;
523
524 }
525
526 private boolean checkCoordPausedStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
527 int coordActionsCount, Job.Status[] coordStatus){
528 boolean ret = false;
529 if (coordStatus[0].equals(Job.Status.PAUSED) || coordStatus[0].equals(Job.Status.PAUSEDWITHERROR)) {
530 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
531 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
532 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
533 coordStatus[0] = Job.Status.PAUSEDWITHERROR;
534 }
535 else {
536 coordStatus[0] = Job.Status.PAUSED;
537 }
538 ret = true;
539 }
540 return ret;
541 }
542 private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
543 int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization, boolean isPending) {
544 boolean ret = false;
545
546 // TODO - When bottom up cmds are allowed to change the status of parent job
547 //if none of the coord actions are in suspended or suspendedwitherror and materialization done is false
548 //,then the function should return
549 // false
550
551 // top down
552 // check for children only when parent is suspended
553 if (coordStatus[0] == Job.Status.SUSPENDED || coordStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
554
555 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
556 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
557 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
558 coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
559 }
560 else {
561 coordStatus[0] = Job.Status.SUSPENDED;
562 }
563 ret = true;
564 }
565 // bottom up
566 // look for children to check the parent's status only if materialization is
567 // done and all actions are non-pending
568 else if (isDoneMaterialization && !isPending && coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) {
569 int succeededActions = coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED) ? coordActionStatus
570 .get(CoordinatorAction.Status.SUCCEEDED) : 0;
571 int killedActions = coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) ? coordActionStatus
572 .get(CoordinatorAction.Status.KILLED) : 0;
573 int failedActions = coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) ? coordActionStatus
574 .get(CoordinatorAction.Status.FAILED) : 0;
575 int timedoutActions = coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) ? coordActionStatus
576 .get(CoordinatorAction.Status.TIMEDOUT) : 0;
577
578 if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) + succeededActions) {
579 coordStatus[0] = Job.Status.SUSPENDED;
580 ret = true;
581 }
582 else if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)
583 + succeededActions + killedActions + failedActions + timedoutActions) {
584 coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
585 ret = true;
586 }
587 }
588 return ret;
589 }
590
591 private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
592 int coordActionsCount, Job.Status[] coordStatus) {
593 boolean ret = false;
594 if (coordStatus[0] != Job.Status.PREP) {
595 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
596 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
597 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
598 coordStatus[0] = Job.Status.RUNNINGWITHERROR;
599 }
600 else {
601 coordStatus[0] = Job.Status.RUNNING;
602 }
603 ret = true;
604 }
605 return ret;
606 }
607
608 private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus,
609 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
610 boolean ret = false;
611 if (bundleStatus[0] != Job.Status.PREP) {
612 if (bundleActionStatus.containsKey(Job.Status.FAILED)
613 || bundleActionStatus.containsKey(Job.Status.KILLED)
614 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
615 || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) {
616 bundleStatus[0] = Job.Status.RUNNINGWITHERROR;
617 }
618 else {
619 bundleStatus[0] = Job.Status.RUNNING;
620 }
621 ret = true;
622 }
623 return ret;
624
625 }
626
627 private void updateBundleJob(boolean isPending, BundleJobBean bundleJob, Job.Status bundleStatus)
628 throws JPAExecutorException {
629 String jobId = bundleJob.getId();
630 // Update the Bundle Job
631 // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
632 // PAUSEDWITHERROR is not supported
633 bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus));
634 if (isPending) {
635 bundleJob.setPending();
636 LOG.info("Bundle job [" + jobId + "] Pending set to TRUE");
637 }
638 else {
639 bundleJob.resetPending();
640 LOG.info("Bundle job [" + jobId + "] Pending set to FALSE");
641 }
642 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
643 }
644
645 private void updateCoordJob(boolean isPending, CoordinatorJobBean coordJob, Job.Status coordStatus)
646 throws JPAExecutorException, CommandException {
647 Job.Status prevStatus = coordJob.getStatus();
648 // Update the Coord Job
649 if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED
650 || coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) {
651 if (coordStatus == Job.Status.SUSPENDED || coordStatus == Job.Status.SUSPENDEDWITHERROR) {
652 LOG.info("Coord Job [" + coordJob.getId()
653 + "] status to "+ coordStatus +" can not be updated as its already in Terminal state");
654 return;
655 }
656 }
657
658 checkCoordPending(isPending, coordJob, false);
659 // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and PAUSEDWITHERROR is
660 // not supported
661 coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(coordStatus));
662 // Backward support when coordinator namespace is 0.1
663 coordJob.setStatus(StatusUtils.getStatus(coordJob));
664 coordJob.setLastModifiedTime(new Date());
665 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
666 // update bundle action only when status changes in coord job
667 if (coordJob.getBundleId() != null) {
668 if (!prevStatus.equals(coordJob.getStatus())) {
669 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
670 bundleStatusUpdate.call();
671 }
672 }
673 }
674
675 private void checkCoordPending(boolean isPending, CoordinatorJobBean coordJob, boolean saveToDB) throws JPAExecutorException {
676 // Checking the coordinator pending should be updated or not
677 if (isPending) {
678 coordJob.setPending();
679 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to TRUE");
680 }
681 else {
682 coordJob.resetPending();
683 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to FALSE");
684 }
685
686 if (saveToDB) {
687 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
688 }
689 }
690
691 /**
692 * Aggregate coordinator actions' status to coordinator jobs
693 *
694 * @throws JPAExecutorException thrown if failed in db updates or retrievals
695 * @throws CommandException thrown if failed to run commands
696 */
697 private void coordTransit() throws JPAExecutorException, CommandException {
698 List<CoordinatorJobBean> pendingJobCheckList = null;
699 if (lastInstanceStartTime == null) {
700 LOG.info("Running coordinator status service first instance");
701 // this is the first instance, we need to check for all pending jobs;
702 pendingJobCheckList = jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit));
703 }
704 else {
705 LOG.info("Running coordinator status service from last instance time = "
706 + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
707 // this is not the first instance, we should only check jobs
708 // that have actions been
709 // updated >= start time of last service run;
710 List<String> coordJobIdList = jpaService
711 .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
712 Set<String> coordIds = new HashSet<String>();
713 for (String coordJobId : coordJobIdList) {
714 coordIds.add(coordJobId);
715 }
716 pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
717 for (String coordId : coordIds.toArray(new String[coordIds.size()])) {
718 CoordinatorJobBean coordJob;
719 try{
720 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId));
721 }
722 catch (JPAExecutorException jpaee) {
723 if (jpaee.getErrorCode().equals(ErrorCode.E0604)) {
724 LOG.warn("Exception happened during StatusTransitRunnable; Coordinator Job doesn't exist", jpaee);
725 continue;
726 } else {
727 throw jpaee;
728 }
729 }
730 // Running coord job might have pending false
731 Job.Status coordJobStatus = coordJob.getStatus();
732 if (coordJob.isPending() || coordJobStatus.equals(Job.Status.PAUSED)
733 || coordJobStatus.equals(Job.Status.RUNNING)
734 || coordJobStatus.equals(Job.Status.RUNNINGWITHERROR)
735 || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
736 pendingJobCheckList.add(coordJob);
737 }
738 }
739 }
740 aggregateCoordJobsStatus(pendingJobCheckList);
741 }
742 }
743
744 /**
745 * Initializes the {@link StatusTransitService}.
746 *
747 * @param services services instance.
748 */
749 @Override
750 public void init(Services services) {
751 Configuration conf = services.getConf();
752 Runnable stateTransitRunnable = new StatusTransitRunnable();
753 services.get(SchedulerService.class).schedule(stateTransitRunnable, 10,
754 conf.getInt(CONF_STATUSTRANSIT_INTERVAL, 60), SchedulerService.Unit.SEC);
755 }
756
757 /**
758 * Destroy the StateTransit Jobs Service.
759 */
760 @Override
761 public void destroy() {
762 }
763
764 /**
765 * Return the public interface for the purge jobs service.
766 *
767 * @return {@link StatusTransitService}.
768 */
769 @Override
770 public Class<? extends Service> getInterface() {
771 return StatusTransitService.class;
772 }
773 }
774