001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.bundle; 019 020 import java.util.Date; 021 022 import org.apache.oozie.BundleActionBean; 023 import org.apache.oozie.CoordinatorJobBean; 024 import org.apache.oozie.ErrorCode; 025 import org.apache.oozie.XException; 026 import org.apache.oozie.client.CoordinatorJob; 027 import org.apache.oozie.client.Job; 028 import org.apache.oozie.command.CommandException; 029 import org.apache.oozie.command.PreconditionException; 030 import org.apache.oozie.command.StatusUpdateXCommand; 031 import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor; 032 import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor; 033 import org.apache.oozie.executor.jpa.JPAExecutorException; 034 import org.apache.oozie.service.JPAService; 035 import org.apache.oozie.service.Services; 036 037 /** 038 * The command to update Bundle status 039 */ 040 public class BundleStatusUpdateXCommand extends StatusUpdateXCommand { 041 private final CoordinatorJobBean coordjob; 042 private JPAService jpaService = null; 043 private BundleActionBean bundleaction; 044 private final Job.Status prevStatus; 045 046 /** 047 * The constructor for class {@link BundleStatusUpdateXCommand} 048 * 049 * @param coordjob coordinator job bean 050 * @param prevStatus coordinator job old status 051 */ 052 public BundleStatusUpdateXCommand(CoordinatorJobBean coordjob, CoordinatorJob.Status prevStatus) { 053 super("BundleStatusUpdate", "BundleStatusUpdate", 1); 054 this.coordjob = coordjob; 055 this.prevStatus = prevStatus; 056 } 057 058 /* (non-Javadoc) 059 * @see org.apache.oozie.command.XCommand#execute() 060 */ 061 @Override 062 protected Void execute() throws CommandException { 063 try { 064 LOG.debug("STARTED BundleStatusUpdateXCommand with bundle id : " + coordjob.getBundleId() 065 + " coord job ID: " + coordjob.getId() + " coord Status " + coordjob.getStatus()); 066 Job.Status coordCurrentStatus = coordjob.getStatus(); 067 // The status of bundle action should not be updated if the bundle action is in terminal state 068 // and coord Id is null. For e.g if Bundleaction is killed and coord Id is null, then the status of bundle 069 // should not be changed. 070 if (bundleaction.getCoordId() != null || !bundleaction.isTerminalStatus()) { 071 bundleaction.setStatus(coordCurrentStatus); 072 } 073 if (bundleaction.isPending()) { 074 bundleaction.decrementAndGetPending(); 075 } 076 // TODO - Uncomment this when bottom up rerun can change terminal state 077 /*BundleJobBean bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleaction.getBundleId())); 078 if (!bundleJob.isPending()) { 079 bundleJob.setPending(); 080 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob)); 081 LOG.info("Updated bundle job [{0}] pending to true", bundleaction.getBundleId()); 082 }*/ 083 084 bundleaction.setLastModifiedTime(new Date()); 085 bundleaction.setCoordId(coordjob.getId()); 086 jpaService.execute(new BundleActionUpdateJPAExecutor(bundleaction)); 087 if (bundleaction.getCoordId() != null) { 088 LOG.info("Updated bundle action [{0}] from prev status [{1}] to current coord status [{2}], and new bundle action pending [{3}]", bundleaction 089 .getBundleActionId(), bundleaction.getStatus(), coordCurrentStatus, bundleaction.getPending()); 090 } 091 else { 092 LOG.info("Updated Bundle action [{0}], status = [{1}], pending = [{2}]", bundleaction.getBundleActionId(), 093 bundleaction.getStatus(), bundleaction.getPending()); 094 } 095 LOG.debug("ENDED BundleStatusUpdateXCommand with bundle id : " + coordjob.getBundleId() + " coord job ID: " 096 + coordjob.getId() + " coord Status " + coordjob.getStatus()); 097 } 098 catch (Exception ex) { 099 throw new CommandException(ErrorCode.E1309, bundleaction.getBundleId(), bundleaction.getCoordName()); 100 } 101 return null; 102 } 103 104 /* (non-Javadoc) 105 * @see org.apache.oozie.command.XCommand#getEntityKey() 106 */ 107 @Override 108 public String getEntityKey() { 109 return this.bundleaction.getBundleActionId(); 110 } 111 112 /* (non-Javadoc) 113 * @see org.apache.oozie.command.XCommand#isLockRequired() 114 */ 115 @Override 116 protected boolean isLockRequired() { 117 return true; 118 } 119 120 /* (non-Javadoc) 121 * @see org.apache.oozie.command.XCommand#eagerLoadState() 122 */ 123 @Override 124 protected void eagerLoadState() throws CommandException{ 125 loadState(); 126 } 127 128 /* (non-Javadoc) 129 * @see org.apache.oozie.command.XCommand#loadState() 130 */ 131 @Override 132 protected void loadState() throws CommandException { 133 try { 134 if (jpaService == null) { 135 jpaService = Services.get().get(JPAService.class); 136 } 137 138 if (jpaService != null) { 139 this.bundleaction = jpaService.execute(new BundleActionGetJPAExecutor(coordjob.getBundleId(), coordjob 140 .getAppName())); 141 } 142 else { 143 throw new CommandException(ErrorCode.E0610); 144 } 145 } 146 catch (XException ex) { 147 throw new CommandException(ex); 148 } 149 } 150 151 /* (non-Javadoc) 152 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 153 */ 154 @Override 155 protected void verifyPrecondition() throws CommandException, PreconditionException { 156 if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0 && bundleaction.getCoordId()!=null) { 157 // pending should be decremented only if status of coord job and bundle action is same 158 // e.g if bundle is killed and coord job is running, then pending should not be false 159 // to allow recovery service to pick and kill the coord job 160 if (bundleaction.isPending() && coordjob.getStatus().equals(bundleaction.getStatus())) { 161 bundleaction.decrementAndGetPending(); 162 } 163 bundleaction.setLastModifiedTime(new Date()); 164 try { 165 jpaService.execute(new BundleActionUpdateJPAExecutor(bundleaction)); 166 } 167 catch (JPAExecutorException je) { 168 throw new CommandException(je); 169 } 170 LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}], decrement pending so new pending = [{3}]", 171 bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(), 172 bundleaction.getPending()); 173 throw new PreconditionException(ErrorCode.E1308, bundleaction.getStatusStr(), prevStatus.toString()); 174 } 175 else if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0) { 176 LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}], pending = [{3}] and bundle not yet updated with coord-id", 177 bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(), 178 bundleaction.getPending()); 179 } 180 } 181 182 }