001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.util.ArrayList; 021 import java.util.Date; 022 import java.util.List; 023 024 import org.apache.oozie.CoordinatorActionBean; 025 import org.apache.oozie.ErrorCode; 026 import org.apache.oozie.SLAEventBean; 027 import org.apache.oozie.WorkflowJobBean; 028 import org.apache.oozie.XException; 029 import org.apache.oozie.service.JPAService; 030 import org.apache.oozie.service.Services; 031 import org.apache.oozie.util.LogUtils; 032 import org.apache.oozie.util.db.SLADbOperations; 033 import org.apache.oozie.client.CoordinatorAction; 034 import org.apache.oozie.client.WorkflowJob; 035 import org.apache.oozie.client.SLAEvent.SlaAppType; 036 import org.apache.oozie.client.SLAEvent.Status; 037 import org.apache.oozie.client.rest.JsonBean; 038 import org.apache.oozie.command.CommandException; 039 import org.apache.oozie.command.PreconditionException; 040 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor; 041 import org.apache.oozie.executor.jpa.CoordActionGetForExternalIdJPAExecutor; 042 import org.apache.oozie.executor.jpa.JPAExecutorException; 043 044 public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> { 045 private WorkflowJobBean workflow; 046 private CoordinatorActionBean coordAction = null; 047 private JPAService jpaService = null; 048 private int maxRetries = 1; 049 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 050 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 051 052 public CoordActionUpdateXCommand(WorkflowJobBean workflow) { 053 super("coord-action-update", "coord-action-update", 1); 054 this.workflow = workflow; 055 } 056 057 public CoordActionUpdateXCommand(WorkflowJobBean workflow, int maxRetries) { 058 super("coord-action-update", "coord-action-update", 1); 059 this.workflow = workflow; 060 this.maxRetries = maxRetries; 061 } 062 063 @Override 064 protected Void execute() throws CommandException { 065 try { 066 LOG.debug("STARTED CoordActionUpdateXCommand for wfId=" + workflow.getId()); 067 068 Status slaStatus = null; 069 CoordinatorAction.Status preCoordStatus = coordAction.getStatus(); 070 if (workflow.getStatus() == WorkflowJob.Status.SUCCEEDED) { 071 coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED); 072 coordAction.setPending(0); 073 slaStatus = Status.SUCCEEDED; 074 } 075 else if (workflow.getStatus() == WorkflowJob.Status.FAILED) { 076 coordAction.setStatus(CoordinatorAction.Status.FAILED); 077 coordAction.setPending(0); 078 slaStatus = Status.FAILED; 079 } 080 else if (workflow.getStatus() == WorkflowJob.Status.KILLED) { 081 coordAction.setStatus(CoordinatorAction.Status.KILLED); 082 coordAction.setPending(0); 083 slaStatus = Status.KILLED; 084 } 085 else if (workflow.getStatus() == WorkflowJob.Status.SUSPENDED) { 086 coordAction.setStatus(CoordinatorAction.Status.SUSPENDED); 087 coordAction.decrementAndGetPending(); 088 } 089 else if (workflow.getStatus() == WorkflowJob.Status.RUNNING) { 090 // resume workflow job and update coord action accordingly 091 coordAction.setStatus(CoordinatorAction.Status.RUNNING); 092 coordAction.decrementAndGetPending(); 093 } 094 else { 095 LOG.warn("Unexpected workflow " + workflow.getId() + " STATUS " + workflow.getStatus()); 096 // update lastModifiedTime 097 coordAction.setLastModifiedTime(new Date()); 098 updateList.add(coordAction); 099 jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null)); 100 // TODO - Uncomment this when bottom up rerun can change terminal state 101 /* CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId())); 102 if (!coordJob.isPending()) { 103 coordJob.setPending(); 104 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 105 }*/ 106 return null; 107 } 108 109 LOG.info("Updating Coordintaor action id :" + coordAction.getId() + " status from " + preCoordStatus 110 + " to " + coordAction.getStatus() + ", pending = " + coordAction.getPending()); 111 112 coordAction.setLastModifiedTime(new Date()); 113 updateList.add(coordAction); 114 // TODO - Uncomment this when bottom up rerun can change terminal state 115 /*CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId())); 116 if (!coordJob.isPending()) { 117 coordJob.setPending(); 118 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 119 LOG.info("Updating Coordinator job "+ coordJob.getId() + "pending to true"); 120 }*/ 121 if (slaStatus != null) { 122 SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus, 123 SlaAppType.COORDINATOR_ACTION, LOG); 124 if(slaEvent != null) { 125 insertList.add(slaEvent); 126 } 127 } 128 if (workflow.getStatus() != WorkflowJob.Status.SUSPENDED 129 && workflow.getStatus() != WorkflowJob.Status.RUNNING) { 130 queue(new CoordActionReadyXCommand(coordAction.getJobId())); 131 } 132 133 jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList)); 134 135 LOG.debug("ENDED CoordActionUpdateXCommand for wfId=" + workflow.getId()); 136 } 137 catch (XException ex) { 138 LOG.warn("CoordActionUpdate Failed ", ex.getMessage()); 139 throw new CommandException(ex); 140 } 141 return null; 142 } 143 144 /* (non-Javadoc) 145 * @see org.apache.oozie.command.XCommand#getEntityKey() 146 */ 147 @Override 148 public String getEntityKey() { 149 return coordAction.getJobId(); 150 } 151 152 /* (non-Javadoc) 153 * @see org.apache.oozie.command.XCommand#isLockRequired() 154 */ 155 @Override 156 protected boolean isLockRequired() { 157 return true; 158 } 159 160 /* (non-Javadoc) 161 * @see org.apache.oozie.command.XCommand#eagerLoadState() 162 */ 163 @Override 164 protected void eagerLoadState() throws CommandException { 165 jpaService = Services.get().get(JPAService.class); 166 if (jpaService == null) { 167 throw new CommandException(ErrorCode.E0610); 168 } 169 170 int retries = 0; 171 while (retries++ < maxRetries) { 172 try { 173 coordAction = jpaService.execute(new CoordActionGetForExternalIdJPAExecutor(workflow.getId())); 174 if (coordAction != null) { 175 break; 176 } 177 178 if (retries < maxRetries) { 179 Thread.sleep(500); 180 } 181 } 182 catch (JPAExecutorException je) { 183 LOG.warn("Could not load coord action {0}", je.getMessage(), je); 184 } 185 catch (InterruptedException ex) { 186 LOG.warn("Retry to load coord action is interrupted {0}", ex.getMessage(), ex); 187 } 188 } 189 190 if (coordAction != null) { 191 LogUtils.setLogInfo(coordAction, logInfo); 192 } 193 } 194 195 /* (non-Javadoc) 196 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition() 197 */ 198 @Override 199 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 200 if (coordAction == null) { 201 throw new PreconditionException(ErrorCode.E1100, ", coord action is null"); 202 } 203 } 204 205 /* (non-Javadoc) 206 * @see org.apache.oozie.command.XCommand#loadState() 207 */ 208 @Override 209 protected void loadState() throws CommandException { 210 } 211 212 /* (non-Javadoc) 213 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 214 */ 215 @Override 216 protected void verifyPrecondition() throws CommandException, PreconditionException { 217 218 // if coord action is RUNNING and pending false and workflow is RUNNING, this doesn't need to be updated. 219 if (workflow.getStatus() == WorkflowJob.Status.RUNNING 220 && coordAction.getStatus() == CoordinatorAction.Status.RUNNING && !coordAction.isPending()) { 221 // update lastModifiedTime 222 coordAction.setLastModifiedTime(new Date()); 223 try { 224 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor(coordAction)); 225 } 226 catch (JPAExecutorException je) { 227 throw new CommandException(je); 228 } 229 throw new PreconditionException(ErrorCode.E1100, ", workflow is RUNNING and coordinator action is RUNNING and pending false"); 230 } 231 } 232 233 }