001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.sql.Timestamp; 021 import java.util.ArrayList; 022 import java.util.Date; 023 import java.util.List; 024 025 import org.apache.oozie.CoordinatorActionBean; 026 import org.apache.oozie.ErrorCode; 027 import org.apache.oozie.SLAEventBean; 028 import org.apache.oozie.WorkflowJobBean; 029 import org.apache.oozie.XException; 030 import org.apache.oozie.service.JPAService; 031 import org.apache.oozie.service.Services; 032 import org.apache.oozie.util.InstrumentUtils; 033 import org.apache.oozie.util.LogUtils; 034 import org.apache.oozie.util.ParamChecker; 035 import org.apache.oozie.util.db.SLADbOperations; 036 import org.apache.oozie.client.CoordinatorAction; 037 import org.apache.oozie.client.WorkflowJob; 038 import org.apache.oozie.client.SLAEvent.SlaAppType; 039 import org.apache.oozie.client.SLAEvent.Status; 040 import org.apache.oozie.client.rest.JsonBean; 041 import org.apache.oozie.command.CommandException; 042 import org.apache.oozie.command.PreconditionException; 043 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor; 044 import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor; 045 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 046 047 /** 048 * The command checks workflow status for coordinator action. 049 */ 050 public class CoordActionCheckXCommand extends CoordinatorXCommand<Void> { 051 private String actionId; 052 private int actionCheckDelay; 053 private CoordinatorActionBean coordAction = null; 054 private JPAService jpaService = null; 055 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 056 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 057 058 public CoordActionCheckXCommand(String actionId, int actionCheckDelay) { 059 super("coord_action_check", "coord_action_check", 0); 060 this.actionId = ParamChecker.notEmpty(actionId, "actionId"); 061 this.actionCheckDelay = actionCheckDelay; 062 } 063 064 /* (non-Javadoc) 065 * @see org.apache.oozie.command.XCommand#execute() 066 */ 067 @Override 068 protected Void execute() throws CommandException { 069 try { 070 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 071 WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetJPAExecutor(coordAction.getExternalId())); 072 073 Status slaStatus = null; 074 075 if (wf.getStatus() == WorkflowJob.Status.SUCCEEDED) { 076 coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED); 077 // set pending to false as the status is SUCCEEDED 078 coordAction.setPending(0); 079 slaStatus = Status.SUCCEEDED; 080 } 081 else { 082 if (wf.getStatus() == WorkflowJob.Status.FAILED) { 083 coordAction.setStatus(CoordinatorAction.Status.FAILED); 084 slaStatus = Status.FAILED; 085 // set pending to false as the status is FAILED 086 coordAction.setPending(0); 087 } 088 else { 089 if (wf.getStatus() == WorkflowJob.Status.KILLED) { 090 coordAction.setStatus(CoordinatorAction.Status.KILLED); 091 slaStatus = Status.KILLED; 092 // set pending to false as the status is KILLED 093 coordAction.setPending(0); 094 } 095 else { 096 LOG.warn("Unexpected workflow " + wf.getId() + " STATUS " + wf.getStatus()); 097 coordAction.setLastModifiedTime(new Date()); 098 updateList.add(coordAction); 099 jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null)); 100 return null; 101 } 102 } 103 } 104 105 LOG.debug("Updating Coordintaor actionId :" + coordAction.getId() + "status to =" 106 + coordAction.getStatus()); 107 coordAction.setLastModifiedTime(new Date()); 108 updateList.add(coordAction); 109 110 if (slaStatus != null) { 111 SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus, 112 SlaAppType.COORDINATOR_ACTION, LOG); 113 if(slaEvent != null) { 114 insertList.add(slaEvent); 115 } 116 } 117 118 jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList)); 119 } 120 catch (XException ex) { 121 LOG.warn("CoordActionCheckCommand Failed ", ex); 122 throw new CommandException(ex); 123 } 124 return null; 125 } 126 127 /* (non-Javadoc) 128 * @see org.apache.oozie.command.XCommand#getEntityKey() 129 */ 130 @Override 131 public String getEntityKey() { 132 return actionId; 133 } 134 135 /* (non-Javadoc) 136 * @see org.apache.oozie.command.XCommand#isLockRequired() 137 */ 138 @Override 139 protected boolean isLockRequired() { 140 return true; 141 } 142 143 /* (non-Javadoc) 144 * @see org.apache.oozie.command.XCommand#loadState() 145 */ 146 @Override 147 protected void loadState() throws CommandException { 148 try { 149 jpaService = Services.get().get(JPAService.class); 150 151 if (jpaService != null) { 152 coordAction = jpaService.execute(new CoordActionGetForCheckJPAExecutor(actionId)); 153 LogUtils.setLogInfo(coordAction, logInfo); 154 } 155 else { 156 throw new CommandException(ErrorCode.E0610); 157 } 158 } 159 catch (XException ex) { 160 throw new CommandException(ex); 161 } 162 } 163 164 /* (non-Javadoc) 165 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 166 */ 167 @Override 168 protected void verifyPrecondition() throws CommandException, PreconditionException { 169 // if the action has been updated, quit this command 170 Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000); 171 Timestamp cactionLmt = coordAction.getLastModifiedTimestamp(); 172 if (cactionLmt.after(actionCheckTs)) { 173 throw new PreconditionException(ErrorCode.E1100, "The coord action :" + actionId 174 + " has been udated. Ignore CoordActionCheckCommand!"); 175 } 176 if (coordAction.getStatus().equals(CoordinatorAction.Status.SUCCEEDED) 177 || coordAction.getStatus().equals(CoordinatorAction.Status.FAILED) 178 || coordAction.getStatus().equals(CoordinatorAction.Status.KILLED)) { 179 throw new PreconditionException(ErrorCode.E1100, "The coord action [" + actionId + "] must not have status " 180 + CoordinatorAction.Status.SUCCEEDED.name() + ", " + CoordinatorAction.Status.FAILED.name() 181 + ", or " + CoordinatorAction.Status.KILLED.name() + " but has status [" + coordAction.getStatus().name() 182 + "]"); 183 } 184 } 185 }