001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.util.ArrayList; 021 import java.util.Date; 022 import java.util.HashMap; 023 import java.util.HashSet; 024 import java.util.List; 025 import java.util.Set; 026 027 import org.apache.hadoop.conf.Configuration; 028 import org.apache.oozie.BundleActionBean; 029 import org.apache.oozie.BundleJobBean; 030 import org.apache.oozie.CoordinatorActionBean; 031 import org.apache.oozie.CoordinatorJobBean; 032 import org.apache.oozie.client.CoordinatorAction; 033 import org.apache.oozie.client.Job; 034 import org.apache.oozie.command.CommandException; 035 import org.apache.oozie.command.bundle.BundleKillXCommand; 036 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 037 import org.apache.oozie.executor.jpa.BundleActionsGetByLastModifiedTimeJPAExecutor; 038 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor; 039 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 040 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor; 041 import org.apache.oozie.executor.jpa.BundleJobsGetPendingJPAExecutor; 042 import org.apache.oozie.executor.jpa.BundleJobsGetRunningJPAExecutor; 043 import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor; 044 import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor; 045 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 046 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor; 047 import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor; 048 import org.apache.oozie.executor.jpa.JPAExecutorException; 049 import org.apache.oozie.util.DateUtils; 050 import org.apache.oozie.util.MemoryLocks; 051 import org.apache.oozie.util.StatusUtils; 052 import org.apache.oozie.util.XLog; 053 054 /** 055 * StateTransitService is scheduled to run at the configured interval. 056 * <p/> 057 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 (job 058 * done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to 059 * SUCCEEDED. 060 */ 061 public class StatusTransitService implements Service { 062 public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService."; 063 public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval"; 064 public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status"; 065 private static int limit = -1; 066 private static Date lastInstanceStartTime = null; 067 private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class); 068 069 /** 070 * StateTransitRunnable is the runnable which is scheduled to run at the configured interval. 071 * <p/> 072 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 073 * (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to 074 * SUCCEEDED. 075 */ 076 static class StatusTransitRunnable implements Runnable { 077 private JPAService jpaService = null; 078 private MemoryLocks.LockToken lock; 079 080 public StatusTransitRunnable() { 081 jpaService = Services.get().get(JPAService.class); 082 if (jpaService == null) { 083 LOG.error("Missing JPAService"); 084 } 085 } 086 087 public void run() { 088 try { 089 Date curDate = new Date(); // records the start time of this service run; 090 091 // first check if there is some other instance running; 092 lock = Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(), 093 lockTimeout); 094 if (lock == null) { 095 LOG.info("This StatusTransitService instance" 096 + " will not run since there is already an instance running"); 097 } 098 else { 099 LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName()); 100 // running coord jobs transit service 101 coordTransit(); 102 // running bundle jobs transit service 103 bundleTransit(); 104 105 lastInstanceStartTime = curDate; 106 } 107 } 108 catch (Exception ex) { 109 LOG.warn("Exception happened during StatusTransitRunnable ", ex); 110 } 111 finally { 112 // release lock; 113 if (lock != null) { 114 lock.release(); 115 LOG.info("Released lock for [{0}]", StatusTransitService.class.getName()); 116 } 117 } 118 } 119 120 /** 121 * Aggregate bundle actions' status to bundle jobs 122 * 123 * @throws JPAExecutorException thrown if failed in db updates or retrievals 124 * @throws CommandException thrown if failed to run commands 125 */ 126 private void bundleTransit() throws JPAExecutorException, CommandException { 127 List<BundleJobBean> pendingJobCheckList = null; 128 List<BundleJobBean> runningJobCheckList = null; 129 List<List<BundleJobBean>> bundleLists = new ArrayList<List<BundleJobBean>>(); 130 if (lastInstanceStartTime == null) { 131 LOG.info("Running bundle status service first instance"); 132 // this is the first instance, we need to check for all pending jobs; 133 pendingJobCheckList = jpaService.execute(new BundleJobsGetPendingJPAExecutor(limit)); 134 runningJobCheckList = jpaService.execute(new BundleJobsGetRunningJPAExecutor(limit)); 135 bundleLists.add(pendingJobCheckList); 136 bundleLists.add(runningJobCheckList); 137 } 138 else { 139 LOG.info("Running bundle status service from last instance time = " 140 + DateUtils.convertDateToString(lastInstanceStartTime)); 141 // this is not the first instance, we should only check jobs that have actions been 142 // updated >= start time of last service run; 143 List<BundleActionBean> actionList = jpaService 144 .execute(new BundleActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime)); 145 Set<String> bundleIds = new HashSet<String>(); 146 for (BundleActionBean action : actionList) { 147 bundleIds.add(action.getBundleId()); 148 } 149 pendingJobCheckList = new ArrayList<BundleJobBean>(); 150 for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) { 151 BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); 152 // Running bundle job might have pending false 153 if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING)) { 154 pendingJobCheckList.add(bundle); 155 } 156 } 157 runningJobCheckList = pendingJobCheckList; 158 bundleLists.add(pendingJobCheckList); 159 } 160 aggregateBundleJobsStatus(bundleLists); 161 } 162 163 private void aggregateBundleJobsStatus(List<List<BundleJobBean>> bundleLists) throws JPAExecutorException, 164 CommandException { 165 if (bundleLists != null) { 166 for (List<BundleJobBean> listBundleBean : bundleLists) { 167 for (BundleJobBean bundleJob : listBundleBean) { 168 try { 169 String jobId = bundleJob.getId(); 170 Job.Status[] bundleStatus = new Job.Status[1]; 171 bundleStatus[0] = bundleJob.getStatus(); 172 List<BundleActionBean> bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor( 173 jobId)); 174 HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>(); 175 boolean foundPending = false; 176 for (BundleActionBean bAction : bundleActions) { 177 if (!bAction.isPending()) { 178 int counter = 0; 179 if (bundleActionStatus.containsKey(bAction.getStatus())) { 180 counter = bundleActionStatus.get(bAction.getStatus()) + 1; 181 } 182 else { 183 ++counter; 184 } 185 bundleActionStatus.put(bAction.getStatus(), counter); 186 if (bAction.getCoordId() == null 187 && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED)) { 188 (new BundleKillXCommand(jobId)).call(); 189 LOG.info("Bundle job ["+ jobId 190 + "] has been killed since one of its coordinator job failed submission."); 191 } 192 } 193 else { 194 foundPending = true; 195 break; 196 } 197 } 198 199 if (foundPending) { 200 continue; 201 } 202 203 if (checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) { 204 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 205 + "' from '" + bundleJob.getStatus() + "'"); 206 updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]); 207 } 208 else if (checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) { 209 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 210 + "' from '" + bundleJob.getStatus() + "'"); 211 updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]); 212 } 213 else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) { 214 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 215 + "' from '" + bundleJob.getStatus() + "'"); 216 updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]); 217 } 218 else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus)) { 219 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 220 + "' from '" + bundleJob.getStatus() + "'"); 221 updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]); 222 } 223 else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) { 224 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 225 + "' from '" + bundleJob.getStatus() + "'"); 226 updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]); 227 } 228 } 229 catch (Exception ex) { 230 LOG.error("Exception happened during aggregate bundle job's status, job = " 231 + bundleJob.getId(), ex); 232 } 233 } 234 } 235 } 236 } 237 238 private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException, 239 CommandException { 240 if (CoordList != null) { 241 Configuration conf = Services.get().getConf(); 242 boolean backwardSupportForCoordStatus = conf.getBoolean(CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false); 243 244 for (CoordinatorJobBean coordJob : CoordList) { 245 try { 246 // if namespace 0.1 is used and backward support is true, then ignore this coord job 247 if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null 248 && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) { 249 continue; 250 } 251 String jobId = coordJob.getId(); 252 Job.Status[] coordStatus = new Job.Status[1]; 253 coordStatus[0] = coordJob.getStatus(); 254 List<CoordinatorActionBean> coordActions = jpaService 255 .execute(new CoordJobGetActionsJPAExecutor(jobId)); 256 HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>(); 257 boolean foundPending = false; 258 for (CoordinatorActionBean cAction : coordActions) { 259 if (!cAction.isPending()) { 260 int counter = 0; 261 if (coordActionStatus.containsKey(cAction.getStatus())) { 262 counter = coordActionStatus.get(cAction.getStatus()) + 1; 263 } 264 else { 265 ++counter; 266 } 267 coordActionStatus.put(cAction.getStatus(), counter); 268 } 269 else { 270 foundPending = true; 271 break; 272 } 273 } 274 275 if (foundPending) { 276 continue; 277 } 278 279 if (coordJob.isDoneMaterialization() 280 && checkCoordTerminalStatus(coordActionStatus, coordActions, coordStatus)) { 281 LOG.info("Set coordinator job [" + jobId + "] status to '" + coordStatus[0].toString() 282 + "' from '" + coordJob.getStatus() + "'"); 283 updateCoordJob(coordActionStatus, coordActions, coordJob, coordStatus[0]); 284 } 285 else if (coordJob.isDoneMaterialization() 286 && checkCoordSuspendStatus(coordActionStatus, coordActions, coordStatus)) { 287 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString() 288 + "' from '" + coordJob.getStatus() + "'"); 289 updateCoordJob(coordActionStatus, coordActions, coordJob, coordStatus[0]); 290 } 291 else if (checkCoordRunningStatus(coordActionStatus, coordActions, coordStatus)) { 292 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString() 293 + "' from '" + coordJob.getStatus() + "'"); 294 updateCoordJob(coordActionStatus, coordActions, coordJob, coordStatus[0]); 295 } 296 // checking pending flag for job when user killed or suspended the job 297 else { 298 checkCoordPending(coordActionStatus, coordActions, coordJob, true); 299 } 300 } 301 catch (Exception ex) { 302 LOG.error("Exception happened during aggregate coordinator job's status, job = " 303 + coordJob.getId(), ex); 304 } 305 } 306 307 } 308 } 309 310 private boolean checkTerminalStatus(HashMap<Job.Status, Integer> bundleActionStatus, 311 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { 312 boolean ret = false; 313 int totalValuesSucceed = 0; 314 if (bundleActionStatus.containsKey(Job.Status.SUCCEEDED)) { 315 totalValuesSucceed = bundleActionStatus.get(Job.Status.SUCCEEDED); 316 } 317 int totalValuesFailed = 0; 318 if (bundleActionStatus.containsKey(Job.Status.FAILED)) { 319 totalValuesFailed = bundleActionStatus.get(Job.Status.FAILED); 320 } 321 int totalValuesKilled = 0; 322 if (bundleActionStatus.containsKey(Job.Status.KILLED)) { 323 totalValuesKilled = bundleActionStatus.get(Job.Status.KILLED); 324 } 325 326 int totalValuesDoneWithError = 0; 327 if (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)) { 328 totalValuesDoneWithError = bundleActionStatus.get(Job.Status.DONEWITHERROR); 329 } 330 331 if (bundleActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesDoneWithError)) { 332 // If all the bundle actions are succeeded then bundle job should be succeeded. 333 if (bundleActions.size() == totalValuesSucceed) { 334 bundleStatus[0] = Job.Status.SUCCEEDED; 335 ret = true; 336 } 337 else if (bundleActions.size() == totalValuesKilled) { 338 // If all the bundle actions are KILLED then bundle job should be KILLED. 339 bundleStatus[0] = Job.Status.KILLED; 340 ret = true; 341 } 342 else if (bundleActions.size() == totalValuesFailed) { 343 // If all the bundle actions are FAILED then bundle job should be FAILED. 344 bundleStatus[0] = Job.Status.FAILED; 345 ret = true; 346 } 347 else { 348 bundleStatus[0] = Job.Status.DONEWITHERROR; 349 ret = true; 350 } 351 } 352 return ret; 353 } 354 355 private boolean checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 356 List<CoordinatorActionBean> coordActions, Job.Status[] coordStatus) { 357 boolean ret = false; 358 int totalValuesSucceed = 0; 359 if (coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) { 360 totalValuesSucceed = coordActionStatus.get(CoordinatorAction.Status.SUCCEEDED); 361 } 362 int totalValuesFailed = 0; 363 if (coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)) { 364 totalValuesFailed = coordActionStatus.get(CoordinatorAction.Status.FAILED); 365 } 366 int totalValuesKilled = 0; 367 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) { 368 totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED); 369 } 370 371 int totalValuesTimeOut = 0; 372 if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) { 373 totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT); 374 } 375 376 if (coordActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut)) { 377 // If all the coordinator actions are succeeded then coordinator job should be succeeded. 378 if (coordActions.size() == totalValuesSucceed) { 379 coordStatus[0] = Job.Status.SUCCEEDED; 380 ret = true; 381 } 382 else if (coordActions.size() == totalValuesKilled) { 383 // If all the coordinator actions are KILLED then coordinator job should be KILLED. 384 coordStatus[0] = Job.Status.KILLED; 385 ret = true; 386 } 387 else if (coordActions.size() == totalValuesFailed) { 388 // If all the coordinator actions are FAILED then coordinator job should be FAILED. 389 coordStatus[0] = Job.Status.FAILED; 390 ret = true; 391 } 392 else { 393 coordStatus[0] = Job.Status.DONEWITHERROR; 394 ret = true; 395 } 396 } 397 return ret; 398 } 399 400 private boolean checkPrepStatus(HashMap<Job.Status, Integer> bundleActionStatus, 401 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { 402 boolean ret = false; 403 if (bundleActionStatus.containsKey(Job.Status.PREP)) { 404 // If all the bundle actions are PREP then bundle job should be RUNNING. 405 if (bundleActions.size() > bundleActionStatus.get(Job.Status.PREP)) { 406 bundleStatus[0] = Job.Status.RUNNING; 407 ret = true; 408 } 409 } 410 return ret; 411 } 412 413 private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus, 414 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { 415 boolean ret = false; 416 if (bundleActionStatus.containsKey(Job.Status.PAUSED)) { 417 if (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED)) { 418 bundleStatus[0] = Job.Status.PAUSED; 419 ret = true; 420 } 421 else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR) 422 && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED) 423 + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR))) { 424 // bundleStatus = Job.Status.PAUSEDWITHERROR; 425 // We need to change this to PAUSEDWITHERROR in future when we add this to coordinator 426 bundleStatus[0] = Job.Status.PAUSED; 427 ret = true; 428 } 429 } 430 return ret; 431 } 432 433 private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus, 434 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { 435 boolean ret = false; 436 if (bundleActionStatus.containsKey(Job.Status.SUSPENDED)) { 437 if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED)) { 438 bundleStatus[0] = Job.Status.SUSPENDED; 439 ret = true; 440 } 441 else if (bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR) 442 && (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED) 443 + bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR))) { 444 // bundleStatus = Job.Status.SUSPENDEDWITHERROR; 445 // We need to change this to SUSPENDEDWITHERROR in future when we add this to coordinator 446 bundleStatus[0] = Job.Status.SUSPENDED; 447 ret = true; 448 } 449 } 450 return ret; 451 } 452 453 private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 454 List<CoordinatorActionBean> coordActions, Job.Status[] coordStatus) { 455 boolean ret = false; 456 if (coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) { 457 if (coordActions.size() == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)) { 458 coordStatus[0] = Job.Status.SUSPENDED; 459 ret = true; 460 } 461 } 462 return ret; 463 } 464 465 private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 466 List<CoordinatorActionBean> coordActions, Job.Status[] coordStatus) { 467 boolean ret = false; 468 if (coordActionStatus.containsKey(CoordinatorAction.Status.RUNNING)) { 469 // If all the bundle actions are succeeded then bundle job should be succeeded. 470 if (coordActions.size() == coordActionStatus.get(CoordinatorAction.Status.RUNNING)) { 471 coordStatus[0] = Job.Status.RUNNING; 472 ret = true; 473 } 474 else if (coordActionStatus.get(CoordinatorAction.Status.RUNNING) > 0) { 475 if ((coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) && coordActionStatus.get(CoordinatorAction.Status.FAILED) > 0) 476 || (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) && coordActionStatus 477 .get(CoordinatorAction.Status.KILLED) > 0) 478 || (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) && coordActionStatus 479 .get(CoordinatorAction.Status.TIMEDOUT) > 0)) { 480 // coordStatus = Job.Status.RUNNINGWITHERROR; 481 // We need to change this to RUNNINGWIHERROR in future when we add this to coordinator 482 coordStatus[0] = Job.Status.RUNNING; 483 ret = true; 484 } 485 } 486 } 487 return ret; 488 } 489 490 private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus, 491 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { 492 boolean ret = false; 493 if (bundleActionStatus.containsKey(Job.Status.RUNNING)) { 494 // If all the bundle actions are succeeded then bundle job should be succeeded. 495 if (bundleActions.size() == bundleActionStatus.get(Job.Status.RUNNING)) { 496 bundleStatus[0] = Job.Status.RUNNING; 497 ret = true; 498 } 499 else if (bundleActionStatus.get(Job.Status.RUNNING) > 0) { 500 if ((bundleActionStatus.containsKey(Job.Status.FAILED) && bundleActionStatus.get(Job.Status.FAILED) > 0) 501 || (bundleActionStatus.containsKey(Job.Status.KILLED) && bundleActionStatus 502 .get(Job.Status.KILLED) > 0) 503 || (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) && bundleActionStatus 504 .get(Job.Status.DONEWITHERROR) > 0) 505 || (bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR) && bundleActionStatus 506 .get(Job.Status.RUNNINGWITHERROR) > 0)) { 507 // bundleStatus = Job.Status.RUNNINGWITHERROR; 508 // We need to change this to RUNNINGWIHERROR in future when we add this to coordinator 509 bundleStatus[0] = Job.Status.RUNNING; 510 ret = true; 511 } 512 } 513 } 514 return ret; 515 } 516 517 private void updateBundleJob(HashMap<Job.Status, Integer> bundleActionStatus, 518 List<BundleActionBean> bundleActions, BundleJobBean bundleJob, Job.Status bundleStatus) 519 throws JPAExecutorException { 520 String jobId = bundleJob.getId(); 521 boolean pendingBundleJob = bundleJob.isPending(); 522 // Checking the bundle pending should be updated or not 523 int totalNonPendingActions = 0; 524 for (Job.Status js : bundleActionStatus.keySet()) { 525 totalNonPendingActions += bundleActionStatus.get(js); 526 } 527 528 if (totalNonPendingActions == bundleActions.size()) { 529 pendingBundleJob = false; 530 } 531 532 // Update the Bundle Job 533 bundleJob.setStatus(bundleStatus); 534 if (pendingBundleJob) { 535 bundleJob.setPending(); 536 LOG.info("Bundle job [" + jobId + "] Pending set to TRUE"); 537 } 538 else { 539 bundleJob.resetPending(); 540 LOG.info("Bundle job [" + jobId + "] Pending set to FALSE"); 541 } 542 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob)); 543 } 544 545 private void updateCoordJob(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 546 List<CoordinatorActionBean> coordActions, CoordinatorJobBean coordJob, Job.Status coordStatus) 547 throws JPAExecutorException, CommandException { 548 Job.Status prevStatus = coordJob.getStatus(); 549 // Update the Coord Job 550 if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED 551 || coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) { 552 if (coordStatus == Job.Status.SUSPENDED) { 553 LOG.info("Coord Job [" + coordJob.getId() 554 + "] status can not be updated as its already in Terminal state"); 555 return; 556 } 557 } 558 559 checkCoordPending(coordActionStatus, coordActions, coordJob, false); 560 coordJob.setStatus(coordStatus); 561 coordJob.setStatus(StatusUtils.getStatus(coordJob)); 562 coordJob.setLastModifiedTime(new Date()); 563 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 564 // update bundle action only when status changes in coord job 565 if (coordJob.getBundleId() != null) { 566 if (!prevStatus.equals(coordJob.getStatus())) { 567 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 568 bundleStatusUpdate.call(); 569 } 570 } 571 } 572 573 private void checkCoordPending(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 574 List<CoordinatorActionBean> coordActions, CoordinatorJobBean coordJob, boolean saveToDB) throws JPAExecutorException { 575 boolean pendingCoordJob = coordJob.isPending(); 576 // Checking the coordinator pending should be updated or not 577 int totalNonPendingActions = 0; 578 for (CoordinatorAction.Status js : coordActionStatus.keySet()) { 579 totalNonPendingActions += coordActionStatus.get(js); 580 } 581 582 if (totalNonPendingActions == coordActions.size()) { 583 pendingCoordJob = false; 584 } 585 586 if (pendingCoordJob) { 587 coordJob.setPending(); 588 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to TRUE"); 589 } 590 else { 591 coordJob.resetPending(); 592 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to FALSE"); 593 } 594 595 if (saveToDB) { 596 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 597 } 598 } 599 600 /** 601 * Aggregate coordinator actions' status to coordinator jobs 602 * 603 * @throws JPAExecutorException thrown if failed in db updates or retrievals 604 * @throws CommandException thrown if failed to run commands 605 */ 606 private void coordTransit() throws JPAExecutorException, CommandException { 607 List<CoordinatorJobBean> pendingJobCheckList = null; 608 if (lastInstanceStartTime == null) { 609 LOG.info("Running coordinator status service first instance"); 610 // this is the first instance, we need to check for all pending jobs; 611 pendingJobCheckList = jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit)); 612 } 613 else { 614 LOG.info("Running coordinator status service from last instance time = " 615 + DateUtils.convertDateToString(lastInstanceStartTime)); 616 // this is not the first instance, we should only check jobs that have actions been 617 // updated >= start time of last service run; 618 List<CoordinatorActionBean> actionList = jpaService 619 .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime)); 620 Set<String> coordIds = new HashSet<String>(); 621 for (CoordinatorActionBean action : actionList) { 622 coordIds.add(action.getJobId()); 623 } 624 pendingJobCheckList = new ArrayList<CoordinatorJobBean>(); 625 for (String coordId : coordIds.toArray(new String[coordIds.size()])) { 626 CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId)); 627 // Running coord job might have pending false 628 if (coordJob.isPending() || coordJob.getStatus().equals(Job.Status.RUNNING)) { 629 pendingJobCheckList.add(coordJob); 630 } 631 } 632 } 633 aggregateCoordJobsStatus(pendingJobCheckList); 634 } 635 } 636 637 /** 638 * Initializes the {@link StatusTransitService}. 639 * 640 * @param services services instance. 641 */ 642 @Override 643 public void init(Services services) { 644 Configuration conf = services.getConf(); 645 Runnable stateTransitRunnable = new StatusTransitRunnable(); 646 services.get(SchedulerService.class).schedule(stateTransitRunnable, 10, 647 conf.getInt(CONF_STATUSTRANSIT_INTERVAL, 60), SchedulerService.Unit.SEC); 648 } 649 650 /** 651 * Destroy the StateTransit Jobs Service. 652 */ 653 @Override 654 public void destroy() { 655 } 656 657 /** 658 * Return the public interface for the purge jobs service. 659 * 660 * @return {@link StatusTransitService}. 661 */ 662 @Override 663 public Class<? extends Service> getInterface() { 664 return StatusTransitService.class; 665 } 666 }