001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.coord; 020 021import java.util.Date; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025 026import org.apache.oozie.CoordinatorActionBean; 027import org.apache.oozie.CoordinatorJobBean; 028import org.apache.oozie.ErrorCode; 029import org.apache.oozie.client.CoordinatorAction; 030import org.apache.oozie.client.Job; 031import org.apache.oozie.client.Job.Status; 032import org.apache.oozie.command.CommandException; 033import org.apache.oozie.command.PreconditionException; 034import org.apache.oozie.command.StatusTransitXCommand; 035import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 036import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; 037import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 038import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 039import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 040import org.apache.oozie.executor.jpa.JPAExecutorException; 041import org.apache.oozie.service.SchemaService; 042import org.apache.oozie.service.Services; 043import org.apache.oozie.service.StatusTransitService; 044import org.apache.oozie.util.LogUtils; 045import org.apache.oozie.util.StatusUtils; 046 047/** 048 * CoordStatusTransitXCommand update coord job's status according to its child actions' status. If all child actions' 049 * pending flag equals 0 (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set 050 * the job's status to SUCCEEDED. 051 */ 052public class CoordStatusTransitXCommand extends StatusTransitXCommand { 053 054 private final String jobId; 055 private CoordinatorJobBean coordJob; 056 int coordActionCount; 057 private final Map<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>(); 058 boolean isPending = false; 059 060 final boolean backwardSupportForCoordStatus = Services.get().getConf() 061 .getBoolean(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false); 062 063 public CoordStatusTransitXCommand(String jobId) { 064 super("coord_status_transit", "coord_status_transit", 0); 065 this.jobId = jobId; 066 } 067 068 @Override 069 public String getEntityKey() { 070 return jobId; 071 } 072 073 @Override 074 protected void loadState() throws CommandException { 075 try { 076 coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId); 077 List<CoordinatorActionBean> coordActionStatusList = CoordActionQueryExecutor.getInstance().getList( 078 CoordActionQuery.GET_COORD_ACTIONS_STATUS_UNIGNORED, jobId); 079 080 long count = (Long) CoordActionQueryExecutor.getInstance().getSingleValue( 081 CoordActionQuery.GET_COORD_ACTIONS_PENDING_COUNT, jobId); 082 if (count > 0) { 083 isPending = true; 084 } 085 086 for (CoordinatorAction coordAction : coordActionStatusList) { 087 int counter = 0; 088 if (coordActionStatus.containsKey(coordAction.getStatus())) { 089 counter = getStatusCount(coordAction.getStatus()) + 1; 090 } 091 else { 092 ++counter; 093 } 094 coordActionStatus.put(coordAction.getStatus(), counter); 095 } 096 coordActionCount = coordActionStatusList.size(); 097 } 098 catch (JPAExecutorException jpae) { 099 throw new CommandException(ErrorCode.E1025, jpae); 100 } 101 LogUtils.setLogInfo(this.coordJob); 102 } 103 104 @Override 105 protected void verifyPrecondition() throws CommandException, PreconditionException { 106 // if namespace 0.1 is used and backward support is true, then ignore this coord job 107 if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null 108 && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) { 109 throw new CommandException(ErrorCode.E1025, 110 " Coord namespace is 0.1 and backward.support.for.coord.status is set"); 111 } 112 113 } 114 115 @Override 116 protected Job.Status getJobStatus() throws CommandException { 117 Job.Status jobStatus = super.getJobStatus(); 118 if (jobStatus == null) { 119 jobStatus = coordJob.getStatus(); 120 } 121 122 return jobStatus; 123 } 124 125 @Override 126 protected boolean isTerminalState() { 127 return (coordJob.isDoneMaterialization() || coordJob.getStatus() == Job.Status.FAILED || 128 coordJob.getStatus() == Job.Status.KILLED) && isCoordTerminalStatus(coordActionCount); 129 } 130 131 @Override 132 protected Status getTerminalStatus() { 133 134 // If all coord action is done and coord is killed, then don't change the status. 135 if (coordJob.getStatus().equals(Job.Status.KILLED)) { 136 return Job.Status.KILLED; 137 138 } 139 // If all the coordinator actions are succeeded then coordinator job should be succeeded. 140 if (coordActionCount == (getStatusCount(CoordinatorAction.Status.SUCCEEDED) 141 + getStatusCount(CoordinatorAction.Status.SKIPPED)) && coordJob.isDoneMaterialization()) { 142 return Job.Status.SUCCEEDED; 143 144 } 145 else if (coordActionCount == getStatusCount(CoordinatorAction.Status.KILLED)) { 146 // If all the coordinator actions are KILLED then coordinator job should be KILLED. 147 return Job.Status.KILLED; 148 149 } 150 else if (coordActionCount == getStatusCount(CoordinatorAction.Status.FAILED)) { 151 // If all the coordinator actions are FAILED then coordinator job should be FAILED. 152 return Job.Status.FAILED; 153 154 } 155 else { 156 return Job.Status.DONEWITHERROR; 157 } 158 } 159 160 @Override 161 protected boolean isPausedState() { 162 return coordJob.getStatus().equals(Job.Status.PAUSED) 163 || coordJob.getStatus().equals(Job.Status.PAUSEDWITHERROR); 164 } 165 166 @Override 167 protected Status getPausedState() { 168 return hasTerminatedActions() ? Job.Status.PAUSEDWITHERROR : Job.Status.PAUSED; 169 } 170 171 @Override 172 protected boolean isSuspendedState() { 173 if (coordJob.getStatus() == Job.Status.SUSPENDED 174 || coordJob.getStatus() == Job.Status.SUSPENDEDWITHERROR 175 || coordJob.getStatus() == Job.Status.PREPSUSPENDED) { 176 return true; 177 } 178 else { 179 return getBottomUpSuspendedState() != null; 180 } 181 } 182 183 @Override 184 protected Status getSuspendedStatus() { 185 if (coordJob.getStatus() == Job.Status.SUSPENDED || coordJob.getStatus() == Job.Status.SUSPENDEDWITHERROR) { 186 return hasTerminatedActions() ? Job.Status.SUSPENDEDWITHERROR : Job.Status.SUSPENDED; 187 } else if (coordJob.getStatus() == Job.Status.PREPSUSPENDED) { 188 return Job.Status.PREPSUSPENDED; 189 } 190 else { 191 return getBottomUpSuspendedState(); 192 } 193 } 194 195 @Override 196 protected boolean isRunningState() { 197 return coordJob.getStatus() != Job.Status.PREP; 198 } 199 200 @Override 201 protected Status getRunningState() { 202 return hasTerminatedActions() ? Job.Status.RUNNINGWITHERROR : Job.Status.RUNNING; 203 } 204 205 @Override 206 protected void updateJobStatus(Status coordStatus) throws JPAExecutorException, CommandException { 207 final Job.Status prevStatus = coordJob.getStatus(); 208 209 boolean prevPending = coordJob.isPending(); 210 if (isPending) { 211 coordJob.setPending(); 212 } 213 else { 214 coordJob.resetPending(); 215 } 216 boolean isPendingStateChanged = prevPending != coordJob.isPending(); 217 218 // Update the Coord Job 219 if (coordJob.isTerminalStatus() 220 && (coordStatus == Job.Status.SUSPENDED || coordStatus == Job.Status.SUSPENDEDWITHERROR)) { 221 LOG.info("Coord Job [" + coordJob.getId() + "] status to " + coordStatus 222 + " can not be updated as its already in Terminal state"); 223 if (isPendingStateChanged) { 224 LOG.info("Pending for job [" + coordJob.getId() + "] is changed to to '" + coordJob.isPending() 225 + "' from '" + prevStatus + "'"); 226 coordJob.setLastModifiedTime(new Date()); 227 CoordJobQueryExecutor.getInstance().executeUpdate( 228 CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME, coordJob); 229 } 230 return; 231 232 } 233 234 // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and PAUSEDWITHERROR is 235 // not supported 236 coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(coordStatus)); 237 // Backward support when coordinator namespace is 0.1 238 coordJob.setStatus(StatusUtils.getStatus(coordJob)); 239 if (coordJob.getStatus() != prevStatus || isPendingStateChanged) { 240 LOG.info("Set coordinator job [" + coordJob.getId() + "] status to '" + coordJob.getStatus() + "' from '" 241 + prevStatus + "'"); 242 coordJob.setLastModifiedTime(new Date()); 243 CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_MODTIME, 244 coordJob); 245 } 246 // update bundle action only when status changes in coord job 247 if (coordJob.getBundleId() != null) { 248 if (!prevStatus.equals(coordJob.getStatus())) { 249 new BundleStatusUpdateXCommand(coordJob, prevStatus).call(); 250 } 251 } 252 } 253 254 /** 255 * Bottom up look for children to check the parent's status only if materialization is done and all actions are 256 * non-pending. 257 * 258 * @return the bottom up suspended state 259 */ 260 protected Job.Status getBottomUpSuspendedState() { 261 if (coordJob.isDoneMaterialization() && !isPending 262 && coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) { 263 264 if (coordActionCount == getStatusCount(CoordinatorAction.Status.SUSPENDED) 265 + getStatusCount(CoordinatorAction.Status.SUCCEEDED)) { 266 return Job.Status.SUSPENDED; 267 268 } 269 else if (coordActionCount == getStatusCount(CoordinatorAction.Status.SUSPENDED) 270 + getStatusCount(CoordinatorAction.Status.SUCCEEDED) 271 + getStatusCount(CoordinatorAction.Status.KILLED) + getStatusCount(CoordinatorAction.Status.FAILED) 272 + getStatusCount(CoordinatorAction.Status.TIMEDOUT)) { 273 return Job.Status.SUSPENDEDWITHERROR; 274 275 } 276 } 277 return null; 278 } 279 280 private boolean isCoordTerminalStatus(int coordActionsCount) { 281 return coordActionsCount == getStatusCount(CoordinatorAction.Status.SUCCEEDED) 282 + getStatusCount(CoordinatorAction.Status.FAILED) + getStatusCount(CoordinatorAction.Status.KILLED) 283 + getStatusCount(CoordinatorAction.Status.TIMEDOUT) + getStatusCount(CoordinatorAction.Status.SKIPPED); 284 285 } 286 287 private int getStatusCount(CoordinatorAction.Status status) { 288 int statusCount = 0; 289 if (coordActionStatus.containsKey(status)) { 290 statusCount = coordActionStatus.get(status); 291 } 292 return statusCount; 293 } 294 295 private boolean hasTerminatedActions() { 296 return coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) 297 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) 298 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT); 299 } 300 301}