001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.coord; 020 021import java.io.IOException; 022import java.io.StringReader; 023import java.util.Calendar; 024import java.util.Date; 025import java.util.List; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.security.AccessControlException; 028import org.apache.oozie.CoordinatorActionBean; 029import org.apache.oozie.CoordinatorJobBean; 030import org.apache.oozie.ErrorCode; 031import org.apache.oozie.client.CoordinatorAction; 032import org.apache.oozie.client.Job; 033import org.apache.oozie.command.CommandException; 034import org.apache.oozie.command.PreconditionException; 035import org.apache.oozie.coord.CoordELEvaluator; 036import org.apache.oozie.coord.CoordELFunctions; 037import org.apache.oozie.coord.input.dependency.CoordInputDependency; 038import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor; 039import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; 040import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 041import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 042import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 043import org.apache.oozie.executor.jpa.JPAExecutorException; 044import org.apache.oozie.service.CallableQueueService; 045import org.apache.oozie.service.ConfigurationService; 046import org.apache.oozie.service.EventHandlerService; 047import org.apache.oozie.service.JPAService; 048import org.apache.oozie.service.Service; 049import org.apache.oozie.service.Services; 050import org.apache.oozie.util.DateUtils; 051import org.apache.oozie.util.ELEvaluator; 052import org.apache.oozie.util.LogUtils; 053import org.apache.oozie.util.ParamChecker; 054import org.apache.oozie.util.StatusUtils; 055import org.apache.oozie.util.XConfiguration; 056import org.apache.oozie.util.XLog; 057import org.apache.oozie.util.XmlUtils; 058import org.jdom.Element; 059 060/** 061 * The command to check if an action's data input paths exist in the file system. 062 */ 063public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> { 064 065 public static final String COORD_EXECUTION_NONE_TOLERANCE = "oozie.coord.execution.none.tolerance"; 066 067 private final String actionId; 068 /** 069 * Property name of command re-queue interval for coordinator action input check in 070 * milliseconds. 071 */ 072 public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX 073 + "coord.input.check.requeue.interval"; 074 public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL_ADDITIONAL_DELAY = Service.CONF_PREFIX 075 + "coord.input.check.requeue.interval.additional.delay"; 076 private CoordinatorActionBean coordAction = null; 077 private CoordinatorJobBean coordJob = null; 078 private JPAService jpaService = null; 079 private String jobId = null; 080 public CoordActionInputCheckXCommand(String actionId, String jobId) { 081 super("coord_action_input", "coord_action_input", 1); 082 this.actionId = ParamChecker.notEmpty(actionId, "actionId"); 083 this.jobId = jobId; 084 } 085 086 @Override 087 protected void setLogInfo() { 088 LogUtils.setLogInfo(actionId); 089 } 090 091 @Override 092 protected Void execute() throws CommandException { 093 LOG.debug("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state."); 094 095 // this action should only get processed if current time > nominal time; 096 // otherwise, requeue this action for delay execution; 097 Date nominalTime = coordAction.getNominalTime(); 098 Date currentTime = new Date(); 099 if (nominalTime.compareTo(currentTime) > 0) { 100 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), nominalTime.getTime() 101 - currentTime.getTime()); 102 updateCoordAction(coordAction, false); 103 LOG.info("[" + actionId 104 + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current=" 105 + DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" + DateUtils.formatDateOozieTZ(nominalTime)); 106 107 return null; 108 } 109 110 StringBuilder actionXml = new StringBuilder(coordAction.getActionXml()); 111 boolean isChangeInDependency = false; 112 try { 113 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); 114 Date now = new Date(); 115 if (coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.LAST_ONLY)) { 116 Date nextNominalTime = CoordCommandUtils.computeNextNominalTime(coordJob, coordAction); 117 if (nextNominalTime != null) { 118 // If the current time is after the next action's nominal time, then we've passed the window where this action 119 // should be started; so set it to SKIPPED 120 if (now.after(nextNominalTime)) { 121 LOG.info("LAST_ONLY execution: Preparing to skip action [{0}] because the current time [{1}] is later than " 122 + "the nominal time [{2}] of the next action]", coordAction.getId(), 123 DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime)); 124 queue(new CoordActionSkipXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); 125 return null; 126 } else { 127 LOG.debug("LAST_ONLY execution: Not skipping action [{0}] because the current time [{1}] is earlier than " 128 + "the nominal time [{2}] of the next action]", coordAction.getId(), 129 DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime)); 130 } 131 } 132 } 133 else if (coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.NONE)) { 134 // If the current time is after the nominal time of this action plus some tolerance, 135 // then we've passed the window where this action should be started; so set it to SKIPPED 136 Calendar cal = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone())); 137 cal.setTime(nominalTime); 138 int tolerance = ConfigurationService.getInt(COORD_EXECUTION_NONE_TOLERANCE); 139 cal.add(Calendar.MINUTE, tolerance); 140 if (now.after(cal.getTime())) { 141 LOG.info("NONE execution: Preparing to skip action [{0}] because the current time [{1}] is more than [{2}]" 142 + " minutes later than the nominal time [{3}] of the current action]", coordAction.getId(), 143 DateUtils.formatDateOozieTZ(now), tolerance, DateUtils.formatDateOozieTZ(nominalTime)); 144 queue(new CoordActionSkipXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); 145 return null; 146 } else { 147 LOG.debug("NONE execution: Not skipping action [{0}] because the current time [{1}] is earlier than [{2}]" 148 + " minutes later than the nominal time [{3}] of the current action]", coordAction.getId(), 149 DateUtils.formatDateOozieTZ(now), tolerance, DateUtils.formatDateOozieTZ(coordAction.getNominalTime())); 150 } 151 } 152 153 StringBuilder existList = new StringBuilder(); 154 StringBuilder nonExistList = new StringBuilder(); 155 CoordInputDependency coordPullInputDependency = coordAction.getPullInputDependencies(); 156 CoordInputDependency coordPushInputDependency = coordAction.getPushInputDependencies(); 157 158 159 String missingDependencies = coordPullInputDependency.getMissingDependencies(); 160 StringBuilder nonResolvedList = new StringBuilder(); 161 162 CoordCommandUtils.getResolvedList(missingDependencies, nonExistList, nonResolvedList); 163 String firstMissingDependency = ""; 164 // For clarity regarding which is the missing dependency in synchronous order 165 // instead of printing entire list, some of which, may be available 166 if (nonExistList.length() > 0) { 167 firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0]; 168 } 169 LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + firstMissingDependency + " " 170 + nonResolvedList.toString()); 171 172 173 boolean status = checkResolvedInput(actionXml, existList, nonExistList, actionConf); 174 String nonExistListStr = nonExistList.toString(); 175 boolean isPushDependenciesMet = coordPushInputDependency.isDependencyMet(); 176 if (status && nonResolvedList.length() > 0) { 177 status = (isPushDependenciesMet) ? checkUnResolvedInput(actionXml, actionConf) : false; 178 } 179 coordAction.setLastModifiedTime(currentTime); 180 coordAction.setActionXml(actionXml.toString()); 181 182 isChangeInDependency = isChangeInDependency(nonExistList, missingDependencies, nonResolvedList, status); 183 184 if (status && isPushDependenciesMet) { 185 String newActionXml = resolveCoordConfiguration(actionXml, actionConf, actionId, 186 coordPullInputDependency, coordPushInputDependency); 187 actionXml.replace(0, actionXml.length(), newActionXml); 188 coordAction.setActionXml(actionXml.toString()); 189 coordAction.setStatus(CoordinatorAction.Status.READY); 190 updateCoordAction(coordAction, true); 191 new CoordActionReadyXCommand(coordAction.getJobId()).call(); 192 } 193 else if (!isTimeout(currentTime)) { 194 if (!status) { 195 long addtionalDelay = isChangeInDependency ? 0 196 : ConfigurationService.getInt(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL_ADDITIONAL_DELAY) 197 * 1000L; 198 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 199 addtionalDelay + getCoordInputCheckRequeueInterval()); 200 } 201 updateCoordAction(coordAction, isChangeInDependency); 202 } 203 else { 204 if (!nonExistListStr.isEmpty() && isPushDependenciesMet) { 205 queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); 206 } 207 else { 208 // Let CoordPushDependencyCheckXCommand queue the timeout 209 queue(new CoordPushDependencyCheckXCommand(coordAction.getId())); 210 } 211 updateCoordAction(coordAction, isChangeInDependency); 212 } 213 } 214 catch (AccessControlException e) { 215 LOG.error("Permission error in ActionInputCheck", e); 216 if (isTimeout(currentTime)) { 217 LOG.debug("Queueing timeout command"); 218 Services.get().get(CallableQueueService.class) 219 .queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); 220 } 221 else { 222 // Requeue InputCheckCommand for permission denied error with longer interval 223 Services.get() 224 .get(CallableQueueService.class) 225 .queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 226 2 * getCoordInputCheckRequeueInterval()); 227 } 228 updateCoordAction(coordAction, isChangeInDependency); 229 } 230 catch (Exception e) { 231 if (isTimeout(currentTime)) { 232 LOG.debug("Queueing timeout command"); 233 // XCommand.queue() will not work when there is a Exception 234 Services.get().get(CallableQueueService.class) 235 .queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); 236 } 237 updateCoordAction(coordAction, isChangeInDependency); 238 throw new CommandException(ErrorCode.E1021, e.getMessage(), e); 239 } 240 return null; 241 } 242 243 private boolean isChangeInDependency(StringBuilder nonExistList, String missingDependencies, 244 StringBuilder nonResolvedList, boolean status) throws IOException { 245 if (nonResolvedList.length() > 0 && status == false) { 246 nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList); 247 } 248 return coordAction.getPullInputDependencies().isChangeInDependency(nonExistList, missingDependencies, 249 nonResolvedList, status); 250 } 251 252 static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId) 253 throws Exception { 254 return resolveCoordConfiguration(actionXml, actionConf, actionId, null, null); 255 } 256 257 static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId, 258 CoordInputDependency pullDependencies, CoordInputDependency pushDependencies) throws Exception { 259 Element eAction = XmlUtils.parseXml(actionXml.toString()); 260 ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, actionConf, actionId, pullDependencies, 261 pushDependencies); 262 materializeDataProperties(eAction, actionConf, eval); 263 return XmlUtils.prettyPrint(eAction).toString(); 264 } 265 266 private boolean isTimeout(Date currentTime) { 267 long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction 268 .getCreatedTime().getTime())) 269 / (60 * 1000); 270 int timeOut = coordAction.getTimeOut(); 271 return (timeOut >= 0) && (waitingTime > timeOut); 272 } 273 274 private void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency) 275 throws CommandException { 276 coordAction.setLastModifiedTime(new Date()); 277 if (jpaService != null) { 278 try { 279 if (isChangeInDependency) { 280 coordAction.setMissingDependencies(coordAction.getPullInputDependencies().serialize()); 281 CoordActionQueryExecutor.getInstance().executeUpdate( 282 CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK, coordAction); 283 if (EventHandlerService.isEnabled() && coordAction.getStatus() != CoordinatorAction.Status.READY) { 284 // since event is not to be generated unless action 285 // RUNNING via StartX 286 generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null); 287 } 288 } 289 else { 290 CoordActionQueryExecutor.getInstance().executeUpdate( 291 CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, coordAction); 292 } 293 } 294 catch (Exception jex) { 295 throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex); 296 } 297 } 298 } 299 /** 300 * This function reads the value of re-queue interval for coordinator input 301 * check command from the Oozie configuration provided by Configuration 302 * Service. If nothing defined in the configuration, it uses the code 303 * specified default value. 304 * 305 * @return re-queue interval in ms 306 */ 307 public long getCoordInputCheckRequeueInterval() { 308 long requeueInterval = ConfigurationService.getLong(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL); 309 return requeueInterval; 310 } 311 312 /** 313 * To check the list of input paths if all of them exist 314 * 315 * @param actionXml action xml 316 * @param existList the list of existed paths 317 * @param nonExistList the list of non existed paths 318 * @param conf action configuration 319 * @return true if all input paths are existed 320 * @throws Exception thrown of unable to check input path 321 */ 322 protected boolean checkResolvedInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList, 323 Configuration conf) throws Exception { 324 return coordAction.getPullInputDependencies().checkPullMissingDependencies(coordAction, existList, 325 nonExistList); 326 } 327 328 /** 329 * Check un resolved input. 330 * 331 * @param coordAction the coord action 332 * @param actionXml the action xml 333 * @param conf the conf 334 * @return true, if successful 335 * @throws Exception the exception 336 */ 337 protected boolean checkUnResolvedInput(CoordinatorActionBean coordAction, StringBuilder actionXml, 338 Configuration conf) throws Exception { 339 Element eAction = XmlUtils.parseXml(actionXml.toString()); 340 LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future"); 341 boolean allExist = checkUnresolvedInstances(coordAction, eAction, conf); 342 if (allExist) { 343 actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString()); 344 } 345 return allExist; 346 } 347 348 /** 349 * Check un resolved input. 350 * 351 * @param actionXml the action xml 352 * @param conf the conf 353 * @return true, if successful 354 * @throws Exception the exception 355 */ 356 protected boolean checkUnResolvedInput(StringBuilder actionXml, Configuration conf) throws Exception { 357 return checkUnResolvedInput(coordAction, actionXml, conf); 358 } 359 360 /** 361 * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list 362 * of files that will be needed. 363 * 364 * @param eAction action element 365 * @param conf action configuration 366 * @throws Exception thrown if failed to resolve data properties 367 * @update modify 'Action' element with appropriate list of files. 368 */ 369 @SuppressWarnings("unchecked") 370 static void materializeDataProperties(Element eAction, Configuration conf, ELEvaluator eval) throws Exception { 371 Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow", 372 eAction.getNamespace()).getChild("configuration", eAction.getNamespace()); 373 if (configElem != null) { 374 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) { 375 resolveTagContents("value", propElem, eval); 376 } 377 } 378 } 379 380 /** 381 * To resolve property value which contains el functions 382 * 383 * @param tagName tag name 384 * @param elem the child element of "property" element 385 * @param eval el functions evaluator 386 * @throws Exception thrown if unable to resolve tag value 387 */ 388 private static void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception { 389 if (elem == null) { 390 return; 391 } 392 Element tagElem = elem.getChild(tagName, elem.getNamespace()); 393 if (tagElem != null) { 394 String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText()); 395 tagElem.removeContent(); 396 tagElem.addContent(updated); 397 } 398 else { 399 XLog.getLog(CoordActionInputCheckXCommand.class).warn(" Value NOT FOUND " + tagName); 400 } 401 } 402 403 /** 404 * Check if any unsolved paths under data output. Resolve the unresolved data input paths. 405 * 406 * @param eAction action element 407 * @param actionConf action configuration 408 * @return true if successful to resolve input and output paths 409 * @throws Exception thrown if failed to resolve data input and output paths 410 */ 411 @SuppressWarnings("unchecked") 412 private boolean checkUnresolvedInstances(CoordinatorActionBean coordAction, Element eAction, 413 Configuration actionConf) throws Exception { 414 415 boolean ret = coordAction.getPullInputDependencies().checkUnresolved(coordAction, eAction); 416 417 // Using latest() or future() in output-event is not intuitive. 418 // We need to make sure, this assumption is correct. 419 Element outputList = eAction.getChild("output-events", eAction.getNamespace()); 420 if (outputList != null) { 421 for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) { 422 if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()) != null) { 423 throw new CommandException(ErrorCode.E1006, "coord:latest()/future()", 424 " not permitted in output-event "); 425 } 426 } 427 } 428 return ret; 429 } 430 431 /** 432 * getting the error code of the coord action. (used mainly for unit testing) 433 */ 434 protected String getCoordActionErrorCode() { 435 if (coordAction != null) { 436 return coordAction.getErrorCode(); 437 } 438 return null; 439 } 440 441 /** 442 * getting the error message of the coord action. (used mainly for unit testing) 443 */ 444 protected String getCoordActionErrorMsg() { 445 if (coordAction != null) { 446 return coordAction.getErrorMessage(); 447 } 448 return null; 449 } 450 451 @Override 452 public String getEntityKey() { 453 return this.jobId; 454 } 455 456 @Override 457 protected boolean isLockRequired() { 458 return true; 459 } 460 461 @Override 462 protected void loadState() throws CommandException { 463 if (jpaService == null) { 464 jpaService = Services.get().get(JPAService.class); 465 } 466 try { 467 coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId)); 468 coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_INPUT_CHECK, 469 coordAction.getJobId()); 470 } 471 catch (JPAExecutorException je) { 472 throw new CommandException(je); 473 } 474 LogUtils.setLogInfo(coordAction); 475 } 476 477 @Override 478 protected void verifyPrecondition() throws CommandException, PreconditionException { 479 if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) { 480 throw new PreconditionException(ErrorCode.E1100, "[" + actionId 481 + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state=" 482 + coordAction.getStatus()); 483 } 484 485 // if eligible to do action input check when running with backward support is true 486 if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) { 487 return; 488 } 489 490 if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR && coordJob.getStatus() != Job.Status.PAUSED 491 && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) { 492 throw new PreconditionException( 493 ErrorCode.E1100, "["+ actionId + "]::CoordActionInputCheck:: Ignoring action." + 494 " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state=" 495 + coordJob.getStatus()); 496 } 497 } 498 499 @Override 500 public String getKey(){ 501 return getName() + "_" + actionId; 502 } 503 504}