001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.coord; 019 020import java.sql.Timestamp; 021import java.util.ArrayList; 022import java.util.Date; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Map; 026 027import org.apache.oozie.CoordinatorActionBean; 028import org.apache.oozie.CoordinatorJobBean; 029import org.apache.oozie.ErrorCode; 030import org.apache.oozie.SLAEventBean; 031import org.apache.oozie.WorkflowJobBean; 032import org.apache.oozie.XException; 033import org.apache.oozie.service.EventHandlerService; 034import org.apache.oozie.service.JPAService; 035import org.apache.oozie.service.Services; 036import org.apache.oozie.util.InstrumentUtils; 037import org.apache.oozie.util.LogUtils; 038import org.apache.oozie.util.ParamChecker; 039import org.apache.oozie.util.db.SLADbOperations; 040import org.apache.oozie.client.CoordinatorAction; 041import org.apache.oozie.client.WorkflowJob; 042import org.apache.oozie.client.SLAEvent.SlaAppType; 043import org.apache.oozie.client.SLAEvent.Status; 044import org.apache.oozie.client.rest.JsonBean; 045import org.apache.oozie.command.CommandException; 046import org.apache.oozie.command.PreconditionException; 047import org.apache.oozie.executor.jpa.BatchQueryExecutor; 048import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor; 049import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; 050import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserAppnameJPAExecutor; 051import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor; 052import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 053import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 054 055/** 056 * The command checks workflow status for coordinator action. 057 */ 058@SuppressWarnings("deprecation") 059public class CoordActionCheckXCommand extends CoordinatorXCommand<Void> { 060 private String actionId; 061 private int actionCheckDelay; 062 private CoordinatorActionBean coordAction = null; 063 private CoordinatorJobBean coordJob; 064 private WorkflowJobBean workflowJob; 065 private JPAService jpaService = null; 066 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 067 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 068 069 public CoordActionCheckXCommand(String actionId, int actionCheckDelay) { 070 super("coord_action_check", "coord_action_check", 0); 071 this.actionId = ParamChecker.notEmpty(actionId, "actionId"); 072 this.actionCheckDelay = actionCheckDelay; 073 } 074 075 /* (non-Javadoc) 076 * @see org.apache.oozie.command.XCommand#execute() 077 */ 078 @Override 079 protected Void execute() throws CommandException { 080 try { 081 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 082 Status slaStatus = null; 083 CoordinatorAction.Status initialStatus = coordAction.getStatus(); 084 085 if (workflowJob.getStatus() == WorkflowJob.Status.SUCCEEDED) { 086 coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED); 087 // set pending to false as the status is SUCCEEDED 088 coordAction.setPending(0); 089 slaStatus = Status.SUCCEEDED; 090 } 091 else { 092 if (workflowJob.getStatus() == WorkflowJob.Status.FAILED) { 093 coordAction.setStatus(CoordinatorAction.Status.FAILED); 094 slaStatus = Status.FAILED; 095 // set pending to false as the status is FAILED 096 coordAction.setPending(0); 097 } 098 else { 099 if (workflowJob.getStatus() == WorkflowJob.Status.KILLED) { 100 coordAction.setStatus(CoordinatorAction.Status.KILLED); 101 slaStatus = Status.KILLED; 102 // set pending to false as the status is KILLED 103 coordAction.setPending(0); 104 } 105 else { 106 LOG.warn("Unexpected workflow " + workflowJob.getId() + " STATUS " + workflowJob.getStatus()); 107 coordAction.setLastModifiedTime(new Date()); 108 CoordActionQueryExecutor.getInstance().executeUpdate( 109 CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, 110 coordAction); 111 return null; 112 } 113 } 114 } 115 116 LOG.debug("Updating Coordinator actionId :" + coordAction.getId() + "status to =" 117 + coordAction.getStatus()); 118 coordAction.setLastModifiedTime(new Date()); 119 updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, 120 coordAction)); 121 122 if (slaStatus != null) { 123 SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus, 124 SlaAppType.COORDINATOR_ACTION, LOG); 125 if(slaEvent != null) { 126 insertList.add(slaEvent); 127 } 128 } 129 130 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); 131 CoordinatorAction.Status endStatus = coordAction.getStatus(); 132 if (endStatus != initialStatus && EventHandlerService.isEnabled()) { 133 generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), workflowJob.getStartTime()); 134 } 135 } 136 catch (XException ex) { 137 LOG.warn("CoordActionCheckCommand Failed ", ex); 138 throw new CommandException(ex); 139 } 140 return null; 141 } 142 143 /* (non-Javadoc) 144 * @see org.apache.oozie.command.XCommand#getEntityKey() 145 */ 146 @Override 147 public String getEntityKey() { 148 return actionId; 149 } 150 151 @Override 152 public String getKey() { 153 return getName() + "_" + actionId; 154 } 155 156 /* (non-Javadoc) 157 * @see org.apache.oozie.command.XCommand#isLockRequired() 158 */ 159 @Override 160 protected boolean isLockRequired() { 161 return true; 162 } 163 164 /* (non-Javadoc) 165 * @see org.apache.oozie.command.XCommand#loadState() 166 */ 167 @Override 168 protected void loadState() throws CommandException { 169 try { 170 jpaService = Services.get().get(JPAService.class); 171 172 if (jpaService != null) { 173 coordAction = jpaService.execute(new CoordActionGetForCheckJPAExecutor(actionId)); 174 coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor( 175 coordAction.getJobId())); 176 workflowJob = jpaService.execute (new WorkflowJobGetForSLAJPAExecutor(coordAction.getExternalId())); 177 LogUtils.setLogInfo(coordAction, logInfo); 178 } 179 else { 180 throw new CommandException(ErrorCode.E0610); 181 } 182 } 183 catch (XException ex) { 184 throw new CommandException(ex); 185 } 186 } 187 188 /* (non-Javadoc) 189 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 190 */ 191 @Override 192 protected void verifyPrecondition() throws CommandException, PreconditionException { 193 // if the action has been updated, quit this command 194 Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000); 195 Timestamp cactionLmt = coordAction.getLastModifiedTimestamp(); 196 if (cactionLmt.after(actionCheckTs)) { 197 throw new PreconditionException(ErrorCode.E1100, "The coord action :" + actionId 198 + " has been udated. Ignore CoordActionCheckCommand!"); 199 } 200 if (coordAction.getStatus().equals(CoordinatorAction.Status.SUCCEEDED) 201 || coordAction.getStatus().equals(CoordinatorAction.Status.FAILED) 202 || coordAction.getStatus().equals(CoordinatorAction.Status.KILLED)) { 203 throw new PreconditionException(ErrorCode.E1100, "The coord action [" + actionId + "] must not have status " 204 + CoordinatorAction.Status.SUCCEEDED.name() + ", " + CoordinatorAction.Status.FAILED.name() 205 + ", or " + CoordinatorAction.Status.KILLED.name() + " but has status [" + coordAction.getStatus().name() 206 + "]"); 207 } 208 } 209}