001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.net.URI; 023 import java.util.Arrays; 024 import java.util.Date; 025 import java.util.List; 026 import org.apache.hadoop.conf.Configuration; 027 import org.apache.oozie.CoordinatorActionBean; 028 import org.apache.oozie.CoordinatorJobBean; 029 import org.apache.oozie.ErrorCode; 030 import org.apache.oozie.XException; 031 import org.apache.oozie.client.CoordinatorAction; 032 import org.apache.oozie.client.Job; 033 import org.apache.oozie.client.OozieClient; 034 import org.apache.oozie.command.CommandException; 035 import org.apache.oozie.command.PreconditionException; 036 import org.apache.oozie.dependency.DependencyChecker; 037 import org.apache.oozie.dependency.ActionDependency; 038 import org.apache.oozie.dependency.URIHandler; 039 import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor; 040 import org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor; 041 import org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor; 042 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 043 import org.apache.oozie.executor.jpa.JPAExecutorException; 044 import org.apache.oozie.service.CallableQueueService; 045 import org.apache.oozie.service.EventHandlerService; 046 import org.apache.oozie.service.JPAService; 047 import org.apache.oozie.service.RecoveryService; 048 import org.apache.oozie.service.Service; 049 import org.apache.oozie.service.Services; 050 import org.apache.oozie.service.URIHandlerService; 051 import org.apache.oozie.util.LogUtils; 052 import org.apache.oozie.util.StatusUtils; 053 import org.apache.oozie.util.XConfiguration; 054 import org.apache.oozie.util.XLog; 055 056 public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> { 057 protected String actionId; 058 protected JPAService jpaService = null; 059 protected CoordinatorActionBean coordAction = null; 060 protected CoordinatorJobBean coordJob = null; 061 062 /** 063 * Property name of command re-queue interval for coordinator push check in 064 * milliseconds. 065 */ 066 public static final String CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX 067 + "coord.push.check.requeue.interval"; 068 /** 069 * Default re-queue interval in ms. It is applied when no value defined in 070 * the oozie configuration. 071 */ 072 private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 600000; 073 private boolean registerForNotification; 074 private boolean removeAvailDependencies; 075 076 public CoordPushDependencyCheckXCommand(String actionId) { 077 this(actionId, false, true); 078 } 079 080 public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification) { 081 this(actionId, registerForNotification, !registerForNotification); 082 } 083 084 public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification, 085 boolean removeAvailDependencies) { 086 super("coord_push_dep_check", "coord_push_dep_check", 0); 087 this.actionId = actionId; 088 this.registerForNotification = registerForNotification; 089 this.removeAvailDependencies = removeAvailDependencies; 090 } 091 092 protected CoordPushDependencyCheckXCommand(String actionName, String actionId) { 093 super(actionName, actionName, 0); 094 this.actionId = actionId; 095 } 096 097 @Override 098 protected Void execute() throws CommandException { 099 String pushMissingDeps = coordAction.getPushMissingDependencies(); 100 if (pushMissingDeps == null || pushMissingDeps.length() == 0) { 101 LOG.info("Nothing to check. Empty push missing dependency"); 102 } 103 else { 104 String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps); 105 LOG.info("First Push missing dependency is [{0}] ", missingDepsArray[0]); 106 LOG.trace("Push missing dependencies are [{0}] ", pushMissingDeps); 107 if (registerForNotification) { 108 LOG.debug("Register for notifications is true"); 109 } 110 111 try { 112 Configuration actionConf = null; 113 try { 114 actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); 115 } 116 catch (IOException e) { 117 throw new CommandException(ErrorCode.E1307, e.getMessage(), e); 118 } 119 120 // Check all dependencies during materialization to avoid registering in the cache. 121 // But check only first missing one afterwards similar to 122 // CoordActionInputCheckXCommand for efficiency. listPartitions is costly. 123 ActionDependency actionDep = DependencyChecker.checkForAvailability(missingDepsArray, actionConf, 124 !registerForNotification); 125 126 boolean isChangeInDependency = true; 127 boolean timeout = false; 128 if (actionDep.getMissingDependencies().size() == 0) { 129 // All push-based dependencies are available 130 onAllPushDependenciesAvailable(); 131 } 132 else { 133 if (actionDep.getMissingDependencies().size() == missingDepsArray.length) { 134 isChangeInDependency = false; 135 } 136 else { 137 String stillMissingDeps = DependencyChecker.dependenciesAsString(actionDep 138 .getMissingDependencies()); 139 coordAction.setPushMissingDependencies(stillMissingDeps); 140 } 141 // Checking for timeout 142 timeout = isTimeout(); 143 if (timeout) { 144 queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); 145 } 146 else { 147 queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 148 getCoordPushCheckRequeueInterval()); 149 } 150 } 151 152 updateCoordAction(coordAction, isChangeInDependency); 153 if (registerForNotification) { 154 registerForNotification(actionDep.getMissingDependencies(), actionConf); 155 } 156 if (removeAvailDependencies) { 157 unregisterAvailableDependencies(actionDep.getAvailableDependencies()); 158 } 159 if (timeout) { 160 unregisterMissingDependencies(actionDep.getMissingDependencies(), actionId); 161 } 162 } 163 catch (Exception e) { 164 final CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class); 165 if (isTimeout()) { 166 LOG.debug("Queueing timeout command"); 167 // XCommand.queue() will not work when there is a Exception 168 callableQueueService.queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); 169 unregisterMissingDependencies(Arrays.asList(missingDepsArray), actionId); 170 } 171 else if (coordAction.getMissingDependencies() != null 172 && coordAction.getMissingDependencies().length() > 0) { 173 // Queue again on exception as RecoveryService will not queue this again with 174 // the action being updated regularly by CoordActionInputCheckXCommand 175 callableQueueService.queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), 176 registerForNotification, removeAvailDependencies), 177 Services.get().getConf().getInt(RecoveryService.CONF_COORD_OLDER_THAN, 600) * 1000); 178 } 179 throw new CommandException(ErrorCode.E1021, e.getMessage(), e); 180 } 181 } 182 return null; 183 } 184 185 /** 186 * Return the re-queue interval for coord push dependency check 187 * @return 188 */ 189 public long getCoordPushCheckRequeueInterval() { 190 long requeueInterval = Services.get().getConf().getLong(CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL, 191 DEFAULT_COMMAND_REQUEUE_INTERVAL); 192 return requeueInterval; 193 } 194 195 /** 196 * Returns true if timeout period has been reached 197 * 198 * @return true if it is time for timeout else false 199 */ 200 protected boolean isTimeout() { 201 long waitingTime = (new Date().getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction 202 .getCreatedTime().getTime())) 203 / (60 * 1000); 204 int timeOut = coordAction.getTimeOut(); 205 return (timeOut >= 0) && (waitingTime > timeOut); 206 } 207 208 protected void onAllPushDependenciesAvailable() throws CommandException { 209 coordAction.setPushMissingDependencies(""); 210 if (coordAction.getMissingDependencies() == null || coordAction.getMissingDependencies().length() == 0) { 211 Date nominalTime = coordAction.getNominalTime(); 212 Date currentTime = new Date(); 213 // The action should become READY only if current time > nominal time; 214 // CoordActionInputCheckXCommand will take care of moving it to READY when it is nominal time. 215 if (nominalTime.compareTo(currentTime) > 0) { 216 LOG.info("[" + actionId + "]::ActionInputCheck:: nominal Time is newer than current time. Current=" 217 + currentTime + ", nominal=" + nominalTime); 218 } 219 else { 220 String actionXml = resolveCoordConfiguration(); 221 coordAction.setActionXml(actionXml); 222 coordAction.setStatus(CoordinatorAction.Status.READY); 223 // pass jobID to the CoordActionReadyXCommand 224 queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100); 225 } 226 } 227 else if (isTimeout()) { 228 // If it is timeout and all push dependencies are available but still some unresolved 229 // missing dependencies queue CoordActionInputCheckXCommand now. Else it will have to 230 // wait till RecoveryService kicks in 231 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId())); 232 } 233 } 234 235 private String resolveCoordConfiguration() throws CommandException { 236 try { 237 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); 238 StringBuilder actionXml = new StringBuilder(coordAction.getActionXml()); 239 String newActionXml = CoordActionInputCheckXCommand.resolveCoordConfiguration(actionXml, actionConf, 240 actionId); 241 actionXml.replace(0, actionXml.length(), newActionXml); 242 return actionXml.toString(); 243 } 244 catch (Exception e) { 245 throw new CommandException(ErrorCode.E1021, e.getMessage(), e); 246 } 247 } 248 249 protected void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency) 250 throws CommandException { 251 coordAction.setLastModifiedTime(new Date()); 252 if (jpaService != null) { 253 try { 254 if (isChangeInDependency) { 255 jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(coordAction)); 256 if (EventHandlerService.isEnabled() 257 && coordAction.getStatus() != CoordinatorAction.Status.READY) { 258 //since event is not to be generated unless action RUNNING via StartX 259 generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null); 260 } 261 } 262 else { 263 jpaService.execute(new CoordActionUpdateForModifiedTimeJPAExecutor(coordAction)); 264 } 265 } 266 catch (JPAExecutorException jex) { 267 throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex); 268 } 269 } 270 } 271 272 private void registerForNotification(List<String> missingDeps, Configuration actionConf) { 273 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 274 String user = actionConf.get(OozieClient.USER_NAME, OozieClient.USER_NAME); 275 for (String missingDep : missingDeps) { 276 try { 277 URI missingURI = new URI(missingDep); 278 URIHandler handler = uriService.getURIHandler(missingURI); 279 handler.registerForNotification(missingURI, actionConf, user, actionId); 280 LOG.debug("Registered uri [{0}] for notifications", missingURI); 281 } 282 catch (Exception e) { 283 LOG.warn("Exception while registering uri [{0}] for notifications", missingDep, e); 284 } 285 } 286 } 287 288 private void unregisterAvailableDependencies(List<String> availableDeps) { 289 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 290 for (String availableDep : availableDeps) { 291 try { 292 URI availableURI = new URI(availableDep); 293 URIHandler handler = uriService.getURIHandler(availableURI); 294 if (handler.unregisterFromNotification(availableURI, actionId)) { 295 LOG.debug("Successfully unregistered uri [{0}] from notifications", availableURI); 296 } 297 else { 298 LOG.warn("Unable to unregister uri [{0}] from notifications", availableURI); 299 } 300 } 301 catch (Exception e) { 302 LOG.warn("Exception while unregistering uri [{0}] from notifications", availableDep, e); 303 } 304 } 305 } 306 307 public static void unregisterMissingDependencies(List<String> missingDeps, String actionId) { 308 final XLog LOG = XLog.getLog(CoordPushDependencyCheckXCommand.class); 309 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 310 for (String missingDep : missingDeps) { 311 try { 312 URI missingURI = new URI(missingDep); 313 URIHandler handler = uriService.getURIHandler(missingURI); 314 if (handler.unregisterFromNotification(missingURI, actionId)) { 315 LOG.debug("Successfully unregistered uri [{0}] from notifications", missingURI); 316 } 317 else { 318 LOG.warn("Unable to unregister uri [{0}] from notifications", missingURI); 319 } 320 } 321 catch (Exception e) { 322 LOG.warn("Exception while unregistering uri [{0}] from notifications", missingDep, e); 323 } 324 } 325 } 326 327 @Override 328 public String getEntityKey() { 329 return coordAction.getJobId(); 330 } 331 332 @Override 333 public String getKey(){ 334 return getName() + "_" + actionId; 335 } 336 337 @Override 338 protected boolean isLockRequired() { 339 return true; 340 } 341 342 @Override 343 protected void eagerLoadState() throws CommandException { 344 try { 345 jpaService = Services.get().get(JPAService.class); 346 347 if (jpaService != null) { 348 coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId)); 349 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId())); 350 LogUtils.setLogInfo(coordAction, logInfo); 351 } 352 else { 353 throw new CommandException(ErrorCode.E0610); 354 } 355 } 356 catch (XException ex) { 357 throw new CommandException(ex); 358 } 359 } 360 361 @Override 362 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 363 if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) { 364 throw new PreconditionException(ErrorCode.E1100, "[" + actionId 365 + "]::CoordPushDependencyCheck:: Ignoring action. Should be in WAITING state, but state=" 366 + coordAction.getStatus()); 367 } 368 369 // if eligible to do action input check when running with backward 370 // support is true 371 if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) { 372 return; 373 } 374 375 if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR 376 && coordJob.getStatus() != Job.Status.PAUSED && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) { 377 throw new PreconditionException(ErrorCode.E1100, "[" + actionId 378 + "]::CoordPushDependencyCheck:: Ignoring action." 379 + " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state=" 380 + coordJob.getStatus()); 381 } 382 } 383 384 @Override 385 protected void loadState() throws CommandException { 386 eagerLoadState(); 387 } 388 389 @Override 390 protected void verifyPrecondition() throws CommandException, PreconditionException { 391 eagerVerifyPrecondition(); 392 } 393 394 }