001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.bundle; 020 021import java.util.Date; 022import java.util.HashMap; 023import java.util.List; 024 025import org.apache.oozie.BundleActionBean; 026import org.apache.oozie.BundleJobBean; 027import org.apache.oozie.ErrorCode; 028import org.apache.oozie.client.Job; 029import org.apache.oozie.client.Job.Status; 030import org.apache.oozie.command.CommandException; 031import org.apache.oozie.command.PreconditionException; 032import org.apache.oozie.command.StatusTransitXCommand; 033import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; 034import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; 035import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; 036import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; 037import org.apache.oozie.executor.jpa.JPAExecutorException; 038import org.apache.oozie.util.LogUtils; 039import org.apache.oozie.util.StatusUtils; 040 041/** 042 * BundleStatusTransitXCommand update job's status according to its child actions' status. If all child actions' pending 043 * flag equals 0 (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's 044 * status to SUCCEEDED. 045 */ 046public class BundleStatusTransitXCommand extends StatusTransitXCommand { 047 048 private String jobId; 049 private List<BundleActionBean> bundleActions; 050 private BundleJobBean bundleJob; 051 private boolean foundPending; 052 private HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>(); 053 054 public BundleStatusTransitXCommand(String id) { 055 super("bundle_status_transit", "bundle_status_transit", 0); 056 this.jobId = id; 057 } 058 059 @Override 060 public String getEntityKey() { 061 return jobId; 062 } 063 064 @Override 065 protected void loadState() throws CommandException { 066 try { 067 bundleJob = BundleJobQueryExecutor.getInstance().get( 068 BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME, jobId); 069 070 bundleActions = BundleActionQueryExecutor.getInstance().getList( 071 BundleActionQuery.GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE, jobId); 072 for (BundleActionBean bAction : bundleActions) { 073 int counter = 0; 074 if (bundleActionStatus.containsKey(bAction.getStatus())) { 075 counter = getActionStatusCount(bAction.getStatus()) + 1; 076 } 077 else { 078 ++counter; 079 } 080 bundleActionStatus.put(bAction.getStatus(), counter); 081 if (bAction.getCoordId() == null 082 && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED) ) { 083 new BundleKillXCommand(jobId).call(); 084 bundleJob = BundleJobQueryExecutor.getInstance().get( 085 BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME, jobId); 086 bundleJob.setStatus(Job.Status.FAILED); 087 bundleJob.setLastModifiedTime(new Date()); 088 BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS, 089 bundleJob); 090 } 091 092 if (bAction.isPending()) { 093 LOG.debug(bAction + " has pending flag set"); 094 foundPending = true; 095 } 096 } 097 LogUtils.setLogInfo(bundleJob); 098 } 099 catch (JPAExecutorException e) { 100 throw new CommandException(ErrorCode.E1322, e); 101 } 102 } 103 104 @Override 105 protected Job.Status getJobStatus() throws CommandException { 106 Job.Status jobStatus = super.getJobStatus(); 107 if (jobStatus == null) { 108 if (isPrepRunningState()) { 109 return getPrepRunningStatus(); 110 } 111 } 112 113 return jobStatus; 114 } 115 116 @Override 117 protected boolean isTerminalState() { 118 return !foundPending 119 && bundleActions.size() == getActionStatusCount(Job.Status.SUCCEEDED) 120 + getActionStatusCount(Job.Status.FAILED) + getActionStatusCount(Job.Status.KILLED) 121 + getActionStatusCount(Job.Status.DONEWITHERROR); 122 } 123 124 @Override 125 protected Job.Status getTerminalStatus() { 126 127 // If all bundle action is done and bundle is killed, then don't change the status. 128 if (bundleJob.getStatus().equals(Job.Status.KILLED)) { 129 return Job.Status.KILLED; 130 131 } 132 // If all the bundle actions are succeeded then bundle job should be succeeded. 133 if (bundleActions.size() == getActionStatusCount(Job.Status.SUCCEEDED)) { 134 return Job.Status.SUCCEEDED; 135 136 } 137 else if (bundleActions.size() == getActionStatusCount(Job.Status.KILLED)) { 138 // If all the bundle actions are KILLED then bundle job should be KILLED. 139 return Job.Status.KILLED; 140 } 141 else if (bundleActions.size() == getActionStatusCount(Job.Status.FAILED)) { 142 // If all the bundle actions are FAILED then bundle job should be FAILED. 143 return Job.Status.FAILED; 144 } 145 else { 146 return Job.Status.DONEWITHERROR; 147 148 } 149 } 150 151 @Override 152 protected boolean isPausedState() { 153 //If bundle is paused then timestamp will be set. 154 //If bundleJob.getPauseTime() is not set, that means that status has to be computed from bottom-up. 155 if (bundleJob.getStatus() == Job.Status.PAUSED || bundleJob.getStatus() == Job.Status.PAUSEDWITHERROR 156 && bundleJob.getPauseTime() != null) { 157 return true; 158 } 159 else { 160 return getBottomUpPauseStatus() != null; 161 } 162 163 } 164 165 @Override 166 protected Job.Status getPausedState() { 167 if (bundleJob.getStatus() == Job.Status.PAUSED || bundleJob.getStatus() == Job.Status.PAUSEDWITHERROR) { 168 if (hasTerminatedActions() || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR) 169 || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR) 170 || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) { 171 return Job.Status.PAUSEDWITHERROR; 172 } 173 else { 174 return Job.Status.PAUSED; 175 } 176 } 177 return getBottomUpPauseStatus(); 178 179 } 180 181 @Override 182 protected boolean isSuspendedState() { 183 //If bundle is suspended then timestamp will be set. 184 //If bundleJob.getSuspendedTimestamp() is not set, that means that status has to be computed from bottom-up. 185 if ((bundleJob.getStatus() == Job.Status.SUSPENDED || bundleJob.getStatus() == Job.Status.SUSPENDEDWITHERROR) 186 && bundleJob.getSuspendedTimestamp() != null) { 187 return true; 188 } 189 190 return getBottomUpSuspendedState() != null; 191 192 } 193 194 @Override 195 protected Job.Status getSuspendedStatus() { 196 if (bundleJob.getStatus() == Job.Status.SUSPENDED || bundleJob.getStatus() == Job.Status.SUSPENDEDWITHERROR) { 197 if (hasTerminatedActions() || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR) 198 || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) { 199 return Job.Status.SUSPENDEDWITHERROR; 200 } 201 else { 202 return Job.Status.SUSPENDED; 203 } 204 205 } 206 return getBottomUpSuspendedState(); 207 208 } 209 210 @Override 211 protected boolean isRunningState() { 212 return true; 213 } 214 215 @Override 216 protected Status getRunningState() { 217 if (bundleJob.getStatus() != Job.Status.PREP) { 218 return getRunningStatus(bundleActionStatus); 219 } 220 else 221 return null; 222 } 223 224 @Override 225 protected void updateJobStatus(Job.Status bundleStatus) throws JPAExecutorException { 226 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus + "' from '" + bundleJob.getStatus() + "'"); 227 228 String jobId = bundleJob.getId(); 229 // Update the Bundle Job 230 // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and 231 // PAUSEDWITHERROR is not supported 232 bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus)); 233 bundleJob.setLastModifiedTime(new Date()); 234 if (foundPending) { 235 bundleJob.setPending(); 236 LOG.info("Bundle job [" + jobId + "] Pending set to TRUE"); 237 } 238 else { 239 bundleJob.resetPending(); 240 LOG.info("Bundle job [" + jobId + "] Pending set to FALSE"); 241 } 242 BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME, 243 bundleJob); 244 } 245 246 /** 247 * bottom up; check the status of parent through their children. 248 * 249 * @return the bottom up pause status 250 */ 251 private Job.Status getBottomUpPauseStatus() { 252 253 if (bundleActionStatus.containsKey(Job.Status.PAUSED) 254 && bundleActions.size() == getActionStatusCount(Job.Status.PAUSED)) { 255 return Job.Status.PAUSED; 256 257 } 258 else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR) 259 && bundleActions.size() == getActionStatusCount(Job.Status.PAUSED) 260 + getActionStatusCount(Job.Status.PAUSEDWITHERROR)) { 261 return Job.Status.PAUSEDWITHERROR; 262 } 263 264 return null; 265 } 266 267 /** 268 * Bottom up update status of parent from the status of its children. 269 * 270 * @return the bottom up suspended state 271 */ 272 private Job.Status getBottomUpSuspendedState() { 273 274 if (!foundPending && bundleActionStatus.containsKey(Job.Status.SUSPENDED) 275 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) { 276 277 if (bundleActions.size() == getActionStatusCount(Job.Status.SUSPENDED) 278 + getActionStatusCount(Job.Status.SUCCEEDED)) { 279 return Job.Status.SUSPENDED; 280 } 281 else if (bundleActions.size() == getActionStatusCount(Job.Status.SUSPENDEDWITHERROR) 282 + getActionStatusCount(Job.Status.SUSPENDED) + getActionStatusCount(Job.Status.SUCCEEDED) 283 + getActionStatusCount(Job.Status.KILLED) + getActionStatusCount(Job.Status.FAILED) 284 + getActionStatusCount(Job.Status.DONEWITHERROR)) { 285 return Job.Status.SUSPENDEDWITHERROR; 286 287 } 288 } 289 return null; 290 } 291 292 private boolean hasTerminatedActions() { 293 return bundleActionStatus.containsKey(Job.Status.KILLED) || bundleActionStatus.containsKey(Job.Status.FAILED) 294 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR); 295 296 } 297 298 private boolean isPrepRunningState() { 299 return !foundPending && bundleActionStatus.containsKey(Job.Status.PREP) 300 && bundleActions.size() > getActionStatusCount(Job.Status.PREP); 301 } 302 303 private Status getPrepRunningStatus() { 304 return getRunningStatus(bundleActionStatus); 305 306 } 307 308 private int getActionStatusCount(final Job.Status status) { 309 310 if (bundleActionStatus.containsKey(status)) { 311 return bundleActionStatus.get(status); 312 } 313 else { 314 return 0; 315 } 316 } 317 318 private Job.Status getRunningStatus(HashMap<Job.Status, Integer> actionStatus) { 319 if (actionStatus.containsKey(Job.Status.FAILED) || actionStatus.containsKey(Job.Status.KILLED) 320 || actionStatus.containsKey(Job.Status.DONEWITHERROR) 321 || actionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) { 322 return Job.Status.RUNNINGWITHERROR; 323 } 324 else { 325 return Job.Status.RUNNING; 326 } 327 } 328 329 @Override 330 protected void verifyPrecondition() throws CommandException, PreconditionException { 331 } 332 333}