001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.bundle; 019 020import java.util.Date; 021 022import org.apache.oozie.BundleActionBean; 023import org.apache.oozie.CoordinatorJobBean; 024import org.apache.oozie.ErrorCode; 025import org.apache.oozie.XException; 026import org.apache.oozie.client.CoordinatorJob; 027import org.apache.oozie.client.Job; 028import org.apache.oozie.command.CommandException; 029import org.apache.oozie.command.PreconditionException; 030import org.apache.oozie.command.StatusUpdateXCommand; 031import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor; 032import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; 033import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; 034import org.apache.oozie.executor.jpa.JPAExecutorException; 035import org.apache.oozie.service.JPAService; 036import org.apache.oozie.service.Services; 037 038/** 039 * The command to update Bundle status 040 */ 041public class BundleStatusUpdateXCommand extends StatusUpdateXCommand { 042 private final CoordinatorJobBean coordjob; 043 private JPAService jpaService = null; 044 private BundleActionBean bundleaction; 045 private final Job.Status prevStatus; 046 private final boolean ignorePending; 047 048 /** 049 * The constructor for class {@link BundleStatusUpdateXCommand} 050 * 051 * @param coordjob coordinator job bean 052 * @param prevStatus coordinator job old status 053 */ 054 public BundleStatusUpdateXCommand(CoordinatorJobBean coordjob, CoordinatorJob.Status prevStatus) { 055 this(coordjob, prevStatus, false); 056 } 057 058 public BundleStatusUpdateXCommand(CoordinatorJobBean coordjob, CoordinatorJob.Status prevStatus, boolean ignorePending) { 059 super("BundleStatusUpdate", "BundleStatusUpdate", 1); 060 this.coordjob = coordjob; 061 this.prevStatus = prevStatus; 062 this.ignorePending = ignorePending; 063 } 064 065 @Override 066 protected Void execute() throws CommandException { 067 try { 068 LOG.debug("STARTED BundleStatusUpdateXCommand with bundle id : " + coordjob.getBundleId() 069 + " coord job ID: " + coordjob.getId() + " coord Status " + coordjob.getStatus()); 070 Job.Status coordCurrentStatus = coordjob.getStatus(); 071 // The status of bundle action should not be updated if the bundle action is in terminal state 072 // and coord Id is null. For e.g if Bundleaction is killed and coord Id is null, then the status of bundle 073 // should not be changed. 074 if (bundleaction.getCoordId() != null 075 || !bundleaction.isTerminalStatus() 076 || (bundleaction.getCoordId() != null && bundleaction.isTerminalStatus() && coordjob 077 .isTerminalStatus())) { 078 bundleaction.setStatus(coordCurrentStatus); 079 } 080 if (bundleaction.isPending() && !ignorePending) { 081 bundleaction.decrementAndGetPending(); 082 } 083 // TODO - Uncomment this when bottom up rerun can change terminal state 084 /*BundleJobBean bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleaction.getBundleId())); 085 if (!bundleJob.isPending()) { 086 bundleJob.setPending(); 087 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob)); 088 LOG.info("Updated bundle job [{0}] pending to true", bundleaction.getBundleId()); 089 }*/ 090 091 bundleaction.setLastModifiedTime(new Date()); 092 bundleaction.setCoordId(coordjob.getId()); 093 BundleActionQueryExecutor.getInstance().executeUpdate( 094 BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID, bundleaction); 095 if (bundleaction.getCoordId() != null) { 096 LOG.info( 097 "Updated bundle action [{0}] from prev status [{1}] to current coord status [{2}], " + 098 "and new bundle action pending [{3}]", 099 bundleaction.getBundleActionId(), bundleaction.getStatus(), coordCurrentStatus, 100 bundleaction.getPending()); 101 } 102 else { 103 LOG.info("Updated Bundle action [{0}], status = [{1}], pending = [{2}]", bundleaction.getBundleActionId(), 104 bundleaction.getStatus(), bundleaction.getPending()); 105 } 106 LOG.debug("ENDED BundleStatusUpdateXCommand with bundle id : " + coordjob.getBundleId() + " coord job ID: " 107 + coordjob.getId() + " coord Status " + coordjob.getStatus()); 108 } 109 catch (Exception ex) { 110 throw new CommandException(ErrorCode.E1309, bundleaction.getBundleId(), bundleaction.getCoordName()); 111 } 112 return null; 113 } 114 115 @Override 116 public String getEntityKey() { 117 return coordjob.getBundleId(); 118 } 119 120 @Override 121 protected boolean isLockRequired() { 122 return true; 123 } 124 125 @Override 126 protected void loadState() throws CommandException { 127 try { 128 if (jpaService == null) { 129 jpaService = Services.get().get(JPAService.class); 130 } 131 132 if (jpaService != null) { 133 this.bundleaction = jpaService.execute(new BundleActionGetJPAExecutor(coordjob.getBundleId(), coordjob 134 .getAppName())); 135 } 136 else { 137 throw new CommandException(ErrorCode.E0610); 138 } 139 } 140 catch (XException ex) { 141 throw new CommandException(ex); 142 } 143 } 144 145 @Override 146 protected void verifyPrecondition() throws CommandException, PreconditionException { 147 if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0 && bundleaction.getCoordId()!=null) { 148 // pending should be decremented only if status of coord job and bundle action is same 149 // e.g if bundle is killed and coord job is running, then pending should not be false 150 // to allow recovery service to pick and kill the coord job 151 if (bundleaction.isTerminalStatus() && coordjob.isTerminalStatus()) { 152 LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}], " 153 + "but coord job is currently in terminal state = [{3}]", 154 bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(), 155 coordjob.getStatus()); 156 return; 157 } 158 if (bundleaction.isPending() && coordjob.getStatus().equals(bundleaction.getStatus()) && !ignorePending) { 159 bundleaction.decrementAndGetPending(); 160 } 161 bundleaction.setLastModifiedTime(new Date()); 162 try { 163 BundleActionQueryExecutor.getInstance().executeUpdate( 164 BundleActionQuery.UPDATE_BUNDLE_ACTION_PENDING_MODTIME, bundleaction); 165 } 166 catch (JPAExecutorException je) { 167 throw new CommandException(je); 168 } 169 LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}] and current coord" 170 + " status [{3}], decrement pending so new pending = [{4}]", bundleaction.getBundleActionId(), 171 bundleaction.getStatusStr(), prevStatus.toString(), coordjob.getStatusStr(), 172 bundleaction.getPending()); 173 throw new PreconditionException(ErrorCode.E1308, bundleaction.getStatusStr(), prevStatus.toString()); 174 } 175 else if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0) { 176 LOG.info( 177 "Bundle action [{0}] status [{1}] is different from prev coord status [{2}], " + 178 "pending = [{3}] and bundle not yet updated with coord-id", 179 bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(), 180 bundleaction.getPending()); 181 } 182 } 183 184}