001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.coord; 020 021import java.io.IOException; 022import java.io.StringReader; 023import java.net.URI; 024import java.util.Date; 025import java.util.List; 026 027import org.apache.commons.lang.StringUtils; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.oozie.CoordinatorActionBean; 030import org.apache.oozie.CoordinatorJobBean; 031import org.apache.oozie.ErrorCode; 032import org.apache.oozie.client.CoordinatorAction; 033import org.apache.oozie.client.Job; 034import org.apache.oozie.client.OozieClient; 035import org.apache.oozie.command.CommandException; 036import org.apache.oozie.command.PreconditionException; 037import org.apache.oozie.coord.input.dependency.CoordInputDependency; 038import org.apache.oozie.dependency.ActionDependency; 039import org.apache.oozie.dependency.DependencyChecker; 040import org.apache.oozie.dependency.URIHandler; 041import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor; 042import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; 043import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 044import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 045import org.apache.oozie.executor.jpa.JPAExecutorException; 046import org.apache.oozie.service.CallableQueueService; 047import org.apache.oozie.service.ConfigurationService; 048import org.apache.oozie.service.EventHandlerService; 049import org.apache.oozie.service.JPAService; 050import org.apache.oozie.service.PartitionDependencyManagerService; 051import org.apache.oozie.service.RecoveryService; 052import org.apache.oozie.service.Service; 053import org.apache.oozie.service.Services; 054import org.apache.oozie.service.URIHandlerService; 055import org.apache.oozie.util.LogUtils; 056import org.apache.oozie.util.StatusUtils; 057import org.apache.oozie.util.XConfiguration; 058import org.apache.oozie.util.XLog; 059import org.apache.oozie.util.DateUtils; 060 061public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> { 062 protected String actionId; 063 protected JPAService jpaService = null; 064 protected CoordinatorActionBean coordAction = null; 065 protected CoordinatorJobBean coordJob = null; 066 067 /** 068 * Property name of command re-queue interval for coordinator push check in 069 * milliseconds. 070 */ 071 public static final String CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX 072 + "coord.push.check.requeue.interval"; 073 private boolean registerForNotification; 074 private boolean removeAvailDependencies; 075 076 public CoordPushDependencyCheckXCommand(String actionId) { 077 this(actionId, false, true); 078 } 079 080 public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification) { 081 this(actionId, registerForNotification, !registerForNotification); 082 } 083 084 public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification, 085 boolean removeAvailDependencies) { 086 super("coord_push_dep_check", "coord_push_dep_check", 0); 087 this.actionId = actionId; 088 this.registerForNotification = registerForNotification; 089 this.removeAvailDependencies = removeAvailDependencies; 090 } 091 092 protected CoordPushDependencyCheckXCommand(String actionName, String actionId) { 093 super(actionName, actionName, 0); 094 this.actionId = actionId; 095 } 096 097 @Override 098 protected void setLogInfo() { 099 LogUtils.setLogInfo(actionId); 100 } 101 102 @Override 103 protected Void execute() throws CommandException { 104 // this action should only get processed if current time > nominal time; 105 // otherwise, requeue this action for delay execution; 106 Date nominalTime = coordAction.getNominalTime(); 107 Date currentTime = new Date(); 108 if (nominalTime.compareTo(currentTime) > 0) { 109 queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), true), nominalTime.getTime() - currentTime.getTime()); 110 updateCoordAction(coordAction, false); 111 LOG.info("[" + actionId 112 + "]::CoordPushDependency:: nominal Time is newer than current time, so requeue and wait. Current=" 113 + DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" + DateUtils.formatDateOozieTZ(nominalTime)); 114 return null; 115 } 116 117 CoordInputDependency coordPushInputDependency = coordAction.getPushInputDependencies(); 118 CoordInputDependency coordPullInputDependency = coordAction.getPullInputDependencies(); 119 if (coordPushInputDependency.getMissingDependenciesAsList().size() == 0) { 120 LOG.info("Nothing to check. Empty push missing dependency"); 121 } 122 else { 123 List<String> missingDependenciesArray = coordPushInputDependency.getMissingDependenciesAsList(); 124 LOG.info("First Push missing dependency is [{0}] ", missingDependenciesArray.get(0)); 125 LOG.trace("Push missing dependencies are [{0}] ", missingDependenciesArray); 126 if (registerForNotification) { 127 LOG.debug("Register for notifications is true"); 128 } 129 130 try { 131 Configuration actionConf = null; 132 try { 133 actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); 134 } 135 catch (IOException e) { 136 throw new CommandException(ErrorCode.E1307, e.getMessage(), e); 137 } 138 139 140 boolean isChangeInDependency = true; 141 boolean timeout = false; 142 ActionDependency actionDependency = coordPushInputDependency.checkPushMissingDependencies(coordAction, 143 registerForNotification); 144 // Check all dependencies during materialization to avoid registering in the cache. 145 // But check only first missing one afterwards similar to 146 // CoordActionInputCheckXCommand for efficiency. listPartitions is costly. 147 if (actionDependency.getMissingDependencies().size() == missingDependenciesArray.size()) { 148 isChangeInDependency = false; 149 } 150 else { 151 String stillMissingDeps = DependencyChecker.dependenciesAsString(actionDependency.getMissingDependencies()); 152 coordPushInputDependency.setMissingDependencies(stillMissingDeps); 153 } 154 155 if (coordPushInputDependency.isDependencyMet()) { 156 // All push-based dependencies are available 157 onAllPushDependenciesAvailable(coordPullInputDependency.isDependencyMet()); 158 } 159 else { 160 // Checking for timeout 161 timeout = isTimeout(); 162 if (timeout) { 163 queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); 164 } 165 else { 166 queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 167 getCoordPushCheckRequeueInterval()); 168 } 169 } 170 171 updateCoordAction(coordAction, isChangeInDependency || coordPushInputDependency.isDependencyMet()); 172 if (registerForNotification) { 173 registerForNotification(coordPushInputDependency.getMissingDependenciesAsList(), actionConf); 174 } 175 if (removeAvailDependencies) { 176 unregisterAvailableDependencies(actionDependency.getAvailableDependencies()); 177 } 178 if (timeout) { 179 unregisterMissingDependencies(coordPushInputDependency.getMissingDependenciesAsList(), actionId); 180 } 181 } 182 catch (Exception e) { 183 final CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class); 184 if (isTimeout()) { 185 LOG.debug("Queueing timeout command"); 186 // XCommand.queue() will not work when there is a Exception 187 callableQueueService.queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); 188 unregisterMissingDependencies(missingDependenciesArray, actionId); 189 } 190 else if (coordPullInputDependency.getMissingDependenciesAsList().size() > 0) { 191 // Queue again on exception as RecoveryService will not queue this again with 192 // the action being updated regularly by CoordActionInputCheckXCommand 193 callableQueueService.queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), 194 registerForNotification, removeAvailDependencies), 195 Services.get().getConf().getInt(RecoveryService.CONF_COORD_OLDER_THAN, 600) * 1000); 196 } 197 throw new CommandException(ErrorCode.E1021, e.getMessage(), e); 198 } 199 } 200 return null; 201 } 202 203 /** 204 * Return the re-queue interval for coord push dependency check 205 * @return 206 */ 207 public long getCoordPushCheckRequeueInterval() { 208 long requeueInterval = ConfigurationService.getLong(CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL); 209 return requeueInterval; 210 } 211 212 /** 213 * Returns true if timeout period has been reached 214 * 215 * @return true if it is time for timeout else false 216 */ 217 protected boolean isTimeout() { 218 long waitingTime = (new Date().getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction 219 .getCreatedTime().getTime())) 220 / (60 * 1000); 221 int timeOut = coordAction.getTimeOut(); 222 return (timeOut >= 0) && (waitingTime > timeOut); 223 } 224 225 protected void onAllPushDependenciesAvailable(boolean isPullDependencyMeet) throws CommandException { 226 Services.get().get(PartitionDependencyManagerService.class) 227 .removeCoordActionWithDependenciesAvailable(coordAction.getId()); 228 if (isPullDependencyMeet) { 229 Date nominalTime = coordAction.getNominalTime(); 230 Date currentTime = new Date(); 231 // The action should become READY only if current time > nominal time; 232 // CoordActionInputCheckXCommand will take care of moving it to READY when it is nominal time. 233 if (nominalTime.compareTo(currentTime) > 0) { 234 LOG.info("[" + actionId + "]::ActionInputCheck:: nominal Time is newer than current time. Current=" 235 + DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" 236 + DateUtils.formatDateOozieTZ(nominalTime)); 237 } 238 else { 239 String actionXml = resolveCoordConfiguration(); 240 coordAction.setActionXml(actionXml); 241 coordAction.setStatus(CoordinatorAction.Status.READY); 242 // pass jobID to the CoordActionReadyXCommand 243 queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100); 244 } 245 } 246 else if (isTimeout()) { 247 // If it is timeout and all push dependencies are available but still some unresolved 248 // missing dependencies queue CoordActionInputCheckXCommand now. Else it will have to 249 // wait till RecoveryService kicks in 250 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId())); 251 } 252 coordAction.getPushInputDependencies().setDependencyMet(true); 253 254 } 255 256 private String resolveCoordConfiguration() throws CommandException { 257 try { 258 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); 259 StringBuilder actionXml = new StringBuilder(coordAction.getActionXml()); 260 String newActionXml = CoordActionInputCheckXCommand.resolveCoordConfiguration(actionXml, actionConf, 261 actionId, coordAction.getPullInputDependencies(), coordAction 262 .getPushInputDependencies()); 263 actionXml.replace(0, actionXml.length(), newActionXml); 264 return actionXml.toString(); 265 } 266 catch (Exception e) { 267 throw new CommandException(ErrorCode.E1021, e.getMessage(), e); 268 } 269 } 270 271 protected void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency) 272 throws CommandException { 273 coordAction.setLastModifiedTime(new Date()); 274 if (jpaService != null) { 275 try { 276 if (isChangeInDependency) { 277 coordAction.setPushMissingDependencies(coordAction.getPushInputDependencies().serialize()); 278 CoordActionQueryExecutor.getInstance().executeUpdate( 279 CoordActionQuery.UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK, coordAction); 280 if (EventHandlerService.isEnabled() && coordAction.getStatus() != CoordinatorAction.Status.READY) { 281 // since event is not to be generated unless action 282 // RUNNING via StartX 283 generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null); 284 } 285 } 286 else { 287 CoordActionQueryExecutor.getInstance().executeUpdate( 288 CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, coordAction); 289 } 290 } 291 catch (JPAExecutorException jex) { 292 throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex); 293 } 294 catch (IOException ioe) { 295 throw new CommandException(ErrorCode.E1021, ioe.getMessage(), ioe); 296 } 297 } 298 } 299 300 private void registerForNotification(List<String> missingDeps, Configuration actionConf) { 301 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 302 String user = actionConf.get(OozieClient.USER_NAME, OozieClient.USER_NAME); 303 for (String missingDep : missingDeps) { 304 try { 305 URI missingURI = new URI(missingDep); 306 URIHandler handler = uriService.getURIHandler(missingURI); 307 handler.registerForNotification(missingURI, actionConf, user, actionId); 308 LOG.debug("Registered uri [{0}] for notifications", missingURI); 309 } 310 catch (Exception e) { 311 LOG.warn("Exception while registering uri [{0}] for notifications", missingDep, e); 312 } 313 } 314 } 315 316 private void unregisterAvailableDependencies(List<String> availableDeps) { 317 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 318 for (String availableDep : availableDeps) { 319 try { 320 URI availableURI = new URI(availableDep); 321 URIHandler handler = uriService.getURIHandler(availableURI); 322 if (handler.unregisterFromNotification(availableURI, actionId)) { 323 LOG.debug("Successfully unregistered uri [{0}] from notifications", availableURI); 324 } 325 else { 326 LOG.warn("Unable to unregister uri [{0}] from notifications", availableURI); 327 } 328 } 329 catch (Exception e) { 330 LOG.warn("Exception while unregistering uri [{0}] from notifications", availableDep, e); 331 } 332 } 333 } 334 335 public static void unregisterMissingDependencies(List<String> missingDeps, String actionId) { 336 final XLog LOG = XLog.getLog(CoordPushDependencyCheckXCommand.class); 337 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 338 for (String missingDep : missingDeps) { 339 try { 340 URI missingURI = new URI(missingDep); 341 URIHandler handler = uriService.getURIHandler(missingURI); 342 if (handler.unregisterFromNotification(missingURI, actionId)) { 343 LOG.debug("Successfully unregistered uri [{0}] from notifications", missingURI); 344 } 345 else { 346 LOG.warn("Unable to unregister uri [{0}] from notifications", missingURI); 347 } 348 } 349 catch (Exception e) { 350 LOG.warn("Exception while unregistering uri [{0}] from notifications", missingDep, e); 351 } 352 } 353 } 354 355 @Override 356 public String getEntityKey() { 357 return actionId.substring(0, actionId.indexOf("@")); 358 } 359 360 @Override 361 public String getKey(){ 362 return getName() + "_" + actionId; 363 } 364 365 @Override 366 protected boolean isLockRequired() { 367 return true; 368 } 369 370 @Override 371 protected void loadState() throws CommandException { 372 jpaService = Services.get().get(JPAService.class); 373 try { 374 coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId)); 375 if (coordAction != null) { 376 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId())); 377 LogUtils.setLogInfo(coordAction); 378 } 379 else { 380 throw new CommandException(ErrorCode.E0605, actionId); 381 } 382 } 383 catch (JPAExecutorException je) { 384 throw new CommandException(je); 385 } 386 } 387 388 @Override 389 protected void verifyPrecondition() throws CommandException, PreconditionException { 390 if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) { 391 throw new PreconditionException(ErrorCode.E1100, "[" + actionId 392 + "]::CoordPushDependencyCheck:: Ignoring action. Should be in WAITING state, but state=" 393 + coordAction.getStatus()); 394 } 395 396 // if eligible to do action input check when running with backward 397 // support is true 398 if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) { 399 return; 400 } 401 402 if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR 403 && coordJob.getStatus() != Job.Status.PAUSED && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) { 404 throw new PreconditionException(ErrorCode.E1100, "[" + actionId 405 + "]::CoordPushDependencyCheck:: Ignoring action." 406 + " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state=" 407 + coordJob.getStatus()); 408 } 409 } 410 411}