This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.util.ArrayList;
021 import java.util.Date;
022 import java.util.HashMap;
023 import java.util.HashSet;
024 import java.util.List;
025 import java.util.Set;
026
027 import org.apache.hadoop.conf.Configuration;
028 import org.apache.oozie.BundleActionBean;
029 import org.apache.oozie.BundleJobBean;
030 import org.apache.oozie.CoordinatorActionBean;
031 import org.apache.oozie.CoordinatorJobBean;
032 import org.apache.oozie.client.CoordinatorAction;
033 import org.apache.oozie.client.Job;
034 import org.apache.oozie.command.CommandException;
035 import org.apache.oozie.command.bundle.BundleKillXCommand;
036 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
037 import org.apache.oozie.executor.jpa.BundleActionsGetByLastModifiedTimeJPAExecutor;
038 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
039 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
040 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
041 import org.apache.oozie.executor.jpa.BundleJobsGetPendingJPAExecutor;
042 import org.apache.oozie.executor.jpa.BundleJobsGetRunningJPAExecutor;
043 import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor;
044 import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor;
045 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
046 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
047 import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor;
048 import org.apache.oozie.executor.jpa.JPAExecutorException;
049 import org.apache.oozie.util.DateUtils;
050 import org.apache.oozie.util.MemoryLocks;
051 import org.apache.oozie.util.StatusUtils;
052 import org.apache.oozie.util.XLog;
053
054 /**
055 * StateTransitService is scheduled to run at the configured interval.
056 * <p/>
057 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 (job
058 * done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
059 * SUCCEEDED.
060 */
061 public class StatusTransitService implements Service {
062 public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService.";
063 public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval";
064 public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status";
065 private static int limit = -1;
066 private static Date lastInstanceStartTime = null;
067 private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class);
068
069 /**
070 * StateTransitRunnable is the runnable which is scheduled to run at the configured interval.
071 * <p/>
072 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0
073 * (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
074 * SUCCEEDED.
075 */
076 static class StatusTransitRunnable implements Runnable {
077 private JPAService jpaService = null;
078 private MemoryLocks.LockToken lock;
079
080 public StatusTransitRunnable() {
081 jpaService = Services.get().get(JPAService.class);
082 if (jpaService == null) {
083 LOG.error("Missing JPAService");
084 }
085 }
086
087 public void run() {
088 try {
089 Date curDate = new Date(); // records the start time of this service run;
090
091 // first check if there is some other instance running;
092 lock = Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(),
093 lockTimeout);
094 if (lock == null) {
095 LOG.info("This StatusTransitService instance"
096 + " will not run since there is already an instance running");
097 }
098 else {
099 LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName());
100 // running coord jobs transit service
101 coordTransit();
102 // running bundle jobs transit service
103 bundleTransit();
104
105 lastInstanceStartTime = curDate;
106 }
107 }
108 catch (Exception ex) {
109 LOG.warn("Exception happened during StatusTransitRunnable ", ex);
110 }
111 finally {
112 // release lock;
113 if (lock != null) {
114 lock.release();
115 LOG.info("Released lock for [{0}]", StatusTransitService.class.getName());
116 }
117 }
118 }
119
120 /**
121 * Aggregate bundle actions' status to bundle jobs
122 *
123 * @throws JPAExecutorException thrown if failed in db updates or retrievals
124 * @throws CommandException thrown if failed to run commands
125 */
126 private void bundleTransit() throws JPAExecutorException, CommandException {
127 List<BundleJobBean> pendingJobCheckList = null;
128 List<BundleJobBean> runningJobCheckList = null;
129 List<List<BundleJobBean>> bundleLists = new ArrayList<List<BundleJobBean>>();
130 if (lastInstanceStartTime == null) {
131 LOG.info("Running bundle status service first instance");
132 // this is the first instance, we need to check for all pending jobs;
133 pendingJobCheckList = jpaService.execute(new BundleJobsGetPendingJPAExecutor(limit));
134 runningJobCheckList = jpaService.execute(new BundleJobsGetRunningJPAExecutor(limit));
135 bundleLists.add(pendingJobCheckList);
136 bundleLists.add(runningJobCheckList);
137 }
138 else {
139 LOG.info("Running bundle status service from last instance time = "
140 + DateUtils.convertDateToString(lastInstanceStartTime));
141 // this is not the first instance, we should only check jobs that have actions been
142 // updated >= start time of last service run;
143 List<BundleActionBean> actionList = jpaService
144 .execute(new BundleActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
145 Set<String> bundleIds = new HashSet<String>();
146 for (BundleActionBean action : actionList) {
147 bundleIds.add(action.getBundleId());
148 }
149 pendingJobCheckList = new ArrayList<BundleJobBean>();
150 for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) {
151 BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId));
152 // Running bundle job might have pending false
153 if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING)) {
154 pendingJobCheckList.add(bundle);
155 }
156 }
157 runningJobCheckList = pendingJobCheckList;
158 bundleLists.add(pendingJobCheckList);
159 }
160 aggregateBundleJobsStatus(bundleLists);
161 }
162
163 private void aggregateBundleJobsStatus(List<List<BundleJobBean>> bundleLists) throws JPAExecutorException,
164 CommandException {
165 if (bundleLists != null) {
166 for (List<BundleJobBean> listBundleBean : bundleLists) {
167 for (BundleJobBean bundleJob : listBundleBean) {
168 try {
169 String jobId = bundleJob.getId();
170 Job.Status[] bundleStatus = new Job.Status[1];
171 bundleStatus[0] = bundleJob.getStatus();
172 List<BundleActionBean> bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(
173 jobId));
174 HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>();
175 boolean foundPending = false;
176 for (BundleActionBean bAction : bundleActions) {
177 if (!bAction.isPending()) {
178 int counter = 0;
179 if (bundleActionStatus.containsKey(bAction.getStatus())) {
180 counter = bundleActionStatus.get(bAction.getStatus()) + 1;
181 }
182 else {
183 ++counter;
184 }
185 bundleActionStatus.put(bAction.getStatus(), counter);
186 if (bAction.getCoordId() == null
187 && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED)) {
188 (new BundleKillXCommand(jobId)).call();
189 LOG.info("Bundle job ["+ jobId
190 + "] has been killed since one of its coordinator job failed submission.");
191 }
192 }
193 else {
194 foundPending = true;
195 break;
196 }
197 }
198
199 if (foundPending) {
200 continue;
201 }
202
203 if (checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) {
204 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
205 + "' from '" + bundleJob.getStatus() + "'");
206 updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
207 }
208 else if (checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) {
209 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
210 + "' from '" + bundleJob.getStatus() + "'");
211 updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
212 }
213 else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) {
214 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
215 + "' from '" + bundleJob.getStatus() + "'");
216 updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
217 }
218 else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus)) {
219 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
220 + "' from '" + bundleJob.getStatus() + "'");
221 updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
222 }
223 else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) {
224 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
225 + "' from '" + bundleJob.getStatus() + "'");
226 updateBundleJob(bundleActionStatus, bundleActions, bundleJob, bundleStatus[0]);
227 }
228 }
229 catch (Exception ex) {
230 LOG.error("Exception happened during aggregate bundle job's status, job = "
231 + bundleJob.getId(), ex);
232 }
233 }
234 }
235 }
236 }
237
238 private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException,
239 CommandException {
240 if (CoordList != null) {
241 Configuration conf = Services.get().getConf();
242 boolean backwardSupportForCoordStatus = conf.getBoolean(CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
243
244 for (CoordinatorJobBean coordJob : CoordList) {
245 try {
246 // if namespace 0.1 is used and backward support is true, then ignore this coord job
247 if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
248 && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
249 continue;
250 }
251 String jobId = coordJob.getId();
252 Job.Status[] coordStatus = new Job.Status[1];
253 coordStatus[0] = coordJob.getStatus();
254 List<CoordinatorActionBean> coordActions = jpaService
255 .execute(new CoordJobGetActionsJPAExecutor(jobId));
256 HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>();
257 boolean foundPending = false;
258 for (CoordinatorActionBean cAction : coordActions) {
259 if (!cAction.isPending()) {
260 int counter = 0;
261 if (coordActionStatus.containsKey(cAction.getStatus())) {
262 counter = coordActionStatus.get(cAction.getStatus()) + 1;
263 }
264 else {
265 ++counter;
266 }
267 coordActionStatus.put(cAction.getStatus(), counter);
268 }
269 else {
270 foundPending = true;
271 break;
272 }
273 }
274
275 if (foundPending) {
276 continue;
277 }
278
279 if (coordJob.isDoneMaterialization()
280 && checkCoordTerminalStatus(coordActionStatus, coordActions, coordStatus)) {
281 LOG.info("Set coordinator job [" + jobId + "] status to '" + coordStatus[0].toString()
282 + "' from '" + coordJob.getStatus() + "'");
283 updateCoordJob(coordActionStatus, coordActions, coordJob, coordStatus[0]);
284 }
285 else if (coordJob.isDoneMaterialization()
286 && checkCoordSuspendStatus(coordActionStatus, coordActions, coordStatus)) {
287 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
288 + "' from '" + coordJob.getStatus() + "'");
289 updateCoordJob(coordActionStatus, coordActions, coordJob, coordStatus[0]);
290 }
291 else if (checkCoordRunningStatus(coordActionStatus, coordActions, coordStatus)) {
292 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
293 + "' from '" + coordJob.getStatus() + "'");
294 updateCoordJob(coordActionStatus, coordActions, coordJob, coordStatus[0]);
295 }
296 // checking pending flag for job when user killed or suspended the job
297 else {
298 checkCoordPending(coordActionStatus, coordActions, coordJob, true);
299 }
300 }
301 catch (Exception ex) {
302 LOG.error("Exception happened during aggregate coordinator job's status, job = "
303 + coordJob.getId(), ex);
304 }
305 }
306
307 }
308 }
309
310 private boolean checkTerminalStatus(HashMap<Job.Status, Integer> bundleActionStatus,
311 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
312 boolean ret = false;
313 int totalValuesSucceed = 0;
314 if (bundleActionStatus.containsKey(Job.Status.SUCCEEDED)) {
315 totalValuesSucceed = bundleActionStatus.get(Job.Status.SUCCEEDED);
316 }
317 int totalValuesFailed = 0;
318 if (bundleActionStatus.containsKey(Job.Status.FAILED)) {
319 totalValuesFailed = bundleActionStatus.get(Job.Status.FAILED);
320 }
321 int totalValuesKilled = 0;
322 if (bundleActionStatus.containsKey(Job.Status.KILLED)) {
323 totalValuesKilled = bundleActionStatus.get(Job.Status.KILLED);
324 }
325
326 int totalValuesDoneWithError = 0;
327 if (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)) {
328 totalValuesDoneWithError = bundleActionStatus.get(Job.Status.DONEWITHERROR);
329 }
330
331 if (bundleActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesDoneWithError)) {
332 // If all the bundle actions are succeeded then bundle job should be succeeded.
333 if (bundleActions.size() == totalValuesSucceed) {
334 bundleStatus[0] = Job.Status.SUCCEEDED;
335 ret = true;
336 }
337 else if (bundleActions.size() == totalValuesKilled) {
338 // If all the bundle actions are KILLED then bundle job should be KILLED.
339 bundleStatus[0] = Job.Status.KILLED;
340 ret = true;
341 }
342 else if (bundleActions.size() == totalValuesFailed) {
343 // If all the bundle actions are FAILED then bundle job should be FAILED.
344 bundleStatus[0] = Job.Status.FAILED;
345 ret = true;
346 }
347 else {
348 bundleStatus[0] = Job.Status.DONEWITHERROR;
349 ret = true;
350 }
351 }
352 return ret;
353 }
354
355 private boolean checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
356 List<CoordinatorActionBean> coordActions, Job.Status[] coordStatus) {
357 boolean ret = false;
358 int totalValuesSucceed = 0;
359 if (coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) {
360 totalValuesSucceed = coordActionStatus.get(CoordinatorAction.Status.SUCCEEDED);
361 }
362 int totalValuesFailed = 0;
363 if (coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)) {
364 totalValuesFailed = coordActionStatus.get(CoordinatorAction.Status.FAILED);
365 }
366 int totalValuesKilled = 0;
367 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) {
368 totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED);
369 }
370
371 int totalValuesTimeOut = 0;
372 if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
373 totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT);
374 }
375
376 if (coordActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut)) {
377 // If all the coordinator actions are succeeded then coordinator job should be succeeded.
378 if (coordActions.size() == totalValuesSucceed) {
379 coordStatus[0] = Job.Status.SUCCEEDED;
380 ret = true;
381 }
382 else if (coordActions.size() == totalValuesKilled) {
383 // If all the coordinator actions are KILLED then coordinator job should be KILLED.
384 coordStatus[0] = Job.Status.KILLED;
385 ret = true;
386 }
387 else if (coordActions.size() == totalValuesFailed) {
388 // If all the coordinator actions are FAILED then coordinator job should be FAILED.
389 coordStatus[0] = Job.Status.FAILED;
390 ret = true;
391 }
392 else {
393 coordStatus[0] = Job.Status.DONEWITHERROR;
394 ret = true;
395 }
396 }
397 return ret;
398 }
399
400 private boolean checkPrepStatus(HashMap<Job.Status, Integer> bundleActionStatus,
401 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
402 boolean ret = false;
403 if (bundleActionStatus.containsKey(Job.Status.PREP)) {
404 // If all the bundle actions are PREP then bundle job should be RUNNING.
405 if (bundleActions.size() > bundleActionStatus.get(Job.Status.PREP)) {
406 bundleStatus[0] = Job.Status.RUNNING;
407 ret = true;
408 }
409 }
410 return ret;
411 }
412
413 private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus,
414 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
415 boolean ret = false;
416 if (bundleActionStatus.containsKey(Job.Status.PAUSED)) {
417 if (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED)) {
418 bundleStatus[0] = Job.Status.PAUSED;
419 ret = true;
420 }
421 else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)
422 && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED)
423 + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR))) {
424 // bundleStatus = Job.Status.PAUSEDWITHERROR;
425 // We need to change this to PAUSEDWITHERROR in future when we add this to coordinator
426 bundleStatus[0] = Job.Status.PAUSED;
427 ret = true;
428 }
429 }
430 return ret;
431 }
432
433 private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus,
434 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
435 boolean ret = false;
436 if (bundleActionStatus.containsKey(Job.Status.SUSPENDED)) {
437 if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED)) {
438 bundleStatus[0] = Job.Status.SUSPENDED;
439 ret = true;
440 }
441 else if (bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
442 && (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED)
443 + bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR))) {
444 // bundleStatus = Job.Status.SUSPENDEDWITHERROR;
445 // We need to change this to SUSPENDEDWITHERROR in future when we add this to coordinator
446 bundleStatus[0] = Job.Status.SUSPENDED;
447 ret = true;
448 }
449 }
450 return ret;
451 }
452
453 private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
454 List<CoordinatorActionBean> coordActions, Job.Status[] coordStatus) {
455 boolean ret = false;
456 if (coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) {
457 if (coordActions.size() == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)) {
458 coordStatus[0] = Job.Status.SUSPENDED;
459 ret = true;
460 }
461 }
462 return ret;
463 }
464
465 private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
466 List<CoordinatorActionBean> coordActions, Job.Status[] coordStatus) {
467 boolean ret = false;
468 if (coordActionStatus.containsKey(CoordinatorAction.Status.RUNNING)) {
469 // If all the bundle actions are succeeded then bundle job should be succeeded.
470 if (coordActions.size() == coordActionStatus.get(CoordinatorAction.Status.RUNNING)) {
471 coordStatus[0] = Job.Status.RUNNING;
472 ret = true;
473 }
474 else if (coordActionStatus.get(CoordinatorAction.Status.RUNNING) > 0) {
475 if ((coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) && coordActionStatus.get(CoordinatorAction.Status.FAILED) > 0)
476 || (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) && coordActionStatus
477 .get(CoordinatorAction.Status.KILLED) > 0)
478 || (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) && coordActionStatus
479 .get(CoordinatorAction.Status.TIMEDOUT) > 0)) {
480 // coordStatus = Job.Status.RUNNINGWITHERROR;
481 // We need to change this to RUNNINGWIHERROR in future when we add this to coordinator
482 coordStatus[0] = Job.Status.RUNNING;
483 ret = true;
484 }
485 }
486 }
487 return ret;
488 }
489
490 private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus,
491 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
492 boolean ret = false;
493 if (bundleActionStatus.containsKey(Job.Status.RUNNING)) {
494 // If all the bundle actions are succeeded then bundle job should be succeeded.
495 if (bundleActions.size() == bundleActionStatus.get(Job.Status.RUNNING)) {
496 bundleStatus[0] = Job.Status.RUNNING;
497 ret = true;
498 }
499 else if (bundleActionStatus.get(Job.Status.RUNNING) > 0) {
500 if ((bundleActionStatus.containsKey(Job.Status.FAILED) && bundleActionStatus.get(Job.Status.FAILED) > 0)
501 || (bundleActionStatus.containsKey(Job.Status.KILLED) && bundleActionStatus
502 .get(Job.Status.KILLED) > 0)
503 || (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) && bundleActionStatus
504 .get(Job.Status.DONEWITHERROR) > 0)
505 || (bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR) && bundleActionStatus
506 .get(Job.Status.RUNNINGWITHERROR) > 0)) {
507 // bundleStatus = Job.Status.RUNNINGWITHERROR;
508 // We need to change this to RUNNINGWIHERROR in future when we add this to coordinator
509 bundleStatus[0] = Job.Status.RUNNING;
510 ret = true;
511 }
512 }
513 }
514 return ret;
515 }
516
517 private void updateBundleJob(HashMap<Job.Status, Integer> bundleActionStatus,
518 List<BundleActionBean> bundleActions, BundleJobBean bundleJob, Job.Status bundleStatus)
519 throws JPAExecutorException {
520 String jobId = bundleJob.getId();
521 boolean pendingBundleJob = bundleJob.isPending();
522 // Checking the bundle pending should be updated or not
523 int totalNonPendingActions = 0;
524 for (Job.Status js : bundleActionStatus.keySet()) {
525 totalNonPendingActions += bundleActionStatus.get(js);
526 }
527
528 if (totalNonPendingActions == bundleActions.size()) {
529 pendingBundleJob = false;
530 }
531
532 // Update the Bundle Job
533 bundleJob.setStatus(bundleStatus);
534 if (pendingBundleJob) {
535 bundleJob.setPending();
536 LOG.info("Bundle job [" + jobId + "] Pending set to TRUE");
537 }
538 else {
539 bundleJob.resetPending();
540 LOG.info("Bundle job [" + jobId + "] Pending set to FALSE");
541 }
542 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
543 }
544
545 private void updateCoordJob(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
546 List<CoordinatorActionBean> coordActions, CoordinatorJobBean coordJob, Job.Status coordStatus)
547 throws JPAExecutorException, CommandException {
548 Job.Status prevStatus = coordJob.getStatus();
549 // Update the Coord Job
550 if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED
551 || coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) {
552 if (coordStatus == Job.Status.SUSPENDED) {
553 LOG.info("Coord Job [" + coordJob.getId()
554 + "] status can not be updated as its already in Terminal state");
555 return;
556 }
557 }
558
559 checkCoordPending(coordActionStatus, coordActions, coordJob, false);
560 coordJob.setStatus(coordStatus);
561 coordJob.setStatus(StatusUtils.getStatus(coordJob));
562 coordJob.setLastModifiedTime(new Date());
563 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
564 // update bundle action only when status changes in coord job
565 if (coordJob.getBundleId() != null) {
566 if (!prevStatus.equals(coordJob.getStatus())) {
567 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
568 bundleStatusUpdate.call();
569 }
570 }
571 }
572
573 private void checkCoordPending(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
574 List<CoordinatorActionBean> coordActions, CoordinatorJobBean coordJob, boolean saveToDB) throws JPAExecutorException {
575 boolean pendingCoordJob = coordJob.isPending();
576 // Checking the coordinator pending should be updated or not
577 int totalNonPendingActions = 0;
578 for (CoordinatorAction.Status js : coordActionStatus.keySet()) {
579 totalNonPendingActions += coordActionStatus.get(js);
580 }
581
582 if (totalNonPendingActions == coordActions.size()) {
583 pendingCoordJob = false;
584 }
585
586 if (pendingCoordJob) {
587 coordJob.setPending();
588 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to TRUE");
589 }
590 else {
591 coordJob.resetPending();
592 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to FALSE");
593 }
594
595 if (saveToDB) {
596 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
597 }
598 }
599
600 /**
601 * Aggregate coordinator actions' status to coordinator jobs
602 *
603 * @throws JPAExecutorException thrown if failed in db updates or retrievals
604 * @throws CommandException thrown if failed to run commands
605 */
606 private void coordTransit() throws JPAExecutorException, CommandException {
607 List<CoordinatorJobBean> pendingJobCheckList = null;
608 if (lastInstanceStartTime == null) {
609 LOG.info("Running coordinator status service first instance");
610 // this is the first instance, we need to check for all pending jobs;
611 pendingJobCheckList = jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit));
612 }
613 else {
614 LOG.info("Running coordinator status service from last instance time = "
615 + DateUtils.convertDateToString(lastInstanceStartTime));
616 // this is not the first instance, we should only check jobs that have actions been
617 // updated >= start time of last service run;
618 List<CoordinatorActionBean> actionList = jpaService
619 .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
620 Set<String> coordIds = new HashSet<String>();
621 for (CoordinatorActionBean action : actionList) {
622 coordIds.add(action.getJobId());
623 }
624 pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
625 for (String coordId : coordIds.toArray(new String[coordIds.size()])) {
626 CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId));
627 // Running coord job might have pending false
628 if (coordJob.isPending() || coordJob.getStatus().equals(Job.Status.RUNNING)) {
629 pendingJobCheckList.add(coordJob);
630 }
631 }
632 }
633 aggregateCoordJobsStatus(pendingJobCheckList);
634 }
635 }
636
637 /**
638 * Initializes the {@link StatusTransitService}.
639 *
640 * @param services services instance.
641 */
642 @Override
643 public void init(Services services) {
644 Configuration conf = services.getConf();
645 Runnable stateTransitRunnable = new StatusTransitRunnable();
646 services.get(SchedulerService.class).schedule(stateTransitRunnable, 10,
647 conf.getInt(CONF_STATUSTRANSIT_INTERVAL, 60), SchedulerService.Unit.SEC);
648 }
649
650 /**
651 * Destroy the StateTransit Jobs Service.
652 */
653 @Override
654 public void destroy() {
655 }
656
657 /**
658 * Return the public interface for the purge jobs service.
659 *
660 * @return {@link StatusTransitService}.
661 */
662 @Override
663 public Class<? extends Service> getInterface() {
664 return StatusTransitService.class;
665 }
666 }