001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.util.ArrayList; 021 import java.util.Date; 022 import java.util.List; 023 024 import org.apache.oozie.CoordinatorActionBean; 025 import org.apache.oozie.CoordinatorJobBean; 026 import org.apache.oozie.ErrorCode; 027 import org.apache.oozie.SLAEventBean; 028 import org.apache.oozie.WorkflowJobBean; 029 import org.apache.oozie.XException; 030 import org.apache.oozie.service.EventHandlerService; 031 import org.apache.oozie.service.JPAService; 032 import org.apache.oozie.service.Services; 033 import org.apache.oozie.util.LogUtils; 034 import org.apache.oozie.util.db.SLADbOperations; 035 import org.apache.oozie.client.CoordinatorAction; 036 import org.apache.oozie.client.WorkflowJob; 037 import org.apache.oozie.client.SLAEvent.SlaAppType; 038 import org.apache.oozie.client.SLAEvent.Status; 039 import org.apache.oozie.client.rest.JsonBean; 040 import org.apache.oozie.command.CommandException; 041 import org.apache.oozie.command.PreconditionException; 042 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor; 043 import org.apache.oozie.executor.jpa.CoordActionGetForExternalIdJPAExecutor; 044 import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserAppnameJPAExecutor; 045 import org.apache.oozie.executor.jpa.JPAExecutorException; 046 047 @SuppressWarnings("deprecation") 048 public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> { 049 private WorkflowJobBean workflow; 050 private CoordinatorActionBean coordAction = null; 051 private CoordinatorJobBean coordJob; 052 private JPAService jpaService = null; 053 private int maxRetries = 1; 054 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 055 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 056 057 public CoordActionUpdateXCommand(WorkflowJobBean workflow) { 058 super("coord-action-update", "coord-action-update", 1); 059 this.workflow = workflow; 060 } 061 062 public CoordActionUpdateXCommand(WorkflowJobBean workflow, int maxRetries) { 063 super("coord-action-update", "coord-action-update", 1); 064 this.workflow = workflow; 065 this.maxRetries = maxRetries; 066 } 067 068 @Override 069 protected Void execute() throws CommandException { 070 try { 071 LOG.debug("STARTED CoordActionUpdateXCommand for wfId=" + workflow.getId()); 072 Status slaStatus = null; 073 if (workflow.getStatus() == WorkflowJob.Status.SUCCEEDED) { 074 coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED); 075 coordAction.setPending(0); 076 slaStatus = Status.SUCCEEDED; 077 } 078 else if (workflow.getStatus() == WorkflowJob.Status.FAILED) { 079 coordAction.setStatus(CoordinatorAction.Status.FAILED); 080 coordAction.setPending(0); 081 slaStatus = Status.FAILED; 082 } 083 else if (workflow.getStatus() == WorkflowJob.Status.KILLED) { 084 coordAction.setStatus(CoordinatorAction.Status.KILLED); 085 coordAction.setPending(0); 086 slaStatus = Status.KILLED; 087 } 088 else if (workflow.getStatus() == WorkflowJob.Status.SUSPENDED) { 089 coordAction.setStatus(CoordinatorAction.Status.SUSPENDED); 090 coordAction.decrementAndGetPending(); 091 } 092 else if (workflow.getStatus() == WorkflowJob.Status.RUNNING) { 093 // resume workflow job and update coord action accordingly 094 coordAction.setStatus(CoordinatorAction.Status.RUNNING); 095 coordAction.decrementAndGetPending(); 096 } 097 else { 098 LOG.warn("Unexpected workflow " + workflow.getId() + " STATUS " + workflow.getStatus()); 099 // update lastModifiedTime 100 coordAction.setLastModifiedTime(new Date()); 101 updateList.add(coordAction); 102 jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null)); 103 // TODO - Uncomment this when bottom up rerun can change terminal state 104 /* CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId())); 105 if (!coordJob.isPending()) { 106 coordJob.setPending(); 107 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 108 }*/ 109 return null; 110 } 111 112 LOG.info("Updating Coordintaor action id :" + coordAction.getId() + " status " 113 + " to " + coordAction.getStatus() + ", pending = " + coordAction.getPending()); 114 115 coordAction.setLastModifiedTime(new Date()); 116 updateList.add(coordAction); 117 // TODO - Uncomment this when bottom up rerun can change terminal state 118 /*CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId())); 119 if (!coordJob.isPending()) { 120 coordJob.setPending(); 121 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 122 LOG.info("Updating Coordinator job "+ coordJob.getId() + "pending to true"); 123 }*/ 124 if (slaStatus != null) { 125 SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus, 126 SlaAppType.COORDINATOR_ACTION, LOG); 127 if(slaEvent != null) { 128 insertList.add(slaEvent); 129 } 130 } 131 if (workflow.getStatus() != WorkflowJob.Status.SUSPENDED 132 && workflow.getStatus() != WorkflowJob.Status.RUNNING) { 133 queue(new CoordActionReadyXCommand(coordAction.getJobId())); 134 } 135 136 jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList)); 137 if (EventHandlerService.isEnabled()) { 138 generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), workflow.getStartTime()); 139 } 140 141 LOG.debug("ENDED CoordActionUpdateXCommand for wfId=" + workflow.getId()); 142 } 143 catch (XException ex) { 144 LOG.warn("CoordActionUpdate Failed ", ex.getMessage()); 145 throw new CommandException(ex); 146 } 147 return null; 148 } 149 150 /* (non-Javadoc) 151 * @see org.apache.oozie.command.XCommand#getEntityKey() 152 */ 153 @Override 154 public String getEntityKey() { 155 return coordAction.getJobId(); 156 } 157 158 /* (non-Javadoc) 159 * @see org.apache.oozie.command.XCommand#isLockRequired() 160 */ 161 @Override 162 protected boolean isLockRequired() { 163 return true; 164 } 165 166 /* (non-Javadoc) 167 * @see org.apache.oozie.command.XCommand#eagerLoadState() 168 */ 169 @Override 170 protected void eagerLoadState() throws CommandException { 171 jpaService = Services.get().get(JPAService.class); 172 if (jpaService == null) { 173 throw new CommandException(ErrorCode.E0610); 174 } 175 176 int retries = 0; 177 while (retries++ < maxRetries) { 178 try { 179 coordAction = jpaService.execute(new CoordActionGetForExternalIdJPAExecutor(workflow.getId())); 180 if (coordAction != null) { 181 coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor( 182 coordAction.getJobId())); 183 break; 184 } 185 186 if (retries < maxRetries) { 187 Thread.sleep(500); 188 } 189 } 190 catch (JPAExecutorException je) { 191 LOG.warn("Could not load coord action {0}", je.getMessage(), je); 192 } 193 catch (InterruptedException ex) { 194 LOG.warn("Retry to load coord action is interrupted {0}", ex.getMessage(), ex); 195 } 196 } 197 198 if (coordAction != null) { 199 LogUtils.setLogInfo(coordAction, logInfo); 200 } 201 } 202 203 /* (non-Javadoc) 204 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition() 205 */ 206 @Override 207 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 208 if (coordAction == null) { 209 throw new PreconditionException(ErrorCode.E1100, ", coord action is null"); 210 } 211 } 212 213 /* (non-Javadoc) 214 * @see org.apache.oozie.command.XCommand#loadState() 215 */ 216 @Override 217 protected void loadState() throws CommandException { 218 eagerLoadState(); 219 } 220 221 /* (non-Javadoc) 222 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 223 */ 224 @Override 225 protected void verifyPrecondition() throws CommandException, PreconditionException { 226 227 // if coord action is RUNNING and pending false and workflow is RUNNING, this doesn't need to be updated. 228 if (workflow.getStatus() == WorkflowJob.Status.RUNNING 229 && coordAction.getStatus() == CoordinatorAction.Status.RUNNING && !coordAction.isPending()) { 230 // update lastModifiedTime 231 coordAction.setLastModifiedTime(new Date()); 232 try { 233 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor(coordAction)); 234 } 235 catch (JPAExecutorException je) { 236 throw new CommandException(je); 237 } 238 throw new PreconditionException(ErrorCode.E1100, ", workflow is RUNNING and coordinator action is RUNNING and pending false"); 239 } 240 } 241 242 }