This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.util.ArrayList;
021 import java.util.Date;
022 import java.util.HashMap;
023 import java.util.HashSet;
024 import java.util.List;
025 import java.util.Set;
026
027 import org.apache.hadoop.conf.Configuration;
028 import org.apache.oozie.BundleActionBean;
029 import org.apache.oozie.BundleJobBean;
030 import org.apache.oozie.CoordinatorActionBean;
031 import org.apache.oozie.CoordinatorJobBean;
032 import org.apache.oozie.ErrorCode;
033 import org.apache.oozie.client.CoordinatorAction;
034 import org.apache.oozie.client.Job;
035 import org.apache.oozie.command.CommandException;
036 import org.apache.oozie.command.bundle.BundleKillXCommand;
037 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
038 import org.apache.oozie.executor.jpa.BundleActionsGetByLastModifiedTimeJPAExecutor;
039 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
040 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
041 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
042 import org.apache.oozie.executor.jpa.BundleJobsGetPendingJPAExecutor;
043 import org.apache.oozie.executor.jpa.BundleJobsGetRunningJPAExecutor;
044 import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor;
045 import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusByPendingFalseJPAExecutor;
046 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
047 import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor;
048 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
049 import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor;
050 import org.apache.oozie.executor.jpa.JPAExecutorException;
051 import org.apache.oozie.util.DateUtils;
052 import org.apache.oozie.util.MemoryLocks;
053 import org.apache.oozie.util.StatusUtils;
054 import org.apache.oozie.util.XLog;
055
056 /**
057 * StateTransitService is scheduled to run at the configured interval.
058 * <p/>
059 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 (job
060 * done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
061 * SUCCEEDED.
062 */
063 public class StatusTransitService implements Service {
064 public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService.";
065 public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval";
066 public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status";
067 private static int limit = -1;
068 private static Date lastInstanceStartTime = null;
069 private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class);
070
071 /**
072 * StateTransitRunnable is the runnable which is scheduled to run at the configured interval.
073 * <p/>
074 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0
075 * (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
076 * SUCCEEDED.
077 */
078 static class StatusTransitRunnable implements Runnable {
079 private JPAService jpaService = null;
080 private MemoryLocks.LockToken lock;
081
082 public StatusTransitRunnable() {
083 jpaService = Services.get().get(JPAService.class);
084 if (jpaService == null) {
085 LOG.error("Missing JPAService");
086 }
087 }
088
089 @Override
090 public void run() {
091 try {
092 Date curDate = new Date(); // records the start time of this service run;
093
094 // first check if there is some other instance running;
095 lock = Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(),
096 lockTimeout);
097 if (lock == null) {
098 LOG.info("This StatusTransitService instance"
099 + " will not run since there is already an instance running");
100 }
101 else {
102 LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName());
103 // running coord jobs transit service
104 coordTransit();
105 // running bundle jobs transit service
106 bundleTransit();
107
108 lastInstanceStartTime = curDate;
109 }
110 }
111 catch (Exception ex) {
112 LOG.warn("Exception happened during StatusTransitRunnable ", ex);
113 }
114 finally {
115 // release lock;
116 if (lock != null) {
117 lock.release();
118 LOG.info("Released lock for [{0}]", StatusTransitService.class.getName());
119 }
120 }
121 }
122
123 /**
124 * Aggregate bundle actions' status to bundle jobs
125 *
126 * @throws JPAExecutorException thrown if failed in db updates or retrievals
127 * @throws CommandException thrown if failed to run commands
128 */
129 private void bundleTransit() throws JPAExecutorException, CommandException {
130 List<BundleJobBean> pendingJobCheckList = null;
131 List<BundleJobBean> runningJobCheckList = null;
132 List<List<BundleJobBean>> bundleLists = new ArrayList<List<BundleJobBean>>();
133 if (lastInstanceStartTime == null) {
134 LOG.info("Running bundle status service first instance");
135 // this is the first instance, we need to check for all pending jobs;
136 pendingJobCheckList = jpaService.execute(new BundleJobsGetPendingJPAExecutor(limit));
137 runningJobCheckList = jpaService.execute(new BundleJobsGetRunningJPAExecutor(limit));
138 bundleLists.add(pendingJobCheckList);
139 bundleLists.add(runningJobCheckList);
140 }
141 else {
142 LOG.info("Running bundle status service from last instance time = "
143 + DateUtils.convertDateToString(lastInstanceStartTime));
144 // this is not the first instance, we should only check jobs that have actions been
145 // updated >= start time of last service run;
146 List<BundleActionBean> actionList = jpaService
147 .execute(new BundleActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
148 Set<String> bundleIds = new HashSet<String>();
149 for (BundleActionBean action : actionList) {
150 bundleIds.add(action.getBundleId());
151 }
152 pendingJobCheckList = new ArrayList<BundleJobBean>();
153 for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) {
154 BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId));
155 // Running bundle job might have pending false
156 if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING)) {
157 pendingJobCheckList.add(bundle);
158 }
159 }
160 runningJobCheckList = pendingJobCheckList;
161 bundleLists.add(pendingJobCheckList);
162 }
163 aggregateBundleJobsStatus(bundleLists);
164 }
165
166 private void aggregateBundleJobsStatus(List<List<BundleJobBean>> bundleLists) throws JPAExecutorException,
167 CommandException {
168 if (bundleLists != null) {
169 for (List<BundleJobBean> listBundleBean : bundleLists) {
170 for (BundleJobBean bundleJob : listBundleBean) {
171 try {
172 String jobId = bundleJob.getId();
173 Job.Status[] bundleStatus = new Job.Status[1];
174 bundleStatus[0] = bundleJob.getStatus();
175 List<BundleActionBean> bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(
176 jobId));
177 HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>();
178 boolean foundPending = false;
179 for (BundleActionBean bAction : bundleActions) {
180 if (!bAction.isPending()) {
181 int counter = 0;
182 if (bundleActionStatus.containsKey(bAction.getStatus())) {
183 counter = bundleActionStatus.get(bAction.getStatus()) + 1;
184 }
185 else {
186 ++counter;
187 }
188 bundleActionStatus.put(bAction.getStatus(), counter);
189 if (bAction.getCoordId() == null
190 && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED)) {
191 (new BundleKillXCommand(jobId)).call();
192 LOG.info("Bundle job ["+ jobId
193 + "] has been killed since one of its coordinator job failed submission.");
194 }
195 }
196 else {
197 foundPending = true;
198 break;
199 }
200 }
201
202 if (foundPending) {
203 continue;
204 }
205
206 if (checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) {
207 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
208 + "' from '" + bundleJob.getStatus() + "'");
209 updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
210 }
211 else if (checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) {
212 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
213 + "' from '" + bundleJob.getStatus() + "'");
214 updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
215 }
216 else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) {
217 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
218 + "' from '" + bundleJob.getStatus() + "'");
219 updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
220 }
221 else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus)) {
222 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
223 + "' from '" + bundleJob.getStatus() + "'");
224 updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
225 }
226 else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) {
227 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
228 + "' from '" + bundleJob.getStatus() + "'");
229 updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
230 }
231 }
232 catch (Exception ex) {
233 LOG.error("Exception happened during aggregate bundle job's status, job = "
234 + bundleJob.getId(), ex);
235 }
236 }
237 }
238 }
239 }
240
241 private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException,
242 CommandException {
243 if (CoordList != null) {
244 Configuration conf = Services.get().getConf();
245 boolean backwardSupportForCoordStatus = conf.getBoolean(CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
246
247 for (CoordinatorJobBean coordJob : CoordList) {
248 try {
249 // if namespace 0.1 is used and backward support is true, then ignore this coord job
250 if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
251 && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
252 continue;
253 }
254 String jobId = coordJob.getId();
255 Job.Status[] coordStatus = new Job.Status[1];
256 coordStatus[0] = coordJob.getStatus();
257 //Get count of Coordinator actions with pending true
258 int count = jpaService.execute(new CoordJobGetPendingActionsCountJPAExecutor(jobId));
259 if (count > 0) {
260 continue;
261 }
262 // Code below this is executed only when none of the Coordinator actions are pending
263 // Get status of Coordinator actions with pending false
264 List<CoordinatorAction.Status> coordActionStatusList = jpaService
265 .execute(new CoordJobGetActionsStatusByPendingFalseJPAExecutor(jobId));
266 HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>();
267
268 for (CoordinatorAction.Status status : coordActionStatusList) {
269 int counter = 0;
270 if (coordActionStatus.containsKey(status)) {
271 counter = coordActionStatus.get(status) + 1;
272 }
273 else {
274 ++counter;
275 }
276 coordActionStatus.put(status, counter);
277 }
278
279 int nonPendingCoordActionsCount = coordActionStatusList.size();
280 if (coordJob.isDoneMaterialization()
281 && checkCoordTerminalStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
282 LOG.info("Set coordinator job [" + jobId + "] status to '" + coordStatus[0].toString()
283 + "' from '" + coordJob.getStatus() + "'");
284 updateCoordJob(coordActionStatus, nonPendingCoordActionsCount, coordJob, coordStatus[0]);
285 }
286 else if (coordJob.isDoneMaterialization()
287 && checkCoordSuspendStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
288 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
289 + "' from '" + coordJob.getStatus() + "'");
290 updateCoordJob(coordActionStatus, nonPendingCoordActionsCount, coordJob, coordStatus[0]);
291 }
292 else if (checkCoordRunningStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
293 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
294 + "' from '" + coordJob.getStatus() + "'");
295 updateCoordJob(coordActionStatus, nonPendingCoordActionsCount, coordJob, coordStatus[0]);
296 }
297 // checking pending flag for job when user killed or suspended the job
298 else {
299 checkCoordPending(coordActionStatus, nonPendingCoordActionsCount, coordJob, true);
300 }
301 }
302 catch (Exception ex) {
303 LOG.error("Exception happened during aggregate coordinator job's status, job = "
304 + coordJob.getId(), ex);
305 }
306 }
307
308 }
309 }
310
311 private boolean checkTerminalStatus(HashMap<Job.Status, Integer> bundleActionStatus,
312 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
313 boolean ret = false;
314 int totalValuesSucceed = 0;
315 if (bundleActionStatus.containsKey(Job.Status.SUCCEEDED)) {
316 totalValuesSucceed = bundleActionStatus.get(Job.Status.SUCCEEDED);
317 }
318 int totalValuesFailed = 0;
319 if (bundleActionStatus.containsKey(Job.Status.FAILED)) {
320 totalValuesFailed = bundleActionStatus.get(Job.Status.FAILED);
321 }
322 int totalValuesKilled = 0;
323 if (bundleActionStatus.containsKey(Job.Status.KILLED)) {
324 totalValuesKilled = bundleActionStatus.get(Job.Status.KILLED);
325 }
326
327 int totalValuesDoneWithError = 0;
328 if (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)) {
329 totalValuesDoneWithError = bundleActionStatus.get(Job.Status.DONEWITHERROR);
330 }
331
332 if (bundleActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesDoneWithError)) {
333 // If all the bundle actions are succeeded then bundle job should be succeeded.
334 if (bundleActions.size() == totalValuesSucceed) {
335 bundleStatus[0] = Job.Status.SUCCEEDED;
336 ret = true;
337 }
338 else if (bundleActions.size() == totalValuesKilled) {
339 // If all the bundle actions are KILLED then bundle job should be KILLED.
340 bundleStatus[0] = Job.Status.KILLED;
341 ret = true;
342 }
343 else if (bundleActions.size() == totalValuesFailed) {
344 // If all the bundle actions are FAILED then bundle job should be FAILED.
345 bundleStatus[0] = Job.Status.FAILED;
346 ret = true;
347 }
348 else {
349 bundleStatus[0] = Job.Status.DONEWITHERROR;
350 ret = true;
351 }
352 }
353 return ret;
354 }
355
356 private boolean checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
357 int coordActionsCount, Job.Status[] coordStatus) {
358 boolean ret = false;
359 int totalValuesSucceed = 0;
360 if (coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) {
361 totalValuesSucceed = coordActionStatus.get(CoordinatorAction.Status.SUCCEEDED);
362 }
363 int totalValuesFailed = 0;
364 if (coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)) {
365 totalValuesFailed = coordActionStatus.get(CoordinatorAction.Status.FAILED);
366 }
367 int totalValuesKilled = 0;
368 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) {
369 totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED);
370 }
371
372 int totalValuesTimeOut = 0;
373 if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
374 totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT);
375 }
376
377 if (coordActionsCount == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut)) {
378 // If all the coordinator actions are succeeded then coordinator job should be succeeded.
379 if (coordActionsCount == totalValuesSucceed) {
380 coordStatus[0] = Job.Status.SUCCEEDED;
381 ret = true;
382 }
383 else if (coordActionsCount == totalValuesKilled) {
384 // If all the coordinator actions are KILLED then coordinator job should be KILLED.
385 coordStatus[0] = Job.Status.KILLED;
386 ret = true;
387 }
388 else if (coordActionsCount == totalValuesFailed) {
389 // If all the coordinator actions are FAILED then coordinator job should be FAILED.
390 coordStatus[0] = Job.Status.FAILED;
391 ret = true;
392 }
393 else {
394 coordStatus[0] = Job.Status.DONEWITHERROR;
395 ret = true;
396 }
397 }
398 return ret;
399 }
400
401 private boolean checkPrepStatus(HashMap<Job.Status, Integer> bundleActionStatus,
402 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
403 boolean ret = false;
404 if (bundleActionStatus.containsKey(Job.Status.PREP)) {
405 // If all the bundle actions are PREP then bundle job should be RUNNING.
406 if (bundleActions.size() > bundleActionStatus.get(Job.Status.PREP)) {
407 bundleStatus[0] = Job.Status.RUNNING;
408 ret = true;
409 }
410 }
411 return ret;
412 }
413
414 private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus,
415 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
416 boolean ret = false;
417 if (bundleActionStatus.containsKey(Job.Status.PAUSED)) {
418 if (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED)) {
419 bundleStatus[0] = Job.Status.PAUSED;
420 ret = true;
421 }
422 else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)
423 && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED)
424 + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR))) {
425 // bundleStatus = Job.Status.PAUSEDWITHERROR;
426 // We need to change this to PAUSEDWITHERROR in future when we add this to coordinator
427 bundleStatus[0] = Job.Status.PAUSED;
428 ret = true;
429 }
430 }
431 return ret;
432 }
433
434 private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus,
435 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
436 boolean ret = false;
437 if (bundleActionStatus.containsKey(Job.Status.SUSPENDED)) {
438 if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED)) {
439 bundleStatus[0] = Job.Status.SUSPENDED;
440 ret = true;
441 }
442 else if (bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
443 && (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED)
444 + bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR))) {
445 // bundleStatus = Job.Status.SUSPENDEDWITHERROR;
446 // We need to change this to SUSPENDEDWITHERROR in future when we add this to coordinator
447 bundleStatus[0] = Job.Status.SUSPENDED;
448 ret = true;
449 }
450 }
451 return ret;
452 }
453
454 private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
455 int coordActionsCount, Job.Status[] coordStatus) {
456 boolean ret = false;
457 if (coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) {
458 if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)) {
459 coordStatus[0] = Job.Status.SUSPENDED;
460 ret = true;
461 }
462 }
463 return ret;
464 }
465
466 private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
467 int coordActionsCount, Job.Status[] coordStatus) {
468 boolean ret = false;
469 if (coordActionStatus.containsKey(CoordinatorAction.Status.RUNNING)) {
470 // If all the bundle actions are succeeded then bundle job should be succeeded.
471 if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.RUNNING)) {
472 coordStatus[0] = Job.Status.RUNNING;
473 ret = true;
474 }
475 else if (coordActionStatus.get(CoordinatorAction.Status.RUNNING) > 0) {
476 if ((coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) && coordActionStatus.get(CoordinatorAction.Status.FAILED) > 0)
477 || (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) && coordActionStatus
478 .get(CoordinatorAction.Status.KILLED) > 0)
479 || (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) && coordActionStatus
480 .get(CoordinatorAction.Status.TIMEDOUT) > 0)) {
481 // coordStatus = Job.Status.RUNNINGWITHERROR;
482 // We need to change this to RUNNINGWIHERROR in future when we add this to coordinator
483 coordStatus[0] = Job.Status.RUNNING;
484 ret = true;
485 }
486 }
487 }
488 return ret;
489 }
490
491 private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus,
492 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
493 boolean ret = false;
494 if (bundleActionStatus.containsKey(Job.Status.RUNNING)) {
495 // If all the bundle actions are succeeded then bundle job should be succeeded.
496 if (bundleActions.size() == bundleActionStatus.get(Job.Status.RUNNING)) {
497 bundleStatus[0] = Job.Status.RUNNING;
498 ret = true;
499 }
500 else if (bundleActionStatus.get(Job.Status.RUNNING) > 0) {
501 if ((bundleActionStatus.containsKey(Job.Status.FAILED) && bundleActionStatus.get(Job.Status.FAILED) > 0)
502 || (bundleActionStatus.containsKey(Job.Status.KILLED) && bundleActionStatus
503 .get(Job.Status.KILLED) > 0)
504 || (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) && bundleActionStatus
505 .get(Job.Status.DONEWITHERROR) > 0)
506 || (bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR) && bundleActionStatus
507 .get(Job.Status.RUNNINGWITHERROR) > 0)) {
508 // bundleStatus = Job.Status.RUNNINGWITHERROR;
509 // We need to change this to RUNNINGWIHERROR in future when we add this to coordinator
510 bundleStatus[0] = Job.Status.RUNNING;
511 ret = true;
512 }
513 }
514 }
515 return ret;
516 }
517
518 private void updateBundleJob(HashMap<Job.Status, Integer> bundleActionStatus,
519 List<BundleActionBean> bundleActions, BundleJobBean bundleJob, Job.Status bundleStatus)
520 throws JPAExecutorException {
521 String jobId = bundleJob.getId();
522 boolean pendingBundleJob = bundleJob.isPending();
523 // Checking the bundle pending should be updated or not
524 int totalNonPendingActions = 0;
525 for (Job.Status js : bundleActionStatus.keySet()) {
526 totalNonPendingActions += bundleActionStatus.get(js);
527 }
528
529 if (totalNonPendingActions == bundleActions.size()) {
530 pendingBundleJob = false;
531 }
532
533 // Update the Bundle Job
534 bundleJob.setStatus(bundleStatus);
535 if (pendingBundleJob) {
536 bundleJob.setPending();
537 LOG.info("Bundle job [" + jobId + "] Pending set to TRUE");
538 }
539 else {
540 bundleJob.resetPending();
541 LOG.info("Bundle job [" + jobId + "] Pending set to FALSE");
542 }
543 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
544 }
545
546 private void updateCoordJob(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
547 int coordActionsCount, CoordinatorJobBean coordJob, Job.Status coordStatus)
548 throws JPAExecutorException, CommandException {
549 Job.Status prevStatus = coordJob.getStatus();
550 // Update the Coord Job
551 if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED
552 || coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) {
553 if (coordStatus == Job.Status.SUSPENDED) {
554 LOG.info("Coord Job [" + coordJob.getId()
555 + "] status can not be updated as its already in Terminal state");
556 return;
557 }
558 }
559
560 checkCoordPending(coordActionStatus, coordActionsCount, coordJob, false);
561 coordJob.setStatus(coordStatus);
562 coordJob.setStatus(StatusUtils.getStatus(coordJob));
563 coordJob.setLastModifiedTime(new Date());
564 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
565 // update bundle action only when status changes in coord job
566 if (coordJob.getBundleId() != null) {
567 if (!prevStatus.equals(coordJob.getStatus())) {
568 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
569 bundleStatusUpdate.call();
570 }
571 }
572 }
573
574 private void checkCoordPending(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
575 int coordActionsCount, CoordinatorJobBean coordJob, boolean saveToDB) throws JPAExecutorException {
576 boolean pendingCoordJob = coordJob.isPending();
577 // Checking the coordinator pending should be updated or not
578 int totalNonPendingActions = 0;
579 for (CoordinatorAction.Status js : coordActionStatus.keySet()) {
580 totalNonPendingActions += coordActionStatus.get(js);
581 }
582
583 if (totalNonPendingActions == coordActionsCount) {
584 pendingCoordJob = false;
585 }
586
587 if (pendingCoordJob) {
588 coordJob.setPending();
589 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to TRUE");
590 }
591 else {
592 coordJob.resetPending();
593 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to FALSE");
594 }
595
596 if (saveToDB) {
597 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
598 }
599 }
600
601 /**
602 * Aggregate coordinator actions' status to coordinator jobs
603 *
604 * @throws JPAExecutorException thrown if failed in db updates or retrievals
605 * @throws CommandException thrown if failed to run commands
606 */
607 private void coordTransit() throws JPAExecutorException, CommandException {
608 List<CoordinatorJobBean> pendingJobCheckList = null;
609 if (lastInstanceStartTime == null) {
610 LOG.info("Running coordinator status service first instance");
611 // this is the first instance, we need to check for all pending jobs;
612 pendingJobCheckList = jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit));
613 }
614 else {
615 LOG.info("Running coordinator status service from last instance time = "
616 + DateUtils.convertDateToString(lastInstanceStartTime));
617 // this is not the first instance, we should only check jobs
618 // that have actions been
619 // updated >= start time of last service run;
620 List<String> coordJobIdList = jpaService
621 .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
622 Set<String> coordIds = new HashSet<String>();
623 for (String coordJobId : coordJobIdList) {
624 coordIds.add(coordJobId);
625 }
626 pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
627 for (String coordId : coordIds.toArray(new String[coordIds.size()])) {
628 CoordinatorJobBean coordJob;
629 try{
630 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId));
631 }
632 catch (JPAExecutorException jpaee) {
633 if (jpaee.getErrorCode().equals(ErrorCode.E0604)) {
634 LOG.warn("Exception happened during StatusTransitRunnable; Coordinator Job doesn't exist", jpaee);
635 continue;
636 } else {
637 throw jpaee;
638 }
639 }
640 // Running coord job might have pending false
641 if (coordJob.isPending() || coordJob.getStatus().equals(Job.Status.RUNNING)) {
642 pendingJobCheckList.add(coordJob);
643 }
644 }
645 }
646 aggregateCoordJobsStatus(pendingJobCheckList);
647 }
648 }
649
650 /**
651 * Initializes the {@link StatusTransitService}.
652 *
653 * @param services services instance.
654 */
655 @Override
656 public void init(Services services) {
657 Configuration conf = services.getConf();
658 Runnable stateTransitRunnable = new StatusTransitRunnable();
659 services.get(SchedulerService.class).schedule(stateTransitRunnable, 10,
660 conf.getInt(CONF_STATUSTRANSIT_INTERVAL, 60), SchedulerService.Unit.SEC);
661 }
662
663 /**
664 * Destroy the StateTransit Jobs Service.
665 */
666 @Override
667 public void destroy() {
668 }
669
670 /**
671 * Return the public interface for the purge jobs service.
672 *
673 * @return {@link StatusTransitService}.
674 */
675 @Override
676 public Class<? extends Service> getInterface() {
677 return StatusTransitService.class;
678 }
679 }
680