001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.coord; 019 020import java.io.IOException; 021import java.io.StringReader; 022import java.net.URI; 023import java.util.Arrays; 024import java.util.Date; 025import java.util.List; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.oozie.CoordinatorActionBean; 029import org.apache.oozie.CoordinatorJobBean; 030import org.apache.oozie.ErrorCode; 031import org.apache.oozie.client.CoordinatorAction; 032import org.apache.oozie.client.Job; 033import org.apache.oozie.client.OozieClient; 034import org.apache.oozie.command.CommandException; 035import org.apache.oozie.command.PreconditionException; 036import org.apache.oozie.dependency.DependencyChecker; 037import org.apache.oozie.dependency.ActionDependency; 038import org.apache.oozie.dependency.URIHandler; 039import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor; 040import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; 041import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 042import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 043import org.apache.oozie.executor.jpa.JPAExecutorException; 044import org.apache.oozie.service.CallableQueueService; 045import org.apache.oozie.service.EventHandlerService; 046import org.apache.oozie.service.JPAService; 047import org.apache.oozie.service.PartitionDependencyManagerService; 048import org.apache.oozie.service.RecoveryService; 049import org.apache.oozie.service.Service; 050import org.apache.oozie.service.Services; 051import org.apache.oozie.service.URIHandlerService; 052import org.apache.oozie.util.LogUtils; 053import org.apache.oozie.util.StatusUtils; 054import org.apache.oozie.util.XConfiguration; 055import org.apache.oozie.util.XLog; 056import org.apache.oozie.util.DateUtils; 057 058public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> { 059 protected String actionId; 060 protected JPAService jpaService = null; 061 protected CoordinatorActionBean coordAction = null; 062 protected CoordinatorJobBean coordJob = null; 063 064 /** 065 * Property name of command re-queue interval for coordinator push check in 066 * milliseconds. 067 */ 068 public static final String CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX 069 + "coord.push.check.requeue.interval"; 070 /** 071 * Default re-queue interval in ms. It is applied when no value defined in 072 * the oozie configuration. 073 */ 074 private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 600000; 075 private boolean registerForNotification; 076 private boolean removeAvailDependencies; 077 078 public CoordPushDependencyCheckXCommand(String actionId) { 079 this(actionId, false, true); 080 } 081 082 public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification) { 083 this(actionId, registerForNotification, !registerForNotification); 084 } 085 086 public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification, 087 boolean removeAvailDependencies) { 088 super("coord_push_dep_check", "coord_push_dep_check", 0); 089 this.actionId = actionId; 090 this.registerForNotification = registerForNotification; 091 this.removeAvailDependencies = removeAvailDependencies; 092 } 093 094 protected CoordPushDependencyCheckXCommand(String actionName, String actionId) { 095 super(actionName, actionName, 0); 096 this.actionId = actionId; 097 } 098 099 @Override 100 protected Void execute() throws CommandException { 101 String pushMissingDeps = coordAction.getPushMissingDependencies(); 102 if (pushMissingDeps == null || pushMissingDeps.length() == 0) { 103 LOG.info("Nothing to check. Empty push missing dependency"); 104 } 105 else { 106 String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps); 107 LOG.info("First Push missing dependency is [{0}] ", missingDepsArray[0]); 108 LOG.trace("Push missing dependencies are [{0}] ", pushMissingDeps); 109 if (registerForNotification) { 110 LOG.debug("Register for notifications is true"); 111 } 112 113 try { 114 Configuration actionConf = null; 115 try { 116 actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); 117 } 118 catch (IOException e) { 119 throw new CommandException(ErrorCode.E1307, e.getMessage(), e); 120 } 121 122 // Check all dependencies during materialization to avoid registering in the cache. 123 // But check only first missing one afterwards similar to 124 // CoordActionInputCheckXCommand for efficiency. listPartitions is costly. 125 ActionDependency actionDep = DependencyChecker.checkForAvailability(missingDepsArray, actionConf, 126 !registerForNotification); 127 128 boolean isChangeInDependency = true; 129 boolean timeout = false; 130 if (actionDep.getMissingDependencies().size() == 0) { 131 // All push-based dependencies are available 132 onAllPushDependenciesAvailable(); 133 } 134 else { 135 if (actionDep.getMissingDependencies().size() == missingDepsArray.length) { 136 isChangeInDependency = false; 137 } 138 else { 139 String stillMissingDeps = DependencyChecker.dependenciesAsString(actionDep 140 .getMissingDependencies()); 141 coordAction.setPushMissingDependencies(stillMissingDeps); 142 } 143 // Checking for timeout 144 timeout = isTimeout(); 145 if (timeout) { 146 queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); 147 } 148 else { 149 queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 150 getCoordPushCheckRequeueInterval()); 151 } 152 } 153 154 updateCoordAction(coordAction, isChangeInDependency); 155 if (registerForNotification) { 156 registerForNotification(actionDep.getMissingDependencies(), actionConf); 157 } 158 if (removeAvailDependencies) { 159 unregisterAvailableDependencies(actionDep.getAvailableDependencies()); 160 } 161 if (timeout) { 162 unregisterMissingDependencies(actionDep.getMissingDependencies(), actionId); 163 } 164 } 165 catch (Exception e) { 166 final CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class); 167 if (isTimeout()) { 168 LOG.debug("Queueing timeout command"); 169 // XCommand.queue() will not work when there is a Exception 170 callableQueueService.queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); 171 unregisterMissingDependencies(Arrays.asList(missingDepsArray), actionId); 172 } 173 else if (coordAction.getMissingDependencies() != null 174 && coordAction.getMissingDependencies().length() > 0) { 175 // Queue again on exception as RecoveryService will not queue this again with 176 // the action being updated regularly by CoordActionInputCheckXCommand 177 callableQueueService.queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), 178 registerForNotification, removeAvailDependencies), 179 Services.get().getConf().getInt(RecoveryService.CONF_COORD_OLDER_THAN, 600) * 1000); 180 } 181 throw new CommandException(ErrorCode.E1021, e.getMessage(), e); 182 } 183 } 184 return null; 185 } 186 187 /** 188 * Return the re-queue interval for coord push dependency check 189 * @return 190 */ 191 public long getCoordPushCheckRequeueInterval() { 192 long requeueInterval = Services.get().getConf().getLong(CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL, 193 DEFAULT_COMMAND_REQUEUE_INTERVAL); 194 return requeueInterval; 195 } 196 197 /** 198 * Returns true if timeout period has been reached 199 * 200 * @return true if it is time for timeout else false 201 */ 202 protected boolean isTimeout() { 203 long waitingTime = (new Date().getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction 204 .getCreatedTime().getTime())) 205 / (60 * 1000); 206 int timeOut = coordAction.getTimeOut(); 207 return (timeOut >= 0) && (waitingTime > timeOut); 208 } 209 210 protected void onAllPushDependenciesAvailable() throws CommandException { 211 coordAction.setPushMissingDependencies(""); 212 Services.get().get(PartitionDependencyManagerService.class) 213 .removeCoordActionWithDependenciesAvailable(coordAction.getId()); 214 if (coordAction.getMissingDependencies() == null || coordAction.getMissingDependencies().length() == 0) { 215 Date nominalTime = coordAction.getNominalTime(); 216 Date currentTime = new Date(); 217 // The action should become READY only if current time > nominal time; 218 // CoordActionInputCheckXCommand will take care of moving it to READY when it is nominal time. 219 if (nominalTime.compareTo(currentTime) > 0) { 220 LOG.info("[" + actionId + "]::ActionInputCheck:: nominal Time is newer than current time. Current=" 221 + DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" + DateUtils.formatDateOozieTZ(nominalTime)); 222 } 223 else { 224 String actionXml = resolveCoordConfiguration(); 225 coordAction.setActionXml(actionXml); 226 coordAction.setStatus(CoordinatorAction.Status.READY); 227 // pass jobID to the CoordActionReadyXCommand 228 queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100); 229 } 230 } 231 else if (isTimeout()) { 232 // If it is timeout and all push dependencies are available but still some unresolved 233 // missing dependencies queue CoordActionInputCheckXCommand now. Else it will have to 234 // wait till RecoveryService kicks in 235 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId())); 236 } 237 } 238 239 private String resolveCoordConfiguration() throws CommandException { 240 try { 241 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); 242 StringBuilder actionXml = new StringBuilder(coordAction.getActionXml()); 243 String newActionXml = CoordActionInputCheckXCommand.resolveCoordConfiguration(actionXml, actionConf, 244 actionId); 245 actionXml.replace(0, actionXml.length(), newActionXml); 246 return actionXml.toString(); 247 } 248 catch (Exception e) { 249 throw new CommandException(ErrorCode.E1021, e.getMessage(), e); 250 } 251 } 252 253 protected void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency) 254 throws CommandException { 255 coordAction.setLastModifiedTime(new Date()); 256 if (jpaService != null) { 257 try { 258 if (isChangeInDependency) { 259 CoordActionQueryExecutor.getInstance().executeUpdate( 260 CoordActionQuery.UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK, coordAction); 261 if (EventHandlerService.isEnabled() && coordAction.getStatus() != CoordinatorAction.Status.READY) { 262 // since event is not to be generated unless action 263 // RUNNING via StartX 264 generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null); 265 } 266 } 267 else { 268 CoordActionQueryExecutor.getInstance().executeUpdate( 269 CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, coordAction); 270 } 271 } 272 catch (JPAExecutorException jex) { 273 throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex); 274 } 275 } 276 } 277 278 private void registerForNotification(List<String> missingDeps, Configuration actionConf) { 279 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 280 String user = actionConf.get(OozieClient.USER_NAME, OozieClient.USER_NAME); 281 for (String missingDep : missingDeps) { 282 try { 283 URI missingURI = new URI(missingDep); 284 URIHandler handler = uriService.getURIHandler(missingURI); 285 handler.registerForNotification(missingURI, actionConf, user, actionId); 286 LOG.debug("Registered uri [{0}] for notifications", missingURI); 287 } 288 catch (Exception e) { 289 LOG.warn("Exception while registering uri [{0}] for notifications", missingDep, e); 290 } 291 } 292 } 293 294 private void unregisterAvailableDependencies(List<String> availableDeps) { 295 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 296 for (String availableDep : availableDeps) { 297 try { 298 URI availableURI = new URI(availableDep); 299 URIHandler handler = uriService.getURIHandler(availableURI); 300 if (handler.unregisterFromNotification(availableURI, actionId)) { 301 LOG.debug("Successfully unregistered uri [{0}] from notifications", availableURI); 302 } 303 else { 304 LOG.warn("Unable to unregister uri [{0}] from notifications", availableURI); 305 } 306 } 307 catch (Exception e) { 308 LOG.warn("Exception while unregistering uri [{0}] from notifications", availableDep, e); 309 } 310 } 311 } 312 313 public static void unregisterMissingDependencies(List<String> missingDeps, String actionId) { 314 final XLog LOG = XLog.getLog(CoordPushDependencyCheckXCommand.class); 315 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 316 for (String missingDep : missingDeps) { 317 try { 318 URI missingURI = new URI(missingDep); 319 URIHandler handler = uriService.getURIHandler(missingURI); 320 if (handler.unregisterFromNotification(missingURI, actionId)) { 321 LOG.debug("Successfully unregistered uri [{0}] from notifications", missingURI); 322 } 323 else { 324 LOG.warn("Unable to unregister uri [{0}] from notifications", missingURI); 325 } 326 } 327 catch (Exception e) { 328 LOG.warn("Exception while unregistering uri [{0}] from notifications", missingDep, e); 329 } 330 } 331 } 332 333 @Override 334 public String getEntityKey() { 335 return actionId.substring(0, actionId.indexOf("@")); 336 } 337 338 @Override 339 public String getKey(){ 340 return getName() + "_" + actionId; 341 } 342 343 @Override 344 protected boolean isLockRequired() { 345 return true; 346 } 347 348 @Override 349 protected void loadState() throws CommandException { 350 jpaService = Services.get().get(JPAService.class); 351 try { 352 coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId)); 353 if (coordAction != null) { 354 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId())); 355 LogUtils.setLogInfo(coordAction, logInfo); 356 } 357 else { 358 throw new CommandException(ErrorCode.E0605, actionId); 359 } 360 } 361 catch (JPAExecutorException je) { 362 throw new CommandException(je); 363 } 364 } 365 366 @Override 367 protected void verifyPrecondition() throws CommandException, PreconditionException { 368 if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) { 369 throw new PreconditionException(ErrorCode.E1100, "[" + actionId 370 + "]::CoordPushDependencyCheck:: Ignoring action. Should be in WAITING state, but state=" 371 + coordAction.getStatus()); 372 } 373 374 // if eligible to do action input check when running with backward 375 // support is true 376 if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) { 377 return; 378 } 379 380 if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR 381 && coordJob.getStatus() != Job.Status.PAUSED && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) { 382 throw new PreconditionException(ErrorCode.E1100, "[" + actionId 383 + "]::CoordPushDependencyCheck:: Ignoring action." 384 + " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state=" 385 + coordJob.getStatus()); 386 } 387 } 388 389}