001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.coord; 019 020import java.util.ArrayList; 021import java.util.Date; 022import java.util.List; 023 024import org.apache.oozie.CoordinatorActionBean; 025import org.apache.oozie.CoordinatorJobBean; 026import org.apache.oozie.ErrorCode; 027import org.apache.oozie.SLAEventBean; 028import org.apache.oozie.WorkflowJobBean; 029import org.apache.oozie.XException; 030import org.apache.oozie.service.EventHandlerService; 031import org.apache.oozie.service.JPAService; 032import org.apache.oozie.service.Services; 033import org.apache.oozie.util.LogUtils; 034import org.apache.oozie.util.db.SLADbOperations; 035import org.apache.oozie.client.CoordinatorAction; 036import org.apache.oozie.client.WorkflowJob; 037import org.apache.oozie.client.SLAEvent.SlaAppType; 038import org.apache.oozie.client.SLAEvent.Status; 039import org.apache.oozie.client.rest.JsonBean; 040import org.apache.oozie.command.CommandException; 041import org.apache.oozie.command.PreconditionException; 042import org.apache.oozie.executor.jpa.BatchQueryExecutor; 043import org.apache.oozie.executor.jpa.CoordActionGetForExternalIdJPAExecutor; 044import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; 045import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserAppnameJPAExecutor; 046import org.apache.oozie.executor.jpa.JPAExecutorException; 047import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 048import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 049 050@SuppressWarnings("deprecation") 051public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> { 052 private WorkflowJobBean workflow; 053 private CoordinatorActionBean coordAction = null; 054 private CoordinatorJobBean coordJob; 055 private JPAService jpaService = null; 056 private int maxRetries = 1; 057 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 058 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 059 060 public CoordActionUpdateXCommand(WorkflowJobBean workflow) { 061 super("coord-action-update", "coord-action-update", 1); 062 this.workflow = workflow; 063 } 064 065 public CoordActionUpdateXCommand(WorkflowJobBean workflow, int maxRetries) { 066 super("coord-action-update", "coord-action-update", 1); 067 this.workflow = workflow; 068 this.maxRetries = maxRetries; 069 } 070 071 @Override 072 protected Void execute() throws CommandException { 073 try { 074 LOG.debug("STARTED CoordActionUpdateXCommand for wfId=" + workflow.getId()); 075 Status slaStatus = null; 076 if (workflow.getStatus() == WorkflowJob.Status.SUCCEEDED) { 077 coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED); 078 coordAction.setPending(0); 079 slaStatus = Status.SUCCEEDED; 080 } 081 else if (workflow.getStatus() == WorkflowJob.Status.FAILED) { 082 coordAction.setStatus(CoordinatorAction.Status.FAILED); 083 coordAction.setPending(0); 084 slaStatus = Status.FAILED; 085 } 086 else if (workflow.getStatus() == WorkflowJob.Status.KILLED) { 087 coordAction.setStatus(CoordinatorAction.Status.KILLED); 088 coordAction.setPending(0); 089 slaStatus = Status.KILLED; 090 } 091 else if (workflow.getStatus() == WorkflowJob.Status.SUSPENDED) { 092 coordAction.setStatus(CoordinatorAction.Status.SUSPENDED); 093 coordAction.decrementAndGetPending(); 094 } 095 else if (workflow.getStatus() == WorkflowJob.Status.RUNNING) { 096 // resume workflow job and update coord action accordingly 097 coordAction.setStatus(CoordinatorAction.Status.RUNNING); 098 coordAction.decrementAndGetPending(); 099 } 100 else { 101 LOG.warn("Unexpected workflow " + workflow.getId() + " STATUS " + workflow.getStatus()); 102 // update lastModifiedTime 103 coordAction.setLastModifiedTime(new Date()); 104 CoordActionQueryExecutor.getInstance().executeUpdate( 105 CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, coordAction); 106 // TODO - Uncomment this when bottom up rerun can change terminal state 107 /* CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId())); 108 if (!coordJob.isPending()) { 109 coordJob.setPending(); 110 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 111 }*/ 112 return null; 113 } 114 115 LOG.info("Updating Coordintaor action id :" + coordAction.getId() + " status " 116 + " to " + coordAction.getStatus() + ", pending = " + coordAction.getPending()); 117 118 coordAction.setLastModifiedTime(new Date()); 119 updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, 120 coordAction)); 121 // TODO - Uncomment this when bottom up rerun can change terminal state 122 /*CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId())); 123 if (!coordJob.isPending()) { 124 coordJob.setPending(); 125 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 126 LOG.info("Updating Coordinator job "+ coordJob.getId() + "pending to true"); 127 }*/ 128 if (slaStatus != null) { 129 SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus, 130 SlaAppType.COORDINATOR_ACTION, LOG); 131 if(slaEvent != null) { 132 insertList.add(slaEvent); 133 } 134 } 135 if (workflow.getStatus() != WorkflowJob.Status.SUSPENDED 136 && workflow.getStatus() != WorkflowJob.Status.RUNNING) { 137 queue(new CoordActionReadyXCommand(coordAction.getJobId())); 138 } 139 140 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); 141 if (EventHandlerService.isEnabled()) { 142 generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), workflow.getStartTime()); 143 } 144 145 LOG.debug("ENDED CoordActionUpdateXCommand for wfId=" + workflow.getId()); 146 } 147 catch (XException ex) { 148 LOG.warn("CoordActionUpdate Failed ", ex.getMessage()); 149 throw new CommandException(ex); 150 } 151 return null; 152 } 153 154 /* (non-Javadoc) 155 * @see org.apache.oozie.command.XCommand#getEntityKey() 156 */ 157 @Override 158 public String getEntityKey() { 159 return workflow.getParentId().substring(0, workflow.getParentId().indexOf("@")); 160 } 161 162 /* (non-Javadoc) 163 * @see org.apache.oozie.command.XCommand#isLockRequired() 164 */ 165 @Override 166 protected boolean isLockRequired() { 167 return true; 168 } 169 170 @Override 171 protected void loadState() throws CommandException { 172 jpaService = Services.get().get(JPAService.class); 173 int retries = 0; 174 while (retries++ < maxRetries) { 175 try { 176 coordAction = jpaService.execute(new CoordActionGetForExternalIdJPAExecutor(workflow.getId())); 177 if (coordAction != null) { 178 coordJob = jpaService 179 .execute(new CoordinatorJobGetForUserAppnameJPAExecutor(coordAction.getJobId())); 180 LogUtils.setLogInfo(coordAction, logInfo); 181 break; 182 } 183 if (retries < maxRetries) { 184 Thread.sleep(500); 185 } 186 } 187 catch (JPAExecutorException je) { 188 LOG.warn("Could not load coord action {0}", je.getMessage(), je); 189 } 190 catch (InterruptedException ex) { 191 LOG.warn("Retry to load coord action is interrupted {0}", ex.getMessage(), ex); 192 } 193 } 194 } 195 196 @Override 197 protected void verifyPrecondition() throws CommandException, PreconditionException { 198 199 // if coord action is RUNNING and pending false and workflow is RUNNING, this doesn't need to be updated. 200 if (workflow.getStatus() == WorkflowJob.Status.RUNNING 201 && coordAction.getStatus() == CoordinatorAction.Status.RUNNING && !coordAction.isPending()) { 202 try { 203 CoordActionQueryExecutor.getInstance().executeUpdate( 204 CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, coordAction); 205 } 206 catch (JPAExecutorException je) { 207 throw new CommandException(je); 208 } 209 throw new PreconditionException(ErrorCode.E1100, ", workflow is RUNNING and coordinator action is RUNNING and pending false"); 210 } 211 } 212 213}