001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.bundle; 019 020 import java.util.Date; 021 022 import org.apache.oozie.BundleActionBean; 023 import org.apache.oozie.CoordinatorJobBean; 024 import org.apache.oozie.ErrorCode; 025 import org.apache.oozie.XException; 026 import org.apache.oozie.client.CoordinatorJob; 027 import org.apache.oozie.client.Job; 028 import org.apache.oozie.command.CommandException; 029 import org.apache.oozie.command.PreconditionException; 030 import org.apache.oozie.command.StatusUpdateXCommand; 031 import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor; 032 import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor; 033 import org.apache.oozie.executor.jpa.JPAExecutorException; 034 import org.apache.oozie.service.JPAService; 035 import org.apache.oozie.service.Services; 036 037 /** 038 * The command to update Bundle status 039 */ 040 public class BundleStatusUpdateXCommand extends StatusUpdateXCommand { 041 private final CoordinatorJobBean coordjob; 042 private JPAService jpaService = null; 043 private BundleActionBean bundleaction; 044 private final Job.Status prevStatus; 045 046 /** 047 * The constructor for class {@link BundleStatusUpdateXCommand} 048 * 049 * @param coordjob coordinator job bean 050 * @param prevStatus coordinator job old status 051 */ 052 public BundleStatusUpdateXCommand(CoordinatorJobBean coordjob, CoordinatorJob.Status prevStatus) { 053 super("BundleStatusUpdate", "BundleStatusUpdate", 1); 054 this.coordjob = coordjob; 055 this.prevStatus = prevStatus; 056 } 057 058 /* (non-Javadoc) 059 * @see org.apache.oozie.command.XCommand#execute() 060 */ 061 @Override 062 protected Void execute() throws CommandException { 063 try { 064 LOG.debug("STARTED BundleStatusUpdateXCommand with bundle id : " + coordjob.getBundleId() 065 + " coord job ID: " + coordjob.getId() + " coord Status " + coordjob.getStatus()); 066 Job.Status coordCurrentStatus = coordjob.getStatus(); 067 // The status of bundle action should not be updated if the bundle action is in terminal state 068 // TODO - change this once bottom up rerun is allowed to change the bundle action state 069 if (!bundleaction.isTerminalStatus()) { 070 bundleaction.setStatus(coordCurrentStatus); 071 } 072 if (bundleaction.isPending()) { 073 bundleaction.decrementAndGetPending(); 074 } 075 // TODO - Uncomment this when bottom up rerun can change terminal state 076 /*BundleJobBean bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleaction.getBundleId())); 077 if (!bundleJob.isPending()) { 078 bundleJob.setPending(); 079 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob)); 080 LOG.info("Updated bundle job [{0}] pending to true", bundleaction.getBundleId()); 081 }*/ 082 083 bundleaction.setLastModifiedTime(new Date()); 084 bundleaction.setCoordId(coordjob.getId()); 085 jpaService.execute(new BundleActionUpdateJPAExecutor(bundleaction)); 086 if (bundleaction.getCoordId() != null) { 087 LOG.info("Updated bundle action [{0}] from prev status [{1}] to current coord status [{2}], and new bundle action pending [{3}]", bundleaction 088 .getBundleActionId(), bundleaction.getStatus(), coordCurrentStatus, bundleaction.getPending()); 089 } 090 else { 091 LOG.info("Updated Bundle action [{0}], status = [{1}], pending = [{2}]", bundleaction.getBundleActionId(), 092 bundleaction.getStatus(), bundleaction.getPending()); 093 } 094 LOG.debug("ENDED BundleStatusUpdateXCommand with bundle id : " + coordjob.getBundleId() + " coord job ID: " 095 + coordjob.getId() + " coord Status " + coordjob.getStatus()); 096 } 097 catch (Exception ex) { 098 throw new CommandException(ErrorCode.E1309, bundleaction.getBundleId(), bundleaction.getCoordName()); 099 } 100 return null; 101 } 102 103 /* (non-Javadoc) 104 * @see org.apache.oozie.command.XCommand#getEntityKey() 105 */ 106 @Override 107 public String getEntityKey() { 108 return this.bundleaction.getBundleActionId(); 109 } 110 111 /* (non-Javadoc) 112 * @see org.apache.oozie.command.XCommand#isLockRequired() 113 */ 114 @Override 115 protected boolean isLockRequired() { 116 return true; 117 } 118 119 /* (non-Javadoc) 120 * @see org.apache.oozie.command.XCommand#eagerLoadState() 121 */ 122 @Override 123 protected void eagerLoadState() throws CommandException{ 124 loadState(); 125 } 126 127 /* (non-Javadoc) 128 * @see org.apache.oozie.command.XCommand#loadState() 129 */ 130 @Override 131 protected void loadState() throws CommandException { 132 try { 133 if (jpaService == null) { 134 jpaService = Services.get().get(JPAService.class); 135 } 136 137 if (jpaService != null) { 138 this.bundleaction = jpaService.execute(new BundleActionGetJPAExecutor(coordjob.getBundleId(), coordjob 139 .getAppName())); 140 } 141 else { 142 throw new CommandException(ErrorCode.E0610); 143 } 144 } 145 catch (XException ex) { 146 throw new CommandException(ex); 147 } 148 } 149 150 /* (non-Javadoc) 151 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 152 */ 153 @Override 154 protected void verifyPrecondition() throws CommandException, PreconditionException { 155 if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0 && bundleaction.getCoordId()!=null) { 156 // pending should be decremented only if status of coord job and bundle action is same 157 // e.g if bundle is killed and coord job is running, then pending should not be false 158 // to allow recovery service to pick and kill the coord job 159 if (bundleaction.isPending() && coordjob.getStatus().equals(bundleaction.getStatus())) { 160 bundleaction.decrementAndGetPending(); 161 } 162 bundleaction.setLastModifiedTime(new Date()); 163 try { 164 jpaService.execute(new BundleActionUpdateJPAExecutor(bundleaction)); 165 } 166 catch (JPAExecutorException je) { 167 throw new CommandException(je); 168 } 169 LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}], decrement pending so new pending = [{3}]", 170 bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(), 171 bundleaction.getPending()); 172 throw new PreconditionException(ErrorCode.E1308, bundleaction.getStatusStr(), prevStatus.toString()); 173 } 174 else if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0) { 175 LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}], pending = [{3}] and bundle not yet updated with coord-id", 176 bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(), 177 bundleaction.getPending()); 178 } 179 } 180 181 }