001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.util.ArrayList; 021 import java.util.Date; 022 import java.util.HashMap; 023 import java.util.HashSet; 024 import java.util.List; 025 import java.util.Set; 026 import java.util.TreeSet; 027 import java.util.Comparator; 028 029 import org.apache.hadoop.conf.Configuration; 030 import org.apache.oozie.BundleActionBean; 031 import org.apache.oozie.BundleJobBean; 032 import org.apache.oozie.CoordinatorJobBean; 033 import org.apache.oozie.ErrorCode; 034 import org.apache.oozie.client.CoordinatorAction; 035 import org.apache.oozie.client.Job; 036 import org.apache.oozie.command.CommandException; 037 import org.apache.oozie.command.bundle.BundleKillXCommand; 038 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 039 import org.apache.oozie.executor.jpa.BundleActionsGetByLastModifiedTimeJPAExecutor; 040 import org.apache.oozie.executor.jpa.BundleActionsGetStatusPendingJPAExecutor; 041 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 042 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor; 043 import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor; 044 import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor; 045 import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor; 046 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 047 import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor; 048 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor; 049 import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor; 050 import org.apache.oozie.executor.jpa.JPAExecutorException; 051 import org.apache.oozie.util.DateUtils; 052 import org.apache.oozie.util.MemoryLocks; 053 import org.apache.oozie.util.StatusUtils; 054 import org.apache.oozie.util.XLog; 055 056 /** 057 * StateTransitService is scheduled to run at the configured interval. 058 * <p/> 059 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 (job 060 * done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to 061 * SUCCEEDED. 062 */ 063 public class StatusTransitService implements Service { 064 public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService."; 065 public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval"; 066 public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status"; 067 public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR = CONF_PREFIX + "backward.support.for.states.without.error"; 068 private static int limit = -1; 069 private static Date lastInstanceStartTime = null; 070 private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class); 071 072 /** 073 * StateTransitRunnable is the runnable which is scheduled to run at the configured interval. 074 * <p/> 075 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 076 * (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to 077 * SUCCEEDED. 078 */ 079 public static class StatusTransitRunnable implements Runnable { 080 private JPAService jpaService = null; 081 private MemoryLocks.LockToken lock; 082 083 public StatusTransitRunnable() { 084 jpaService = Services.get().get(JPAService.class); 085 if (jpaService == null) { 086 LOG.error("Missing JPAService"); 087 } 088 } 089 090 @Override 091 public void run() { 092 try { 093 Date curDate = new Date(); // records the start time of this service run; 094 095 // first check if there is some other instance running; 096 lock = Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(), 097 lockTimeout); 098 if (lock == null) { 099 LOG.info("This StatusTransitService instance" 100 + " will not run since there is already an instance running"); 101 } 102 else { 103 LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName()); 104 // running coord jobs transit service 105 coordTransit(); 106 // running bundle jobs transit service 107 bundleTransit(); 108 109 lastInstanceStartTime = curDate; 110 } 111 } 112 catch (Exception ex) { 113 LOG.warn("Exception happened during StatusTransitRunnable ", ex); 114 } 115 finally { 116 // release lock; 117 if (lock != null) { 118 lock.release(); 119 LOG.info("Released lock for [{0}]", StatusTransitService.class.getName()); 120 } 121 } 122 } 123 124 public List<BundleJobBean> removeDuplicates(List<BundleJobBean> pendingJobList) { 125 Set<BundleJobBean> s = new TreeSet<BundleJobBean>(new Comparator<BundleJobBean>() { 126 @Override 127 public int compare(BundleJobBean b1, BundleJobBean b2) { 128 if (b1.getId().equals(b2.getId())) { 129 return 0; 130 } 131 else 132 return 1; 133 } 134 }); 135 s.addAll(pendingJobList); 136 return new ArrayList<BundleJobBean>(s); 137 } 138 139 /** 140 * Aggregate bundle actions' status to bundle jobs 141 * 142 * @throws JPAExecutorException thrown if failed in db updates or retrievals 143 * @throws CommandException thrown if failed to run commands 144 */ 145 private void bundleTransit() throws JPAExecutorException, CommandException { 146 List<BundleJobBean> pendingJobCheckList = null; 147 148 if (lastInstanceStartTime == null) { 149 LOG.info("Running bundle status service first instance"); 150 // this is the first instance, we need to check for all pending or running jobs; 151 pendingJobCheckList = jpaService.execute(new BundleJobsGetRunningOrPendingJPAExecutor(limit)); 152 } 153 else { 154 LOG.info("Running bundle status service from last instance time = " 155 + DateUtils.formatDateOozieTZ(lastInstanceStartTime)); 156 // this is not the first instance, we should only check jobs that have actions been 157 // updated >= start time of last service run; 158 List<BundleActionBean> actionList = jpaService 159 .execute(new BundleActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime)); 160 Set<String> bundleIds = new HashSet<String>(); 161 for (BundleActionBean action : actionList) { 162 bundleIds.add(action.getBundleId()); 163 } 164 pendingJobCheckList = new ArrayList<BundleJobBean>(); 165 for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) { 166 BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); 167 // Running bundle job might have pending false 168 if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING) 169 || bundle.getStatus().equals(Job.Status.RUNNINGWITHERROR) 170 || bundle.getStatus().equals(Job.Status.PAUSED) 171 || bundle.getStatus().equals(Job.Status.PAUSEDWITHERROR)) { 172 pendingJobCheckList.add(bundle); 173 } 174 } 175 } 176 aggregateBundleJobsStatus(pendingJobCheckList); 177 } 178 179 private void aggregateBundleJobsStatus(List<BundleJobBean> bundleLists) throws JPAExecutorException, 180 CommandException { 181 if (bundleLists != null) { 182 for (BundleJobBean bundleJob : bundleLists) { 183 try { 184 String jobId = bundleJob.getId(); 185 Job.Status[] bundleStatus = new Job.Status[1]; 186 bundleStatus[0] = bundleJob.getStatus(); 187 List<BundleActionBean> bundleActions = jpaService.execute(new BundleActionsGetStatusPendingJPAExecutor( 188 jobId)); 189 HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>(); 190 boolean foundPending = false; 191 for (BundleActionBean bAction : bundleActions) { 192 int counter = 0; 193 if (bundleActionStatus.containsKey(bAction.getStatus())) { 194 counter = bundleActionStatus.get(bAction.getStatus()) + 1; 195 } 196 else { 197 ++counter; 198 } 199 bundleActionStatus.put(bAction.getStatus(), counter); 200 if (bAction.getCoordId() == null 201 && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED)) { 202 (new BundleKillXCommand(jobId)).call(); 203 LOG.info("Bundle job ["+ jobId 204 + "] has been killed since one of its coordinator job failed submission."); 205 } 206 207 if (bAction.isPending()) { 208 foundPending = true; 209 } 210 } 211 212 if (!foundPending && checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) { 213 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 214 + "' from '" + bundleJob.getStatus() + "'"); 215 updateBundleJob(foundPending, bundleJob, bundleStatus[0]); 216 } 217 else if (!foundPending && checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) { 218 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 219 + "' from '" + bundleJob.getStatus() + "'"); 220 updateBundleJob(foundPending, bundleJob, bundleStatus[0]); 221 } 222 else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) { 223 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 224 + "' from '" + bundleJob.getStatus() + "'"); 225 updateBundleJob(foundPending, bundleJob, bundleStatus[0]); 226 } 227 else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus, foundPending)) { 228 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 229 + "' from '" + bundleJob.getStatus() + "'"); 230 updateBundleJob(foundPending, bundleJob, bundleStatus[0]); 231 } 232 else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) { 233 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 234 + "' from '" + bundleJob.getStatus() + "'"); 235 updateBundleJob(foundPending, bundleJob, bundleStatus[0]); 236 } 237 } 238 catch (Exception ex) { 239 LOG.error("Exception happened during aggregate bundle job's status, job = " 240 + bundleJob.getId(), ex); 241 } 242 } 243 } 244 245 } 246 247 private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException, 248 CommandException { 249 if (CoordList != null) { 250 Configuration conf = Services.get().getConf(); 251 boolean backwardSupportForCoordStatus = conf.getBoolean(CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false); 252 253 for (CoordinatorJobBean coordJob : CoordList) { 254 try { 255 // if namespace 0.1 is used and backward support is true, then ignore this coord job 256 if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null 257 && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) { 258 continue; 259 } 260 String jobId = coordJob.getId(); 261 Job.Status[] coordStatus = new Job.Status[1]; 262 coordStatus[0] = coordJob.getStatus(); 263 //Get count of Coordinator actions with pending true 264 boolean isPending = false; 265 int count = jpaService.execute(new CoordJobGetPendingActionsCountJPAExecutor(jobId)); 266 if (count > 0) { 267 isPending = true; 268 } 269 // Get status of Coordinator actions 270 List<CoordinatorAction.Status> coordActionStatusList = jpaService 271 .execute(new CoordJobGetActionsStatusJPAExecutor(jobId)); 272 HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>(); 273 274 for (CoordinatorAction.Status status : coordActionStatusList) { 275 int counter = 0; 276 if (coordActionStatus.containsKey(status)) { 277 counter = coordActionStatus.get(status) + 1; 278 } 279 else { 280 ++counter; 281 } 282 coordActionStatus.put(status, counter); 283 } 284 285 int nonPendingCoordActionsCount = coordActionStatusList.size(); 286 boolean isDoneMaterialization = coordJob.isDoneMaterialization(); 287 if ((isDoneMaterialization || coordStatus[0] == Job.Status.FAILED || coordStatus[0] == Job.Status.KILLED) 288 && checkCoordTerminalStatus(coordActionStatus, nonPendingCoordActionsCount, 289 coordStatus, isDoneMaterialization)) { 290 LOG.info("Set coordinator job [" + jobId + "] status to '" + coordStatus[0].toString() 291 + "' from '" + coordJob.getStatus() + "'"); 292 updateCoordJob(isPending, coordJob, coordStatus[0]); 293 } 294 else if (checkCoordPausedStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) { 295 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString() 296 + "' from '" + coordJob.getStatus() + "'"); 297 updateCoordJob(isPending, coordJob, coordStatus[0]); 298 } 299 else if(checkCoordSuspendStatus( coordActionStatus, nonPendingCoordActionsCount, coordStatus, coordJob.isDoneMaterialization(), isPending)) { 300 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString() 301 + "' from '" + coordJob.getStatus() + "'"); 302 updateCoordJob(isPending, coordJob, coordStatus[0]); 303 } 304 else if (checkCoordRunningStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) { 305 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString() 306 + "' from '" + coordJob.getStatus() + "'"); 307 updateCoordJob(isPending, coordJob, coordStatus[0]); 308 } 309 else { 310 checkCoordPending(isPending, coordJob, true); 311 } 312 } 313 catch (Exception ex) { 314 LOG.error("Exception happened during aggregate coordinator job's status, job = " 315 + coordJob.getId(), ex); 316 } 317 } 318 319 } 320 } 321 322 private boolean checkTerminalStatus(HashMap<Job.Status, Integer> bundleActionStatus, 323 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { 324 boolean ret = false; 325 int totalValuesSucceed = 0; 326 if (bundleActionStatus.containsKey(Job.Status.SUCCEEDED)) { 327 totalValuesSucceed = bundleActionStatus.get(Job.Status.SUCCEEDED); 328 } 329 int totalValuesFailed = 0; 330 if (bundleActionStatus.containsKey(Job.Status.FAILED)) { 331 totalValuesFailed = bundleActionStatus.get(Job.Status.FAILED); 332 } 333 int totalValuesKilled = 0; 334 if (bundleActionStatus.containsKey(Job.Status.KILLED)) { 335 totalValuesKilled = bundleActionStatus.get(Job.Status.KILLED); 336 } 337 338 int totalValuesDoneWithError = 0; 339 if (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)) { 340 totalValuesDoneWithError = bundleActionStatus.get(Job.Status.DONEWITHERROR); 341 } 342 343 if (bundleActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesDoneWithError)) { 344 // If all the bundle actions are succeeded then bundle job should be succeeded. 345 if (bundleActions.size() == totalValuesSucceed) { 346 bundleStatus[0] = Job.Status.SUCCEEDED; 347 ret = true; 348 } 349 else if (bundleActions.size() == totalValuesKilled) { 350 // If all the bundle actions are KILLED then bundle job should be KILLED. 351 bundleStatus[0] = Job.Status.KILLED; 352 ret = true; 353 } 354 else if (bundleActions.size() == totalValuesFailed) { 355 // If all the bundle actions are FAILED then bundle job should be FAILED. 356 bundleStatus[0] = Job.Status.FAILED; 357 ret = true; 358 } 359 else { 360 bundleStatus[0] = Job.Status.DONEWITHERROR; 361 ret = true; 362 } 363 } 364 return ret; 365 } 366 367 private boolean checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 368 int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization) { 369 boolean ret = false; 370 int totalValuesSucceed = 0; 371 if (coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) { 372 totalValuesSucceed = coordActionStatus.get(CoordinatorAction.Status.SUCCEEDED); 373 } 374 int totalValuesFailed = 0; 375 if (coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)) { 376 totalValuesFailed = coordActionStatus.get(CoordinatorAction.Status.FAILED); 377 } 378 int totalValuesKilled = 0; 379 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) { 380 totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED); 381 } 382 383 int totalValuesTimeOut = 0; 384 if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) { 385 totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT); 386 } 387 388 if (coordActionsCount == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut)) { 389 // If all the coordinator actions are succeeded then coordinator job should be succeeded. 390 if (coordActionsCount == totalValuesSucceed && isDoneMaterialization) { 391 coordStatus[0] = Job.Status.SUCCEEDED; 392 ret = true; 393 } 394 else if (coordActionsCount == totalValuesKilled) { 395 // If all the coordinator actions are KILLED then coordinator job should be KILLED. 396 coordStatus[0] = Job.Status.KILLED; 397 ret = true; 398 } 399 else if (coordActionsCount == totalValuesFailed) { 400 // If all the coordinator actions are FAILED then coordinator job should be FAILED. 401 coordStatus[0] = Job.Status.FAILED; 402 ret = true; 403 } 404 else { 405 coordStatus[0] = Job.Status.DONEWITHERROR; 406 ret = true; 407 } 408 } 409 return ret; 410 } 411 412 private boolean checkPrepStatus(HashMap<Job.Status, Integer> bundleActionStatus, 413 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { 414 boolean ret = false; 415 if (bundleActionStatus.containsKey(Job.Status.PREP)) { 416 // If all the bundle actions are PREP then bundle job should be RUNNING. 417 if (bundleActions.size() > bundleActionStatus.get(Job.Status.PREP)) { 418 bundleStatus[0] = Job.Status.RUNNING; 419 ret = true; 420 } 421 } 422 return ret; 423 } 424 425 private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus, 426 List<BundleActionBean> bundleActions, Job.Status[] bundleJobStatus) { 427 boolean ret = false; 428 429 // TODO - When bottom up cmds are allowed to change the status of parent job, 430 // if none of the bundle actions are in paused or pausedwitherror, the function should return 431 // false 432 433 // top down 434 // If the bundle job is PAUSED or PAUSEDINERROR and no children are in error 435 // state, then job should be PAUSED otherwise it should be pausedwitherror 436 if (bundleJobStatus[0] == Job.Status.PAUSED || bundleJobStatus[0] == Job.Status.PAUSEDWITHERROR) { 437 if (bundleActionStatus.containsKey(Job.Status.KILLED) 438 || bundleActionStatus.containsKey(Job.Status.FAILED) 439 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) 440 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR) 441 || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR) 442 || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) { 443 bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR; 444 } 445 else { 446 bundleJobStatus[0] = Job.Status.PAUSED; 447 } 448 ret = true; 449 } 450 451 // bottom up; check the status of parent through their children 452 else if (bundleActionStatus.containsKey(Job.Status.PAUSED) 453 && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED))) { 454 bundleJobStatus[0] = Job.Status.PAUSED; 455 ret = true; 456 } 457 else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) { 458 int pausedActions = bundleActionStatus.containsKey(Job.Status.PAUSED) ? bundleActionStatus 459 .get(Job.Status.PAUSED) : 0; 460 if (bundleActions.size() == pausedActions + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR)) { 461 bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR; 462 ret = true; 463 } 464 } 465 else { 466 ret = false; 467 } 468 return ret; 469 } 470 471 472 private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus, 473 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus, boolean isPending) { 474 boolean ret = false; 475 476 // TODO - When bottom up cmds are allowed to change the status of parent job, 477 // if none of the bundle actions are in suspended or suspendedwitherror, the function should return 478 // false 479 480 // top down 481 // if job is suspended 482 if (bundleStatus[0] == Job.Status.SUSPENDED 483 || bundleStatus[0] == Job.Status.SUSPENDEDWITHERROR) { 484 if (bundleActionStatus.containsKey(Job.Status.KILLED) 485 || bundleActionStatus.containsKey(Job.Status.FAILED) 486 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) 487 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR) 488 || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) { 489 bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR; 490 } 491 else { 492 bundleStatus[0] = Job.Status.SUSPENDED; 493 } 494 ret =true; 495 } 496 497 // bottom up 498 // Update status of parent from the status of its children 499 else if (!isPending && bundleActionStatus.containsKey(Job.Status.SUSPENDED) 500 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) { 501 int succeededActions = bundleActionStatus.containsKey(Job.Status.SUCCEEDED) ? bundleActionStatus 502 .get(Job.Status.SUCCEEDED) : 0; 503 int killedActions = bundleActionStatus.containsKey(Job.Status.KILLED) ? bundleActionStatus 504 .get(Job.Status.KILLED) : 0; 505 int failedActions = bundleActionStatus.containsKey(Job.Status.FAILED) ? bundleActionStatus 506 .get(Job.Status.FAILED) : 0; 507 int doneWithErrorActions = bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) ? bundleActionStatus 508 .get(Job.Status.DONEWITHERROR) : 0; 509 510 if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions) { 511 bundleStatus[0] = Job.Status.SUSPENDED; 512 ret = true; 513 } 514 else if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR) 515 + bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions + killedActions + failedActions + doneWithErrorActions) { 516 bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR; 517 ret = true; 518 } 519 } 520 return ret; 521 522 } 523 524 private boolean checkCoordPausedStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 525 int coordActionsCount, Job.Status[] coordStatus){ 526 boolean ret = false; 527 if (coordStatus[0].equals(Job.Status.PAUSED) || coordStatus[0].equals(Job.Status.PAUSEDWITHERROR)) { 528 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) 529 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) 530 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) { 531 coordStatus[0] = Job.Status.PAUSEDWITHERROR; 532 } 533 else { 534 coordStatus[0] = Job.Status.PAUSED; 535 } 536 ret = true; 537 } 538 return ret; 539 } 540 private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 541 int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization, boolean isPending) { 542 boolean ret = false; 543 544 // TODO - When bottom up cmds are allowed to change the status of parent job 545 //if none of the coord actions are in suspended or suspendedwitherror and materialization done is false 546 //,then the function should return 547 // false 548 549 // top down 550 // check for children only when parent is suspended 551 if (coordStatus[0] == Job.Status.SUSPENDED || coordStatus[0] == Job.Status.SUSPENDEDWITHERROR) { 552 553 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) 554 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) 555 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) { 556 coordStatus[0] = Job.Status.SUSPENDEDWITHERROR; 557 } 558 else { 559 coordStatus[0] = Job.Status.SUSPENDED; 560 } 561 ret = true; 562 } 563 // bottom up 564 // look for children to check the parent's status only if materialization is 565 // done and all actions are non-pending 566 else if (isDoneMaterialization && !isPending && coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) { 567 int succeededActions = coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED) ? coordActionStatus 568 .get(CoordinatorAction.Status.SUCCEEDED) : 0; 569 int killedActions = coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) ? coordActionStatus 570 .get(CoordinatorAction.Status.KILLED) : 0; 571 int failedActions = coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) ? coordActionStatus 572 .get(CoordinatorAction.Status.FAILED) : 0; 573 int timedoutActions = coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) ? coordActionStatus 574 .get(CoordinatorAction.Status.TIMEDOUT) : 0; 575 576 if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) + succeededActions) { 577 coordStatus[0] = Job.Status.SUSPENDED; 578 ret = true; 579 } 580 else if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) 581 + succeededActions + killedActions + failedActions + timedoutActions) { 582 coordStatus[0] = Job.Status.SUSPENDEDWITHERROR; 583 ret = true; 584 } 585 } 586 return ret; 587 } 588 589 private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 590 int coordActionsCount, Job.Status[] coordStatus) { 591 boolean ret = false; 592 if (coordStatus[0] != Job.Status.PREP) { 593 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) 594 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) 595 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) { 596 coordStatus[0] = Job.Status.RUNNINGWITHERROR; 597 } 598 else { 599 coordStatus[0] = Job.Status.RUNNING; 600 } 601 ret = true; 602 } 603 return ret; 604 } 605 606 private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus, 607 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { 608 boolean ret = false; 609 if (bundleStatus[0] != Job.Status.PREP) { 610 if (bundleActionStatus.containsKey(Job.Status.FAILED) 611 || bundleActionStatus.containsKey(Job.Status.KILLED) 612 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) 613 || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) { 614 bundleStatus[0] = Job.Status.RUNNINGWITHERROR; 615 } 616 else { 617 bundleStatus[0] = Job.Status.RUNNING; 618 } 619 ret = true; 620 } 621 return ret; 622 623 } 624 625 private void updateBundleJob(boolean isPending, BundleJobBean bundleJob, Job.Status bundleStatus) 626 throws JPAExecutorException { 627 String jobId = bundleJob.getId(); 628 // Update the Bundle Job 629 // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and 630 // PAUSEDWITHERROR is not supported 631 bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus)); 632 if (isPending) { 633 bundleJob.setPending(); 634 LOG.info("Bundle job [" + jobId + "] Pending set to TRUE"); 635 } 636 else { 637 bundleJob.resetPending(); 638 LOG.info("Bundle job [" + jobId + "] Pending set to FALSE"); 639 } 640 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob)); 641 } 642 643 private void updateCoordJob(boolean isPending, CoordinatorJobBean coordJob, Job.Status coordStatus) 644 throws JPAExecutorException, CommandException { 645 Job.Status prevStatus = coordJob.getStatus(); 646 // Update the Coord Job 647 if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED 648 || coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) { 649 if (coordStatus == Job.Status.SUSPENDED || coordStatus == Job.Status.SUSPENDEDWITHERROR) { 650 LOG.info("Coord Job [" + coordJob.getId() 651 + "] status to "+ coordStatus +" can not be updated as its already in Terminal state"); 652 return; 653 } 654 } 655 656 boolean isPendingStateChanged = checkCoordPending(isPending, coordJob, false); 657 // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and PAUSEDWITHERROR is 658 // not supported 659 coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(coordStatus)); 660 // Backward support when coordinator namespace is 0.1 661 coordJob.setStatus(StatusUtils.getStatus(coordJob)); 662 if (coordJob.getStatus() != prevStatus || isPendingStateChanged) { 663 LOG.debug("Updating coord job " + coordJob.getId()); 664 coordJob.setLastModifiedTime(new Date()); 665 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 666 } 667 // update bundle action only when status changes in coord job 668 if (coordJob.getBundleId() != null) { 669 if (!prevStatus.equals(coordJob.getStatus())) { 670 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 671 bundleStatusUpdate.call(); 672 } 673 } 674 } 675 676 private boolean checkCoordPending(boolean isPending, CoordinatorJobBean coordJob, boolean saveToDB) 677 throws JPAExecutorException { 678 // Checking the coordinator pending should be updated or not 679 boolean prevPending = coordJob.isPending(); 680 if (isPending) { 681 coordJob.setPending(); 682 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to TRUE"); 683 } 684 else { 685 coordJob.resetPending(); 686 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to FALSE"); 687 } 688 boolean hasChange = prevPending != coordJob.isPending(); 689 if (saveToDB && hasChange) { 690 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 691 } 692 return hasChange; 693 694 } 695 696 /** 697 * Aggregate coordinator actions' status to coordinator jobs 698 * 699 * @throws JPAExecutorException thrown if failed in db updates or retrievals 700 * @throws CommandException thrown if failed to run commands 701 */ 702 private void coordTransit() throws JPAExecutorException, CommandException { 703 List<CoordinatorJobBean> pendingJobCheckList = null; 704 if (lastInstanceStartTime == null) { 705 LOG.info("Running coordinator status service first instance"); 706 // this is the first instance, we need to check for all pending jobs; 707 pendingJobCheckList = jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit)); 708 } 709 else { 710 LOG.info("Running coordinator status service from last instance time = " 711 + DateUtils.formatDateOozieTZ(lastInstanceStartTime)); 712 // this is not the first instance, we should only check jobs 713 // that have actions been 714 // updated >= start time of last service run; 715 List<String> coordJobIdList = jpaService 716 .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime)); 717 Set<String> coordIds = new HashSet<String>(); 718 for (String coordJobId : coordJobIdList) { 719 coordIds.add(coordJobId); 720 } 721 pendingJobCheckList = new ArrayList<CoordinatorJobBean>(); 722 for (String coordId : coordIds.toArray(new String[coordIds.size()])) { 723 CoordinatorJobBean coordJob; 724 try{ 725 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId)); 726 } 727 catch (JPAExecutorException jpaee) { 728 if (jpaee.getErrorCode().equals(ErrorCode.E0604)) { 729 LOG.warn("Exception happened during StatusTransitRunnable; Coordinator Job doesn't exist", jpaee); 730 continue; 731 } else { 732 throw jpaee; 733 } 734 } 735 // Running coord job might have pending false 736 Job.Status coordJobStatus = coordJob.getStatus(); 737 if (coordJob.isPending() || coordJobStatus.equals(Job.Status.PAUSED) 738 || coordJobStatus.equals(Job.Status.RUNNING) 739 || coordJobStatus.equals(Job.Status.RUNNINGWITHERROR) 740 || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) { 741 pendingJobCheckList.add(coordJob); 742 } 743 } 744 } 745 aggregateCoordJobsStatus(pendingJobCheckList); 746 } 747 } 748 749 /** 750 * Initializes the {@link StatusTransitService}. 751 * 752 * @param services services instance. 753 */ 754 @Override 755 public void init(Services services) { 756 Configuration conf = services.getConf(); 757 Runnable stateTransitRunnable = new StatusTransitRunnable(); 758 services.get(SchedulerService.class).schedule(stateTransitRunnable, 10, 759 conf.getInt(CONF_STATUSTRANSIT_INTERVAL, 60), SchedulerService.Unit.SEC); 760 } 761 762 /** 763 * Destroy the StateTransit Jobs Service. 764 */ 765 @Override 766 public void destroy() { 767 } 768 769 /** 770 * Return the public interface for the purge jobs service. 771 * 772 * @return {@link StatusTransitService}. 773 */ 774 @Override 775 public Class<? extends Service> getInterface() { 776 return StatusTransitService.class; 777 } 778 } 779