001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.coord; 020 021import java.sql.Timestamp; 022import java.util.ArrayList; 023import java.util.Date; 024import java.util.List; 025 026import org.apache.oozie.CoordinatorActionBean; 027import org.apache.oozie.CoordinatorJobBean; 028import org.apache.oozie.ErrorCode; 029import org.apache.oozie.SLAEventBean; 030import org.apache.oozie.WorkflowJobBean; 031import org.apache.oozie.XException; 032import org.apache.oozie.service.EventHandlerService; 033import org.apache.oozie.service.JPAService; 034import org.apache.oozie.service.Services; 035import org.apache.oozie.util.InstrumentUtils; 036import org.apache.oozie.util.LogUtils; 037import org.apache.oozie.util.ParamChecker; 038import org.apache.oozie.util.db.SLADbOperations; 039import org.apache.oozie.client.CoordinatorAction; 040import org.apache.oozie.client.WorkflowJob; 041import org.apache.oozie.client.SLAEvent.SlaAppType; 042import org.apache.oozie.client.SLAEvent.Status; 043import org.apache.oozie.client.rest.JsonBean; 044import org.apache.oozie.command.CommandException; 045import org.apache.oozie.command.PreconditionException; 046import org.apache.oozie.executor.jpa.BatchQueryExecutor; 047import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor; 048import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; 049import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserAppnameJPAExecutor; 050import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 051import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 052import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 053import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 054 055/** 056 * The command checks workflow status for coordinator action. 057 */ 058@SuppressWarnings("deprecation") 059public class CoordActionCheckXCommand extends CoordinatorXCommand<Void> { 060 private String actionId; 061 private int actionCheckDelay; 062 private CoordinatorActionBean coordAction = null; 063 private CoordinatorJobBean coordJob; 064 private WorkflowJobBean workflowJob; 065 private JPAService jpaService = null; 066 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 067 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 068 069 public CoordActionCheckXCommand(String actionId, int actionCheckDelay) { 070 super("coord_action_check", "coord_action_check", 0); 071 this.actionId = ParamChecker.notEmpty(actionId, "actionId"); 072 this.actionCheckDelay = actionCheckDelay; 073 } 074 075 @Override 076 protected void setLogInfo() { 077 LogUtils.setLogInfo(actionId); 078 } 079 080 /* (non-Javadoc) 081 * @see org.apache.oozie.command.XCommand#execute() 082 */ 083 @Override 084 protected Void execute() throws CommandException { 085 try { 086 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 087 Status slaStatus = null; 088 CoordinatorAction.Status initialStatus = coordAction.getStatus(); 089 090 if (workflowJob.getStatus() == WorkflowJob.Status.SUCCEEDED) { 091 coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED); 092 // set pending to false as the status is SUCCEEDED 093 coordAction.setPending(0); 094 slaStatus = Status.SUCCEEDED; 095 } 096 else { 097 if (workflowJob.getStatus() == WorkflowJob.Status.FAILED) { 098 coordAction.setStatus(CoordinatorAction.Status.FAILED); 099 slaStatus = Status.FAILED; 100 // set pending to false as the status is FAILED 101 coordAction.setPending(0); 102 } 103 else { 104 if (workflowJob.getStatus() == WorkflowJob.Status.KILLED) { 105 coordAction.setStatus(CoordinatorAction.Status.KILLED); 106 slaStatus = Status.KILLED; 107 // set pending to false as the status is KILLED 108 coordAction.setPending(0); 109 } 110 else if (workflowJob.getStatus() == WorkflowJob.Status.SUSPENDED) { 111 coordAction.setStatus(CoordinatorAction.Status.SUSPENDED); 112 slaStatus = Status.FAILED; 113 // set pending to false as the status is SUSPENDED 114 coordAction.setPending(0); 115 } 116 else { 117 LOG.warn("Unexpected workflow " + workflowJob.getId() + " STATUS " + workflowJob.getStatus()); 118 coordAction.setLastModifiedTime(new Date()); 119 CoordActionQueryExecutor.getInstance().executeUpdate( 120 CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, 121 coordAction); 122 return null; 123 } 124 } 125 } 126 127 LOG.debug("Updating Coordinator actionId :" + coordAction.getId() + "status to =" 128 + coordAction.getStatus()); 129 coordAction.setLastModifiedTime(new Date()); 130 updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, 131 coordAction)); 132 133 if (slaStatus != null) { 134 SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus, 135 SlaAppType.COORDINATOR_ACTION, LOG); 136 if(slaEvent != null) { 137 insertList.add(slaEvent); 138 } 139 } 140 141 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); 142 CoordinatorAction.Status endStatus = coordAction.getStatus(); 143 if (endStatus != initialStatus && EventHandlerService.isEnabled()) { 144 generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), workflowJob.getStartTime()); 145 } 146 } 147 catch (XException ex) { 148 LOG.warn("CoordActionCheckCommand Failed ", ex); 149 throw new CommandException(ex); 150 } 151 return null; 152 } 153 154 /* (non-Javadoc) 155 * @see org.apache.oozie.command.XCommand#getEntityKey() 156 */ 157 @Override 158 public String getEntityKey() { 159 return actionId.substring(0, actionId.indexOf("@")); 160 } 161 162 @Override 163 public String getKey() { 164 return getName() + "_" + actionId; 165 } 166 167 /* (non-Javadoc) 168 * @see org.apache.oozie.command.XCommand#isLockRequired() 169 */ 170 @Override 171 protected boolean isLockRequired() { 172 return true; 173 } 174 175 /* (non-Javadoc) 176 * @see org.apache.oozie.command.XCommand#loadState() 177 */ 178 @Override 179 protected void loadState() throws CommandException { 180 try { 181 jpaService = Services.get().get(JPAService.class); 182 183 if (jpaService != null) { 184 coordAction = jpaService.execute(new CoordActionGetForCheckJPAExecutor(actionId)); 185 coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor( 186 coordAction.getJobId())); 187 workflowJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA, 188 coordAction.getExternalId()); 189 LogUtils.setLogInfo(coordAction); 190 } 191 else { 192 throw new CommandException(ErrorCode.E0610); 193 } 194 } 195 catch (XException ex) { 196 throw new CommandException(ex); 197 } 198 } 199 200 /* (non-Javadoc) 201 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 202 */ 203 @Override 204 protected void verifyPrecondition() throws CommandException, PreconditionException { 205 // if the action has been updated, quit this command 206 Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000); 207 Timestamp cactionLmt = coordAction.getLastModifiedTimestamp(); 208 if (cactionLmt.after(actionCheckTs)) { 209 throw new PreconditionException(ErrorCode.E1100, "The coord action :" + actionId 210 + " has been udated. Ignore CoordActionCheckCommand!"); 211 } 212 if (coordAction.getStatus().equals(CoordinatorAction.Status.SUCCEEDED) 213 || coordAction.getStatus().equals(CoordinatorAction.Status.FAILED) 214 || coordAction.getStatus().equals(CoordinatorAction.Status.KILLED)) { 215 throw new PreconditionException(ErrorCode.E1100, "The coord action [" + actionId + "] must not have status " 216 + CoordinatorAction.Status.SUCCEEDED.name() + ", " + CoordinatorAction.Status.FAILED.name() 217 + ", or " + CoordinatorAction.Status.KILLED.name() + " but has status [" + coordAction.getStatus().name() 218 + "]"); 219 } 220 } 221}