001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.bundle; 019 020 import java.util.Date; 021 022 import org.apache.oozie.BundleActionBean; 023 import org.apache.oozie.CoordinatorJobBean; 024 import org.apache.oozie.ErrorCode; 025 import org.apache.oozie.XException; 026 import org.apache.oozie.client.CoordinatorJob; 027 import org.apache.oozie.client.Job; 028 import org.apache.oozie.command.CommandException; 029 import org.apache.oozie.command.PreconditionException; 030 import org.apache.oozie.command.StatusUpdateXCommand; 031 import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor; 032 import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor; 033 import org.apache.oozie.executor.jpa.JPAExecutorException; 034 import org.apache.oozie.service.JPAService; 035 import org.apache.oozie.service.Services; 036 037 /** 038 * The command to update Bundle status 039 */ 040 public class BundleStatusUpdateXCommand extends StatusUpdateXCommand { 041 private final CoordinatorJobBean coordjob; 042 private JPAService jpaService = null; 043 private BundleActionBean bundleaction; 044 private final Job.Status prevStatus; 045 046 /** 047 * The constructor for class {@link BundleStatusUpdateXCommand} 048 * 049 * @param coordjob coordinator job bean 050 * @param prevStatus coordinator job old status 051 */ 052 public BundleStatusUpdateXCommand(CoordinatorJobBean coordjob, CoordinatorJob.Status prevStatus) { 053 super("BundleStatusUpdate", "BundleStatusUpdate", 1); 054 this.coordjob = coordjob; 055 this.prevStatus = prevStatus; 056 } 057 058 /* (non-Javadoc) 059 * @see org.apache.oozie.command.XCommand#execute() 060 */ 061 @Override 062 protected Void execute() throws CommandException { 063 try { 064 LOG.debug("STARTED BundleStatusUpdateXCommand with bubdle id : " + coordjob.getBundleId() 065 + " coord job ID: " + coordjob.getId() + " coord Status " + coordjob.getStatus()); 066 Job.Status coordCurrentStatus = coordjob.getStatus(); 067 Job.Status bundleActionStatus = bundleaction.getStatus(); 068 bundleaction.setStatus(coordCurrentStatus); 069 070 if (bundleaction.isPending()) { 071 bundleaction.decrementAndGetPending(); 072 } 073 bundleaction.setLastModifiedTime(new Date()); 074 bundleaction.setCoordId(coordjob.getId()); 075 jpaService.execute(new BundleActionUpdateJPAExecutor(bundleaction)); 076 LOG.info("Updated bundle action [{0}] from prev status [{1}] to current coord status [{2}], and new bundle action pending [{3}]", bundleaction 077 .getBundleActionId(), bundleActionStatus, coordCurrentStatus, bundleaction.getPending()); 078 079 LOG.debug("ENDED BundleStatusUpdateXCommand with bubdle id : " + coordjob.getBundleId() + " coord job ID: " 080 + coordjob.getId() + " coord Status " + coordjob.getStatus()); 081 } 082 catch (Exception ex) { 083 throw new CommandException(ErrorCode.E1309, bundleaction.getBundleId(), bundleaction.getCoordName()); 084 } 085 return null; 086 } 087 088 /* (non-Javadoc) 089 * @see org.apache.oozie.command.XCommand#getEntityKey() 090 */ 091 @Override 092 public String getEntityKey() { 093 return this.bundleaction.getBundleActionId(); 094 } 095 096 /* (non-Javadoc) 097 * @see org.apache.oozie.command.XCommand#isLockRequired() 098 */ 099 @Override 100 protected boolean isLockRequired() { 101 return true; 102 } 103 104 /* (non-Javadoc) 105 * @see org.apache.oozie.command.XCommand#eagerLoadState() 106 */ 107 @Override 108 protected void eagerLoadState() throws CommandException{ 109 loadState(); 110 } 111 112 /* (non-Javadoc) 113 * @see org.apache.oozie.command.XCommand#loadState() 114 */ 115 @Override 116 protected void loadState() throws CommandException { 117 try { 118 if (jpaService == null) { 119 jpaService = Services.get().get(JPAService.class); 120 } 121 122 if (jpaService != null) { 123 this.bundleaction = jpaService.execute(new BundleActionGetJPAExecutor(coordjob.getBundleId(), coordjob 124 .getAppName())); 125 } 126 else { 127 throw new CommandException(ErrorCode.E0610); 128 } 129 } 130 catch (XException ex) { 131 throw new CommandException(ex); 132 } 133 } 134 135 /* (non-Javadoc) 136 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 137 */ 138 @Override 139 protected void verifyPrecondition() throws CommandException, PreconditionException { 140 if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0) { 141 // Previous status are not matched with bundle action status 142 // So that's the error and we should not be updating the Bundle Action status 143 // however we need to decrement the pending flag. 144 if (bundleaction.isPending()) { 145 bundleaction.decrementAndGetPending(); 146 } 147 bundleaction.setLastModifiedTime(new Date()); 148 try { 149 jpaService.execute(new BundleActionUpdateJPAExecutor(bundleaction)); 150 } 151 catch (JPAExecutorException je) { 152 throw new CommandException(je); 153 } 154 LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}], decrement pending so new pending = [{3}]", 155 bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(), 156 bundleaction.getPending()); 157 throw new PreconditionException(ErrorCode.E1308, bundleaction.getStatusStr(), prevStatus.toString()); 158 } 159 } 160 161 }