001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.sql.Timestamp; 021 import java.util.ArrayList; 022 import java.util.Date; 023 import java.util.List; 024 025 import org.apache.oozie.CoordinatorActionBean; 026 import org.apache.oozie.CoordinatorJobBean; 027 import org.apache.oozie.ErrorCode; 028 import org.apache.oozie.SLAEventBean; 029 import org.apache.oozie.WorkflowJobBean; 030 import org.apache.oozie.XException; 031 import org.apache.oozie.service.EventHandlerService; 032 import org.apache.oozie.service.JPAService; 033 import org.apache.oozie.service.Services; 034 import org.apache.oozie.util.InstrumentUtils; 035 import org.apache.oozie.util.LogUtils; 036 import org.apache.oozie.util.ParamChecker; 037 import org.apache.oozie.util.db.SLADbOperations; 038 import org.apache.oozie.client.CoordinatorAction; 039 import org.apache.oozie.client.WorkflowJob; 040 import org.apache.oozie.client.SLAEvent.SlaAppType; 041 import org.apache.oozie.client.SLAEvent.Status; 042 import org.apache.oozie.client.rest.JsonBean; 043 import org.apache.oozie.command.CommandException; 044 import org.apache.oozie.command.PreconditionException; 045 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor; 046 import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor; 047 import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserAppnameJPAExecutor; 048 import org.apache.oozie.executor.jpa.WorkflowActionGetForSLAJPAExecutor; 049 import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor; 050 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 051 052 /** 053 * The command checks workflow status for coordinator action. 054 */ 055 @SuppressWarnings("deprecation") 056 public class CoordActionCheckXCommand extends CoordinatorXCommand<Void> { 057 private String actionId; 058 private int actionCheckDelay; 059 private CoordinatorActionBean coordAction = null; 060 private CoordinatorJobBean coordJob; 061 private WorkflowJobBean workflowJob; 062 private JPAService jpaService = null; 063 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 064 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 065 066 public CoordActionCheckXCommand(String actionId, int actionCheckDelay) { 067 super("coord_action_check", "coord_action_check", 0); 068 this.actionId = ParamChecker.notEmpty(actionId, "actionId"); 069 this.actionCheckDelay = actionCheckDelay; 070 } 071 072 /* (non-Javadoc) 073 * @see org.apache.oozie.command.XCommand#execute() 074 */ 075 @Override 076 protected Void execute() throws CommandException { 077 try { 078 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 079 WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetJPAExecutor(coordAction.getExternalId())); 080 Status slaStatus = null; 081 CoordinatorAction.Status initialStatus = coordAction.getStatus(); 082 083 if (wf.getStatus() == WorkflowJob.Status.SUCCEEDED) { 084 coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED); 085 // set pending to false as the status is SUCCEEDED 086 coordAction.setPending(0); 087 slaStatus = Status.SUCCEEDED; 088 } 089 else { 090 if (wf.getStatus() == WorkflowJob.Status.FAILED) { 091 coordAction.setStatus(CoordinatorAction.Status.FAILED); 092 slaStatus = Status.FAILED; 093 // set pending to false as the status is FAILED 094 coordAction.setPending(0); 095 } 096 else { 097 if (wf.getStatus() == WorkflowJob.Status.KILLED) { 098 coordAction.setStatus(CoordinatorAction.Status.KILLED); 099 slaStatus = Status.KILLED; 100 // set pending to false as the status is KILLED 101 coordAction.setPending(0); 102 } 103 else { 104 LOG.warn("Unexpected workflow " + wf.getId() + " STATUS " + wf.getStatus()); 105 coordAction.setLastModifiedTime(new Date()); 106 updateList.add(coordAction); 107 jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null)); 108 return null; 109 } 110 } 111 } 112 113 LOG.debug("Updating Coordintaor actionId :" + coordAction.getId() + "status to =" 114 + coordAction.getStatus()); 115 coordAction.setLastModifiedTime(new Date()); 116 updateList.add(coordAction); 117 118 if (slaStatus != null) { 119 SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus, 120 SlaAppType.COORDINATOR_ACTION, LOG); 121 if(slaEvent != null) { 122 insertList.add(slaEvent); 123 } 124 } 125 126 jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList)); 127 CoordinatorAction.Status endStatus = coordAction.getStatus(); 128 if (endStatus != initialStatus && EventHandlerService.isEnabled()) { 129 generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), workflowJob.getStartTime()); 130 } 131 } 132 catch (XException ex) { 133 LOG.warn("CoordActionCheckCommand Failed ", ex); 134 throw new CommandException(ex); 135 } 136 return null; 137 } 138 139 /* (non-Javadoc) 140 * @see org.apache.oozie.command.XCommand#getEntityKey() 141 */ 142 @Override 143 public String getEntityKey() { 144 return actionId; 145 } 146 147 @Override 148 public String getKey() { 149 return getName() + "_" + actionId; 150 } 151 152 /* (non-Javadoc) 153 * @see org.apache.oozie.command.XCommand#isLockRequired() 154 */ 155 @Override 156 protected boolean isLockRequired() { 157 return true; 158 } 159 160 /* (non-Javadoc) 161 * @see org.apache.oozie.command.XCommand#loadState() 162 */ 163 @Override 164 protected void loadState() throws CommandException { 165 try { 166 jpaService = Services.get().get(JPAService.class); 167 168 if (jpaService != null) { 169 coordAction = jpaService.execute(new CoordActionGetForCheckJPAExecutor(actionId)); 170 coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor( 171 coordAction.getJobId())); 172 workflowJob = jpaService.execute (new WorkflowJobGetForSLAJPAExecutor(coordAction.getExternalId())); 173 LogUtils.setLogInfo(coordAction, logInfo); 174 } 175 else { 176 throw new CommandException(ErrorCode.E0610); 177 } 178 } 179 catch (XException ex) { 180 throw new CommandException(ex); 181 } 182 } 183 184 /* (non-Javadoc) 185 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 186 */ 187 @Override 188 protected void verifyPrecondition() throws CommandException, PreconditionException { 189 // if the action has been updated, quit this command 190 Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000); 191 Timestamp cactionLmt = coordAction.getLastModifiedTimestamp(); 192 if (cactionLmt.after(actionCheckTs)) { 193 throw new PreconditionException(ErrorCode.E1100, "The coord action :" + actionId 194 + " has been udated. Ignore CoordActionCheckCommand!"); 195 } 196 if (coordAction.getStatus().equals(CoordinatorAction.Status.SUCCEEDED) 197 || coordAction.getStatus().equals(CoordinatorAction.Status.FAILED) 198 || coordAction.getStatus().equals(CoordinatorAction.Status.KILLED)) { 199 throw new PreconditionException(ErrorCode.E1100, "The coord action [" + actionId + "] must not have status " 200 + CoordinatorAction.Status.SUCCEEDED.name() + ", " + CoordinatorAction.Status.FAILED.name() 201 + ", or " + CoordinatorAction.Status.KILLED.name() + " but has status [" + coordAction.getStatus().name() 202 + "]"); 203 } 204 } 205 }