001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.util.ArrayList; 021 import java.util.Date; 022 import java.util.HashMap; 023 import java.util.HashSet; 024 import java.util.List; 025 import java.util.Set; 026 027 import org.apache.hadoop.conf.Configuration; 028 import org.apache.oozie.BundleActionBean; 029 import org.apache.oozie.BundleJobBean; 030 import org.apache.oozie.CoordinatorActionBean; 031 import org.apache.oozie.CoordinatorJobBean; 032 import org.apache.oozie.ErrorCode; 033 import org.apache.oozie.client.CoordinatorAction; 034 import org.apache.oozie.client.Job; 035 import org.apache.oozie.command.CommandException; 036 import org.apache.oozie.command.bundle.BundleKillXCommand; 037 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 038 import org.apache.oozie.executor.jpa.BundleActionsGetByLastModifiedTimeJPAExecutor; 039 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor; 040 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 041 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor; 042 import org.apache.oozie.executor.jpa.BundleJobsGetPendingJPAExecutor; 043 import org.apache.oozie.executor.jpa.BundleJobsGetRunningJPAExecutor; 044 import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor; 045 import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusByPendingFalseJPAExecutor; 046 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 047 import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor; 048 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor; 049 import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor; 050 import org.apache.oozie.executor.jpa.JPAExecutorException; 051 import org.apache.oozie.util.DateUtils; 052 import org.apache.oozie.util.MemoryLocks; 053 import org.apache.oozie.util.StatusUtils; 054 import org.apache.oozie.util.XLog; 055 056 /** 057 * StateTransitService is scheduled to run at the configured interval. 058 * <p/> 059 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 (job 060 * done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to 061 * SUCCEEDED. 062 */ 063 public class StatusTransitService implements Service { 064 public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService."; 065 public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval"; 066 public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status"; 067 private static int limit = -1; 068 private static Date lastInstanceStartTime = null; 069 private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class); 070 071 /** 072 * StateTransitRunnable is the runnable which is scheduled to run at the configured interval. 073 * <p/> 074 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 075 * (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to 076 * SUCCEEDED. 077 */ 078 static class StatusTransitRunnable implements Runnable { 079 private JPAService jpaService = null; 080 private MemoryLocks.LockToken lock; 081 082 public StatusTransitRunnable() { 083 jpaService = Services.get().get(JPAService.class); 084 if (jpaService == null) { 085 LOG.error("Missing JPAService"); 086 } 087 } 088 089 @Override 090 public void run() { 091 try { 092 Date curDate = new Date(); // records the start time of this service run; 093 094 // first check if there is some other instance running; 095 lock = Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(), 096 lockTimeout); 097 if (lock == null) { 098 LOG.info("This StatusTransitService instance" 099 + " will not run since there is already an instance running"); 100 } 101 else { 102 LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName()); 103 // running coord jobs transit service 104 coordTransit(); 105 // running bundle jobs transit service 106 bundleTransit(); 107 108 lastInstanceStartTime = curDate; 109 } 110 } 111 catch (Exception ex) { 112 LOG.warn("Exception happened during StatusTransitRunnable ", ex); 113 } 114 finally { 115 // release lock; 116 if (lock != null) { 117 lock.release(); 118 LOG.info("Released lock for [{0}]", StatusTransitService.class.getName()); 119 } 120 } 121 } 122 123 /** 124 * Aggregate bundle actions' status to bundle jobs 125 * 126 * @throws JPAExecutorException thrown if failed in db updates or retrievals 127 * @throws CommandException thrown if failed to run commands 128 */ 129 private void bundleTransit() throws JPAExecutorException, CommandException { 130 List<BundleJobBean> pendingJobCheckList = null; 131 List<BundleJobBean> runningJobCheckList = null; 132 List<List<BundleJobBean>> bundleLists = new ArrayList<List<BundleJobBean>>(); 133 if (lastInstanceStartTime == null) { 134 LOG.info("Running bundle status service first instance"); 135 // this is the first instance, we need to check for all pending jobs; 136 pendingJobCheckList = jpaService.execute(new BundleJobsGetPendingJPAExecutor(limit)); 137 runningJobCheckList = jpaService.execute(new BundleJobsGetRunningJPAExecutor(limit)); 138 bundleLists.add(pendingJobCheckList); 139 bundleLists.add(runningJobCheckList); 140 } 141 else { 142 LOG.info("Running bundle status service from last instance time = " 143 + DateUtils.convertDateToString(lastInstanceStartTime)); 144 // this is not the first instance, we should only check jobs that have actions been 145 // updated >= start time of last service run; 146 List<BundleActionBean> actionList = jpaService 147 .execute(new BundleActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime)); 148 Set<String> bundleIds = new HashSet<String>(); 149 for (BundleActionBean action : actionList) { 150 bundleIds.add(action.getBundleId()); 151 } 152 pendingJobCheckList = new ArrayList<BundleJobBean>(); 153 for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) { 154 BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); 155 // Running bundle job might have pending false 156 if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING)) { 157 pendingJobCheckList.add(bundle); 158 } 159 } 160 runningJobCheckList = pendingJobCheckList; 161 bundleLists.add(pendingJobCheckList); 162 } 163 aggregateBundleJobsStatus(bundleLists); 164 } 165 166 private void aggregateBundleJobsStatus(List<List<BundleJobBean>> bundleLists) throws JPAExecutorException, 167 CommandException { 168 if (bundleLists != null) { 169 for (List<BundleJobBean> listBundleBean : bundleLists) { 170 for (BundleJobBean bundleJob : listBundleBean) { 171 try { 172 String jobId = bundleJob.getId(); 173 Job.Status[] bundleStatus = new Job.Status[1]; 174 bundleStatus[0] = bundleJob.getStatus(); 175 List<BundleActionBean> bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor( 176 jobId)); 177 HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>(); 178 boolean foundPending = false; 179 for (BundleActionBean bAction : bundleActions) { 180 if (!bAction.isPending()) { 181 int counter = 0; 182 if (bundleActionStatus.containsKey(bAction.getStatus())) { 183 counter = bundleActionStatus.get(bAction.getStatus()) + 1; 184 } 185 else { 186 ++counter; 187 } 188 bundleActionStatus.put(bAction.getStatus(), counter); 189 if (bAction.getCoordId() == null 190 && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED)) { 191 (new BundleKillXCommand(jobId)).call(); 192 LOG.info("Bundle job ["+ jobId 193 + "] has been killed since one of its coordinator job failed submission."); 194 } 195 } 196 else { 197 foundPending = true; 198 break; 199 } 200 } 201 202 if (foundPending) { 203 continue; 204 } 205 206 if (checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) { 207 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 208 + "' from '" + bundleJob.getStatus() + "'"); 209 updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]); 210 } 211 else if (checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) { 212 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 213 + "' from '" + bundleJob.getStatus() + "'"); 214 updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]); 215 } 216 else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) { 217 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 218 + "' from '" + bundleJob.getStatus() + "'"); 219 updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]); 220 } 221 else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus)) { 222 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 223 + "' from '" + bundleJob.getStatus() + "'"); 224 updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]); 225 } 226 else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) { 227 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 228 + "' from '" + bundleJob.getStatus() + "'"); 229 updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]); 230 } 231 } 232 catch (Exception ex) { 233 LOG.error("Exception happened during aggregate bundle job's status, job = " 234 + bundleJob.getId(), ex); 235 } 236 } 237 } 238 } 239 } 240 241 private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException, 242 CommandException { 243 if (CoordList != null) { 244 Configuration conf = Services.get().getConf(); 245 boolean backwardSupportForCoordStatus = conf.getBoolean(CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false); 246 247 for (CoordinatorJobBean coordJob : CoordList) { 248 try { 249 // if namespace 0.1 is used and backward support is true, then ignore this coord job 250 if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null 251 && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) { 252 continue; 253 } 254 String jobId = coordJob.getId(); 255 Job.Status[] coordStatus = new Job.Status[1]; 256 coordStatus[0] = coordJob.getStatus(); 257 //Get count of Coordinator actions with pending true 258 int count = jpaService.execute(new CoordJobGetPendingActionsCountJPAExecutor(jobId)); 259 if (count > 0) { 260 continue; 261 } 262 // Code below this is executed only when none of the Coordinator actions are pending 263 // Get status of Coordinator actions with pending false 264 List<CoordinatorAction.Status> coordActionStatusList = jpaService 265 .execute(new CoordJobGetActionsStatusByPendingFalseJPAExecutor(jobId)); 266 HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>(); 267 268 for (CoordinatorAction.Status status : coordActionStatusList) { 269 int counter = 0; 270 if (coordActionStatus.containsKey(status)) { 271 counter = coordActionStatus.get(status) + 1; 272 } 273 else { 274 ++counter; 275 } 276 coordActionStatus.put(status, counter); 277 } 278 279 int nonPendingCoordActionsCount = coordActionStatusList.size(); 280 if (coordJob.isDoneMaterialization() 281 && checkCoordTerminalStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) { 282 LOG.info("Set coordinator job [" + jobId + "] status to '" + coordStatus[0].toString() 283 + "' from '" + coordJob.getStatus() + "'"); 284 updateCoordJob(coordActionStatus, nonPendingCoordActionsCount, coordJob, coordStatus[0]); 285 } 286 else if (coordJob.isDoneMaterialization() 287 && checkCoordSuspendStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) { 288 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString() 289 + "' from '" + coordJob.getStatus() + "'"); 290 updateCoordJob(coordActionStatus, nonPendingCoordActionsCount, coordJob, coordStatus[0]); 291 } 292 else if (checkCoordRunningStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) { 293 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString() 294 + "' from '" + coordJob.getStatus() + "'"); 295 updateCoordJob(coordActionStatus, nonPendingCoordActionsCount, coordJob, coordStatus[0]); 296 } 297 // checking pending flag for job when user killed or suspended the job 298 else { 299 checkCoordPending(coordActionStatus, nonPendingCoordActionsCount, coordJob, true); 300 } 301 } 302 catch (Exception ex) { 303 LOG.error("Exception happened during aggregate coordinator job's status, job = " 304 + coordJob.getId(), ex); 305 } 306 } 307 308 } 309 } 310 311 private boolean checkTerminalStatus(HashMap<Job.Status, Integer> bundleActionStatus, 312 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { 313 boolean ret = false; 314 int totalValuesSucceed = 0; 315 if (bundleActionStatus.containsKey(Job.Status.SUCCEEDED)) { 316 totalValuesSucceed = bundleActionStatus.get(Job.Status.SUCCEEDED); 317 } 318 int totalValuesFailed = 0; 319 if (bundleActionStatus.containsKey(Job.Status.FAILED)) { 320 totalValuesFailed = bundleActionStatus.get(Job.Status.FAILED); 321 } 322 int totalValuesKilled = 0; 323 if (bundleActionStatus.containsKey(Job.Status.KILLED)) { 324 totalValuesKilled = bundleActionStatus.get(Job.Status.KILLED); 325 } 326 327 int totalValuesDoneWithError = 0; 328 if (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)) { 329 totalValuesDoneWithError = bundleActionStatus.get(Job.Status.DONEWITHERROR); 330 } 331 332 if (bundleActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesDoneWithError)) { 333 // If all the bundle actions are succeeded then bundle job should be succeeded. 334 if (bundleActions.size() == totalValuesSucceed) { 335 bundleStatus[0] = Job.Status.SUCCEEDED; 336 ret = true; 337 } 338 else if (bundleActions.size() == totalValuesKilled) { 339 // If all the bundle actions are KILLED then bundle job should be KILLED. 340 bundleStatus[0] = Job.Status.KILLED; 341 ret = true; 342 } 343 else if (bundleActions.size() == totalValuesFailed) { 344 // If all the bundle actions are FAILED then bundle job should be FAILED. 345 bundleStatus[0] = Job.Status.FAILED; 346 ret = true; 347 } 348 else { 349 bundleStatus[0] = Job.Status.DONEWITHERROR; 350 ret = true; 351 } 352 } 353 return ret; 354 } 355 356 private boolean checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 357 int coordActionsCount, Job.Status[] coordStatus) { 358 boolean ret = false; 359 int totalValuesSucceed = 0; 360 if (coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) { 361 totalValuesSucceed = coordActionStatus.get(CoordinatorAction.Status.SUCCEEDED); 362 } 363 int totalValuesFailed = 0; 364 if (coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)) { 365 totalValuesFailed = coordActionStatus.get(CoordinatorAction.Status.FAILED); 366 } 367 int totalValuesKilled = 0; 368 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) { 369 totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED); 370 } 371 372 int totalValuesTimeOut = 0; 373 if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) { 374 totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT); 375 } 376 377 if (coordActionsCount == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut)) { 378 // If all the coordinator actions are succeeded then coordinator job should be succeeded. 379 if (coordActionsCount == totalValuesSucceed) { 380 coordStatus[0] = Job.Status.SUCCEEDED; 381 ret = true; 382 } 383 else if (coordActionsCount == totalValuesKilled) { 384 // If all the coordinator actions are KILLED then coordinator job should be KILLED. 385 coordStatus[0] = Job.Status.KILLED; 386 ret = true; 387 } 388 else if (coordActionsCount == totalValuesFailed) { 389 // If all the coordinator actions are FAILED then coordinator job should be FAILED. 390 coordStatus[0] = Job.Status.FAILED; 391 ret = true; 392 } 393 else { 394 coordStatus[0] = Job.Status.DONEWITHERROR; 395 ret = true; 396 } 397 } 398 return ret; 399 } 400 401 private boolean checkPrepStatus(HashMap<Job.Status, Integer> bundleActionStatus, 402 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { 403 boolean ret = false; 404 if (bundleActionStatus.containsKey(Job.Status.PREP)) { 405 // If all the bundle actions are PREP then bundle job should be RUNNING. 406 if (bundleActions.size() > bundleActionStatus.get(Job.Status.PREP)) { 407 bundleStatus[0] = Job.Status.RUNNING; 408 ret = true; 409 } 410 } 411 return ret; 412 } 413 414 private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus, 415 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { 416 boolean ret = false; 417 if (bundleActionStatus.containsKey(Job.Status.PAUSED)) { 418 if (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED)) { 419 bundleStatus[0] = Job.Status.PAUSED; 420 ret = true; 421 } 422 else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR) 423 && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED) 424 + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR))) { 425 // bundleStatus = Job.Status.PAUSEDWITHERROR; 426 // We need to change this to PAUSEDWITHERROR in future when we add this to coordinator 427 bundleStatus[0] = Job.Status.PAUSED; 428 ret = true; 429 } 430 } 431 return ret; 432 } 433 434 private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus, 435 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { 436 boolean ret = false; 437 if (bundleActionStatus.containsKey(Job.Status.SUSPENDED)) { 438 if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED)) { 439 bundleStatus[0] = Job.Status.SUSPENDED; 440 ret = true; 441 } 442 else if (bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR) 443 && (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED) 444 + bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR))) { 445 // bundleStatus = Job.Status.SUSPENDEDWITHERROR; 446 // We need to change this to SUSPENDEDWITHERROR in future when we add this to coordinator 447 bundleStatus[0] = Job.Status.SUSPENDED; 448 ret = true; 449 } 450 } 451 return ret; 452 } 453 454 private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 455 int coordActionsCount, Job.Status[] coordStatus) { 456 boolean ret = false; 457 if (coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) { 458 if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)) { 459 coordStatus[0] = Job.Status.SUSPENDED; 460 ret = true; 461 } 462 } 463 return ret; 464 } 465 466 private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 467 int coordActionsCount, Job.Status[] coordStatus) { 468 boolean ret = false; 469 if (coordActionStatus.containsKey(CoordinatorAction.Status.RUNNING)) { 470 // If all the bundle actions are succeeded then bundle job should be succeeded. 471 if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.RUNNING)) { 472 coordStatus[0] = Job.Status.RUNNING; 473 ret = true; 474 } 475 else if (coordActionStatus.get(CoordinatorAction.Status.RUNNING) > 0) { 476 if ((coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) && coordActionStatus.get(CoordinatorAction.Status.FAILED) > 0) 477 || (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) && coordActionStatus 478 .get(CoordinatorAction.Status.KILLED) > 0) 479 || (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) && coordActionStatus 480 .get(CoordinatorAction.Status.TIMEDOUT) > 0)) { 481 // coordStatus = Job.Status.RUNNINGWITHERROR; 482 // We need to change this to RUNNINGWIHERROR in future when we add this to coordinator 483 coordStatus[0] = Job.Status.RUNNING; 484 ret = true; 485 } 486 } 487 } 488 return ret; 489 } 490 491 private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus, 492 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { 493 boolean ret = false; 494 if (bundleActionStatus.containsKey(Job.Status.RUNNING)) { 495 // If all the bundle actions are succeeded then bundle job should be succeeded. 496 if (bundleActions.size() == bundleActionStatus.get(Job.Status.RUNNING)) { 497 bundleStatus[0] = Job.Status.RUNNING; 498 ret = true; 499 } 500 else if (bundleActionStatus.get(Job.Status.RUNNING) > 0) { 501 if ((bundleActionStatus.containsKey(Job.Status.FAILED) && bundleActionStatus.get(Job.Status.FAILED) > 0) 502 || (bundleActionStatus.containsKey(Job.Status.KILLED) && bundleActionStatus 503 .get(Job.Status.KILLED) > 0) 504 || (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) && bundleActionStatus 505 .get(Job.Status.DONEWITHERROR) > 0) 506 || (bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR) && bundleActionStatus 507 .get(Job.Status.RUNNINGWITHERROR) > 0)) { 508 // bundleStatus = Job.Status.RUNNINGWITHERROR; 509 // We need to change this to RUNNINGWIHERROR in future when we add this to coordinator 510 bundleStatus[0] = Job.Status.RUNNING; 511 ret = true; 512 } 513 } 514 } 515 return ret; 516 } 517 518 private void updateBundleJob(HashMap<Job.Status, Integer> bundleActionStatus, 519 List<BundleActionBean> bundleActions, BundleJobBean bundleJob, Job.Status bundleStatus) 520 throws JPAExecutorException { 521 String jobId = bundleJob.getId(); 522 boolean pendingBundleJob = bundleJob.isPending(); 523 // Checking the bundle pending should be updated or not 524 int totalNonPendingActions = 0; 525 for (Job.Status js : bundleActionStatus.keySet()) { 526 totalNonPendingActions += bundleActionStatus.get(js); 527 } 528 529 if (totalNonPendingActions == bundleActions.size()) { 530 pendingBundleJob = false; 531 } 532 533 // Update the Bundle Job 534 bundleJob.setStatus(bundleStatus); 535 if (pendingBundleJob) { 536 bundleJob.setPending(); 537 LOG.info("Bundle job [" + jobId + "] Pending set to TRUE"); 538 } 539 else { 540 bundleJob.resetPending(); 541 LOG.info("Bundle job [" + jobId + "] Pending set to FALSE"); 542 } 543 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob)); 544 } 545 546 private void updateCoordJob(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 547 int coordActionsCount, CoordinatorJobBean coordJob, Job.Status coordStatus) 548 throws JPAExecutorException, CommandException { 549 Job.Status prevStatus = coordJob.getStatus(); 550 // Update the Coord Job 551 if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED 552 || coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) { 553 if (coordStatus == Job.Status.SUSPENDED) { 554 LOG.info("Coord Job [" + coordJob.getId() 555 + "] status can not be updated as its already in Terminal state"); 556 return; 557 } 558 } 559 560 checkCoordPending(coordActionStatus, coordActionsCount, coordJob, false); 561 coordJob.setStatus(coordStatus); 562 coordJob.setStatus(StatusUtils.getStatus(coordJob)); 563 coordJob.setLastModifiedTime(new Date()); 564 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 565 // update bundle action only when status changes in coord job 566 if (coordJob.getBundleId() != null) { 567 if (!prevStatus.equals(coordJob.getStatus())) { 568 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 569 bundleStatusUpdate.call(); 570 } 571 } 572 } 573 574 private void checkCoordPending(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 575 int coordActionsCount, CoordinatorJobBean coordJob, boolean saveToDB) throws JPAExecutorException { 576 boolean pendingCoordJob = coordJob.isPending(); 577 // Checking the coordinator pending should be updated or not 578 int totalNonPendingActions = 0; 579 for (CoordinatorAction.Status js : coordActionStatus.keySet()) { 580 totalNonPendingActions += coordActionStatus.get(js); 581 } 582 583 if (totalNonPendingActions == coordActionsCount) { 584 pendingCoordJob = false; 585 } 586 587 if (pendingCoordJob) { 588 coordJob.setPending(); 589 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to TRUE"); 590 } 591 else { 592 coordJob.resetPending(); 593 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to FALSE"); 594 } 595 596 if (saveToDB) { 597 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 598 } 599 } 600 601 /** 602 * Aggregate coordinator actions' status to coordinator jobs 603 * 604 * @throws JPAExecutorException thrown if failed in db updates or retrievals 605 * @throws CommandException thrown if failed to run commands 606 */ 607 private void coordTransit() throws JPAExecutorException, CommandException { 608 List<CoordinatorJobBean> pendingJobCheckList = null; 609 if (lastInstanceStartTime == null) { 610 LOG.info("Running coordinator status service first instance"); 611 // this is the first instance, we need to check for all pending jobs; 612 pendingJobCheckList = jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit)); 613 } 614 else { 615 LOG.info("Running coordinator status service from last instance time = " 616 + DateUtils.convertDateToString(lastInstanceStartTime)); 617 // this is not the first instance, we should only check jobs 618 // that have actions been 619 // updated >= start time of last service run; 620 List<String> coordJobIdList = jpaService 621 .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime)); 622 Set<String> coordIds = new HashSet<String>(); 623 for (String coordJobId : coordJobIdList) { 624 coordIds.add(coordJobId); 625 } 626 pendingJobCheckList = new ArrayList<CoordinatorJobBean>(); 627 for (String coordId : coordIds.toArray(new String[coordIds.size()])) { 628 CoordinatorJobBean coordJob; 629 try{ 630 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId)); 631 } 632 catch (JPAExecutorException jpaee) { 633 if (jpaee.getErrorCode().equals(ErrorCode.E0604)) { 634 LOG.warn("Exception happened during StatusTransitRunnable; Coordinator Job doesn't exist", jpaee); 635 continue; 636 } else { 637 throw jpaee; 638 } 639 } 640 // Running coord job might have pending false 641 if (coordJob.isPending() || coordJob.getStatus().equals(Job.Status.RUNNING)) { 642 pendingJobCheckList.add(coordJob); 643 } 644 } 645 } 646 aggregateCoordJobsStatus(pendingJobCheckList); 647 } 648 } 649 650 /** 651 * Initializes the {@link StatusTransitService}. 652 * 653 * @param services services instance. 654 */ 655 @Override 656 public void init(Services services) { 657 Configuration conf = services.getConf(); 658 Runnable stateTransitRunnable = new StatusTransitRunnable(); 659 services.get(SchedulerService.class).schedule(stateTransitRunnable, 10, 660 conf.getInt(CONF_STATUSTRANSIT_INTERVAL, 60), SchedulerService.Unit.SEC); 661 } 662 663 /** 664 * Destroy the StateTransit Jobs Service. 665 */ 666 @Override 667 public void destroy() { 668 } 669 670 /** 671 * Return the public interface for the purge jobs service. 672 * 673 * @return {@link StatusTransitService}. 674 */ 675 @Override 676 public Class<? extends Service> getInterface() { 677 return StatusTransitService.class; 678 } 679 } 680