001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.util.Date; 023 import java.util.List; 024 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.hadoop.fs.Path; 027 import org.apache.oozie.CoordinatorActionBean; 028 import org.apache.oozie.CoordinatorJobBean; 029 import org.apache.oozie.ErrorCode; 030 import org.apache.oozie.client.CoordinatorAction; 031 import org.apache.oozie.client.Job; 032 import org.apache.oozie.client.OozieClient; 033 import org.apache.oozie.command.CommandException; 034 import org.apache.oozie.command.PreconditionException; 035 import org.apache.oozie.coord.CoordELEvaluator; 036 import org.apache.oozie.coord.CoordELFunctions; 037 import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor; 038 import org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor; 039 import org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor; 040 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 041 import org.apache.oozie.executor.jpa.JPAExecutorException; 042 import org.apache.oozie.service.HadoopAccessorException; 043 import org.apache.oozie.service.HadoopAccessorService; 044 import org.apache.oozie.service.JPAService; 045 import org.apache.oozie.service.Service; 046 import org.apache.oozie.service.Services; 047 import org.apache.oozie.util.DateUtils; 048 import org.apache.oozie.util.ELEvaluator; 049 import org.apache.oozie.util.Instrumentation; 050 import org.apache.oozie.util.LogUtils; 051 import org.apache.oozie.util.ParamChecker; 052 import org.apache.oozie.util.StatusUtils; 053 import org.apache.oozie.util.XConfiguration; 054 import org.apache.oozie.util.XmlUtils; 055 import org.jdom.Element; 056 057 /** 058 * The command to check if an action's data input paths exist in the file system. 059 */ 060 public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> { 061 062 private final String actionId; 063 /** 064 * Property name of command re-queue interval for coordinator action input check in 065 * milliseconds. 066 */ 067 public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX 068 + "coord.input.check.requeue.interval"; 069 /** 070 * Default re-queue interval in ms. It is applied when no value defined in 071 * the oozie configuration. 072 */ 073 private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 60000; // 1 minute 074 private CoordinatorActionBean coordAction = null; 075 private CoordinatorJobBean coordJob = null; 076 private JPAService jpaService = null; 077 private String jobId = null; 078 079 public CoordActionInputCheckXCommand(String actionId, String jobId) { 080 super("coord_action_input", "coord_action_input", 1); 081 this.actionId = ParamChecker.notEmpty(actionId, "actionId"); 082 this.jobId = jobId; 083 } 084 085 /* (non-Javadoc) 086 * @see org.apache.oozie.command.XCommand#execute() 087 */ 088 @Override 089 protected Void execute() throws CommandException { 090 LOG.info("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state."); 091 092 // this action should only get processed if current time > nominal time; 093 // otherwise, requeue this action for delay execution; 094 Date nominalTime = coordAction.getNominalTime(); 095 Date currentTime = new Date(); 096 if (nominalTime.compareTo(currentTime) > 0) { 097 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), Math.max((nominalTime.getTime() - currentTime 098 .getTime()), getCoordInputCheckRequeueInterval())); 099 // update lastModifiedTime 100 coordAction.setLastModifiedTime(new Date()); 101 try { 102 jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(coordAction)); 103 } 104 catch (JPAExecutorException e) { 105 throw new CommandException(e); 106 } 107 LOG.info("[" + actionId 108 + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current=" 109 + currentTime + ", nominal=" + nominalTime); 110 111 return null; 112 } 113 114 StringBuilder actionXml = new StringBuilder(coordAction.getActionXml()); 115 Instrumentation.Cron cron = new Instrumentation.Cron(); 116 boolean isChangeInDependency = false; 117 try { 118 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); 119 cron.start(); 120 StringBuilder existList = new StringBuilder(); 121 StringBuilder nonExistList = new StringBuilder(); 122 StringBuilder nonResolvedList = new StringBuilder(); 123 String firstMissingDependency = ""; 124 String missingDeps = coordAction.getMissingDependencies(); 125 CoordCommandUtils.getResolvedList(missingDeps, nonExistList, nonResolvedList); 126 127 // For clarity regarding which is the missing dependency in synchronous order 128 // instead of printing entire list, some of which, may be available 129 if(nonExistList.length() > 0) { 130 firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0]; 131 } 132 LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + firstMissingDependency + " " 133 + nonResolvedList.toString()); 134 // Updating the list of data dependencies that are available and those that are yet not 135 boolean status = checkInput(actionXml, existList, nonExistList, actionConf); 136 coordAction.setLastModifiedTime(currentTime); 137 coordAction.setActionXml(actionXml.toString()); 138 if (nonResolvedList.length() > 0 && status == false) { 139 nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList); 140 } 141 String nonExistListStr = nonExistList.toString(); 142 if (!nonExistListStr.equals(missingDeps) || missingDeps.isEmpty()) { 143 // missingDeps empty means action should become READY 144 isChangeInDependency = true; 145 coordAction.setMissingDependencies(nonExistListStr); 146 } 147 if (status == true) { 148 coordAction.setStatus(CoordinatorAction.Status.READY); 149 // pass jobID to the CoordActionReadyXCommand 150 queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100); 151 } 152 else { 153 long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction 154 .getCreatedTime().getTime())) 155 / (60 * 1000); 156 int timeOut = coordAction.getTimeOut(); 157 if ((timeOut >= 0) && (waitingTime > timeOut)) { 158 queue(new CoordActionTimeOutXCommand(coordAction), 100); 159 } 160 else { 161 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), getCoordInputCheckRequeueInterval()); 162 } 163 } 164 } 165 catch (Exception e) { 166 throw new CommandException(ErrorCode.E1021, e.getMessage(), e); 167 } 168 finally { 169 coordAction.setLastModifiedTime(new Date()); 170 cron.stop(); 171 if(jpaService != null) { 172 try { 173 if (isChangeInDependency) { 174 jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(coordAction)); 175 } 176 else { 177 jpaService.execute(new CoordActionUpdateForModifiedTimeJPAExecutor(coordAction)); 178 } 179 } 180 catch(JPAExecutorException jex) { 181 throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex); 182 } 183 } 184 } 185 return null; 186 } 187 188 /** 189 * This function reads the value of re-queue interval for coordinator input 190 * check command from the Oozie configuration provided by Configuration 191 * Service. If nothing defined in the configuration, it uses the code 192 * specified default value. 193 * 194 * @return re-queue interval in ms 195 */ 196 public long getCoordInputCheckRequeueInterval() { 197 long requeueInterval = Services.get().getConf().getLong(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL, 198 DEFAULT_COMMAND_REQUEUE_INTERVAL); 199 return requeueInterval; 200 } 201 202 /** 203 * To check the list of input paths if all of them exist 204 * 205 * @param actionXml action xml 206 * @param existList the list of existed paths 207 * @param nonExistList the list of non existed paths 208 * @param conf action configuration 209 * @return true if all input paths are existed 210 * @throws Exception thrown of unable to check input path 211 */ 212 protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList, 213 Configuration conf) throws Exception { 214 Element eAction = XmlUtils.parseXml(actionXml.toString()); 215 boolean allExist = checkResolvedUris(eAction, existList, nonExistList, conf); 216 if (allExist) { 217 LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future"); 218 allExist = checkUnresolvedInstances(eAction, conf); 219 } 220 if (allExist == true) { 221 materializeDataProperties(eAction, conf); 222 actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString()); 223 } 224 return allExist; 225 } 226 227 /** 228 * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list 229 * of files that will be needed. 230 * 231 * @param eAction action element 232 * @param conf action configuration 233 * @throws Exception thrown if failed to resolve data properties 234 * @update modify 'Action' element with appropriate list of files. 235 */ 236 @SuppressWarnings("unchecked") 237 private void materializeDataProperties(Element eAction, Configuration conf) throws Exception { 238 ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, conf, actionId); 239 Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow", 240 eAction.getNamespace()).getChild("configuration", eAction.getNamespace()); 241 if (configElem != null) { 242 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) { 243 resolveTagContents("value", propElem, eval); 244 } 245 } 246 } 247 248 /** 249 * To resolve property value which contains el functions 250 * 251 * @param tagName tag name 252 * @param elem the child element of "property" element 253 * @param eval el functions evaluator 254 * @throws Exception thrown if unable to resolve tag value 255 */ 256 private void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception { 257 if (elem == null) { 258 return; 259 } 260 Element tagElem = elem.getChild(tagName, elem.getNamespace()); 261 if (tagElem != null) { 262 String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText()); 263 tagElem.removeContent(); 264 tagElem.addContent(updated); 265 } 266 else { 267 LOG.warn(" Value NOT FOUND " + tagName); 268 } 269 } 270 271 /** 272 * Check if any unsolved paths under data output. Resolve the unresolved data input paths. 273 * 274 * @param eAction action element 275 * @param actionConf action configuration 276 * @return true if successful to resolve input and output paths 277 * @throws Exception thrown if failed to resolve data input and output paths 278 */ 279 @SuppressWarnings("unchecked") 280 private boolean checkUnresolvedInstances(Element eAction, Configuration actionConf) throws Exception { 281 String strAction = XmlUtils.prettyPrint(eAction).toString(); 282 Date nominalTime = DateUtils.parseDateOozieTZ(eAction.getAttributeValue("action-nominal-time")); 283 String actualTimeStr = eAction.getAttributeValue("action-actual-time"); 284 Date actualTime = null; 285 if (actualTimeStr == null) { 286 LOG.debug("Unable to get action-actual-time from action xml, this job is submitted " + 287 "from previous version. Assign current date to actual time, action = " + actionId); 288 actualTime = new Date(); 289 } else { 290 actualTime = DateUtils.parseDateOozieTZ(actualTimeStr); 291 } 292 293 StringBuffer resultedXml = new StringBuffer(); 294 295 boolean ret; 296 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 297 if (inputList != null) { 298 ret = materializeUnresolvedEvent(inputList.getChildren("data-in", eAction.getNamespace()), nominalTime, 299 actualTime, actionConf); 300 if (ret == false) { 301 resultedXml.append(strAction); 302 return false; 303 } 304 } 305 306 // Using latest() or future() in output-event is not intuitive. 307 // We need to make sure, this assumption is correct. 308 Element outputList = eAction.getChild("output-events", eAction.getNamespace()); 309 if (outputList != null) { 310 for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) { 311 if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) != null) { 312 throw new CommandException(ErrorCode.E1006, "coord:latest()/future()", 313 " not permitted in output-event "); 314 } 315 } 316 } 317 return true; 318 } 319 320 /** 321 * Resolve the list of data input paths 322 * 323 * @param eDataEvents the list of data input elements 324 * @param nominalTime action nominal time 325 * @param actualTime current time 326 * @param conf action configuration 327 * @return true if all unresolved URIs can be resolved 328 * @throws Exception thrown if failed to resolve data input paths 329 */ 330 @SuppressWarnings("unchecked") 331 private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime, 332 Configuration conf) throws Exception { 333 for (Element dEvent : eDataEvents) { 334 if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) == null) { 335 continue; 336 } 337 ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf); 338 String uresolvedInstance = dEvent.getChild("unresolved-instances", dEvent.getNamespace()).getTextTrim(); 339 String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR); 340 StringBuffer resolvedTmp = new StringBuffer(); 341 for (int i = 0; i < unresolvedList.length; i++) { 342 String ret = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]); 343 Boolean isResolved = (Boolean) eval.getVariable("is_resolved"); 344 if (isResolved == false) { 345 LOG.info("[" + actionId + "]::Cannot resolve: " + ret); 346 return false; 347 } 348 if (resolvedTmp.length() > 0) { 349 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR); 350 } 351 resolvedTmp.append((String) eval.getVariable("resolved_path")); 352 } 353 if (resolvedTmp.length() > 0) { 354 if (dEvent.getChild("uris", dEvent.getNamespace()) != null) { 355 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append( 356 dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim()); 357 dEvent.removeChild("uris", dEvent.getNamespace()); 358 } 359 Element uriInstance = new Element("uris", dEvent.getNamespace()); 360 uriInstance.addContent(resolvedTmp.toString()); 361 dEvent.getContent().add(1, uriInstance); 362 } 363 dEvent.removeChild("unresolved-instances", dEvent.getNamespace()); 364 } 365 366 return true; 367 } 368 369 /** 370 * Check all resolved URIs existence 371 * 372 * @param eAction action element 373 * @param existList the list of existed paths 374 * @param nonExistList the list of paths to check existence 375 * @param conf action configuration 376 * @return true if all nonExistList paths exist 377 * @throws IOException thrown if unable to access the path 378 */ 379 private boolean checkResolvedUris(Element eAction, StringBuilder existList, StringBuilder nonExistList, 380 Configuration conf) throws IOException { 381 LOG.info("[" + actionId + "]::ActionInputCheck:: In checkResolvedUris..."); 382 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 383 if (inputList != null) { 384 if (nonExistList.length() > 0) { 385 checkListOfPaths(existList, nonExistList, conf); 386 } 387 return nonExistList.length() == 0; 388 } 389 return true; 390 } 391 392 /** 393 * Check a list of non existed paths and add to exist list if it exists 394 * 395 * @param existList the list of existed paths 396 * @param nonExistList the list of paths to check existence 397 * @param conf action configuration 398 * @return true if all nonExistList paths exist 399 * @throws IOException thrown if unable to access the path 400 */ 401 private boolean checkListOfPaths(StringBuilder existList, StringBuilder nonExistList, Configuration conf) 402 throws IOException { 403 404 String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR); 405 if (uriList[0] != null) { 406 LOG.info("[" + actionId + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + " is Missing."); 407 } 408 409 nonExistList.delete(0, nonExistList.length()); 410 boolean allExists = true; 411 String existSeparator = "", nonExistSeparator = ""; 412 for (int i = 0; i < uriList.length; i++) { 413 if (allExists) { 414 allExists = pathExists(uriList[i], conf); 415 LOG.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists); 416 } 417 if (allExists) { 418 existList.append(existSeparator).append(uriList[i]); 419 existSeparator = CoordELFunctions.INSTANCE_SEPARATOR; 420 } 421 else { 422 nonExistList.append(nonExistSeparator).append(uriList[i]); 423 nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR; 424 } 425 } 426 return allExists; 427 } 428 429 /** 430 * Check if given path exists 431 * 432 * @param sPath uri path 433 * @param actionConf action configuration 434 * @return true if path exists 435 * @throws IOException thrown if unable to access the path 436 */ 437 protected boolean pathExists(String sPath, Configuration actionConf) throws IOException { 438 LOG.debug("checking for the file " + sPath); 439 Path path = new Path(sPath); 440 String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 441 try { 442 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 443 Configuration fsConf = has.createJobConf(path.toUri().getAuthority()); 444 return has.createFileSystem(user, path.toUri(), fsConf).exists(path); 445 } 446 catch (HadoopAccessorException e) { 447 coordAction.setErrorCode(e.getErrorCode().toString()); 448 coordAction.setErrorMessage(e.getMessage()); 449 throw new IOException(e); 450 } 451 } 452 453 /** 454 * The function create a list of URIs separated by "," using the instances time stamp and URI-template 455 * 456 * @param event : <data-in> event 457 * @param instances : List of time stamp seprated by "," 458 * @param unresolvedInstances : list of instance with latest/future function 459 * @return : list of URIs separated by ",". 460 * @throws Exception thrown if failed to create URIs from unresolvedInstances 461 */ 462 @SuppressWarnings("unused") 463 private String createURIs(Element event, String instances, StringBuilder unresolvedInstances) throws Exception { 464 if (instances == null || instances.length() == 0) { 465 return ""; 466 } 467 String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR); 468 StringBuilder uris = new StringBuilder(); 469 470 for (int i = 0; i < instanceList.length; i++) { 471 int funcType = CoordCommandUtils.getFuncType(instanceList[i]); 472 if (funcType == CoordCommandUtils.LATEST || funcType == CoordCommandUtils.FUTURE) { 473 if (unresolvedInstances.length() > 0) { 474 unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR); 475 } 476 unresolvedInstances.append(instanceList[i]); 477 continue; 478 } 479 ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]); 480 if (uris.length() > 0) { 481 uris.append(CoordELFunctions.INSTANCE_SEPARATOR); 482 } 483 uris.append(CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()).getChild( 484 "uri-template", event.getNamespace()).getTextTrim())); 485 } 486 return uris.toString(); 487 } 488 489 /** 490 * getting the error code of the coord action. (used mainly for unit testing) 491 */ 492 protected String getCoordActionErrorCode() { 493 if (coordAction != null) { 494 return coordAction.getErrorCode(); 495 } 496 return null; 497 } 498 499 /** 500 * getting the error message of the coord action. (used mainly for unit testing) 501 */ 502 protected String getCoordActionErrorMsg() { 503 if (coordAction != null) { 504 return coordAction.getErrorMessage(); 505 } 506 return null; 507 } 508 509 /* (non-Javadoc) 510 * @see org.apache.oozie.command.XCommand#getEntityKey() 511 */ 512 @Override 513 public String getEntityKey() { 514 return this.jobId; 515 } 516 517 /* (non-Javadoc) 518 * @see org.apache.oozie.command.XCommand#isLockRequired() 519 */ 520 @Override 521 protected boolean isLockRequired() { 522 return true; 523 } 524 525 /* (non-Javadoc) 526 * @see org.apache.oozie.command.XCommand#eagerLoadState() 527 */ 528 // TODO - why loadState() is being called from eagerLoadState(); 529 @Override 530 protected void eagerLoadState() throws CommandException { 531 loadState(); 532 } 533 534 /* (non-Javadoc) 535 * @see org.apache.oozie.command.XCommand#loadState() 536 */ 537 @Override 538 protected void loadState() throws CommandException { 539 if (jpaService == null) { 540 jpaService = Services.get().get(JPAService.class); 541 } 542 try { 543 coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId)); 544 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId())); 545 } 546 catch (JPAExecutorException je) { 547 throw new CommandException(je); 548 } 549 LogUtils.setLogInfo(coordAction, logInfo); 550 } 551 552 /* (non-Javadoc) 553 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 554 */ 555 @Override 556 protected void verifyPrecondition() throws CommandException, PreconditionException { 557 if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) { 558 throw new PreconditionException(ErrorCode.E1100, "[" + actionId 559 + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state=" 560 + coordAction.getStatus()); 561 } 562 563 // if eligible to do action input check when running with backward support is true 564 if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) { 565 return; 566 } 567 568 if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR && coordJob.getStatus() != Job.Status.PAUSED 569 && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) { 570 throw new PreconditionException( 571 ErrorCode.E1100, "["+ actionId + "]::CoordActionInputCheck:: Ignoring action." + 572 " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state=" 573 + coordJob.getStatus()); 574 } 575 } 576 577 /* (non-Javadoc) 578 * @see org.apache.oozie.command.XCommand#getKey() 579 */ 580 @Override 581 public String getKey(){ 582 return getName() + "_" + actionId; 583 } 584 585 }