This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import org.apache.oozie.CoordinatorActionBean;
021 import org.apache.oozie.WorkflowJobBean;
022 import org.apache.oozie.XException;
023 import org.apache.oozie.store.CoordinatorStore;
024 import org.apache.oozie.store.StoreException;
025 import org.apache.oozie.util.XLog;
026 import org.apache.oozie.util.db.SLADbOperations;
027 import org.apache.oozie.client.CoordinatorAction;
028 import org.apache.oozie.client.WorkflowJob;
029 import org.apache.oozie.client.SLAEvent.SlaAppType;
030 import org.apache.oozie.client.SLAEvent.Status;
031 import org.apache.oozie.command.CommandException;
032
033 public class CoordActionUpdateCommand extends CoordinatorCommand<Void> {
034 private final XLog log = XLog.getLog(getClass());
035 private WorkflowJobBean workflow;
036 private CoordinatorActionBean caction = null;
037
038 public CoordActionUpdateCommand(WorkflowJobBean workflow) {
039 super("coord-action-update", "coord-action-update", 1, XLog.OPS);
040 this.workflow = workflow;
041 }
042
043 @Override
044 protected Void call(CoordinatorStore cstore) throws StoreException, CommandException {
045 try {
046 if (workflow.getStatus() == WorkflowJob.Status.RUNNING
047 || workflow.getStatus() == WorkflowJob.Status.SUSPENDED) {
048 //update lastModifiedTime
049 cstore.updateCoordinatorAction(caction);
050 return null;
051 }
052 // CoordinatorActionBean caction =
053 // cstore.getCoordinatorActionForExternalId(workflow.getId());
054 Status slaStatus = null;
055 if (caction != null) {
056 if (workflow.getStatus() == WorkflowJob.Status.SUCCEEDED) {
057 caction.setStatus(CoordinatorAction.Status.SUCCEEDED);
058 slaStatus = Status.SUCCEEDED;
059 }
060 else {
061 if (workflow.getStatus() == WorkflowJob.Status.FAILED) {
062 caction.setStatus(CoordinatorAction.Status.FAILED);
063 slaStatus = Status.FAILED;
064 }
065 else {
066 if (workflow.getStatus() == WorkflowJob.Status.KILLED) {
067 caction.setStatus(CoordinatorAction.Status.KILLED);
068 slaStatus = Status.KILLED;
069 }
070 else {
071 log.warn(
072 "Unexpected workflow " + workflow.getId() + " STATUS " + workflow.getStatus());
073 //update lastModifiedTime
074 cstore.updateCoordinatorAction(caction);
075 return null;
076 }
077 }
078 }
079
080 log.info(
081 "Updating Coordintaor id :" + caction.getId() + "status to =" + caction.getStatus());
082 cstore.updateCoordinatorAction(caction);
083 if (slaStatus != null) {
084 SLADbOperations.writeStausEvent(caction.getSlaXml(), caction.getId(), cstore, slaStatus,
085 SlaAppType.COORDINATOR_ACTION);
086 }
087 queueCallable(new CoordActionReadyCommand(caction.getJobId()));
088 }
089 }
090 catch (XException ex) {
091 log.warn("CoordActionUpdate Failed ", ex.getMessage());
092 throw new CommandException(ex);
093 }
094 return null;
095 }
096
097 @Override
098 protected Void execute(CoordinatorStore store) throws StoreException, CommandException {
099 log.info("STARTED CoordActionUpdateCommand for wfId=" + workflow.getId());
100 caction = store.getCoordinatorActionForExternalId(workflow.getId());
101 if (caction == null) {
102 log.info("ENDED CoordActionUpdateCommand for wfId=" + workflow.getId() + ", coord action is null");
103 return null;
104 }
105 setLogInfo(caction);
106 String jobId = caction.getJobId();
107 try {
108 if (lock(jobId)) {
109 call(store);
110 }
111 else {
112 queueCallable(new CoordActionUpdateCommand(workflow), LOCK_FAILURE_REQUEUE_INTERVAL);
113 log.warn("CoordActionUpdateCommand lock was not acquired - failed JobId=" + jobId + ", wfId="
114 + workflow.getId() + ". Requeing the same.");
115 }
116 }
117 catch (InterruptedException e) {
118 queueCallable(new CoordActionUpdateCommand(workflow), LOCK_FAILURE_REQUEUE_INTERVAL);
119 log.warn("CoordActionUpdateCommand lock acquiring failed with exception " + e.getMessage() + " for jobId="
120 + jobId + ", wfId=" + workflow.getId() + ". Requeing the same.");
121 }
122 finally {
123 log.info("ENDED CoordActionUpdateCommand for wfId=" + workflow.getId() + ", jobId=" + jobId);
124 }
125 return null;
126 }
127 }