This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.util.ArrayList;
021 import java.util.Date;
022 import java.util.HashMap;
023 import java.util.HashSet;
024 import java.util.List;
025 import java.util.Set;
026 import java.util.TreeSet;
027 import java.util.Comparator;
028
029 import org.apache.hadoop.conf.Configuration;
030 import org.apache.oozie.BundleActionBean;
031 import org.apache.oozie.BundleJobBean;
032 import org.apache.oozie.CoordinatorJobBean;
033 import org.apache.oozie.ErrorCode;
034 import org.apache.oozie.client.CoordinatorAction;
035 import org.apache.oozie.client.Job;
036 import org.apache.oozie.command.CommandException;
037 import org.apache.oozie.command.bundle.BundleKillXCommand;
038 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
039 import org.apache.oozie.executor.jpa.BundleActionsGetByLastModifiedTimeJPAExecutor;
040 import org.apache.oozie.executor.jpa.BundleActionsGetStatusPendingJPAExecutor;
041 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
042 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
043 import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor;
044 import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor;
045 import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor;
046 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
047 import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor;
048 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
049 import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor;
050 import org.apache.oozie.executor.jpa.JPAExecutorException;
051 import org.apache.oozie.util.DateUtils;
052 import org.apache.oozie.util.MemoryLocks;
053 import org.apache.oozie.util.StatusUtils;
054 import org.apache.oozie.util.XLog;
055
056 /**
057 * StateTransitService is scheduled to run at the configured interval.
058 * <p/>
059 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 (job
060 * done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
061 * SUCCEEDED.
062 */
063 public class StatusTransitService implements Service {
064 public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService.";
065 public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval";
066 public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status";
067 public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR = CONF_PREFIX + "backward.support.for.states.without.error";
068 private static int limit = -1;
069 private static Date lastInstanceStartTime = null;
070 private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class);
071
072 /**
073 * StateTransitRunnable is the runnable which is scheduled to run at the configured interval.
074 * <p/>
075 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0
076 * (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
077 * SUCCEEDED.
078 */
079 public static class StatusTransitRunnable implements Runnable {
080 private JPAService jpaService = null;
081 private MemoryLocks.LockToken lock;
082
083 public StatusTransitRunnable() {
084 jpaService = Services.get().get(JPAService.class);
085 if (jpaService == null) {
086 LOG.error("Missing JPAService");
087 }
088 }
089
090 @Override
091 public void run() {
092 try {
093 Date curDate = new Date(); // records the start time of this service run;
094
095 // first check if there is some other instance running;
096 lock = Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(),
097 lockTimeout);
098 if (lock == null) {
099 LOG.info("This StatusTransitService instance"
100 + " will not run since there is already an instance running");
101 }
102 else {
103 LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName());
104 // running coord jobs transit service
105 coordTransit();
106 // running bundle jobs transit service
107 bundleTransit();
108
109 lastInstanceStartTime = curDate;
110 }
111 }
112 catch (Exception ex) {
113 LOG.warn("Exception happened during StatusTransitRunnable ", ex);
114 }
115 finally {
116 // release lock;
117 if (lock != null) {
118 lock.release();
119 LOG.info("Released lock for [{0}]", StatusTransitService.class.getName());
120 }
121 }
122 }
123
124 public List<BundleJobBean> removeDuplicates(List<BundleJobBean> pendingJobList) {
125 Set<BundleJobBean> s = new TreeSet<BundleJobBean>(new Comparator<BundleJobBean>() {
126 @Override
127 public int compare(BundleJobBean b1, BundleJobBean b2) {
128 if (b1.getId().equals(b2.getId())) {
129 return 0;
130 }
131 else
132 return 1;
133 }
134 });
135 s.addAll(pendingJobList);
136 return new ArrayList<BundleJobBean>(s);
137 }
138
139 /**
140 * Aggregate bundle actions' status to bundle jobs
141 *
142 * @throws JPAExecutorException thrown if failed in db updates or retrievals
143 * @throws CommandException thrown if failed to run commands
144 */
145 private void bundleTransit() throws JPAExecutorException, CommandException {
146 List<BundleJobBean> pendingJobCheckList = null;
147
148 if (lastInstanceStartTime == null) {
149 LOG.info("Running bundle status service first instance");
150 // this is the first instance, we need to check for all pending or running jobs;
151 pendingJobCheckList = jpaService.execute(new BundleJobsGetRunningOrPendingJPAExecutor(limit));
152 }
153 else {
154 LOG.info("Running bundle status service from last instance time = "
155 + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
156 // this is not the first instance, we should only check jobs that have actions been
157 // updated >= start time of last service run;
158 List<BundleActionBean> actionList = jpaService
159 .execute(new BundleActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
160 Set<String> bundleIds = new HashSet<String>();
161 for (BundleActionBean action : actionList) {
162 bundleIds.add(action.getBundleId());
163 }
164 pendingJobCheckList = new ArrayList<BundleJobBean>();
165 for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) {
166 BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId));
167 // Running bundle job might have pending false
168 if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING)
169 || bundle.getStatus().equals(Job.Status.RUNNINGWITHERROR)
170 || bundle.getStatus().equals(Job.Status.PAUSED)
171 || bundle.getStatus().equals(Job.Status.PAUSEDWITHERROR)) {
172 pendingJobCheckList.add(bundle);
173 }
174 }
175 }
176 aggregateBundleJobsStatus(pendingJobCheckList);
177 }
178
179 private void aggregateBundleJobsStatus(List<BundleJobBean> bundleLists) throws JPAExecutorException,
180 CommandException {
181 if (bundleLists != null) {
182 for (BundleJobBean bundleJob : bundleLists) {
183 try {
184 String jobId = bundleJob.getId();
185 Job.Status[] bundleStatus = new Job.Status[1];
186 bundleStatus[0] = bundleJob.getStatus();
187 List<BundleActionBean> bundleActions = jpaService.execute(new BundleActionsGetStatusPendingJPAExecutor(
188 jobId));
189 HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>();
190 boolean foundPending = false;
191 for (BundleActionBean bAction : bundleActions) {
192 int counter = 0;
193 if (bundleActionStatus.containsKey(bAction.getStatus())) {
194 counter = bundleActionStatus.get(bAction.getStatus()) + 1;
195 }
196 else {
197 ++counter;
198 }
199 bundleActionStatus.put(bAction.getStatus(), counter);
200 if (bAction.getCoordId() == null
201 && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED)) {
202 (new BundleKillXCommand(jobId)).call();
203 LOG.info("Bundle job ["+ jobId
204 + "] has been killed since one of its coordinator job failed submission.");
205 }
206
207 if (bAction.isPending()) {
208 foundPending = true;
209 }
210 }
211
212 if (!foundPending && checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) {
213 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
214 + "' from '" + bundleJob.getStatus() + "'");
215 updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
216 }
217 else if (!foundPending && checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) {
218 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
219 + "' from '" + bundleJob.getStatus() + "'");
220 updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
221 }
222 else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) {
223 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
224 + "' from '" + bundleJob.getStatus() + "'");
225 updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
226 }
227 else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus, foundPending)) {
228 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
229 + "' from '" + bundleJob.getStatus() + "'");
230 updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
231 }
232 else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) {
233 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
234 + "' from '" + bundleJob.getStatus() + "'");
235 updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
236 }
237 }
238 catch (Exception ex) {
239 LOG.error("Exception happened during aggregate bundle job's status, job = "
240 + bundleJob.getId(), ex);
241 }
242 }
243 }
244
245 }
246
247 private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException,
248 CommandException {
249 if (CoordList != null) {
250 Configuration conf = Services.get().getConf();
251 boolean backwardSupportForCoordStatus = conf.getBoolean(CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
252
253 for (CoordinatorJobBean coordJob : CoordList) {
254 try {
255 // if namespace 0.1 is used and backward support is true, then ignore this coord job
256 if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
257 && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
258 continue;
259 }
260 String jobId = coordJob.getId();
261 Job.Status[] coordStatus = new Job.Status[1];
262 coordStatus[0] = coordJob.getStatus();
263 //Get count of Coordinator actions with pending true
264 boolean isPending = false;
265 int count = jpaService.execute(new CoordJobGetPendingActionsCountJPAExecutor(jobId));
266 if (count > 0) {
267 isPending = true;
268 }
269 // Get status of Coordinator actions
270 List<CoordinatorAction.Status> coordActionStatusList = jpaService
271 .execute(new CoordJobGetActionsStatusJPAExecutor(jobId));
272 HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>();
273
274 for (CoordinatorAction.Status status : coordActionStatusList) {
275 int counter = 0;
276 if (coordActionStatus.containsKey(status)) {
277 counter = coordActionStatus.get(status) + 1;
278 }
279 else {
280 ++counter;
281 }
282 coordActionStatus.put(status, counter);
283 }
284
285 int nonPendingCoordActionsCount = coordActionStatusList.size();
286 boolean isDoneMaterialization = coordJob.isDoneMaterialization();
287 if ((isDoneMaterialization || coordStatus[0] == Job.Status.FAILED || coordStatus[0] == Job.Status.KILLED)
288 && checkCoordTerminalStatus(coordActionStatus, nonPendingCoordActionsCount,
289 coordStatus, isDoneMaterialization)) {
290 LOG.info("Set coordinator job [" + jobId + "] status to '" + coordStatus[0].toString()
291 + "' from '" + coordJob.getStatus() + "'");
292 updateCoordJob(isPending, coordJob, coordStatus[0]);
293 }
294 else if (checkCoordPausedStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
295 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
296 + "' from '" + coordJob.getStatus() + "'");
297 updateCoordJob(isPending, coordJob, coordStatus[0]);
298 }
299 else if(checkCoordSuspendStatus( coordActionStatus, nonPendingCoordActionsCount, coordStatus, coordJob.isDoneMaterialization(), isPending)) {
300 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
301 + "' from '" + coordJob.getStatus() + "'");
302 updateCoordJob(isPending, coordJob, coordStatus[0]);
303 }
304 else if (checkCoordRunningStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
305 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
306 + "' from '" + coordJob.getStatus() + "'");
307 updateCoordJob(isPending, coordJob, coordStatus[0]);
308 }
309 else {
310 checkCoordPending(isPending, coordJob, true);
311 }
312 }
313 catch (Exception ex) {
314 LOG.error("Exception happened during aggregate coordinator job's status, job = "
315 + coordJob.getId(), ex);
316 }
317 }
318
319 }
320 }
321
322 private boolean checkTerminalStatus(HashMap<Job.Status, Integer> bundleActionStatus,
323 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
324 boolean ret = false;
325 int totalValuesSucceed = 0;
326 if (bundleActionStatus.containsKey(Job.Status.SUCCEEDED)) {
327 totalValuesSucceed = bundleActionStatus.get(Job.Status.SUCCEEDED);
328 }
329 int totalValuesFailed = 0;
330 if (bundleActionStatus.containsKey(Job.Status.FAILED)) {
331 totalValuesFailed = bundleActionStatus.get(Job.Status.FAILED);
332 }
333 int totalValuesKilled = 0;
334 if (bundleActionStatus.containsKey(Job.Status.KILLED)) {
335 totalValuesKilled = bundleActionStatus.get(Job.Status.KILLED);
336 }
337
338 int totalValuesDoneWithError = 0;
339 if (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)) {
340 totalValuesDoneWithError = bundleActionStatus.get(Job.Status.DONEWITHERROR);
341 }
342
343 if (bundleActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesDoneWithError)) {
344 // If all the bundle actions are succeeded then bundle job should be succeeded.
345 if (bundleActions.size() == totalValuesSucceed) {
346 bundleStatus[0] = Job.Status.SUCCEEDED;
347 ret = true;
348 }
349 else if (bundleActions.size() == totalValuesKilled) {
350 // If all the bundle actions are KILLED then bundle job should be KILLED.
351 bundleStatus[0] = Job.Status.KILLED;
352 ret = true;
353 }
354 else if (bundleActions.size() == totalValuesFailed) {
355 // If all the bundle actions are FAILED then bundle job should be FAILED.
356 bundleStatus[0] = Job.Status.FAILED;
357 ret = true;
358 }
359 else {
360 bundleStatus[0] = Job.Status.DONEWITHERROR;
361 ret = true;
362 }
363 }
364 return ret;
365 }
366
367 private boolean checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
368 int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization) {
369 boolean ret = false;
370 int totalValuesSucceed = 0;
371 if (coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) {
372 totalValuesSucceed = coordActionStatus.get(CoordinatorAction.Status.SUCCEEDED);
373 }
374 int totalValuesFailed = 0;
375 if (coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)) {
376 totalValuesFailed = coordActionStatus.get(CoordinatorAction.Status.FAILED);
377 }
378 int totalValuesKilled = 0;
379 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) {
380 totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED);
381 }
382
383 int totalValuesTimeOut = 0;
384 if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
385 totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT);
386 }
387
388 if (coordActionsCount == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut)) {
389 // If all the coordinator actions are succeeded then coordinator job should be succeeded.
390 if (coordActionsCount == totalValuesSucceed && isDoneMaterialization) {
391 coordStatus[0] = Job.Status.SUCCEEDED;
392 ret = true;
393 }
394 else if (coordActionsCount == totalValuesKilled) {
395 // If all the coordinator actions are KILLED then coordinator job should be KILLED.
396 coordStatus[0] = Job.Status.KILLED;
397 ret = true;
398 }
399 else if (coordActionsCount == totalValuesFailed) {
400 // If all the coordinator actions are FAILED then coordinator job should be FAILED.
401 coordStatus[0] = Job.Status.FAILED;
402 ret = true;
403 }
404 else {
405 coordStatus[0] = Job.Status.DONEWITHERROR;
406 ret = true;
407 }
408 }
409 return ret;
410 }
411
412 private boolean checkPrepStatus(HashMap<Job.Status, Integer> bundleActionStatus,
413 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
414 boolean ret = false;
415 if (bundleActionStatus.containsKey(Job.Status.PREP)) {
416 // If all the bundle actions are PREP then bundle job should be RUNNING.
417 if (bundleActions.size() > bundleActionStatus.get(Job.Status.PREP)) {
418 bundleStatus[0] = Job.Status.RUNNING;
419 ret = true;
420 }
421 }
422 return ret;
423 }
424
425 private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus,
426 List<BundleActionBean> bundleActions, Job.Status[] bundleJobStatus) {
427 boolean ret = false;
428
429 // TODO - When bottom up cmds are allowed to change the status of parent job,
430 // if none of the bundle actions are in paused or pausedwitherror, the function should return
431 // false
432
433 // top down
434 // If the bundle job is PAUSED or PAUSEDINERROR and no children are in error
435 // state, then job should be PAUSED otherwise it should be pausedwitherror
436 if (bundleJobStatus[0] == Job.Status.PAUSED || bundleJobStatus[0] == Job.Status.PAUSEDWITHERROR) {
437 if (bundleActionStatus.containsKey(Job.Status.KILLED)
438 || bundleActionStatus.containsKey(Job.Status.FAILED)
439 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
440 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
441 || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)
442 || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
443 bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
444 }
445 else {
446 bundleJobStatus[0] = Job.Status.PAUSED;
447 }
448 ret = true;
449 }
450
451 // bottom up; check the status of parent through their children
452 else if (bundleActionStatus.containsKey(Job.Status.PAUSED)
453 && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED))) {
454 bundleJobStatus[0] = Job.Status.PAUSED;
455 ret = true;
456 }
457 else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
458 int pausedActions = bundleActionStatus.containsKey(Job.Status.PAUSED) ? bundleActionStatus
459 .get(Job.Status.PAUSED) : 0;
460 if (bundleActions.size() == pausedActions + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR)) {
461 bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
462 ret = true;
463 }
464 }
465 else {
466 ret = false;
467 }
468 return ret;
469 }
470
471
472 private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus,
473 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus, boolean isPending) {
474 boolean ret = false;
475
476 // TODO - When bottom up cmds are allowed to change the status of parent job,
477 // if none of the bundle actions are in suspended or suspendedwitherror, the function should return
478 // false
479
480 // top down
481 // if job is suspended
482 if (bundleStatus[0] == Job.Status.SUSPENDED
483 || bundleStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
484 if (bundleActionStatus.containsKey(Job.Status.KILLED)
485 || bundleActionStatus.containsKey(Job.Status.FAILED)
486 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
487 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
488 || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
489 bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
490 }
491 else {
492 bundleStatus[0] = Job.Status.SUSPENDED;
493 }
494 ret =true;
495 }
496
497 // bottom up
498 // Update status of parent from the status of its children
499 else if (!isPending && bundleActionStatus.containsKey(Job.Status.SUSPENDED)
500 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) {
501 int succeededActions = bundleActionStatus.containsKey(Job.Status.SUCCEEDED) ? bundleActionStatus
502 .get(Job.Status.SUCCEEDED) : 0;
503 int killedActions = bundleActionStatus.containsKey(Job.Status.KILLED) ? bundleActionStatus
504 .get(Job.Status.KILLED) : 0;
505 int failedActions = bundleActionStatus.containsKey(Job.Status.FAILED) ? bundleActionStatus
506 .get(Job.Status.FAILED) : 0;
507 int doneWithErrorActions = bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) ? bundleActionStatus
508 .get(Job.Status.DONEWITHERROR) : 0;
509
510 if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions) {
511 bundleStatus[0] = Job.Status.SUSPENDED;
512 ret = true;
513 }
514 else if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR)
515 + bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions + killedActions + failedActions + doneWithErrorActions) {
516 bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
517 ret = true;
518 }
519 }
520 return ret;
521
522 }
523
524 private boolean checkCoordPausedStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
525 int coordActionsCount, Job.Status[] coordStatus){
526 boolean ret = false;
527 if (coordStatus[0].equals(Job.Status.PAUSED) || coordStatus[0].equals(Job.Status.PAUSEDWITHERROR)) {
528 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
529 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
530 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
531 coordStatus[0] = Job.Status.PAUSEDWITHERROR;
532 }
533 else {
534 coordStatus[0] = Job.Status.PAUSED;
535 }
536 ret = true;
537 }
538 return ret;
539 }
540 private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
541 int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization, boolean isPending) {
542 boolean ret = false;
543
544 // TODO - When bottom up cmds are allowed to change the status of parent job
545 //if none of the coord actions are in suspended or suspendedwitherror and materialization done is false
546 //,then the function should return
547 // false
548
549 // top down
550 // check for children only when parent is suspended
551 if (coordStatus[0] == Job.Status.SUSPENDED || coordStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
552
553 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
554 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
555 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
556 coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
557 }
558 else {
559 coordStatus[0] = Job.Status.SUSPENDED;
560 }
561 ret = true;
562 }
563 // bottom up
564 // look for children to check the parent's status only if materialization is
565 // done and all actions are non-pending
566 else if (isDoneMaterialization && !isPending && coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) {
567 int succeededActions = coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED) ? coordActionStatus
568 .get(CoordinatorAction.Status.SUCCEEDED) : 0;
569 int killedActions = coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) ? coordActionStatus
570 .get(CoordinatorAction.Status.KILLED) : 0;
571 int failedActions = coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) ? coordActionStatus
572 .get(CoordinatorAction.Status.FAILED) : 0;
573 int timedoutActions = coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) ? coordActionStatus
574 .get(CoordinatorAction.Status.TIMEDOUT) : 0;
575
576 if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) + succeededActions) {
577 coordStatus[0] = Job.Status.SUSPENDED;
578 ret = true;
579 }
580 else if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)
581 + succeededActions + killedActions + failedActions + timedoutActions) {
582 coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
583 ret = true;
584 }
585 }
586 return ret;
587 }
588
589 private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
590 int coordActionsCount, Job.Status[] coordStatus) {
591 boolean ret = false;
592 if (coordStatus[0] != Job.Status.PREP) {
593 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
594 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
595 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
596 coordStatus[0] = Job.Status.RUNNINGWITHERROR;
597 }
598 else {
599 coordStatus[0] = Job.Status.RUNNING;
600 }
601 ret = true;
602 }
603 return ret;
604 }
605
606 private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus,
607 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
608 boolean ret = false;
609 if (bundleStatus[0] != Job.Status.PREP) {
610 if (bundleActionStatus.containsKey(Job.Status.FAILED)
611 || bundleActionStatus.containsKey(Job.Status.KILLED)
612 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
613 || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) {
614 bundleStatus[0] = Job.Status.RUNNINGWITHERROR;
615 }
616 else {
617 bundleStatus[0] = Job.Status.RUNNING;
618 }
619 ret = true;
620 }
621 return ret;
622
623 }
624
625 private void updateBundleJob(boolean isPending, BundleJobBean bundleJob, Job.Status bundleStatus)
626 throws JPAExecutorException {
627 String jobId = bundleJob.getId();
628 // Update the Bundle Job
629 // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
630 // PAUSEDWITHERROR is not supported
631 bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus));
632 if (isPending) {
633 bundleJob.setPending();
634 LOG.info("Bundle job [" + jobId + "] Pending set to TRUE");
635 }
636 else {
637 bundleJob.resetPending();
638 LOG.info("Bundle job [" + jobId + "] Pending set to FALSE");
639 }
640 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
641 }
642
643 private void updateCoordJob(boolean isPending, CoordinatorJobBean coordJob, Job.Status coordStatus)
644 throws JPAExecutorException, CommandException {
645 Job.Status prevStatus = coordJob.getStatus();
646 // Update the Coord Job
647 if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED
648 || coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) {
649 if (coordStatus == Job.Status.SUSPENDED || coordStatus == Job.Status.SUSPENDEDWITHERROR) {
650 LOG.info("Coord Job [" + coordJob.getId()
651 + "] status to "+ coordStatus +" can not be updated as its already in Terminal state");
652 return;
653 }
654 }
655
656 boolean isPendingStateChanged = checkCoordPending(isPending, coordJob, false);
657 // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and PAUSEDWITHERROR is
658 // not supported
659 coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(coordStatus));
660 // Backward support when coordinator namespace is 0.1
661 coordJob.setStatus(StatusUtils.getStatus(coordJob));
662 if (coordJob.getStatus() != prevStatus || isPendingStateChanged) {
663 LOG.debug("Updating coord job " + coordJob.getId());
664 coordJob.setLastModifiedTime(new Date());
665 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
666 }
667 // update bundle action only when status changes in coord job
668 if (coordJob.getBundleId() != null) {
669 if (!prevStatus.equals(coordJob.getStatus())) {
670 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
671 bundleStatusUpdate.call();
672 }
673 }
674 }
675
676 private boolean checkCoordPending(boolean isPending, CoordinatorJobBean coordJob, boolean saveToDB)
677 throws JPAExecutorException {
678 // Checking the coordinator pending should be updated or not
679 boolean prevPending = coordJob.isPending();
680 if (isPending) {
681 coordJob.setPending();
682 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to TRUE");
683 }
684 else {
685 coordJob.resetPending();
686 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to FALSE");
687 }
688 boolean hasChange = prevPending != coordJob.isPending();
689 if (saveToDB && hasChange) {
690 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
691 }
692 return hasChange;
693
694 }
695
696 /**
697 * Aggregate coordinator actions' status to coordinator jobs
698 *
699 * @throws JPAExecutorException thrown if failed in db updates or retrievals
700 * @throws CommandException thrown if failed to run commands
701 */
702 private void coordTransit() throws JPAExecutorException, CommandException {
703 List<CoordinatorJobBean> pendingJobCheckList = null;
704 if (lastInstanceStartTime == null) {
705 LOG.info("Running coordinator status service first instance");
706 // this is the first instance, we need to check for all pending jobs;
707 pendingJobCheckList = jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit));
708 }
709 else {
710 LOG.info("Running coordinator status service from last instance time = "
711 + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
712 // this is not the first instance, we should only check jobs
713 // that have actions been
714 // updated >= start time of last service run;
715 List<String> coordJobIdList = jpaService
716 .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
717 Set<String> coordIds = new HashSet<String>();
718 for (String coordJobId : coordJobIdList) {
719 coordIds.add(coordJobId);
720 }
721 pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
722 for (String coordId : coordIds.toArray(new String[coordIds.size()])) {
723 CoordinatorJobBean coordJob;
724 try{
725 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId));
726 }
727 catch (JPAExecutorException jpaee) {
728 if (jpaee.getErrorCode().equals(ErrorCode.E0604)) {
729 LOG.warn("Exception happened during StatusTransitRunnable; Coordinator Job doesn't exist", jpaee);
730 continue;
731 } else {
732 throw jpaee;
733 }
734 }
735 // Running coord job might have pending false
736 Job.Status coordJobStatus = coordJob.getStatus();
737 if (coordJob.isPending() || coordJobStatus.equals(Job.Status.PAUSED)
738 || coordJobStatus.equals(Job.Status.RUNNING)
739 || coordJobStatus.equals(Job.Status.RUNNINGWITHERROR)
740 || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
741 pendingJobCheckList.add(coordJob);
742 }
743 }
744 }
745 aggregateCoordJobsStatus(pendingJobCheckList);
746 }
747 }
748
749 /**
750 * Initializes the {@link StatusTransitService}.
751 *
752 * @param services services instance.
753 */
754 @Override
755 public void init(Services services) {
756 Configuration conf = services.getConf();
757 Runnable stateTransitRunnable = new StatusTransitRunnable();
758 services.get(SchedulerService.class).schedule(stateTransitRunnable, 10,
759 conf.getInt(CONF_STATUSTRANSIT_INTERVAL, 60), SchedulerService.Unit.SEC);
760 }
761
762 /**
763 * Destroy the StateTransit Jobs Service.
764 */
765 @Override
766 public void destroy() {
767 }
768
769 /**
770 * Return the public interface for the purge jobs service.
771 *
772 * @return {@link StatusTransitService}.
773 */
774 @Override
775 public Class<? extends Service> getInterface() {
776 return StatusTransitService.class;
777 }
778 }
779