001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.bundle; 020 021import java.util.Date; 022 023import org.apache.oozie.BundleActionBean; 024import org.apache.oozie.CoordinatorJobBean; 025import org.apache.oozie.ErrorCode; 026import org.apache.oozie.XException; 027import org.apache.oozie.client.CoordinatorJob; 028import org.apache.oozie.client.Job; 029import org.apache.oozie.command.CommandException; 030import org.apache.oozie.command.PreconditionException; 031import org.apache.oozie.command.StatusUpdateXCommand; 032import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor; 033import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; 034import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; 035import org.apache.oozie.executor.jpa.JPAExecutorException; 036import org.apache.oozie.service.JPAService; 037import org.apache.oozie.service.Services; 038 039/** 040 * The command to update Bundle status 041 */ 042public class BundleStatusUpdateXCommand extends StatusUpdateXCommand { 043 private final CoordinatorJobBean coordjob; 044 private JPAService jpaService = null; 045 private BundleActionBean bundleaction; 046 private final Job.Status prevStatus; 047 private final boolean ignorePending; 048 049 /** 050 * The constructor for class {@link BundleStatusUpdateXCommand} 051 * 052 * @param coordjob coordinator job bean 053 * @param prevStatus coordinator job old status 054 */ 055 public BundleStatusUpdateXCommand(CoordinatorJobBean coordjob, CoordinatorJob.Status prevStatus) { 056 this(coordjob, prevStatus, false); 057 } 058 059 public BundleStatusUpdateXCommand(CoordinatorJobBean coordjob, CoordinatorJob.Status prevStatus, boolean ignorePending) { 060 super("BundleStatusUpdate", "BundleStatusUpdate", 1); 061 this.coordjob = coordjob; 062 this.prevStatus = prevStatus; 063 this.ignorePending = ignorePending; 064 } 065 066 @Override 067 protected Void execute() throws CommandException { 068 try { 069 LOG.debug("STARTED BundleStatusUpdateXCommand with bundle id : " + coordjob.getBundleId() 070 + " coord job ID: " + coordjob.getId() + " coord Status " + coordjob.getStatus()); 071 Job.Status coordCurrentStatus = coordjob.getStatus(); 072 // The status of bundle action should not be updated if the bundle action is in terminal state 073 // and coord Id is null. For e.g if Bundleaction is killed and coord Id is null, then the status of bundle 074 // should not be changed. 075 if (bundleaction.getCoordId() != null 076 || !bundleaction.isTerminalStatus() 077 || (bundleaction.getCoordId() != null && bundleaction.isTerminalStatus() && coordjob 078 .isTerminalStatus())) { 079 bundleaction.setStatus(coordCurrentStatus); 080 } 081 if (bundleaction.isPending() && !ignorePending) { 082 bundleaction.decrementAndGetPending(); 083 } 084 // TODO - Uncomment this when bottom up rerun can change terminal state 085 /*BundleJobBean bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleaction.getBundleId())); 086 if (!bundleJob.isPending()) { 087 bundleJob.setPending(); 088 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob)); 089 LOG.info("Updated bundle job [{0}] pending to true", bundleaction.getBundleId()); 090 }*/ 091 092 bundleaction.setLastModifiedTime(new Date()); 093 bundleaction.setCoordId(coordjob.getId()); 094 BundleActionQueryExecutor.getInstance().executeUpdate( 095 BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID, bundleaction); 096 if (bundleaction.getCoordId() != null) { 097 LOG.info( 098 "Updated bundle action [{0}] from prev status [{1}] to current coord status [{2}], " + 099 "and new bundle action pending [{3}]", 100 bundleaction.getBundleActionId(), bundleaction.getStatus(), coordCurrentStatus, 101 bundleaction.getPending()); 102 } 103 else { 104 LOG.info("Updated Bundle action [{0}], status = [{1}], pending = [{2}]", bundleaction.getBundleActionId(), 105 bundleaction.getStatus(), bundleaction.getPending()); 106 } 107 LOG.debug("ENDED BundleStatusUpdateXCommand with bundle id : " + coordjob.getBundleId() + " coord job ID: " 108 + coordjob.getId() + " coord Status " + coordjob.getStatus()); 109 } 110 catch (Exception ex) { 111 throw new CommandException(ErrorCode.E1309, bundleaction.getBundleId(), bundleaction.getCoordName()); 112 } 113 return null; 114 } 115 116 @Override 117 public String getEntityKey() { 118 return coordjob.getBundleId(); 119 } 120 121 @Override 122 protected boolean isLockRequired() { 123 return true; 124 } 125 126 @Override 127 protected void loadState() throws CommandException { 128 try { 129 if (jpaService == null) { 130 jpaService = Services.get().get(JPAService.class); 131 } 132 133 if (jpaService != null) { 134 this.bundleaction = jpaService.execute(new BundleActionGetJPAExecutor(coordjob.getBundleId(), coordjob 135 .getAppName())); 136 } 137 else { 138 throw new CommandException(ErrorCode.E0610); 139 } 140 } 141 catch (XException ex) { 142 throw new CommandException(ex); 143 } 144 } 145 146 @Override 147 protected void verifyPrecondition() throws CommandException, PreconditionException { 148 if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0 && bundleaction.getCoordId()!=null) { 149 // pending should be decremented only if status of coord job and bundle action is same 150 // e.g if bundle is killed and coord job is running, then pending should not be false 151 // to allow recovery service to pick and kill the coord job 152 if (bundleaction.isTerminalStatus() && coordjob.isTerminalStatus()) { 153 LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}], " 154 + "but coord job is currently in terminal state = [{3}]", 155 bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(), 156 coordjob.getStatus()); 157 return; 158 } 159 if (bundleaction.isPending() && coordjob.getStatus().equals(bundleaction.getStatus()) && !ignorePending) { 160 bundleaction.decrementAndGetPending(); 161 } 162 bundleaction.setLastModifiedTime(new Date()); 163 try { 164 BundleActionQueryExecutor.getInstance().executeUpdate( 165 BundleActionQuery.UPDATE_BUNDLE_ACTION_PENDING_MODTIME, bundleaction); 166 } 167 catch (JPAExecutorException je) { 168 throw new CommandException(je); 169 } 170 LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}] and current coord" 171 + " status [{3}], decrement pending so new pending = [{4}]", bundleaction.getBundleActionId(), 172 bundleaction.getStatusStr(), prevStatus.toString(), coordjob.getStatusStr(), 173 bundleaction.getPending()); 174 throw new PreconditionException(ErrorCode.E1308, bundleaction.getStatusStr(), prevStatus.toString()); 175 } 176 else if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0) { 177 LOG.info( 178 "Bundle action [{0}] status [{1}] is different from prev coord status [{2}], " + 179 "pending = [{3}] and bundle not yet updated with coord-id", 180 bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(), 181 bundleaction.getPending()); 182 } 183 } 184 185}