This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.util.ArrayList;
021 import java.util.Date;
022 import java.util.HashMap;
023 import java.util.HashSet;
024 import java.util.List;
025 import java.util.Set;
026 import java.util.TreeSet;
027 import java.util.Comparator;
028
029 import org.apache.hadoop.conf.Configuration;
030 import org.apache.oozie.BundleActionBean;
031 import org.apache.oozie.BundleJobBean;
032 import org.apache.oozie.CoordinatorJobBean;
033 import org.apache.oozie.ErrorCode;
034 import org.apache.oozie.client.CoordinatorAction;
035 import org.apache.oozie.client.Job;
036 import org.apache.oozie.command.CommandException;
037 import org.apache.oozie.command.bundle.BundleKillXCommand;
038 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
039 import org.apache.oozie.executor.jpa.BundleActionsGetByLastModifiedTimeJPAExecutor;
040 import org.apache.oozie.executor.jpa.BundleActionsGetStatusPendingJPAExecutor;
041 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
042 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
043 import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor;
044 import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor;
045 import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor;
046 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
047 import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor;
048 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
049 import org.apache.oozie.executor.jpa.CoordJobsGetChangedJPAExecutor;
050 import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor;
051 import org.apache.oozie.executor.jpa.JPAExecutorException;
052 import org.apache.oozie.util.DateUtils;
053 import org.apache.oozie.util.MemoryLocks;
054 import org.apache.oozie.util.StatusUtils;
055 import org.apache.oozie.util.XLog;
056
057 /**
058 * StateTransitService is scheduled to run at the configured interval.
059 * <p/>
060 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 (job
061 * done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
062 * SUCCEEDED.
063 */
064 public class StatusTransitService implements Service {
065 public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService.";
066 public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval";
067 public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status";
068 public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR = CONF_PREFIX + "backward.support.for.states.without.error";
069 private static int limit = -1;
070 private static Date lastInstanceStartTime = null;
071 private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class);
072
073 /**
074 * StateTransitRunnable is the runnable which is scheduled to run at the configured interval.
075 * <p/>
076 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0
077 * (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
078 * SUCCEEDED.
079 */
080 public static class StatusTransitRunnable implements Runnable {
081 private JPAService jpaService = null;
082 private MemoryLocks.LockToken lock;
083
084 public StatusTransitRunnable() {
085 jpaService = Services.get().get(JPAService.class);
086 if (jpaService == null) {
087 LOG.error("Missing JPAService");
088 }
089 }
090
091 @Override
092 public void run() {
093 try {
094 Date curDate = new Date(); // records the start time of this service run;
095
096 // first check if there is some other instance running;
097 lock = Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(),
098 lockTimeout);
099 if (lock == null) {
100 LOG.info("This StatusTransitService instance"
101 + " will not run since there is already an instance running");
102 }
103 else {
104 LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName());
105 // running coord jobs transit service
106 coordTransit();
107 // running bundle jobs transit service
108 bundleTransit();
109
110 lastInstanceStartTime = curDate;
111 }
112 }
113 catch (Exception ex) {
114 LOG.warn("Exception happened during StatusTransitRunnable ", ex);
115 }
116 finally {
117 // release lock;
118 if (lock != null) {
119 lock.release();
120 LOG.info("Released lock for [{0}]", StatusTransitService.class.getName());
121 }
122 }
123 }
124
125 public List<BundleJobBean> removeDuplicates(List<BundleJobBean> pendingJobList) {
126 Set<BundleJobBean> s = new TreeSet<BundleJobBean>(new Comparator<BundleJobBean>() {
127 @Override
128 public int compare(BundleJobBean b1, BundleJobBean b2) {
129 if (b1.getId().equals(b2.getId())) {
130 return 0;
131 }
132 else
133 return 1;
134 }
135 });
136 s.addAll(pendingJobList);
137 return new ArrayList<BundleJobBean>(s);
138 }
139
140 /**
141 * Aggregate bundle actions' status to bundle jobs
142 *
143 * @throws JPAExecutorException thrown if failed in db updates or retrievals
144 * @throws CommandException thrown if failed to run commands
145 */
146 private void bundleTransit() throws JPAExecutorException, CommandException {
147 List<BundleJobBean> pendingJobCheckList = null;
148
149 if (lastInstanceStartTime == null) {
150 LOG.info("Running bundle status service first instance");
151 // this is the first instance, we need to check for all pending or running jobs;
152 pendingJobCheckList = jpaService.execute(new BundleJobsGetRunningOrPendingJPAExecutor(limit));
153 }
154 else {
155 LOG.info("Running bundle status service from last instance time = "
156 + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
157 // this is not the first instance, we should only check jobs that have actions been
158 // updated >= start time of last service run;
159 List<BundleActionBean> actionList = jpaService
160 .execute(new BundleActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
161 Set<String> bundleIds = new HashSet<String>();
162 for (BundleActionBean action : actionList) {
163 bundleIds.add(action.getBundleId());
164 }
165 pendingJobCheckList = new ArrayList<BundleJobBean>();
166 for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) {
167 BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId));
168 // Running bundle job might have pending false
169 if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING)
170 || bundle.getStatus().equals(Job.Status.RUNNINGWITHERROR)
171 || bundle.getStatus().equals(Job.Status.PAUSED)
172 || bundle.getStatus().equals(Job.Status.PAUSEDWITHERROR)) {
173 pendingJobCheckList.add(bundle);
174 }
175 }
176 }
177 aggregateBundleJobsStatus(pendingJobCheckList);
178 }
179
180 private void aggregateBundleJobsStatus(List<BundleJobBean> bundleLists) throws JPAExecutorException,
181 CommandException {
182 if (bundleLists != null) {
183 for (BundleJobBean bundleJob : bundleLists) {
184 try {
185 String jobId = bundleJob.getId();
186 Job.Status[] bundleStatus = new Job.Status[1];
187 bundleStatus[0] = bundleJob.getStatus();
188 List<BundleActionBean> bundleActions = jpaService.execute(new BundleActionsGetStatusPendingJPAExecutor(
189 jobId));
190 HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>();
191 boolean foundPending = false;
192 for (BundleActionBean bAction : bundleActions) {
193 int counter = 0;
194 if (bundleActionStatus.containsKey(bAction.getStatus())) {
195 counter = bundleActionStatus.get(bAction.getStatus()) + 1;
196 }
197 else {
198 ++counter;
199 }
200 bundleActionStatus.put(bAction.getStatus(), counter);
201 if (bAction.getCoordId() == null
202 && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED)) {
203 (new BundleKillXCommand(jobId)).call();
204 LOG.info("Bundle job ["+ jobId
205 + "] has been killed since one of its coordinator job failed submission.");
206 }
207
208 if (bAction.isPending()) {
209 foundPending = true;
210 }
211 }
212
213 if (!foundPending && checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) {
214 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
215 + "' from '" + bundleJob.getStatus() + "'");
216 updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
217 }
218 else if (!foundPending && checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) {
219 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
220 + "' from '" + bundleJob.getStatus() + "'");
221 updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
222 }
223 else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) {
224 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
225 + "' from '" + bundleJob.getStatus() + "'");
226 updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
227 }
228 else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus, foundPending)) {
229 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
230 + "' from '" + bundleJob.getStatus() + "'");
231 updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
232 }
233 else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) {
234 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
235 + "' from '" + bundleJob.getStatus() + "'");
236 updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
237 }
238 }
239 catch (Exception ex) {
240 LOG.error("Exception happened during aggregate bundle job's status, job = "
241 + bundleJob.getId(), ex);
242 }
243 }
244 }
245
246 }
247
248 private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException,
249 CommandException {
250 if (CoordList != null) {
251 Configuration conf = Services.get().getConf();
252 boolean backwardSupportForCoordStatus = conf.getBoolean(CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
253
254 for (CoordinatorJobBean coordJob : CoordList) {
255 try {
256 // if namespace 0.1 is used and backward support is true, then ignore this coord job
257 if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
258 && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
259 continue;
260 }
261 String jobId = coordJob.getId();
262 Job.Status[] coordStatus = new Job.Status[1];
263 coordStatus[0] = coordJob.getStatus();
264 //Get count of Coordinator actions with pending true
265 boolean isPending = false;
266 int count = jpaService.execute(new CoordJobGetPendingActionsCountJPAExecutor(jobId));
267 if (count > 0) {
268 isPending = true;
269 }
270 // Get status of Coordinator actions
271 List<CoordinatorAction.Status> coordActionStatusList = jpaService
272 .execute(new CoordJobGetActionsStatusJPAExecutor(jobId));
273 HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>();
274
275 for (CoordinatorAction.Status status : coordActionStatusList) {
276 int counter = 0;
277 if (coordActionStatus.containsKey(status)) {
278 counter = coordActionStatus.get(status) + 1;
279 }
280 else {
281 ++counter;
282 }
283 coordActionStatus.put(status, counter);
284 }
285
286 int nonPendingCoordActionsCount = coordActionStatusList.size();
287 boolean isDoneMaterialization = coordJob.isDoneMaterialization();
288 if ((isDoneMaterialization || coordStatus[0] == Job.Status.FAILED || coordStatus[0] == Job.Status.KILLED)
289 && checkCoordTerminalStatus(coordActionStatus, nonPendingCoordActionsCount,
290 coordStatus, isDoneMaterialization)) {
291 LOG.info("Set coordinator job [" + jobId + "] status to '" + coordStatus[0].toString()
292 + "' from '" + coordJob.getStatus() + "'");
293 updateCoordJob(isPending, coordJob, coordStatus[0]);
294 }
295 else if (checkCoordPausedStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
296 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
297 + "' from '" + coordJob.getStatus() + "'");
298 updateCoordJob(isPending, coordJob, coordStatus[0]);
299 }
300 else if(checkCoordSuspendStatus( coordActionStatus, nonPendingCoordActionsCount, coordStatus, coordJob.isDoneMaterialization(), isPending)) {
301 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
302 + "' from '" + coordJob.getStatus() + "'");
303 updateCoordJob(isPending, coordJob, coordStatus[0]);
304 }
305 else if (checkCoordRunningStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
306 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
307 + "' from '" + coordJob.getStatus() + "'");
308 updateCoordJob(isPending, coordJob, coordStatus[0]);
309 }
310 else {
311 checkCoordPending(isPending, coordJob, true);
312 }
313 }
314 catch (Exception ex) {
315 LOG.error("Exception happened during aggregate coordinator job's status, job = "
316 + coordJob.getId(), ex);
317 }
318 }
319
320 }
321 }
322
323 private boolean checkTerminalStatus(HashMap<Job.Status, Integer> bundleActionStatus,
324 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
325 boolean ret = false;
326 int totalValuesSucceed = 0;
327 if (bundleActionStatus.containsKey(Job.Status.SUCCEEDED)) {
328 totalValuesSucceed = bundleActionStatus.get(Job.Status.SUCCEEDED);
329 }
330 int totalValuesFailed = 0;
331 if (bundleActionStatus.containsKey(Job.Status.FAILED)) {
332 totalValuesFailed = bundleActionStatus.get(Job.Status.FAILED);
333 }
334 int totalValuesKilled = 0;
335 if (bundleActionStatus.containsKey(Job.Status.KILLED)) {
336 totalValuesKilled = bundleActionStatus.get(Job.Status.KILLED);
337 }
338
339 int totalValuesDoneWithError = 0;
340 if (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)) {
341 totalValuesDoneWithError = bundleActionStatus.get(Job.Status.DONEWITHERROR);
342 }
343
344 if (bundleActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesDoneWithError)) {
345 // If all the bundle actions are succeeded then bundle job should be succeeded.
346 if (bundleActions.size() == totalValuesSucceed) {
347 bundleStatus[0] = Job.Status.SUCCEEDED;
348 ret = true;
349 }
350 else if (bundleActions.size() == totalValuesKilled) {
351 // If all the bundle actions are KILLED then bundle job should be KILLED.
352 bundleStatus[0] = Job.Status.KILLED;
353 ret = true;
354 }
355 else if (bundleActions.size() == totalValuesFailed) {
356 // If all the bundle actions are FAILED then bundle job should be FAILED.
357 bundleStatus[0] = Job.Status.FAILED;
358 ret = true;
359 }
360 else {
361 bundleStatus[0] = Job.Status.DONEWITHERROR;
362 ret = true;
363 }
364 }
365 return ret;
366 }
367
368 private boolean checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
369 int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization) {
370 boolean ret = false;
371 int totalValuesSucceed = 0;
372 if (coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) {
373 totalValuesSucceed = coordActionStatus.get(CoordinatorAction.Status.SUCCEEDED);
374 }
375 int totalValuesFailed = 0;
376 if (coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)) {
377 totalValuesFailed = coordActionStatus.get(CoordinatorAction.Status.FAILED);
378 }
379 int totalValuesKilled = 0;
380 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) {
381 totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED);
382 }
383
384 int totalValuesTimeOut = 0;
385 if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
386 totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT);
387 }
388
389 if (coordActionsCount == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut)) {
390 // If all the coordinator actions are succeeded then coordinator job should be succeeded.
391 if (coordActionsCount == totalValuesSucceed && isDoneMaterialization) {
392 coordStatus[0] = Job.Status.SUCCEEDED;
393 ret = true;
394 }
395 else if (coordActionsCount == totalValuesKilled) {
396 // If all the coordinator actions are KILLED then coordinator job should be KILLED.
397 coordStatus[0] = Job.Status.KILLED;
398 ret = true;
399 }
400 else if (coordActionsCount == totalValuesFailed) {
401 // If all the coordinator actions are FAILED then coordinator job should be FAILED.
402 coordStatus[0] = Job.Status.FAILED;
403 ret = true;
404 }
405 else {
406 coordStatus[0] = Job.Status.DONEWITHERROR;
407 ret = true;
408 }
409 }
410 return ret;
411 }
412
413 private boolean checkPrepStatus(HashMap<Job.Status, Integer> bundleActionStatus,
414 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
415 boolean ret = false;
416 if (bundleActionStatus.containsKey(Job.Status.PREP)) {
417 // If all the bundle actions are PREP then bundle job should be RUNNING.
418 if (bundleActions.size() > bundleActionStatus.get(Job.Status.PREP)) {
419 bundleStatus[0] = Job.Status.RUNNING;
420 ret = true;
421 }
422 }
423 return ret;
424 }
425
426 private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus,
427 List<BundleActionBean> bundleActions, Job.Status[] bundleJobStatus) {
428 boolean ret = false;
429
430 // TODO - When bottom up cmds are allowed to change the status of parent job,
431 // if none of the bundle actions are in paused or pausedwitherror, the function should return
432 // false
433
434 // top down
435 // If the bundle job is PAUSED or PAUSEDINERROR and no children are in error
436 // state, then job should be PAUSED otherwise it should be pausedwitherror
437 if (bundleJobStatus[0] == Job.Status.PAUSED || bundleJobStatus[0] == Job.Status.PAUSEDWITHERROR) {
438 if (bundleActionStatus.containsKey(Job.Status.KILLED)
439 || bundleActionStatus.containsKey(Job.Status.FAILED)
440 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
441 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
442 || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)
443 || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
444 bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
445 }
446 else {
447 bundleJobStatus[0] = Job.Status.PAUSED;
448 }
449 ret = true;
450 }
451
452 // bottom up; check the status of parent through their children
453 else if (bundleActionStatus.containsKey(Job.Status.PAUSED)
454 && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED))) {
455 bundleJobStatus[0] = Job.Status.PAUSED;
456 ret = true;
457 }
458 else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
459 int pausedActions = bundleActionStatus.containsKey(Job.Status.PAUSED) ? bundleActionStatus
460 .get(Job.Status.PAUSED) : 0;
461 if (bundleActions.size() == pausedActions + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR)) {
462 bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
463 ret = true;
464 }
465 }
466 else {
467 ret = false;
468 }
469 return ret;
470 }
471
472
473 private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus,
474 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus, boolean isPending) {
475 boolean ret = false;
476
477 // TODO - When bottom up cmds are allowed to change the status of parent job,
478 // if none of the bundle actions are in suspended or suspendedwitherror, the function should return
479 // false
480
481 // top down
482 // if job is suspended
483 if (bundleStatus[0] == Job.Status.SUSPENDED
484 || bundleStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
485 if (bundleActionStatus.containsKey(Job.Status.KILLED)
486 || bundleActionStatus.containsKey(Job.Status.FAILED)
487 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
488 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
489 || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
490 bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
491 }
492 else {
493 bundleStatus[0] = Job.Status.SUSPENDED;
494 }
495 ret =true;
496 }
497
498 // bottom up
499 // Update status of parent from the status of its children
500 else if (!isPending && bundleActionStatus.containsKey(Job.Status.SUSPENDED)
501 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) {
502 int succeededActions = bundleActionStatus.containsKey(Job.Status.SUCCEEDED) ? bundleActionStatus
503 .get(Job.Status.SUCCEEDED) : 0;
504 int killedActions = bundleActionStatus.containsKey(Job.Status.KILLED) ? bundleActionStatus
505 .get(Job.Status.KILLED) : 0;
506 int failedActions = bundleActionStatus.containsKey(Job.Status.FAILED) ? bundleActionStatus
507 .get(Job.Status.FAILED) : 0;
508 int doneWithErrorActions = bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) ? bundleActionStatus
509 .get(Job.Status.DONEWITHERROR) : 0;
510
511 if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions) {
512 bundleStatus[0] = Job.Status.SUSPENDED;
513 ret = true;
514 }
515 else if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR)
516 + bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions + killedActions + failedActions + doneWithErrorActions) {
517 bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
518 ret = true;
519 }
520 }
521 return ret;
522
523 }
524
525 private boolean checkCoordPausedStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
526 int coordActionsCount, Job.Status[] coordStatus){
527 boolean ret = false;
528 if (coordStatus[0].equals(Job.Status.PAUSED) || coordStatus[0].equals(Job.Status.PAUSEDWITHERROR)) {
529 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
530 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
531 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
532 coordStatus[0] = Job.Status.PAUSEDWITHERROR;
533 }
534 else {
535 coordStatus[0] = Job.Status.PAUSED;
536 }
537 ret = true;
538 }
539 return ret;
540 }
541 private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
542 int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization, boolean isPending) {
543 boolean ret = false;
544
545 // TODO - When bottom up cmds are allowed to change the status of parent job
546 //if none of the coord actions are in suspended or suspendedwitherror and materialization done is false
547 //,then the function should return
548 // false
549
550 // top down
551 // check for children only when parent is suspended
552 if (coordStatus[0] == Job.Status.SUSPENDED || coordStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
553
554 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
555 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
556 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
557 coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
558 }
559 else {
560 coordStatus[0] = Job.Status.SUSPENDED;
561 }
562 ret = true;
563 }
564 // bottom up
565 // look for children to check the parent's status only if materialization is
566 // done and all actions are non-pending
567 else if (isDoneMaterialization && !isPending && coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) {
568 int succeededActions = coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED) ? coordActionStatus
569 .get(CoordinatorAction.Status.SUCCEEDED) : 0;
570 int killedActions = coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) ? coordActionStatus
571 .get(CoordinatorAction.Status.KILLED) : 0;
572 int failedActions = coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) ? coordActionStatus
573 .get(CoordinatorAction.Status.FAILED) : 0;
574 int timedoutActions = coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) ? coordActionStatus
575 .get(CoordinatorAction.Status.TIMEDOUT) : 0;
576
577 if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) + succeededActions) {
578 coordStatus[0] = Job.Status.SUSPENDED;
579 ret = true;
580 }
581 else if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)
582 + succeededActions + killedActions + failedActions + timedoutActions) {
583 coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
584 ret = true;
585 }
586 }
587 return ret;
588 }
589
590 private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
591 int coordActionsCount, Job.Status[] coordStatus) {
592 boolean ret = false;
593 if (coordStatus[0] != Job.Status.PREP) {
594 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
595 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
596 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
597 coordStatus[0] = Job.Status.RUNNINGWITHERROR;
598 }
599 else {
600 coordStatus[0] = Job.Status.RUNNING;
601 }
602 ret = true;
603 }
604 return ret;
605 }
606
607 private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus,
608 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
609 boolean ret = false;
610 if (bundleStatus[0] != Job.Status.PREP) {
611 if (bundleActionStatus.containsKey(Job.Status.FAILED)
612 || bundleActionStatus.containsKey(Job.Status.KILLED)
613 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
614 || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) {
615 bundleStatus[0] = Job.Status.RUNNINGWITHERROR;
616 }
617 else {
618 bundleStatus[0] = Job.Status.RUNNING;
619 }
620 ret = true;
621 }
622 return ret;
623
624 }
625
626 private void updateBundleJob(boolean isPending, BundleJobBean bundleJob, Job.Status bundleStatus)
627 throws JPAExecutorException {
628 String jobId = bundleJob.getId();
629 // Update the Bundle Job
630 // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
631 // PAUSEDWITHERROR is not supported
632 bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus));
633 if (isPending) {
634 bundleJob.setPending();
635 LOG.info("Bundle job [" + jobId + "] Pending set to TRUE");
636 }
637 else {
638 bundleJob.resetPending();
639 LOG.info("Bundle job [" + jobId + "] Pending set to FALSE");
640 }
641 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
642 }
643
644 private void updateCoordJob(boolean isPending, CoordinatorJobBean coordJob, Job.Status coordStatus)
645 throws JPAExecutorException, CommandException {
646 Job.Status prevStatus = coordJob.getStatus();
647 // Update the Coord Job
648 if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED
649 || coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) {
650 if (coordStatus == Job.Status.SUSPENDED || coordStatus == Job.Status.SUSPENDEDWITHERROR) {
651 LOG.info("Coord Job [" + coordJob.getId()
652 + "] status to "+ coordStatus +" can not be updated as its already in Terminal state");
653 return;
654 }
655 }
656
657 boolean isPendingStateChanged = checkCoordPending(isPending, coordJob, false);
658 // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and PAUSEDWITHERROR is
659 // not supported
660 coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(coordStatus));
661 // Backward support when coordinator namespace is 0.1
662 coordJob.setStatus(StatusUtils.getStatus(coordJob));
663 if (coordJob.getStatus() != prevStatus || isPendingStateChanged) {
664 LOG.debug("Updating coord job " + coordJob.getId());
665 coordJob.setLastModifiedTime(new Date());
666 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
667 }
668 // update bundle action only when status changes in coord job
669 if (coordJob.getBundleId() != null) {
670 if (!prevStatus.equals(coordJob.getStatus())) {
671 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
672 bundleStatusUpdate.call();
673 }
674 }
675 }
676
677 private boolean checkCoordPending(boolean isPending, CoordinatorJobBean coordJob, boolean saveToDB)
678 throws JPAExecutorException {
679 // Checking the coordinator pending should be updated or not
680 boolean prevPending = coordJob.isPending();
681 if (isPending) {
682 coordJob.setPending();
683 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to TRUE");
684 }
685 else {
686 coordJob.resetPending();
687 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to FALSE");
688 }
689 boolean hasChange = prevPending != coordJob.isPending();
690 if (saveToDB && hasChange) {
691 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
692 }
693 return hasChange;
694
695 }
696
697 /**
698 * Aggregate coordinator actions' status to coordinator jobs
699 *
700 * @throws JPAExecutorException thrown if failed in db updates or retrievals
701 * @throws CommandException thrown if failed to run commands
702 */
703 private void coordTransit() throws JPAExecutorException, CommandException {
704 List<CoordinatorJobBean> pendingJobCheckList = null;
705 if (lastInstanceStartTime == null) {
706 LOG.info("Running coordinator status service first instance");
707 // this is the first instance, we need to check for all pending jobs;
708 pendingJobCheckList = jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit));
709 }
710 else {
711 LOG.info("Running coordinator status service from last instance time = "
712 + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
713 // this is not the first instance, we should only check jobs
714 // that have actions or jobs been
715 // updated >= start time of last service run;
716 List<String> coordJobIdList = jpaService
717 .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
718 Set<String> coordIds = new HashSet<String>();
719 for (String coordJobId : coordJobIdList) {
720 coordIds.add(coordJobId);
721 }
722 coordIds.addAll(jpaService.execute(new CoordJobsGetChangedJPAExecutor(lastInstanceStartTime)));
723
724 pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
725 for (String coordId : coordIds.toArray(new String[coordIds.size()])) {
726 CoordinatorJobBean coordJob;
727 try{
728 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId));
729 }
730 catch (JPAExecutorException jpaee) {
731 if (jpaee.getErrorCode().equals(ErrorCode.E0604)) {
732 LOG.warn("Exception happened during StatusTransitRunnable; Coordinator Job doesn't exist", jpaee);
733 continue;
734 } else {
735 throw jpaee;
736 }
737 }
738 // Running coord job might have pending false
739 Job.Status coordJobStatus = coordJob.getStatus();
740 if (coordJob.isPending() || coordJobStatus.equals(Job.Status.PAUSED)
741 || coordJobStatus.equals(Job.Status.RUNNING)
742 || coordJobStatus.equals(Job.Status.RUNNINGWITHERROR)
743 || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
744 pendingJobCheckList.add(coordJob);
745 }
746 }
747 }
748 aggregateCoordJobsStatus(pendingJobCheckList);
749 }
750 }
751
752 /**
753 * Initializes the {@link StatusTransitService}.
754 *
755 * @param services services instance.
756 */
757 @Override
758 public void init(Services services) {
759 Configuration conf = services.getConf();
760 Runnable stateTransitRunnable = new StatusTransitRunnable();
761 services.get(SchedulerService.class).schedule(stateTransitRunnable, 10,
762 conf.getInt(CONF_STATUSTRANSIT_INTERVAL, 60), SchedulerService.Unit.SEC);
763 }
764
765 /**
766 * Destroy the StateTransit Jobs Service.
767 */
768 @Override
769 public void destroy() {
770 }
771
772 /**
773 * Return the public interface for the purge jobs service.
774 *
775 * @return {@link StatusTransitService}.
776 */
777 @Override
778 public Class<? extends Service> getInterface() {
779 return StatusTransitService.class;
780 }
781 }
782