001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.util.ArrayList; 021 import java.util.Arrays; 022 import java.util.Date; 023 import java.util.HashMap; 024 import java.util.HashSet; 025 import java.util.List; 026 import java.util.Set; 027 import java.util.TreeSet; 028 import java.util.Comparator; 029 030 import org.apache.hadoop.conf.Configuration; 031 import org.apache.oozie.BundleActionBean; 032 import org.apache.oozie.BundleJobBean; 033 import org.apache.oozie.CoordinatorActionBean; 034 import org.apache.oozie.CoordinatorJobBean; 035 import org.apache.oozie.ErrorCode; 036 import org.apache.oozie.client.CoordinatorAction; 037 import org.apache.oozie.client.Job; 038 import org.apache.oozie.command.CommandException; 039 import org.apache.oozie.command.bundle.BundleKillXCommand; 040 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 041 import org.apache.oozie.executor.jpa.BundleActionsGetByLastModifiedTimeJPAExecutor; 042 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor; 043 import org.apache.oozie.executor.jpa.BundleActionsGetStatusPendingJPAExecutor; 044 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 045 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor; 046 import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor; 047 import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor; 048 import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor; 049 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 050 import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor; 051 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor; 052 import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor; 053 import org.apache.oozie.executor.jpa.JPAExecutorException; 054 import org.apache.oozie.util.DateUtils; 055 import org.apache.oozie.util.MemoryLocks; 056 import org.apache.oozie.util.StatusUtils; 057 import org.apache.oozie.util.XLog; 058 059 /** 060 * StateTransitService is scheduled to run at the configured interval. 061 * <p/> 062 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 (job 063 * done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to 064 * SUCCEEDED. 065 */ 066 public class StatusTransitService implements Service { 067 public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService."; 068 public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval"; 069 public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status"; 070 public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR = CONF_PREFIX + "backward.support.for.states.without.error"; 071 private static int limit = -1; 072 private static Date lastInstanceStartTime = null; 073 private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class); 074 075 /** 076 * StateTransitRunnable is the runnable which is scheduled to run at the configured interval. 077 * <p/> 078 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 079 * (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to 080 * SUCCEEDED. 081 */ 082 static class StatusTransitRunnable implements Runnable { 083 private JPAService jpaService = null; 084 private MemoryLocks.LockToken lock; 085 086 public StatusTransitRunnable() { 087 jpaService = Services.get().get(JPAService.class); 088 if (jpaService == null) { 089 LOG.error("Missing JPAService"); 090 } 091 } 092 093 @Override 094 public void run() { 095 try { 096 Date curDate = new Date(); // records the start time of this service run; 097 098 // first check if there is some other instance running; 099 lock = Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(), 100 lockTimeout); 101 if (lock == null) { 102 LOG.info("This StatusTransitService instance" 103 + " will not run since there is already an instance running"); 104 } 105 else { 106 LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName()); 107 // running coord jobs transit service 108 coordTransit(); 109 // running bundle jobs transit service 110 bundleTransit(); 111 112 lastInstanceStartTime = curDate; 113 } 114 } 115 catch (Exception ex) { 116 LOG.warn("Exception happened during StatusTransitRunnable ", ex); 117 } 118 finally { 119 // release lock; 120 if (lock != null) { 121 lock.release(); 122 LOG.info("Released lock for [{0}]", StatusTransitService.class.getName()); 123 } 124 } 125 } 126 127 public List<BundleJobBean> removeDuplicates(List<BundleJobBean> pendingJobList) { 128 Set<BundleJobBean> s = new TreeSet<BundleJobBean>(new Comparator<BundleJobBean>() { 129 @Override 130 public int compare(BundleJobBean b1, BundleJobBean b2) { 131 if (b1.getId().equals(b2.getId())) { 132 return 0; 133 } 134 else 135 return 1; 136 } 137 }); 138 s.addAll(pendingJobList); 139 return new ArrayList<BundleJobBean>(s); 140 } 141 142 /** 143 * Aggregate bundle actions' status to bundle jobs 144 * 145 * @throws JPAExecutorException thrown if failed in db updates or retrievals 146 * @throws CommandException thrown if failed to run commands 147 */ 148 private void bundleTransit() throws JPAExecutorException, CommandException { 149 List<BundleJobBean> pendingJobCheckList = null; 150 151 if (lastInstanceStartTime == null) { 152 LOG.info("Running bundle status service first instance"); 153 // this is the first instance, we need to check for all pending or running jobs; 154 pendingJobCheckList = jpaService.execute(new BundleJobsGetRunningOrPendingJPAExecutor(limit)); 155 } 156 else { 157 LOG.info("Running bundle status service from last instance time = " 158 + DateUtils.formatDateOozieTZ(lastInstanceStartTime)); 159 // this is not the first instance, we should only check jobs that have actions been 160 // updated >= start time of last service run; 161 List<BundleActionBean> actionList = jpaService 162 .execute(new BundleActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime)); 163 Set<String> bundleIds = new HashSet<String>(); 164 for (BundleActionBean action : actionList) { 165 bundleIds.add(action.getBundleId()); 166 } 167 pendingJobCheckList = new ArrayList<BundleJobBean>(); 168 for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) { 169 BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); 170 // Running bundle job might have pending false 171 if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING) 172 || bundle.getStatus().equals(Job.Status.RUNNINGWITHERROR) 173 || bundle.getStatus().equals(Job.Status.PAUSED) 174 || bundle.getStatus().equals(Job.Status.PAUSEDWITHERROR)) { 175 pendingJobCheckList.add(bundle); 176 } 177 } 178 } 179 aggregateBundleJobsStatus(pendingJobCheckList); 180 } 181 182 private void aggregateBundleJobsStatus(List<BundleJobBean> bundleLists) throws JPAExecutorException, 183 CommandException { 184 if (bundleLists != null) { 185 for (BundleJobBean bundleJob : bundleLists) { 186 try { 187 String jobId = bundleJob.getId(); 188 Job.Status[] bundleStatus = new Job.Status[1]; 189 bundleStatus[0] = bundleJob.getStatus(); 190 List<BundleActionBean> bundleActions = jpaService.execute(new BundleActionsGetStatusPendingJPAExecutor( 191 jobId)); 192 HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>(); 193 boolean foundPending = false; 194 for (BundleActionBean bAction : bundleActions) { 195 int counter = 0; 196 if (bundleActionStatus.containsKey(bAction.getStatus())) { 197 counter = bundleActionStatus.get(bAction.getStatus()) + 1; 198 } 199 else { 200 ++counter; 201 } 202 bundleActionStatus.put(bAction.getStatus(), counter); 203 if (bAction.getCoordId() == null 204 && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED)) { 205 (new BundleKillXCommand(jobId)).call(); 206 LOG.info("Bundle job ["+ jobId 207 + "] has been killed since one of its coordinator job failed submission."); 208 } 209 210 if (bAction.isPending()) { 211 foundPending = true; 212 } 213 } 214 215 if (!foundPending && checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) { 216 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 217 + "' from '" + bundleJob.getStatus() + "'"); 218 updateBundleJob(foundPending, bundleJob, bundleStatus[0]); 219 } 220 else if (!foundPending && checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) { 221 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 222 + "' from '" + bundleJob.getStatus() + "'"); 223 updateBundleJob(foundPending, bundleJob, bundleStatus[0]); 224 } 225 else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) { 226 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 227 + "' from '" + bundleJob.getStatus() + "'"); 228 updateBundleJob(foundPending, bundleJob, bundleStatus[0]); 229 } 230 else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus, foundPending)) { 231 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 232 + "' from '" + bundleJob.getStatus() + "'"); 233 updateBundleJob(foundPending, bundleJob, bundleStatus[0]); 234 } 235 else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) { 236 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 237 + "' from '" + bundleJob.getStatus() + "'"); 238 updateBundleJob(foundPending, bundleJob, bundleStatus[0]); 239 } 240 } 241 catch (Exception ex) { 242 LOG.error("Exception happened during aggregate bundle job's status, job = " 243 + bundleJob.getId(), ex); 244 } 245 } 246 } 247 248 } 249 250 private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException, 251 CommandException { 252 if (CoordList != null) { 253 Configuration conf = Services.get().getConf(); 254 boolean backwardSupportForCoordStatus = conf.getBoolean(CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false); 255 256 for (CoordinatorJobBean coordJob : CoordList) { 257 try { 258 // if namespace 0.1 is used and backward support is true, then ignore this coord job 259 if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null 260 && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) { 261 continue; 262 } 263 String jobId = coordJob.getId(); 264 Job.Status[] coordStatus = new Job.Status[1]; 265 coordStatus[0] = coordJob.getStatus(); 266 //Get count of Coordinator actions with pending true 267 boolean isPending = false; 268 int count = jpaService.execute(new CoordJobGetPendingActionsCountJPAExecutor(jobId)); 269 if (count > 0) { 270 isPending = true; 271 } 272 // Get status of Coordinator actions 273 List<CoordinatorAction.Status> coordActionStatusList = jpaService 274 .execute(new CoordJobGetActionsStatusJPAExecutor(jobId)); 275 HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>(); 276 277 for (CoordinatorAction.Status status : coordActionStatusList) { 278 int counter = 0; 279 if (coordActionStatus.containsKey(status)) { 280 counter = coordActionStatus.get(status) + 1; 281 } 282 else { 283 ++counter; 284 } 285 coordActionStatus.put(status, counter); 286 } 287 288 int nonPendingCoordActionsCount = coordActionStatusList.size(); 289 boolean isDoneMaterialization = coordJob.isDoneMaterialization(); 290 if ((isDoneMaterialization || coordStatus[0] == Job.Status.FAILED || coordStatus[0] == Job.Status.KILLED) 291 && checkCoordTerminalStatus(coordActionStatus, nonPendingCoordActionsCount, 292 coordStatus, isDoneMaterialization)) { 293 LOG.info("Set coordinator job [" + jobId + "] status to '" + coordStatus[0].toString() 294 + "' from '" + coordJob.getStatus() + "'"); 295 updateCoordJob(isPending, coordJob, coordStatus[0]); 296 } 297 else if (checkCoordPausedStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) { 298 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString() 299 + "' from '" + coordJob.getStatus() + "'"); 300 updateCoordJob(isPending, coordJob, coordStatus[0]); 301 } 302 else if(checkCoordSuspendStatus( coordActionStatus, nonPendingCoordActionsCount, coordStatus, coordJob.isDoneMaterialization(), isPending)) { 303 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString() 304 + "' from '" + coordJob.getStatus() + "'"); 305 updateCoordJob(isPending, coordJob, coordStatus[0]); 306 } 307 else if (checkCoordRunningStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) { 308 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString() 309 + "' from '" + coordJob.getStatus() + "'"); 310 updateCoordJob(isPending, coordJob, coordStatus[0]); 311 } 312 else { 313 checkCoordPending(isPending, coordJob, true); 314 } 315 } 316 catch (Exception ex) { 317 LOG.error("Exception happened during aggregate coordinator job's status, job = " 318 + coordJob.getId(), ex); 319 } 320 } 321 322 } 323 } 324 325 private boolean checkTerminalStatus(HashMap<Job.Status, Integer> bundleActionStatus, 326 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { 327 boolean ret = false; 328 int totalValuesSucceed = 0; 329 if (bundleActionStatus.containsKey(Job.Status.SUCCEEDED)) { 330 totalValuesSucceed = bundleActionStatus.get(Job.Status.SUCCEEDED); 331 } 332 int totalValuesFailed = 0; 333 if (bundleActionStatus.containsKey(Job.Status.FAILED)) { 334 totalValuesFailed = bundleActionStatus.get(Job.Status.FAILED); 335 } 336 int totalValuesKilled = 0; 337 if (bundleActionStatus.containsKey(Job.Status.KILLED)) { 338 totalValuesKilled = bundleActionStatus.get(Job.Status.KILLED); 339 } 340 341 int totalValuesDoneWithError = 0; 342 if (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)) { 343 totalValuesDoneWithError = bundleActionStatus.get(Job.Status.DONEWITHERROR); 344 } 345 346 if (bundleActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesDoneWithError)) { 347 // If all the bundle actions are succeeded then bundle job should be succeeded. 348 if (bundleActions.size() == totalValuesSucceed) { 349 bundleStatus[0] = Job.Status.SUCCEEDED; 350 ret = true; 351 } 352 else if (bundleActions.size() == totalValuesKilled) { 353 // If all the bundle actions are KILLED then bundle job should be KILLED. 354 bundleStatus[0] = Job.Status.KILLED; 355 ret = true; 356 } 357 else if (bundleActions.size() == totalValuesFailed) { 358 // If all the bundle actions are FAILED then bundle job should be FAILED. 359 bundleStatus[0] = Job.Status.FAILED; 360 ret = true; 361 } 362 else { 363 bundleStatus[0] = Job.Status.DONEWITHERROR; 364 ret = true; 365 } 366 } 367 return ret; 368 } 369 370 private boolean checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 371 int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization) { 372 boolean ret = false; 373 int totalValuesSucceed = 0; 374 if (coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) { 375 totalValuesSucceed = coordActionStatus.get(CoordinatorAction.Status.SUCCEEDED); 376 } 377 int totalValuesFailed = 0; 378 if (coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)) { 379 totalValuesFailed = coordActionStatus.get(CoordinatorAction.Status.FAILED); 380 } 381 int totalValuesKilled = 0; 382 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) { 383 totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED); 384 } 385 386 int totalValuesTimeOut = 0; 387 if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) { 388 totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT); 389 } 390 391 if (coordActionsCount == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut)) { 392 // If all the coordinator actions are succeeded then coordinator job should be succeeded. 393 if (coordActionsCount == totalValuesSucceed && isDoneMaterialization) { 394 coordStatus[0] = Job.Status.SUCCEEDED; 395 ret = true; 396 } 397 else if (coordActionsCount == totalValuesKilled) { 398 // If all the coordinator actions are KILLED then coordinator job should be KILLED. 399 coordStatus[0] = Job.Status.KILLED; 400 ret = true; 401 } 402 else if (coordActionsCount == totalValuesFailed) { 403 // If all the coordinator actions are FAILED then coordinator job should be FAILED. 404 coordStatus[0] = Job.Status.FAILED; 405 ret = true; 406 } 407 else { 408 coordStatus[0] = Job.Status.DONEWITHERROR; 409 ret = true; 410 } 411 } 412 return ret; 413 } 414 415 private boolean checkPrepStatus(HashMap<Job.Status, Integer> bundleActionStatus, 416 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { 417 boolean ret = false; 418 if (bundleActionStatus.containsKey(Job.Status.PREP)) { 419 // If all the bundle actions are PREP then bundle job should be RUNNING. 420 if (bundleActions.size() > bundleActionStatus.get(Job.Status.PREP)) { 421 bundleStatus[0] = Job.Status.RUNNING; 422 ret = true; 423 } 424 } 425 return ret; 426 } 427 428 private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus, 429 List<BundleActionBean> bundleActions, Job.Status[] bundleJobStatus) { 430 boolean ret = false; 431 432 // TODO - When bottom up cmds are allowed to change the status of parent job, 433 // if none of the bundle actions are in paused or pausedwitherror, the function should return 434 // false 435 436 // top down 437 // If the bundle job is PAUSED or PAUSEDINERROR and no children are in error 438 // state, then job should be PAUSED otherwise it should be pausedwitherror 439 if (bundleJobStatus[0] == Job.Status.PAUSED || bundleJobStatus[0] == Job.Status.PAUSEDWITHERROR) { 440 if (bundleActionStatus.containsKey(Job.Status.KILLED) 441 || bundleActionStatus.containsKey(Job.Status.FAILED) 442 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) 443 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR) 444 || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR) 445 || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) { 446 bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR; 447 } 448 else { 449 bundleJobStatus[0] = Job.Status.PAUSED; 450 } 451 ret = true; 452 } 453 454 // bottom up; check the status of parent through their children 455 else if (bundleActionStatus.containsKey(Job.Status.PAUSED) 456 && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED))) { 457 bundleJobStatus[0] = Job.Status.PAUSED; 458 ret = true; 459 } 460 else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) { 461 int pausedActions = bundleActionStatus.containsKey(Job.Status.PAUSED) ? bundleActionStatus 462 .get(Job.Status.PAUSED) : 0; 463 if (bundleActions.size() == pausedActions + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR)) { 464 bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR; 465 ret = true; 466 } 467 } 468 else { 469 ret = false; 470 } 471 return ret; 472 } 473 474 475 private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus, 476 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus, boolean isPending) { 477 boolean ret = false; 478 479 // TODO - When bottom up cmds are allowed to change the status of parent job, 480 // if none of the bundle actions are in suspended or suspendedwitherror, the function should return 481 // false 482 483 // top down 484 // if job is suspended 485 if (bundleStatus[0] == Job.Status.SUSPENDED 486 || bundleStatus[0] == Job.Status.SUSPENDEDWITHERROR) { 487 if (bundleActionStatus.containsKey(Job.Status.KILLED) 488 || bundleActionStatus.containsKey(Job.Status.FAILED) 489 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) 490 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR) 491 || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) { 492 bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR; 493 } 494 else { 495 bundleStatus[0] = Job.Status.SUSPENDED; 496 } 497 ret =true; 498 } 499 500 // bottom up 501 // Update status of parent from the status of its children 502 else if (!isPending && bundleActionStatus.containsKey(Job.Status.SUSPENDED) 503 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) { 504 int succeededActions = bundleActionStatus.containsKey(Job.Status.SUCCEEDED) ? bundleActionStatus 505 .get(Job.Status.SUCCEEDED) : 0; 506 int killedActions = bundleActionStatus.containsKey(Job.Status.KILLED) ? bundleActionStatus 507 .get(Job.Status.KILLED) : 0; 508 int failedActions = bundleActionStatus.containsKey(Job.Status.FAILED) ? bundleActionStatus 509 .get(Job.Status.FAILED) : 0; 510 int doneWithErrorActions = bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) ? bundleActionStatus 511 .get(Job.Status.DONEWITHERROR) : 0; 512 513 if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions) { 514 bundleStatus[0] = Job.Status.SUSPENDED; 515 ret = true; 516 } 517 else if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR) 518 + bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions + killedActions + failedActions + doneWithErrorActions) { 519 bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR; 520 ret = true; 521 } 522 } 523 return ret; 524 525 } 526 527 private boolean checkCoordPausedStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 528 int coordActionsCount, Job.Status[] coordStatus){ 529 boolean ret = false; 530 if (coordStatus[0].equals(Job.Status.PAUSED) || coordStatus[0].equals(Job.Status.PAUSEDWITHERROR)) { 531 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) 532 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) 533 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) { 534 coordStatus[0] = Job.Status.PAUSEDWITHERROR; 535 } 536 else { 537 coordStatus[0] = Job.Status.PAUSED; 538 } 539 ret = true; 540 } 541 return ret; 542 } 543 private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 544 int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization, boolean isPending) { 545 boolean ret = false; 546 547 // TODO - When bottom up cmds are allowed to change the status of parent job 548 //if none of the coord actions are in suspended or suspendedwitherror and materialization done is false 549 //,then the function should return 550 // false 551 552 // top down 553 // check for children only when parent is suspended 554 if (coordStatus[0] == Job.Status.SUSPENDED || coordStatus[0] == Job.Status.SUSPENDEDWITHERROR) { 555 556 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) 557 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) 558 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) { 559 coordStatus[0] = Job.Status.SUSPENDEDWITHERROR; 560 } 561 else { 562 coordStatus[0] = Job.Status.SUSPENDED; 563 } 564 ret = true; 565 } 566 // bottom up 567 // look for children to check the parent's status only if materialization is 568 // done and all actions are non-pending 569 else if (isDoneMaterialization && !isPending && coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) { 570 int succeededActions = coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED) ? coordActionStatus 571 .get(CoordinatorAction.Status.SUCCEEDED) : 0; 572 int killedActions = coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) ? coordActionStatus 573 .get(CoordinatorAction.Status.KILLED) : 0; 574 int failedActions = coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) ? coordActionStatus 575 .get(CoordinatorAction.Status.FAILED) : 0; 576 int timedoutActions = coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) ? coordActionStatus 577 .get(CoordinatorAction.Status.TIMEDOUT) : 0; 578 579 if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) + succeededActions) { 580 coordStatus[0] = Job.Status.SUSPENDED; 581 ret = true; 582 } 583 else if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) 584 + succeededActions + killedActions + failedActions + timedoutActions) { 585 coordStatus[0] = Job.Status.SUSPENDEDWITHERROR; 586 ret = true; 587 } 588 } 589 return ret; 590 } 591 592 private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 593 int coordActionsCount, Job.Status[] coordStatus) { 594 boolean ret = false; 595 if (coordStatus[0] != Job.Status.PREP) { 596 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) 597 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) 598 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) { 599 coordStatus[0] = Job.Status.RUNNINGWITHERROR; 600 } 601 else { 602 coordStatus[0] = Job.Status.RUNNING; 603 } 604 ret = true; 605 } 606 return ret; 607 } 608 609 private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus, 610 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { 611 boolean ret = false; 612 if (bundleStatus[0] != Job.Status.PREP) { 613 if (bundleActionStatus.containsKey(Job.Status.FAILED) 614 || bundleActionStatus.containsKey(Job.Status.KILLED) 615 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) 616 || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) { 617 bundleStatus[0] = Job.Status.RUNNINGWITHERROR; 618 } 619 else { 620 bundleStatus[0] = Job.Status.RUNNING; 621 } 622 ret = true; 623 } 624 return ret; 625 626 } 627 628 private void updateBundleJob(boolean isPending, BundleJobBean bundleJob, Job.Status bundleStatus) 629 throws JPAExecutorException { 630 String jobId = bundleJob.getId(); 631 // Update the Bundle Job 632 // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and 633 // PAUSEDWITHERROR is not supported 634 bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus)); 635 if (isPending) { 636 bundleJob.setPending(); 637 LOG.info("Bundle job [" + jobId + "] Pending set to TRUE"); 638 } 639 else { 640 bundleJob.resetPending(); 641 LOG.info("Bundle job [" + jobId + "] Pending set to FALSE"); 642 } 643 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob)); 644 } 645 646 private void updateCoordJob(boolean isPending, CoordinatorJobBean coordJob, Job.Status coordStatus) 647 throws JPAExecutorException, CommandException { 648 Job.Status prevStatus = coordJob.getStatus(); 649 // Update the Coord Job 650 if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED 651 || coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) { 652 if (coordStatus == Job.Status.SUSPENDED || coordStatus == Job.Status.SUSPENDEDWITHERROR) { 653 LOG.info("Coord Job [" + coordJob.getId() 654 + "] status to "+ coordStatus +" can not be updated as its already in Terminal state"); 655 return; 656 } 657 } 658 659 checkCoordPending(isPending, coordJob, false); 660 // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and PAUSEDWITHERROR is 661 // not supported 662 coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(coordStatus)); 663 // Backward support when coordinator namespace is 0.1 664 coordJob.setStatus(StatusUtils.getStatus(coordJob)); 665 coordJob.setLastModifiedTime(new Date()); 666 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 667 // update bundle action only when status changes in coord job 668 if (coordJob.getBundleId() != null) { 669 if (!prevStatus.equals(coordJob.getStatus())) { 670 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 671 bundleStatusUpdate.call(); 672 } 673 } 674 } 675 676 private void checkCoordPending(boolean isPending, CoordinatorJobBean coordJob, boolean saveToDB) throws JPAExecutorException { 677 // Checking the coordinator pending should be updated or not 678 if (isPending) { 679 coordJob.setPending(); 680 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to TRUE"); 681 } 682 else { 683 coordJob.resetPending(); 684 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to FALSE"); 685 } 686 687 if (saveToDB) { 688 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 689 } 690 } 691 692 /** 693 * Aggregate coordinator actions' status to coordinator jobs 694 * 695 * @throws JPAExecutorException thrown if failed in db updates or retrievals 696 * @throws CommandException thrown if failed to run commands 697 */ 698 private void coordTransit() throws JPAExecutorException, CommandException { 699 List<CoordinatorJobBean> pendingJobCheckList = null; 700 if (lastInstanceStartTime == null) { 701 LOG.info("Running coordinator status service first instance"); 702 // this is the first instance, we need to check for all pending jobs; 703 pendingJobCheckList = jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit)); 704 } 705 else { 706 LOG.info("Running coordinator status service from last instance time = " 707 + DateUtils.formatDateOozieTZ(lastInstanceStartTime)); 708 // this is not the first instance, we should only check jobs 709 // that have actions been 710 // updated >= start time of last service run; 711 List<String> coordJobIdList = jpaService 712 .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime)); 713 Set<String> coordIds = new HashSet<String>(); 714 for (String coordJobId : coordJobIdList) { 715 coordIds.add(coordJobId); 716 } 717 pendingJobCheckList = new ArrayList<CoordinatorJobBean>(); 718 for (String coordId : coordIds.toArray(new String[coordIds.size()])) { 719 CoordinatorJobBean coordJob; 720 try{ 721 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId)); 722 } 723 catch (JPAExecutorException jpaee) { 724 if (jpaee.getErrorCode().equals(ErrorCode.E0604)) { 725 LOG.warn("Exception happened during StatusTransitRunnable; Coordinator Job doesn't exist", jpaee); 726 continue; 727 } else { 728 throw jpaee; 729 } 730 } 731 // Running coord job might have pending false 732 Job.Status coordJobStatus = coordJob.getStatus(); 733 if (coordJob.isPending() || coordJobStatus.equals(Job.Status.PAUSED) 734 || coordJobStatus.equals(Job.Status.RUNNING) 735 || coordJobStatus.equals(Job.Status.RUNNINGWITHERROR) 736 || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) { 737 pendingJobCheckList.add(coordJob); 738 } 739 } 740 } 741 aggregateCoordJobsStatus(pendingJobCheckList); 742 } 743 } 744 745 /** 746 * Initializes the {@link StatusTransitService}. 747 * 748 * @param services services instance. 749 */ 750 @Override 751 public void init(Services services) { 752 Configuration conf = services.getConf(); 753 Runnable stateTransitRunnable = new StatusTransitRunnable(); 754 services.get(SchedulerService.class).schedule(stateTransitRunnable, 10, 755 conf.getInt(CONF_STATUSTRANSIT_INTERVAL, 60), SchedulerService.Unit.SEC); 756 } 757 758 /** 759 * Destroy the StateTransit Jobs Service. 760 */ 761 @Override 762 public void destroy() { 763 } 764 765 /** 766 * Return the public interface for the purge jobs service. 767 * 768 * @return {@link StatusTransitService}. 769 */ 770 @Override 771 public Class<? extends Service> getInterface() { 772 return StatusTransitService.class; 773 } 774 } 775