001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.coord; 020 021import java.util.ArrayList; 022import java.util.Date; 023import java.util.List; 024 025import org.apache.oozie.CoordinatorActionBean; 026import org.apache.oozie.CoordinatorJobBean; 027import org.apache.oozie.ErrorCode; 028import org.apache.oozie.SLAEventBean; 029import org.apache.oozie.WorkflowJobBean; 030import org.apache.oozie.XException; 031import org.apache.oozie.service.EventHandlerService; 032import org.apache.oozie.service.JPAService; 033import org.apache.oozie.service.Services; 034import org.apache.oozie.util.LogUtils; 035import org.apache.oozie.util.db.SLADbOperations; 036import org.apache.oozie.client.CoordinatorAction; 037import org.apache.oozie.client.WorkflowJob; 038import org.apache.oozie.client.SLAEvent.SlaAppType; 039import org.apache.oozie.client.SLAEvent.Status; 040import org.apache.oozie.client.rest.JsonBean; 041import org.apache.oozie.command.CommandException; 042import org.apache.oozie.command.PreconditionException; 043import org.apache.oozie.executor.jpa.BatchQueryExecutor; 044import org.apache.oozie.executor.jpa.CoordActionGetForExternalIdJPAExecutor; 045import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; 046import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserAppnameJPAExecutor; 047import org.apache.oozie.executor.jpa.JPAExecutorException; 048import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 049import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 050 051@SuppressWarnings("deprecation") 052public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> { 053 private WorkflowJobBean workflow; 054 private CoordinatorActionBean coordAction = null; 055 private CoordinatorJobBean coordJob; 056 private JPAService jpaService = null; 057 private int maxRetries = 1; 058 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 059 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 060 061 public CoordActionUpdateXCommand(WorkflowJobBean workflow) { 062 super("coord-action-update", "coord-action-update", 1); 063 this.workflow = workflow; 064 } 065 066 public CoordActionUpdateXCommand(WorkflowJobBean workflow, int maxRetries) { 067 super("coord-action-update", "coord-action-update", 1); 068 this.workflow = workflow; 069 this.maxRetries = maxRetries; 070 } 071 072 @Override 073 protected void setLogInfo() { 074 LogUtils.setLogInfo(workflow.getId()); 075 } 076 077 @Override 078 protected Void execute() throws CommandException { 079 try { 080 LOG.debug("STARTED CoordActionUpdateXCommand for wfId=[{0}]", workflow.getId()); 081 final CoordinatorAction.Status formerCoordinatorStatus = coordAction.getStatus(); 082 final int formerCoordinatorPending = coordAction.getPending(); 083 Status slaStatus = null; 084 if (workflow.getStatus() == WorkflowJob.Status.SUCCEEDED) { 085 coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED); 086 coordAction.setPending(0); 087 slaStatus = Status.SUCCEEDED; 088 } 089 else if (workflow.getStatus() == WorkflowJob.Status.FAILED) { 090 coordAction.setStatus(CoordinatorAction.Status.FAILED); 091 coordAction.setPending(0); 092 slaStatus = Status.FAILED; 093 } 094 else if (workflow.getStatus() == WorkflowJob.Status.KILLED) { 095 coordAction.setStatus(CoordinatorAction.Status.KILLED); 096 coordAction.setPending(0); 097 slaStatus = Status.KILLED; 098 } 099 else if (workflow.getStatus() == WorkflowJob.Status.SUSPENDED) { 100 coordAction.setStatus(CoordinatorAction.Status.SUSPENDED); 101 coordAction.decrementAndGetPending(); 102 } 103 else if (workflow.getStatus() == WorkflowJob.Status.RUNNING || 104 workflow.getStatus() == WorkflowJob.Status.PREP) { 105 // resume workflow job and update coord action accordingly 106 coordAction.setStatus(CoordinatorAction.Status.RUNNING); 107 coordAction.decrementAndGetPending(); 108 } 109 else { 110 LOG.warn("Unexpected workflow [{0}] STATUS [{1}]", workflow.getId(), workflow.getStatus()); 111 // update lastModifiedTime 112 coordAction.setLastModifiedTime(new Date()); 113 CoordActionQueryExecutor.getInstance().executeUpdate( 114 CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, coordAction); 115 // TODO - Uncomment this when bottom up rerun can change terminal state 116 /* CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId())); 117 if (!coordJob.isPending()) { 118 coordJob.setPending(); 119 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 120 }*/ 121 return null; 122 } 123 124 LOG.info("Updating Coordinator action id: [{0}] status [{1}] to [{2}] pending [{3}] to [{4}]", 125 coordAction.getId(), formerCoordinatorStatus, coordAction.getStatus(), 126 formerCoordinatorPending, coordAction.getPending()); 127 128 coordAction.setLastModifiedTime(new Date()); 129 updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, 130 coordAction)); 131 // TODO - Uncomment this when bottom up rerun can change terminal state 132 /*CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId())); 133 if (!coordJob.isPending()) { 134 coordJob.setPending(); 135 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 136 LOG.info("Updating Coordinator job "+ coordJob.getId() + "pending to true"); 137 }*/ 138 if (slaStatus != null) { 139 SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), 140 coordAction.getId(), slaStatus, SlaAppType.COORDINATOR_ACTION, LOG); 141 if (slaEvent != null) { 142 insertList.add(slaEvent); 143 } 144 } 145 if (workflow.getStatus() != WorkflowJob.Status.SUSPENDED 146 && workflow.getStatus() != WorkflowJob.Status.RUNNING) { 147 queue(new CoordActionReadyXCommand(coordAction.getJobId())); 148 } 149 150 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); 151 if (EventHandlerService.isEnabled()) { 152 generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), workflow.getStartTime()); 153 } 154 155 LOG.debug("ENDED CoordActionUpdateXCommand for wfId= [{0}]", workflow.getId()); 156 } 157 catch (XException ex) { 158 LOG.warn("CoordActionUpdate Failed [{0}]", ex.getMessage()); 159 throw new CommandException(ex); 160 } 161 return null; 162 } 163 164 /* (non-Javadoc) 165 * @see org.apache.oozie.command.XCommand#getEntityKey() 166 */ 167 @Override 168 public String getEntityKey() { 169 return workflow.getParentId().substring(0, workflow.getParentId().indexOf("@")); 170 } 171 172 /* (non-Javadoc) 173 * @see org.apache.oozie.command.XCommand#isLockRequired() 174 */ 175 @Override 176 protected boolean isLockRequired() { 177 return true; 178 } 179 180 @Override 181 protected void loadState() throws CommandException { 182 jpaService = Services.get().get(JPAService.class); 183 int retries = 0; 184 while (retries++ < maxRetries) { 185 try { 186 coordAction = jpaService.execute(new CoordActionGetForExternalIdJPAExecutor(workflow.getId())); 187 if (coordAction != null) { 188 coordJob = jpaService 189 .execute(new CoordinatorJobGetForUserAppnameJPAExecutor(coordAction.getJobId())); 190 LogUtils.setLogInfo(coordAction); 191 break; 192 } 193 if (retries < maxRetries) { 194 Thread.sleep(500); 195 } 196 } 197 catch (JPAExecutorException je) { 198 LOG.warn("Could not load coord action {0}", je.getMessage(), je); 199 } 200 catch (InterruptedException ex) { 201 LOG.warn("Retry to load coord action is interrupted {0}", ex.getMessage(), ex); 202 } 203 } 204 } 205 206 @Override 207 protected void verifyPrecondition() throws CommandException, PreconditionException { 208 // if coord action status == workflow, and pending false then coord action is already updated 209 if (((workflow.getStatus() == WorkflowJob.Status.SUCCEEDED 210 && coordAction.getStatus() == CoordinatorAction.Status.SUCCEEDED) || 211 (workflow.getStatus() == WorkflowJob.Status.FAILED 212 && coordAction.getStatus() == CoordinatorAction.Status.FAILED) || 213 (workflow.getStatus() == WorkflowJob.Status.KILLED 214 && coordAction.getStatus() == CoordinatorAction.Status.KILLED) || 215 (workflow.getStatus() == WorkflowJob.Status.SUSPENDED 216 && coordAction.getStatus() == CoordinatorAction.Status.SUSPENDED) || 217 (workflow.getStatus() == WorkflowJob.Status.RUNNING 218 && coordAction.getStatus() == CoordinatorAction.Status.RUNNING) 219 ) && !coordAction.isPending()) { 220 try { 221 CoordActionQueryExecutor.getInstance().executeUpdate( 222 CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, coordAction); 223 } catch (JPAExecutorException je) { 224 throw new CommandException(je); 225 } 226 throw new PreconditionException(ErrorCode.E1100, ", workflow is " + workflow.getStatus() + 227 " and coordinator action is " + coordAction.getStatus() + " and pending false"); 228 } 229 } 230 231}