This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.sql.Timestamp;
021
022 import org.apache.oozie.CoordinatorActionBean;
023 import org.apache.oozie.WorkflowJobBean;
024 import org.apache.oozie.XException;
025 import org.apache.oozie.service.Services;
026 import org.apache.oozie.service.StoreService;
027 import org.apache.oozie.store.CoordinatorStore;
028 import org.apache.oozie.store.StoreException;
029 import org.apache.oozie.store.WorkflowStore;
030 import org.apache.oozie.util.XLog;
031 import org.apache.oozie.util.db.SLADbOperations;
032 import org.apache.oozie.client.CoordinatorAction;
033 import org.apache.oozie.client.WorkflowJob;
034 import org.apache.oozie.client.SLAEvent.SlaAppType;
035 import org.apache.oozie.client.SLAEvent.Status;
036 import org.apache.oozie.command.CommandException;
037
038 public class CoordActionCheckCommand extends CoordinatorCommand<Void> {
039 private String actionId;
040 private int actionCheckDelay;
041 private final XLog log = XLog.getLog(getClass());
042 private CoordinatorActionBean coordAction = null;
043
044 public CoordActionCheckCommand(String actionId, int actionCheckDelay) {
045 super("coord_action_check", "coord_action_check", 0, XLog.OPS);
046 this.actionId = actionId;
047 this.actionCheckDelay = actionCheckDelay;
048 }
049
050 protected Void call(CoordinatorStore cstore) throws StoreException, CommandException {
051 try {
052 //if the action has been updated, quit this command
053 Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000);
054 Timestamp cactionLmt = coordAction.getLastModifiedTimestamp();
055 if (cactionLmt.after(actionCheckTs)) {
056 log.info("The coord action :" + actionId + " has been udated. Ignore CoordActionCheckCommand!");
057 return null;
058 }
059 if (coordAction.getStatus().equals(CoordinatorAction.Status.SUCCEEDED)
060 || coordAction.getStatus().equals(CoordinatorAction.Status.FAILED)
061 || coordAction.getStatus().equals(CoordinatorAction.Status.KILLED)) {
062 // do nothing
063 }
064 else {
065 incrJobCounter(1);
066 WorkflowStore wstore = Services.get().get(StoreService.class).getStore(WorkflowStore.class, cstore);
067 WorkflowJobBean wf = wstore.getWorkflow(coordAction.getExternalId(), false);
068
069 Status slaStatus = null;
070
071 if (wf.getStatus() == WorkflowJob.Status.SUCCEEDED) {
072 coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED);
073 slaStatus = Status.SUCCEEDED;
074 }
075 else {
076 if (wf.getStatus() == WorkflowJob.Status.FAILED) {
077 coordAction.setStatus(CoordinatorAction.Status.FAILED);
078 slaStatus = Status.FAILED;
079 }
080 else {
081 if (wf.getStatus() == WorkflowJob.Status.KILLED) {
082 coordAction.setStatus(CoordinatorAction.Status.KILLED);
083 slaStatus = Status.KILLED;
084 }
085 else {
086 log.warn("Unexpected workflow " + wf.getId() + " STATUS " + wf.getStatus());
087 cstore.updateCoordinatorAction(coordAction);
088 return null;
089 }
090 }
091 }
092
093 log.debug("Updating Coordintaor actionId :" + coordAction.getId() + "status to =" + coordAction.getStatus());
094 cstore.updateCoordinatorAction(coordAction);
095 if (slaStatus != null) {
096 SLADbOperations.writeStausEvent(coordAction.getSlaXml(), coordAction.getId(), cstore, slaStatus,
097 SlaAppType.COORDINATOR_ACTION);
098 }
099 }
100
101 }
102 catch (XException ex) {
103 log.warn("CoordActionCheckCommand Failed ", ex);
104 throw new CommandException(ex);
105 }
106 return null;
107 }
108
109 @Override
110 protected Void execute(CoordinatorStore store) throws StoreException, CommandException {
111 log.info("STARTED CoordActionCheckCommand for actionId = " + actionId);
112 try {
113 coordAction = store.getEntityManager().find(CoordinatorActionBean.class, actionId);
114 setLogInfo(coordAction);
115 if (lock(coordAction.getJobId())) {
116 call(store);
117 }
118 else {
119 queueCallable(new CoordActionCheckCommand(actionId, actionCheckDelay), LOCK_FAILURE_REQUEUE_INTERVAL);
120 log.warn("CoordActionCheckCommand lock was not acquired - failed jobId=" + coordAction.getJobId()
121 + ", actionId=" + actionId + ". Requeing the same.");
122 }
123 }
124 catch (InterruptedException e) {
125 queueCallable(new CoordActionCheckCommand(actionId, actionCheckDelay), LOCK_FAILURE_REQUEUE_INTERVAL);
126 log.warn("CoordActionCheckCommand lock acquiring failed with exception " + e.getMessage() + " for jobId="
127 + coordAction.getJobId() + ", actionId=" + actionId + " Requeing the same.");
128 }
129 finally {
130 log.info("ENDED CoordActionCheckCommand for actionId:" + actionId);
131 }
132 return null;
133 }
134 }