001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import java.util.ArrayList; 021import java.util.Date; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Set; 026import java.util.TreeSet; 027import java.util.Comparator; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.oozie.BundleActionBean; 031import org.apache.oozie.BundleJobBean; 032import org.apache.oozie.CoordinatorActionBean; 033import org.apache.oozie.CoordinatorJobBean; 034import org.apache.oozie.ErrorCode; 035import org.apache.oozie.client.CoordinatorAction; 036import org.apache.oozie.client.Job; 037import org.apache.oozie.command.CommandException; 038import org.apache.oozie.command.bundle.BundleKillXCommand; 039import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 040import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; 041import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; 042import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; 043import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; 044import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor; 045import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; 046import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 047import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor; 048import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor; 049import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 050import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 051import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor; 052import org.apache.oozie.executor.jpa.JPAExecutorException; 053import org.apache.oozie.util.DateUtils; 054import org.apache.oozie.lock.LockToken; 055import org.apache.oozie.util.StatusUtils; 056import org.apache.oozie.util.XLog; 057 058/** 059 * StateTransitService is scheduled to run at the configured interval. 060 * <p/> 061 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 (job 062 * done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to 063 * SUCCEEDED. 064 */ 065public class StatusTransitService implements Service { 066 public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService."; 067 public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval"; 068 public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status"; 069 public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR = CONF_PREFIX + "backward.support.for.states.without.error"; 070 private static int limit = -1; 071 private static Date lastInstanceStartTime = null; 072 private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class); 073 074 /** 075 * StateTransitRunnable is the runnable which is scheduled to run at the configured interval. 076 * <p/> 077 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 078 * (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to 079 * SUCCEEDED. 080 */ 081 public static class StatusTransitRunnable implements Runnable { 082 private JPAService jpaService = null; 083 private LockToken lock; 084 085 public StatusTransitRunnable() { 086 jpaService = Services.get().get(JPAService.class); 087 if (jpaService == null) { 088 LOG.error("Missing JPAService"); 089 } 090 } 091 092 @Override 093 public void run() { 094 try { 095 Date curDate = new Date(); // records the start time of this service run; 096 097 // first check if there is some other instance running; 098 lock = Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(), 099 lockTimeout); 100 if (lock == null) { 101 LOG.info("This StatusTransitService instance" 102 + " will not run since there is already an instance running"); 103 } 104 else { 105 LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName()); 106 // running coord jobs transit service 107 coordTransit(); 108 // running bundle jobs transit service 109 bundleTransit(); 110 111 lastInstanceStartTime = curDate; 112 } 113 } 114 catch (Exception ex) { 115 LOG.warn("Exception happened during StatusTransitRunnable ", ex); 116 } 117 finally { 118 // release lock; 119 if (lock != null) { 120 lock.release(); 121 LOG.info("Released lock for [{0}]", StatusTransitService.class.getName()); 122 } 123 } 124 } 125 126 public List<BundleJobBean> removeDuplicates(List<BundleJobBean> pendingJobList) { 127 Set<BundleJobBean> s = new TreeSet<BundleJobBean>(new Comparator<BundleJobBean>() { 128 @Override 129 public int compare(BundleJobBean b1, BundleJobBean b2) { 130 if (b1.getId().equals(b2.getId())) { 131 return 0; 132 } 133 else { 134 return 1; 135 } 136 } 137 }); 138 s.addAll(pendingJobList); 139 return new ArrayList<BundleJobBean>(s); 140 } 141 142 /** 143 * Aggregate bundle actions' status to bundle jobs 144 * 145 * @throws JPAExecutorException thrown if failed in db updates or retrievals 146 * @throws CommandException thrown if failed to run commands 147 */ 148 private void bundleTransit() throws JPAExecutorException, CommandException { 149 List<BundleJobBean> pendingJobCheckList = null; 150 151 if (lastInstanceStartTime == null) { 152 LOG.info("Running bundle status service first instance"); 153 // this is the first instance, we need to check for all pending or running jobs; 154 pendingJobCheckList = jpaService.execute(new BundleJobsGetRunningOrPendingJPAExecutor(limit)); 155 } 156 else { 157 LOG.info("Running bundle status service from last instance time = " 158 + DateUtils.formatDateOozieTZ(lastInstanceStartTime)); 159 // this is not the first instance, we should only check jobs that have actions been 160 // updated >= start time of last service run; 161 List<BundleActionBean> actionList = BundleActionQueryExecutor.getInstance().getList( 162 BundleActionQuery.GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME, lastInstanceStartTime); 163 Set<String> bundleIds = new HashSet<String>(); 164 for (BundleActionBean action : actionList) { 165 bundleIds.add(action.getBundleId()); 166 } 167 pendingJobCheckList = new ArrayList<BundleJobBean>(); 168 for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) { 169 BundleJobBean bundle = BundleJobQueryExecutor.getInstance().get( 170 BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME, bundleId); 171 // Running bundle job might have pending false 172 if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING) 173 || bundle.getStatus().equals(Job.Status.RUNNINGWITHERROR) 174 || bundle.getStatus().equals(Job.Status.PAUSED) 175 || bundle.getStatus().equals(Job.Status.PAUSEDWITHERROR)) { 176 pendingJobCheckList.add(bundle); 177 } 178 } 179 } 180 aggregateBundleJobsStatus(pendingJobCheckList); 181 } 182 183 private void aggregateBundleJobsStatus(List<BundleJobBean> bundleLists) throws JPAExecutorException, 184 CommandException { 185 if (bundleLists != null) { 186 for (BundleJobBean bundleJob : bundleLists) { 187 try { 188 String jobId = bundleJob.getId(); 189 Job.Status[] bundleStatus = new Job.Status[1]; 190 bundleStatus[0] = bundleJob.getStatus(); 191 List<BundleActionBean> bundleActions = BundleActionQueryExecutor.getInstance().getList( 192 BundleActionQuery.GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE, jobId); 193 HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>(); 194 boolean foundPending = false; 195 for (BundleActionBean bAction : bundleActions) { 196 int counter = 0; 197 if (bundleActionStatus.containsKey(bAction.getStatus())) { 198 counter = bundleActionStatus.get(bAction.getStatus()) + 1; 199 } 200 else { 201 ++counter; 202 } 203 bundleActionStatus.put(bAction.getStatus(), counter); 204 if (bAction.getCoordId() == null 205 && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED)) { 206 (new BundleKillXCommand(jobId)).call(); 207 LOG.info("Bundle job ["+ jobId 208 + "] has been killed since one of its coordinator job failed submission."); 209 } 210 211 if (bAction.isPending()) { 212 foundPending = true; 213 } 214 } 215 216 if (!foundPending && checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) { 217 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 218 + "' from '" + bundleJob.getStatus() + "'"); 219 updateBundleJob(foundPending, bundleJob, bundleStatus[0]); 220 } 221 else if (!foundPending && checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) { 222 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 223 + "' from '" + bundleJob.getStatus() + "'"); 224 updateBundleJob(foundPending, bundleJob, bundleStatus[0]); 225 } 226 else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) { 227 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 228 + "' from '" + bundleJob.getStatus() + "'"); 229 updateBundleJob(foundPending, bundleJob, bundleStatus[0]); 230 } 231 else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus, foundPending)) { 232 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 233 + "' from '" + bundleJob.getStatus() + "'"); 234 updateBundleJob(foundPending, bundleJob, bundleStatus[0]); 235 } 236 else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) { 237 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 238 + "' from '" + bundleJob.getStatus() + "'"); 239 updateBundleJob(foundPending, bundleJob, bundleStatus[0]); 240 } 241 } 242 catch (Exception ex) { 243 LOG.error("Exception happened during aggregate bundle job's status, job = " 244 + bundleJob.getId(), ex); 245 } 246 } 247 } 248 249 } 250 251 private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException, 252 CommandException { 253 if (CoordList != null) { 254 Configuration conf = Services.get().getConf(); 255 boolean backwardSupportForCoordStatus = conf.getBoolean(CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false); 256 257 for (CoordinatorJobBean coordJob : CoordList) { 258 try { 259 // if namespace 0.1 is used and backward support is true, then ignore this coord job 260 if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null 261 && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) { 262 continue; 263 } 264 String jobId = coordJob.getId(); 265 Job.Status[] coordStatus = new Job.Status[1]; 266 coordStatus[0] = coordJob.getStatus(); 267 //Get count of Coordinator actions with pending true 268 boolean isPending = false; 269 int count = jpaService.execute(new CoordJobGetPendingActionsCountJPAExecutor(jobId)); 270 if (count > 0) { 271 isPending = true; 272 } 273 // Get status of Coordinator actions 274 List<CoordinatorAction.Status> coordActionStatusList = jpaService 275 .execute(new CoordJobGetActionsStatusJPAExecutor(jobId)); 276 HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>(); 277 278 for (CoordinatorAction.Status status : coordActionStatusList) { 279 int counter = 0; 280 if (coordActionStatus.containsKey(status)) { 281 counter = coordActionStatus.get(status) + 1; 282 } 283 else { 284 ++counter; 285 } 286 coordActionStatus.put(status, counter); 287 } 288 289 int nonPendingCoordActionsCount = coordActionStatusList.size(); 290 boolean isDoneMaterialization = coordJob.isDoneMaterialization(); 291 if ((isDoneMaterialization || coordStatus[0] == Job.Status.FAILED || coordStatus[0] == Job.Status.KILLED) 292 && checkCoordTerminalStatus(coordActionStatus, nonPendingCoordActionsCount, 293 coordStatus, isDoneMaterialization)) { 294 updateCoordJob(isPending, coordJob, coordStatus[0]); 295 } 296 else if (checkCoordPausedStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) { 297 updateCoordJob(isPending, coordJob, coordStatus[0]); 298 } 299 else if(checkCoordSuspendStatus( coordActionStatus, nonPendingCoordActionsCount, coordStatus, coordJob.isDoneMaterialization(), isPending)) { 300 updateCoordJob(isPending, coordJob, coordStatus[0]); 301 } 302 else if (checkCoordRunningStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) { 303 updateCoordJob(isPending, coordJob, coordStatus[0]); 304 } 305 else { 306 checkCoordPending(isPending, coordJob, true); 307 } 308 } 309 catch (Exception ex) { 310 LOG.error("Exception happened during aggregate coordinator job's status, job = " 311 + coordJob.getId(), ex); 312 } 313 } 314 315 } 316 } 317 318 private boolean checkTerminalStatus(HashMap<Job.Status, Integer> bundleActionStatus, 319 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { 320 boolean ret = false; 321 int totalValuesSucceed = 0; 322 if (bundleActionStatus.containsKey(Job.Status.SUCCEEDED)) { 323 totalValuesSucceed = bundleActionStatus.get(Job.Status.SUCCEEDED); 324 } 325 int totalValuesFailed = 0; 326 if (bundleActionStatus.containsKey(Job.Status.FAILED)) { 327 totalValuesFailed = bundleActionStatus.get(Job.Status.FAILED); 328 } 329 int totalValuesKilled = 0; 330 if (bundleActionStatus.containsKey(Job.Status.KILLED)) { 331 totalValuesKilled = bundleActionStatus.get(Job.Status.KILLED); 332 } 333 334 int totalValuesDoneWithError = 0; 335 if (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)) { 336 totalValuesDoneWithError = bundleActionStatus.get(Job.Status.DONEWITHERROR); 337 } 338 339 if (bundleActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesDoneWithError)) { 340 // If all bundle action is done and bundle is killed, then don't change the status. 341 if (bundleStatus[0].equals(Job.Status.KILLED)) { 342 bundleStatus[0] = Job.Status.KILLED; 343 return true; 344 } 345 // If all the bundle actions are succeeded then bundle job should be succeeded. 346 if (bundleActions.size() == totalValuesSucceed) { 347 bundleStatus[0] = Job.Status.SUCCEEDED; 348 ret = true; 349 } 350 else if (bundleActions.size() == totalValuesKilled) { 351 // If all the bundle actions are KILLED then bundle job should be KILLED. 352 bundleStatus[0] = Job.Status.KILLED; 353 ret = true; 354 } 355 else if (bundleActions.size() == totalValuesFailed) { 356 // If all the bundle actions are FAILED then bundle job should be FAILED. 357 bundleStatus[0] = Job.Status.FAILED; 358 ret = true; 359 } 360 else { 361 bundleStatus[0] = Job.Status.DONEWITHERROR; 362 ret = true; 363 } 364 } 365 return ret; 366 } 367 368 private boolean checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 369 int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization) { 370 boolean ret = false; 371 int totalValuesSucceed = 0; 372 if (coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) { 373 totalValuesSucceed = coordActionStatus.get(CoordinatorAction.Status.SUCCEEDED); 374 } 375 int totalValuesFailed = 0; 376 if (coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)) { 377 totalValuesFailed = coordActionStatus.get(CoordinatorAction.Status.FAILED); 378 } 379 int totalValuesKilled = 0; 380 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) { 381 totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED); 382 } 383 int totalValuesTimeOut = 0; 384 if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) { 385 totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT); 386 } 387 int totalValuesSkipped = 0; 388 if (coordActionStatus.containsKey(CoordinatorAction.Status.SKIPPED)) { 389 totalValuesSkipped = coordActionStatus.get(CoordinatorAction.Status.SKIPPED); 390 } 391 392 if (coordActionsCount == 393 (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut + totalValuesSkipped)) { 394 395 // If all coord action is done and coord is killed, then don't change the status. 396 if (coordStatus[0].equals(Job.Status.KILLED)) { 397 coordStatus[0] = Job.Status.KILLED; 398 return true; 399 } 400 // If all the coordinator actions are succeeded then coordinator job should be succeeded. 401 if (coordActionsCount == (totalValuesSucceed + totalValuesSkipped) && isDoneMaterialization) { 402 coordStatus[0] = Job.Status.SUCCEEDED; 403 ret = true; 404 } 405 else if (coordActionsCount == totalValuesKilled) { 406 // If all the coordinator actions are KILLED then coordinator job should be KILLED. 407 coordStatus[0] = Job.Status.KILLED; 408 ret = true; 409 } 410 else if (coordActionsCount == totalValuesFailed) { 411 // If all the coordinator actions are FAILED then coordinator job should be FAILED. 412 coordStatus[0] = Job.Status.FAILED; 413 ret = true; 414 } 415 else { 416 coordStatus[0] = Job.Status.DONEWITHERROR; 417 ret = true; 418 } 419 } 420 return ret; 421 } 422 423 private boolean checkPrepStatus(HashMap<Job.Status, Integer> bundleActionStatus, 424 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { 425 boolean ret = false; 426 if (bundleActionStatus.containsKey(Job.Status.PREP)) { 427 // If all the bundle actions are PREP then bundle job should be RUNNING. 428 if (bundleActions.size() > bundleActionStatus.get(Job.Status.PREP)) { 429 bundleStatus[0] = getRunningStatus(bundleActionStatus); 430 ret = true; 431 } 432 } 433 return ret; 434 } 435 436 private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus, 437 List<BundleActionBean> bundleActions, Job.Status[] bundleJobStatus) { 438 boolean ret = false; 439 440 // TODO - When bottom up cmds are allowed to change the status of parent job, 441 // if none of the bundle actions are in paused or pausedwitherror, the function should return 442 // false 443 444 // top down 445 // If the bundle job is PAUSED or PAUSEDINERROR and no children are in error 446 // state, then job should be PAUSED otherwise it should be pausedwitherror 447 if (bundleJobStatus[0] == Job.Status.PAUSED || bundleJobStatus[0] == Job.Status.PAUSEDWITHERROR) { 448 if (bundleActionStatus.containsKey(Job.Status.KILLED) 449 || bundleActionStatus.containsKey(Job.Status.FAILED) 450 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) 451 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR) 452 || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR) 453 || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) { 454 bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR; 455 } 456 else { 457 bundleJobStatus[0] = Job.Status.PAUSED; 458 } 459 ret = true; 460 } 461 462 // bottom up; check the status of parent through their children 463 else if (bundleActionStatus.containsKey(Job.Status.PAUSED) 464 && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED))) { 465 bundleJobStatus[0] = Job.Status.PAUSED; 466 ret = true; 467 } 468 else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) { 469 int pausedActions = bundleActionStatus.containsKey(Job.Status.PAUSED) ? bundleActionStatus 470 .get(Job.Status.PAUSED) : 0; 471 if (bundleActions.size() == pausedActions + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR)) { 472 bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR; 473 ret = true; 474 } 475 } 476 else { 477 ret = false; 478 } 479 return ret; 480 } 481 482 483 private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus, 484 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus, boolean isPending) { 485 boolean ret = false; 486 487 // TODO - When bottom up cmds are allowed to change the status of parent job, 488 // if none of the bundle actions are in suspended or suspendedwitherror, the function should return 489 // false 490 491 // top down 492 // if job is suspended 493 if (bundleStatus[0] == Job.Status.SUSPENDED 494 || bundleStatus[0] == Job.Status.SUSPENDEDWITHERROR) { 495 if (bundleActionStatus.containsKey(Job.Status.KILLED) 496 || bundleActionStatus.containsKey(Job.Status.FAILED) 497 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) 498 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR) 499 || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) { 500 bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR; 501 } 502 else { 503 bundleStatus[0] = Job.Status.SUSPENDED; 504 } 505 ret =true; 506 } 507 508 // bottom up 509 // Update status of parent from the status of its children 510 else if (!isPending && bundleActionStatus.containsKey(Job.Status.SUSPENDED) 511 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) { 512 int succeededActions = bundleActionStatus.containsKey(Job.Status.SUCCEEDED) ? bundleActionStatus 513 .get(Job.Status.SUCCEEDED) : 0; 514 int killedActions = bundleActionStatus.containsKey(Job.Status.KILLED) ? bundleActionStatus 515 .get(Job.Status.KILLED) : 0; 516 int failedActions = bundleActionStatus.containsKey(Job.Status.FAILED) ? bundleActionStatus 517 .get(Job.Status.FAILED) : 0; 518 int doneWithErrorActions = bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) ? bundleActionStatus 519 .get(Job.Status.DONEWITHERROR) : 0; 520 521 if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions) { 522 bundleStatus[0] = Job.Status.SUSPENDED; 523 ret = true; 524 } 525 else if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR) 526 + bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions + killedActions + failedActions + doneWithErrorActions) { 527 bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR; 528 ret = true; 529 } 530 } 531 return ret; 532 533 } 534 535 private boolean checkCoordPausedStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 536 int coordActionsCount, Job.Status[] coordStatus){ 537 boolean ret = false; 538 if (coordStatus[0].equals(Job.Status.PAUSED) || coordStatus[0].equals(Job.Status.PAUSEDWITHERROR)) { 539 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) 540 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) 541 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) { 542 coordStatus[0] = Job.Status.PAUSEDWITHERROR; 543 } 544 else { 545 coordStatus[0] = Job.Status.PAUSED; 546 } 547 ret = true; 548 } 549 return ret; 550 } 551 private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 552 int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization, boolean isPending) { 553 boolean ret = false; 554 555 // TODO - When bottom up cmds are allowed to change the status of parent job 556 //if none of the coord actions are in suspended or suspendedwitherror and materialization done is false 557 //,then the function should return 558 // false 559 560 // top down 561 // check for children only when parent is suspended 562 if (coordStatus[0] == Job.Status.SUSPENDED || coordStatus[0] == Job.Status.SUSPENDEDWITHERROR) { 563 564 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) 565 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) 566 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) { 567 coordStatus[0] = Job.Status.SUSPENDEDWITHERROR; 568 } 569 else { 570 coordStatus[0] = Job.Status.SUSPENDED; 571 } 572 ret = true; 573 } 574 // bottom up 575 // look for children to check the parent's status only if materialization is 576 // done and all actions are non-pending 577 else if (isDoneMaterialization && !isPending && coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) { 578 int succeededActions = coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED) ? coordActionStatus 579 .get(CoordinatorAction.Status.SUCCEEDED) : 0; 580 int killedActions = coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) ? coordActionStatus 581 .get(CoordinatorAction.Status.KILLED) : 0; 582 int failedActions = coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) ? coordActionStatus 583 .get(CoordinatorAction.Status.FAILED) : 0; 584 int timedoutActions = coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) ? coordActionStatus 585 .get(CoordinatorAction.Status.TIMEDOUT) : 0; 586 587 if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) + succeededActions) { 588 coordStatus[0] = Job.Status.SUSPENDED; 589 ret = true; 590 } 591 else if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) 592 + succeededActions + killedActions + failedActions + timedoutActions) { 593 coordStatus[0] = Job.Status.SUSPENDEDWITHERROR; 594 ret = true; 595 } 596 } 597 return ret; 598 } 599 600 private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 601 int coordActionsCount, Job.Status[] coordStatus) { 602 boolean ret = false; 603 if (coordStatus[0] != Job.Status.PREP 604 && coordStatus[0] != Job.Status.PREPSUSPENDED 605 && coordStatus[0] != Job.Status.PREPPAUSED) { 606 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) 607 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) 608 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) { 609 coordStatus[0] = Job.Status.RUNNINGWITHERROR; 610 } 611 else { 612 coordStatus[0] = Job.Status.RUNNING; 613 } 614 ret = true; 615 } 616 return ret; 617 } 618 619 private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus, 620 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { 621 boolean ret = false; 622 if (bundleStatus[0] != Job.Status.PREP) { 623 bundleStatus[0] = getRunningStatus(bundleActionStatus); 624 ret = true; 625 } 626 return ret; 627 628 } 629 630 private Job.Status getRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus) { 631 if (bundleActionStatus.containsKey(Job.Status.FAILED) 632 || bundleActionStatus.containsKey(Job.Status.KILLED) 633 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) 634 || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) { 635 return Job.Status.RUNNINGWITHERROR; 636 } 637 else { 638 return Job.Status.RUNNING; 639 } 640 } 641 642 private void updateBundleJob(boolean isPending, BundleJobBean bundleJob, Job.Status bundleStatus) 643 throws JPAExecutorException { 644 String jobId = bundleJob.getId(); 645 // Update the Bundle Job 646 // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and 647 // PAUSEDWITHERROR is not supported 648 bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus)); 649 if (isPending) { 650 bundleJob.setPending(); 651 LOG.info("Bundle job [" + jobId + "] Pending set to TRUE"); 652 } 653 else { 654 bundleJob.resetPending(); 655 LOG.info("Bundle job [" + jobId + "] Pending set to FALSE"); 656 } 657 BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME, 658 bundleJob); 659 } 660 661 private void updateCoordJob(boolean isPending, CoordinatorJobBean coordJob, Job.Status coordStatus) 662 throws JPAExecutorException, CommandException { 663 Job.Status prevStatus = coordJob.getStatus(); 664 // Update the Coord Job 665 if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED 666 || coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) { 667 if (coordStatus == Job.Status.SUSPENDED || coordStatus == Job.Status.SUSPENDEDWITHERROR) { 668 LOG.info("Coord Job [" + coordJob.getId() 669 + "] status to "+ coordStatus +" can not be updated as its already in Terminal state"); 670 return; 671 } 672 } 673 674 boolean isPendingStateChanged = checkCoordPending(isPending, coordJob, false); 675 // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and PAUSEDWITHERROR is 676 // not supported 677 coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(coordStatus)); 678 // Backward support when coordinator namespace is 0.1 679 coordJob.setStatus(StatusUtils.getStatus(coordJob)); 680 if (coordJob.getStatus() != prevStatus || isPendingStateChanged) { 681 LOG.info("Set coordinator job [" + coordJob.getId() + "] status to '" + coordJob.getStatus() + "' from '" 682 + prevStatus + "'"); 683 coordJob.setLastModifiedTime(new Date()); 684 CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME, coordJob); 685 } 686 // update bundle action only when status changes in coord job 687 if (coordJob.getBundleId() != null) { 688 if (!prevStatus.equals(coordJob.getStatus())) { 689 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 690 bundleStatusUpdate.call(); 691 } 692 } 693 } 694 695 private boolean checkCoordPending(boolean isPending, CoordinatorJobBean coordJob, boolean saveToDB) 696 throws JPAExecutorException { 697 // Checking the coordinator pending should be updated or not 698 boolean prevPending = coordJob.isPending(); 699 if (isPending) { 700 coordJob.setPending(); 701 } 702 else { 703 coordJob.resetPending(); 704 } 705 boolean hasChange = prevPending != coordJob.isPending(); 706 if (saveToDB && hasChange) { 707 LOG.info("Change coordinator job [" + coordJob.getId() + "] pending to '" + coordJob.isPending() 708 + "' from '" + prevPending + "'"); 709 CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME, coordJob); 710 } 711 return hasChange; 712 713 } 714 715 /** 716 * Aggregate coordinator actions' status to coordinator jobs 717 * 718 * @throws JPAExecutorException thrown if failed in db updates or retrievals 719 * @throws CommandException thrown if failed to run commands 720 */ 721 private void coordTransit() throws JPAExecutorException, CommandException { 722 List<CoordinatorJobBean> pendingJobCheckList = null; 723 if (lastInstanceStartTime == null) { 724 LOG.info("Running coordinator status service first instance"); 725 // this is the first instance, we need to check for all pending jobs; 726 pendingJobCheckList = jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit)); 727 } 728 else { 729 LOG.info("Running coordinator status service from last instance time = " 730 + DateUtils.formatDateOozieTZ(lastInstanceStartTime)); 731 // this is not the first instance, we should only check jobs 732 // that have actions or jobs been 733 // updated >= start time of last service run; 734 List<CoordinatorActionBean> actionsList = CoordActionQueryExecutor.getInstance().getList( 735 CoordActionQuery.GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME, lastInstanceStartTime); 736 Set<String> coordIds = new HashSet<String>(); 737 for (CoordinatorActionBean action : actionsList) { 738 coordIds.add(action.getJobId()); 739 } 740 741 pendingJobCheckList = new ArrayList<CoordinatorJobBean>(); 742 for (String coordId : coordIds.toArray(new String[coordIds.size()])) { 743 CoordinatorJobBean coordJob; 744 try { 745 coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coordId); 746 } 747 catch (JPAExecutorException jpaee) { 748 if (jpaee.getErrorCode().equals(ErrorCode.E0604)) { 749 LOG.warn("Exception happened during StatusTransitRunnable; Coordinator Job doesn't exist", jpaee); 750 continue; 751 } else { 752 throw jpaee; 753 } 754 } 755 // Running coord job might have pending false 756 Job.Status coordJobStatus = coordJob.getStatus(); 757 if ((coordJob.isPending() || coordJobStatus.equals(Job.Status.PAUSED) 758 || coordJobStatus.equals(Job.Status.RUNNING) 759 || coordJobStatus.equals(Job.Status.RUNNINGWITHERROR) 760 || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) 761 && !coordJobStatus.equals(Job.Status.IGNORED)) { 762 pendingJobCheckList.add(coordJob); 763 } 764 } 765 pendingJobCheckList.addAll(CoordJobQueryExecutor.getInstance().getList( 766 CoordJobQuery.GET_COORD_JOBS_CHANGED, lastInstanceStartTime)); 767 } 768 aggregateCoordJobsStatus(pendingJobCheckList); 769 } 770 } 771 772 /** 773 * Initializes the {@link StatusTransitService}. 774 * 775 * @param services services instance. 776 */ 777 @Override 778 public void init(Services services) { 779 Configuration conf = services.getConf(); 780 Runnable stateTransitRunnable = new StatusTransitRunnable(); 781 services.get(SchedulerService.class).schedule(stateTransitRunnable, 10, 782 conf.getInt(CONF_STATUSTRANSIT_INTERVAL, 60), SchedulerService.Unit.SEC); 783 } 784 785 /** 786 * Destroy the StateTransit Jobs Service. 787 */ 788 @Override 789 public void destroy() { 790 } 791 792 /** 793 * Return the public interface for the purge jobs service. 794 * 795 * @return {@link StatusTransitService}. 796 */ 797 @Override 798 public Class<? extends Service> getInterface() { 799 return StatusTransitService.class; 800 } 801} 802