001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.coord; 020 021import java.io.IOException; 022import java.io.StringReader; 023import java.util.Calendar; 024import java.util.Date; 025import java.util.List; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.security.AccessControlException; 028import org.apache.oozie.CoordinatorActionBean; 029import org.apache.oozie.CoordinatorJobBean; 030import org.apache.oozie.ErrorCode; 031import org.apache.oozie.client.CoordinatorAction; 032import org.apache.oozie.client.Job; 033import org.apache.oozie.command.CommandException; 034import org.apache.oozie.command.PreconditionException; 035import org.apache.oozie.coord.CoordELEvaluator; 036import org.apache.oozie.coord.CoordELFunctions; 037import org.apache.oozie.coord.ElException; 038import org.apache.oozie.coord.input.dependency.CoordInputDependency; 039import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor; 040import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; 041import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 042import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 043import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 044import org.apache.oozie.executor.jpa.JPAExecutorException; 045import org.apache.oozie.service.CallableQueueService; 046import org.apache.oozie.service.ConfigurationService; 047import org.apache.oozie.service.EventHandlerService; 048import org.apache.oozie.service.JPAService; 049import org.apache.oozie.service.Service; 050import org.apache.oozie.service.Services; 051import org.apache.oozie.util.DateUtils; 052import org.apache.oozie.util.ELEvaluator; 053import org.apache.oozie.util.LogUtils; 054import org.apache.oozie.util.ParamChecker; 055import org.apache.oozie.util.StatusUtils; 056import org.apache.oozie.util.XConfiguration; 057import org.apache.oozie.util.XLog; 058import org.apache.oozie.util.XmlUtils; 059import org.jdom.Element; 060 061/** 062 * The command to check if an action's data input paths exist in the file system. 063 */ 064public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> { 065 066 public static final String COORD_EXECUTION_NONE_TOLERANCE = "oozie.coord.execution.none.tolerance"; 067 068 private final String actionId; 069 /** 070 * Property name of command re-queue interval for coordinator action input check in 071 * milliseconds. 072 */ 073 public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX 074 + "coord.input.check.requeue.interval"; 075 public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL_ADDITIONAL_DELAY = Service.CONF_PREFIX 076 + "coord.input.check.requeue.interval.additional.delay"; 077 private CoordinatorActionBean coordAction = null; 078 private CoordinatorJobBean coordJob = null; 079 private JPAService jpaService = null; 080 private String jobId = null; 081 public CoordActionInputCheckXCommand(String actionId, String jobId) { 082 super("coord_action_input", "coord_action_input", 1); 083 this.actionId = ParamChecker.notEmpty(actionId, "actionId"); 084 this.jobId = jobId; 085 } 086 087 @Override 088 protected void setLogInfo() { 089 LogUtils.setLogInfo(actionId); 090 } 091 092 @Override 093 protected Void execute() throws CommandException { 094 LOG.debug("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state."); 095 096 // this action should only get processed if current time > nominal time; 097 // otherwise, requeue this action for delay execution; 098 Date nominalTime = coordAction.getNominalTime(); 099 Date currentTime = new Date(); 100 if (nominalTime.compareTo(currentTime) > 0) { 101 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), nominalTime.getTime() 102 - currentTime.getTime()); 103 updateCoordAction(coordAction, false); 104 LOG.info("[" + actionId 105 + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current=" 106 + DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" + DateUtils.formatDateOozieTZ(nominalTime)); 107 108 return null; 109 } 110 111 StringBuilder actionXml = new StringBuilder(coordAction.getActionXml()); 112 boolean isChangeInDependency = false; 113 try { 114 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); 115 Date now = new Date(); 116 if (coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.LAST_ONLY)) { 117 Date nextNominalTime = CoordCommandUtils.computeNextNominalTime(coordJob, coordAction); 118 if (nextNominalTime != null) { 119 // If the current time is after the next action's nominal time, then we've passed the window where this action 120 // should be started; so set it to SKIPPED 121 if (now.after(nextNominalTime)) { 122 LOG.info("LAST_ONLY execution: Preparing to skip action [{0}] because the current time [{1}] is later than " 123 + "the nominal time [{2}] of the next action]", coordAction.getId(), 124 DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime)); 125 queue(new CoordActionSkipXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); 126 return null; 127 } else { 128 LOG.debug("LAST_ONLY execution: Not skipping action [{0}] because the current time [{1}] is earlier than " 129 + "the nominal time [{2}] of the next action]", coordAction.getId(), 130 DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime)); 131 } 132 } 133 } 134 else if (coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.NONE)) { 135 // If the current time is after the nominal time of this action plus some tolerance, 136 // then we've passed the window where this action should be started; so set it to SKIPPED 137 Calendar cal = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone())); 138 cal.setTime(nominalTime); 139 int tolerance = ConfigurationService.getInt(COORD_EXECUTION_NONE_TOLERANCE); 140 cal.add(Calendar.MINUTE, tolerance); 141 if (now.after(cal.getTime())) { 142 LOG.info("NONE execution: Preparing to skip action [{0}] because the current time [{1}] is more than [{2}]" 143 + " minutes later than the nominal time [{3}] of the current action]", coordAction.getId(), 144 DateUtils.formatDateOozieTZ(now), tolerance, DateUtils.formatDateOozieTZ(nominalTime)); 145 queue(new CoordActionSkipXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); 146 return null; 147 } else { 148 LOG.debug("NONE execution: Not skipping action [{0}] because the current time [{1}] is earlier than [{2}]" 149 + " minutes later than the nominal time [{3}] of the current action]", coordAction.getId(), 150 DateUtils.formatDateOozieTZ(now), tolerance, DateUtils.formatDateOozieTZ(coordAction.getNominalTime())); 151 } 152 } 153 154 StringBuilder existList = new StringBuilder(); 155 StringBuilder nonExistList = new StringBuilder(); 156 CoordInputDependency coordPullInputDependency = coordAction.getPullInputDependencies(); 157 CoordInputDependency coordPushInputDependency = coordAction.getPushInputDependencies(); 158 159 160 String missingDependencies = coordPullInputDependency.getMissingDependencies(); 161 StringBuilder nonResolvedList = new StringBuilder(); 162 163 CoordCommandUtils.getResolvedList(missingDependencies, nonExistList, nonResolvedList); 164 String firstMissingDependency = ""; 165 // For clarity regarding which is the missing dependency in synchronous order 166 // instead of printing entire list, some of which, may be available 167 if (nonExistList.length() > 0) { 168 firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0]; 169 } 170 LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + firstMissingDependency + " " 171 + nonResolvedList.toString()); 172 173 174 boolean status = checkResolvedInput(actionXml, existList, nonExistList, actionConf); 175 boolean isPushDependenciesMet = coordPushInputDependency.isDependencyMet(); 176 if (status && nonResolvedList.length() > 0) { 177 status = (isPushDependenciesMet) ? checkUnResolvedInput(actionXml, actionConf) : false; 178 } 179 coordAction.setLastModifiedTime(currentTime); 180 coordAction.setActionXml(actionXml.toString()); 181 182 isChangeInDependency = isChangeInDependency(nonExistList, missingDependencies, nonResolvedList, status); 183 184 if (status && isPushDependenciesMet) { 185 moveCoordActionToReady(actionXml, actionConf, coordPullInputDependency, coordPushInputDependency); 186 } 187 else if (!isTimeout(currentTime)) { 188 if (!status) { 189 long addtionalDelay = isChangeInDependency ? 0 190 : ConfigurationService.getInt(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL_ADDITIONAL_DELAY) 191 * 1000L; 192 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 193 addtionalDelay + getCoordInputCheckRequeueInterval()); 194 } 195 updateCoordAction(coordAction, isChangeInDependency); 196 } 197 else { 198 if (isPushDependenciesMet) { 199 queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); 200 } 201 else { 202 // Let CoordPushDependencyCheckXCommand queue the timeout 203 queue(new CoordPushDependencyCheckXCommand(coordAction.getId())); 204 } 205 updateCoordAction(coordAction, isChangeInDependency); 206 } 207 } 208 catch (AccessControlException e) { 209 LOG.error("Permission error in ActionInputCheck", e); 210 if (isTimeout(currentTime)) { 211 LOG.debug("Queueing timeout command"); 212 Services.get().get(CallableQueueService.class) 213 .queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); 214 } 215 else { 216 // Requeue InputCheckCommand for permission denied error with longer interval 217 Services.get() 218 .get(CallableQueueService.class) 219 .queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 220 2 * getCoordInputCheckRequeueInterval()); 221 } 222 updateCoordAction(coordAction, isChangeInDependency); 223 } 224 catch (Exception e) { 225 if (isTimeout(currentTime)) { 226 LOG.debug("Queueing timeout command"); 227 // XCommand.queue() will not work when there is a Exception 228 Services.get().get(CallableQueueService.class) 229 .queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); 230 } 231 updateCoordAction(coordAction, isChangeInDependency); 232 throw new CommandException(ErrorCode.E1021, e.getMessage(), e); 233 } 234 return null; 235 } 236 237 private boolean isChangeInDependency(StringBuilder nonExistList, String missingDependencies, 238 StringBuilder nonResolvedList, boolean status) throws IOException { 239 if (nonResolvedList.length() > 0 && status == false) { 240 nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList); 241 } 242 return coordAction.getPullInputDependencies().isChangeInDependency(nonExistList, missingDependencies, 243 nonResolvedList, status); 244 } 245 246 static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId) 247 throws Exception { 248 return resolveCoordConfiguration(actionXml, actionConf, actionId, null, null); 249 } 250 251 static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId, 252 CoordInputDependency pullDependencies, CoordInputDependency pushDependencies) throws Exception { 253 Element eAction = XmlUtils.parseXml(actionXml.toString()); 254 ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, actionConf, actionId, pullDependencies, 255 pushDependencies); 256 materializeDataProperties(eAction, actionConf, eval); 257 return XmlUtils.prettyPrint(eAction).toString(); 258 } 259 260 private boolean isTimeout(Date currentTime) { 261 long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction 262 .getCreatedTime().getTime())) 263 / (60 * 1000); 264 int timeOut = coordAction.getTimeOut(); 265 return (timeOut >= 0) && (waitingTime > timeOut); 266 } 267 268 private void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency) 269 throws CommandException { 270 coordAction.setLastModifiedTime(new Date()); 271 if (jpaService != null) { 272 try { 273 if (isChangeInDependency) { 274 coordAction.setMissingDependencies(coordAction.getPullInputDependencies().serialize()); 275 CoordActionQueryExecutor.getInstance().executeUpdate( 276 CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK, coordAction); 277 if (EventHandlerService.isEnabled() && coordAction.getStatus() != CoordinatorAction.Status.READY) { 278 // since event is not to be generated unless action 279 // RUNNING via StartX 280 generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null); 281 } 282 } 283 else { 284 CoordActionQueryExecutor.getInstance().executeUpdate( 285 CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, coordAction); 286 } 287 } 288 catch (Exception jex) { 289 throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex); 290 } 291 } 292 } 293 /** 294 * This function reads the value of re-queue interval for coordinator input 295 * check command from the Oozie configuration provided by Configuration 296 * Service. If nothing defined in the configuration, it uses the code 297 * specified default value. 298 * 299 * @return re-queue interval in ms 300 */ 301 public long getCoordInputCheckRequeueInterval() { 302 long requeueInterval = ConfigurationService.getLong(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL); 303 return requeueInterval; 304 } 305 306 /** 307 * To check the list of input paths if all of them exist 308 * 309 * @param actionXml action xml 310 * @param existList the list of existed paths 311 * @param nonExistList the list of non existed paths 312 * @param conf action configuration 313 * @return true if all input paths are existed 314 * @throws Exception thrown of unable to check input path 315 */ 316 protected boolean checkResolvedInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList, 317 Configuration conf) throws Exception { 318 return coordAction.getPullInputDependencies().checkPullMissingDependencies(coordAction, existList, 319 nonExistList); 320 } 321 322 /** 323 * Check un resolved input. 324 * 325 * @param coordAction the coord action 326 * @param actionXml the action xml 327 * @param conf the conf 328 * @return true, if successful 329 * @throws Exception the exception 330 */ 331 protected boolean checkUnResolvedInput(CoordinatorActionBean coordAction, StringBuilder actionXml, 332 Configuration conf) throws Exception { 333 Element eAction = XmlUtils.parseXml(actionXml.toString()); 334 LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future"); 335 boolean allExist = checkUnresolvedInstances(coordAction, eAction, conf); 336 if (allExist) { 337 actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString()); 338 } 339 return allExist; 340 } 341 342 /** 343 * Check un resolved input. 344 * 345 * @param actionXml the action xml 346 * @param conf the conf 347 * @return true, if successful 348 * @throws Exception the exception 349 */ 350 protected boolean checkUnResolvedInput(StringBuilder actionXml, Configuration conf) throws Exception { 351 return checkUnResolvedInput(coordAction, actionXml, conf); 352 } 353 354 /** 355 * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list 356 * of files that will be needed. 357 * 358 * @param eAction action element 359 * @param conf action configuration 360 * @throws Exception thrown if failed to resolve data properties 361 * @update modify 'Action' element with appropriate list of files. 362 */ 363 @SuppressWarnings("unchecked") 364 static void materializeDataProperties(Element eAction, Configuration conf, ELEvaluator eval) throws Exception { 365 Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow", 366 eAction.getNamespace()).getChild("configuration", eAction.getNamespace()); 367 if (configElem != null) { 368 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) { 369 resolveTagContents("value", propElem, eval); 370 } 371 } 372 } 373 374 /** 375 * To resolve property value which contains el functions 376 * 377 * @param tagName tag name 378 * @param elem the child element of "property" element 379 * @param eval el functions evaluator 380 * @throws Exception thrown if unable to resolve tag value 381 */ 382 private static void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception { 383 if (elem == null) { 384 return; 385 } 386 Element tagElem = elem.getChild(tagName, elem.getNamespace()); 387 if (tagElem != null) { 388 String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText()); 389 tagElem.removeContent(); 390 tagElem.addContent(updated); 391 } 392 else { 393 XLog.getLog(CoordActionInputCheckXCommand.class).warn(" Value NOT FOUND " + tagName); 394 } 395 } 396 397 /** 398 * Check if any unsolved paths under data output. Resolve the unresolved data input paths. 399 * 400 * @param eAction action element 401 * @param actionConf action configuration 402 * @return true if successful to resolve input and output paths 403 * @throws Exception thrown if failed to resolve data input and output paths 404 */ 405 @SuppressWarnings("unchecked") 406 private boolean checkUnresolvedInstances(CoordinatorActionBean coordAction, Element eAction, 407 Configuration actionConf) throws Exception { 408 409 boolean ret = coordAction.getPullInputDependencies().checkUnresolved(coordAction, eAction); 410 411 // Using latest() or future() in output-event is not intuitive. 412 // We need to make sure, this assumption is correct. 413 Element outputList = eAction.getChild("output-events", eAction.getNamespace()); 414 if (outputList != null) { 415 for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) { 416 if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()) != null) { 417 throw new CommandException(ErrorCode.E1006, "coord:latest()/future()", 418 " not permitted in output-event "); 419 } 420 } 421 } 422 return ret; 423 } 424 425 /** 426 * Resolves coordinator configuration and moves CoordAction to READY state 427 * 428 * @param actionXml 429 * @param actionConf 430 * @param coordPullInputDependency 431 * @param coordPushInputDependency 432 * @throws Exception 433 */ 434 private void moveCoordActionToReady(StringBuilder actionXml, Configuration actionConf, 435 CoordInputDependency coordPullInputDependency, CoordInputDependency coordPushInputDependency) 436 throws Exception { 437 String newActionXml = null; 438 try { 439 newActionXml = resolveCoordConfiguration(actionXml, actionConf, actionId, coordPullInputDependency, 440 coordPushInputDependency); 441 } 442 catch (ElException e) { 443 coordAction.setStatus(CoordinatorAction.Status.FAILED); 444 updateCoordAction(coordAction, true); 445 throw e; 446 } 447 actionXml.replace(0, actionXml.length(), newActionXml); 448 coordAction.setActionXml(actionXml.toString()); 449 coordAction.setStatus(CoordinatorAction.Status.READY); 450 updateCoordAction(coordAction, true); 451 new CoordActionReadyXCommand(coordAction.getJobId()).call(); 452 } 453 454 /** 455 * getting the error code of the coord action. (used mainly for unit testing) 456 */ 457 protected String getCoordActionErrorCode() { 458 if (coordAction != null) { 459 return coordAction.getErrorCode(); 460 } 461 return null; 462 } 463 464 /** 465 * getting the error message of the coord action. (used mainly for unit testing) 466 */ 467 protected String getCoordActionErrorMsg() { 468 if (coordAction != null) { 469 return coordAction.getErrorMessage(); 470 } 471 return null; 472 } 473 474 @Override 475 public String getEntityKey() { 476 return this.jobId; 477 } 478 479 @Override 480 protected boolean isLockRequired() { 481 return true; 482 } 483 484 @Override 485 protected void loadState() throws CommandException { 486 if (jpaService == null) { 487 jpaService = Services.get().get(JPAService.class); 488 } 489 try { 490 coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId)); 491 coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_INPUT_CHECK, 492 coordAction.getJobId()); 493 } 494 catch (JPAExecutorException je) { 495 throw new CommandException(je); 496 } 497 LogUtils.setLogInfo(coordAction); 498 } 499 500 @Override 501 protected void verifyPrecondition() throws CommandException, PreconditionException { 502 if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) { 503 throw new PreconditionException(ErrorCode.E1100, "[" + actionId 504 + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state=" 505 + coordAction.getStatus()); 506 } 507 508 // if eligible to do action input check when running with backward support is true 509 if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) { 510 return; 511 } 512 513 if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR && coordJob.getStatus() != Job.Status.PAUSED 514 && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) { 515 throw new PreconditionException( 516 ErrorCode.E1100, "["+ actionId + "]::CoordActionInputCheck:: Ignoring action." + 517 " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state=" 518 + coordJob.getStatus()); 519 } 520 } 521 522 @Override 523 public String getKey(){ 524 return getName() + "_" + actionId; 525 } 526 527}