001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.util.Date; 021 022 import org.apache.oozie.CoordinatorActionBean; 023 import org.apache.oozie.ErrorCode; 024 import org.apache.oozie.WorkflowJobBean; 025 import org.apache.oozie.XException; 026 import org.apache.oozie.service.JPAService; 027 import org.apache.oozie.service.Services; 028 import org.apache.oozie.util.LogUtils; 029 import org.apache.oozie.util.db.SLADbOperations; 030 import org.apache.oozie.client.CoordinatorAction; 031 import org.apache.oozie.client.WorkflowJob; 032 import org.apache.oozie.client.SLAEvent.SlaAppType; 033 import org.apache.oozie.client.SLAEvent.Status; 034 import org.apache.oozie.command.CommandException; 035 import org.apache.oozie.command.PreconditionException; 036 import org.apache.oozie.executor.jpa.CoordActionGetForExternalIdJPAExecutor; 037 import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor; 038 import org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor; 039 import org.apache.oozie.executor.jpa.JPAExecutorException; 040 041 public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> { 042 private WorkflowJobBean workflow; 043 private CoordinatorActionBean coordAction = null; 044 private JPAService jpaService = null; 045 private int maxRetries = 1; 046 047 public CoordActionUpdateXCommand(WorkflowJobBean workflow) { 048 super("coord-action-update", "coord-action-update", 1); 049 this.workflow = workflow; 050 } 051 052 public CoordActionUpdateXCommand(WorkflowJobBean workflow, int maxRetries) { 053 super("coord-action-update", "coord-action-update", 1); 054 this.workflow = workflow; 055 this.maxRetries = maxRetries; 056 } 057 058 @Override 059 protected Void execute() throws CommandException { 060 try { 061 LOG.debug("STARTED CoordActionUpdateXCommand for wfId=" + workflow.getId()); 062 063 Status slaStatus = null; 064 CoordinatorAction.Status preCoordStatus = coordAction.getStatus(); 065 if (workflow.getStatus() == WorkflowJob.Status.SUCCEEDED) { 066 coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED); 067 coordAction.setPending(0); 068 slaStatus = Status.SUCCEEDED; 069 } 070 else if (workflow.getStatus() == WorkflowJob.Status.FAILED) { 071 coordAction.setStatus(CoordinatorAction.Status.FAILED); 072 coordAction.setPending(0); 073 slaStatus = Status.FAILED; 074 } 075 else if (workflow.getStatus() == WorkflowJob.Status.KILLED) { 076 coordAction.setStatus(CoordinatorAction.Status.KILLED); 077 coordAction.setPending(0); 078 slaStatus = Status.KILLED; 079 } 080 else if (workflow.getStatus() == WorkflowJob.Status.SUSPENDED) { 081 coordAction.setStatus(CoordinatorAction.Status.SUSPENDED); 082 coordAction.decrementAndGetPending(); 083 } 084 else if (workflow.getStatus() == WorkflowJob.Status.RUNNING) { 085 // resume workflow job and update coord action accordingly 086 coordAction.setStatus(CoordinatorAction.Status.RUNNING); 087 coordAction.decrementAndGetPending(); 088 } 089 else { 090 LOG.warn("Unexpected workflow " + workflow.getId() + " STATUS " + workflow.getStatus()); 091 // update lastModifiedTime 092 coordAction.setLastModifiedTime(new Date()); 093 jpaService.execute(new CoordActionUpdateStatusJPAExecutor(coordAction)); 094 095 return null; 096 } 097 098 LOG.info("Updating Coordintaor action id :" + coordAction.getId() + " status from " + preCoordStatus 099 + " to " + coordAction.getStatus() + ", pending = " + coordAction.getPending()); 100 101 coordAction.setLastModifiedTime(new Date()); 102 jpaService.execute(new CoordActionUpdateStatusJPAExecutor(coordAction)); 103 if (slaStatus != null) { 104 SLADbOperations.writeStausEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus, 105 SlaAppType.COORDINATOR_ACTION, LOG); 106 } 107 if (workflow.getStatus() != WorkflowJob.Status.SUSPENDED 108 && workflow.getStatus() != WorkflowJob.Status.RUNNING) { 109 queue(new CoordActionReadyXCommand(coordAction.getJobId())); 110 } 111 LOG.debug("ENDED CoordActionUpdateXCommand for wfId=" + workflow.getId()); 112 } 113 catch (XException ex) { 114 LOG.warn("CoordActionUpdate Failed ", ex.getMessage()); 115 throw new CommandException(ex); 116 } 117 return null; 118 } 119 120 /* (non-Javadoc) 121 * @see org.apache.oozie.command.XCommand#getEntityKey() 122 */ 123 @Override 124 public String getEntityKey() { 125 return coordAction.getJobId(); 126 } 127 128 /* (non-Javadoc) 129 * @see org.apache.oozie.command.XCommand#isLockRequired() 130 */ 131 @Override 132 protected boolean isLockRequired() { 133 return true; 134 } 135 136 /* (non-Javadoc) 137 * @see org.apache.oozie.command.XCommand#eagerLoadState() 138 */ 139 @Override 140 protected void eagerLoadState() throws CommandException { 141 jpaService = Services.get().get(JPAService.class); 142 if (jpaService == null) { 143 throw new CommandException(ErrorCode.E0610); 144 } 145 146 int retries = 0; 147 while (retries++ < maxRetries) { 148 try { 149 coordAction = jpaService.execute(new CoordActionGetForExternalIdJPAExecutor(workflow.getId())); 150 if (coordAction != null) { 151 break; 152 } 153 154 if (retries < maxRetries) { 155 Thread.sleep(500); 156 } 157 } 158 catch (JPAExecutorException je) { 159 LOG.warn("Could not load coord action {0}", je.getMessage(), je); 160 } 161 catch (InterruptedException ex) { 162 LOG.warn("Retry to load coord action is interrupted {0}", ex.getMessage(), ex); 163 } 164 } 165 166 if (coordAction != null) { 167 LogUtils.setLogInfo(coordAction, logInfo); 168 } 169 } 170 171 /* (non-Javadoc) 172 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition() 173 */ 174 @Override 175 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 176 if (coordAction == null) { 177 throw new PreconditionException(ErrorCode.E1100, ", coord action is null"); 178 } 179 } 180 181 /* (non-Javadoc) 182 * @see org.apache.oozie.command.XCommand#loadState() 183 */ 184 @Override 185 protected void loadState() throws CommandException { 186 } 187 188 /* (non-Javadoc) 189 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 190 */ 191 @Override 192 protected void verifyPrecondition() throws CommandException, PreconditionException { 193 194 // if coord action is RUNNING and pending false and workflow is RUNNING, this doesn't need to be updated. 195 if (workflow.getStatus() == WorkflowJob.Status.RUNNING 196 && coordAction.getStatus() == CoordinatorAction.Status.RUNNING && !coordAction.isPending()) { 197 // update lastModifiedTime 198 coordAction.setLastModifiedTime(new Date()); 199 try { 200 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor(coordAction)); 201 } 202 catch (JPAExecutorException je) { 203 throw new CommandException(je); 204 } 205 throw new PreconditionException(ErrorCode.E1100, ", workflow is RUNNING and coordinator action is RUNNING and pending false"); 206 } 207 } 208 209 }