001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.sql.Timestamp; 021 022 import org.apache.oozie.CoordinatorActionBean; 023 import org.apache.oozie.WorkflowJobBean; 024 import org.apache.oozie.XException; 025 import org.apache.oozie.service.Services; 026 import org.apache.oozie.service.StoreService; 027 import org.apache.oozie.store.CoordinatorStore; 028 import org.apache.oozie.store.StoreException; 029 import org.apache.oozie.store.WorkflowStore; 030 import org.apache.oozie.util.XLog; 031 import org.apache.oozie.util.db.SLADbOperations; 032 import org.apache.oozie.client.CoordinatorAction; 033 import org.apache.oozie.client.WorkflowJob; 034 import org.apache.oozie.client.SLAEvent.SlaAppType; 035 import org.apache.oozie.client.SLAEvent.Status; 036 import org.apache.oozie.command.CommandException; 037 038 public class CoordActionCheckCommand extends CoordinatorCommand<Void> { 039 private String actionId; 040 private int actionCheckDelay; 041 private final XLog log = XLog.getLog(getClass()); 042 private CoordinatorActionBean coordAction = null; 043 044 public CoordActionCheckCommand(String actionId, int actionCheckDelay) { 045 super("coord_action_check", "coord_action_check", 0, XLog.OPS); 046 this.actionId = actionId; 047 this.actionCheckDelay = actionCheckDelay; 048 } 049 050 protected Void call(CoordinatorStore cstore) throws StoreException, CommandException { 051 try { 052 //if the action has been updated, quit this command 053 Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000); 054 Timestamp cactionLmt = coordAction.getLastModifiedTimestamp(); 055 if (cactionLmt.after(actionCheckTs)) { 056 log.info("The coord action :" + actionId + " has been udated. Ignore CoordActionCheckCommand!"); 057 return null; 058 } 059 if (coordAction.getStatus().equals(CoordinatorAction.Status.SUCCEEDED) 060 || coordAction.getStatus().equals(CoordinatorAction.Status.FAILED) 061 || coordAction.getStatus().equals(CoordinatorAction.Status.KILLED)) { 062 // do nothing 063 } 064 else { 065 incrJobCounter(1); 066 WorkflowStore wstore = Services.get().get(StoreService.class).getStore(WorkflowStore.class, cstore); 067 WorkflowJobBean wf = wstore.getWorkflow(coordAction.getExternalId(), false); 068 069 Status slaStatus = null; 070 071 if (wf.getStatus() == WorkflowJob.Status.SUCCEEDED) { 072 coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED); 073 slaStatus = Status.SUCCEEDED; 074 } 075 else { 076 if (wf.getStatus() == WorkflowJob.Status.FAILED) { 077 coordAction.setStatus(CoordinatorAction.Status.FAILED); 078 slaStatus = Status.FAILED; 079 } 080 else { 081 if (wf.getStatus() == WorkflowJob.Status.KILLED) { 082 coordAction.setStatus(CoordinatorAction.Status.KILLED); 083 slaStatus = Status.KILLED; 084 } 085 else { 086 log.warn("Unexpected workflow " + wf.getId() + " STATUS " + wf.getStatus()); 087 cstore.updateCoordinatorAction(coordAction); 088 return null; 089 } 090 } 091 } 092 093 log.debug("Updating Coordintaor actionId :" + coordAction.getId() + "status to =" + coordAction.getStatus()); 094 cstore.updateCoordinatorAction(coordAction); 095 if (slaStatus != null) { 096 SLADbOperations.writeStausEvent(coordAction.getSlaXml(), coordAction.getId(), cstore, slaStatus, 097 SlaAppType.COORDINATOR_ACTION); 098 } 099 } 100 101 } 102 catch (XException ex) { 103 log.warn("CoordActionCheckCommand Failed ", ex); 104 throw new CommandException(ex); 105 } 106 return null; 107 } 108 109 @Override 110 protected Void execute(CoordinatorStore store) throws StoreException, CommandException { 111 log.info("STARTED CoordActionCheckCommand for actionId = " + actionId); 112 try { 113 coordAction = store.getEntityManager().find(CoordinatorActionBean.class, actionId); 114 setLogInfo(coordAction); 115 if (lock(coordAction.getJobId())) { 116 call(store); 117 } 118 else { 119 queueCallable(new CoordActionCheckCommand(actionId, actionCheckDelay), LOCK_FAILURE_REQUEUE_INTERVAL); 120 log.warn("CoordActionCheckCommand lock was not acquired - failed jobId=" + coordAction.getJobId() 121 + ", actionId=" + actionId + ". Requeing the same."); 122 } 123 } 124 catch (InterruptedException e) { 125 queueCallable(new CoordActionCheckCommand(actionId, actionCheckDelay), LOCK_FAILURE_REQUEUE_INTERVAL); 126 log.warn("CoordActionCheckCommand lock acquiring failed with exception " + e.getMessage() + " for jobId=" 127 + coordAction.getJobId() + ", actionId=" + actionId + " Requeing the same."); 128 } 129 finally { 130 log.info("ENDED CoordActionCheckCommand for actionId:" + actionId); 131 } 132 return null; 133 } 134 }