001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.util.Date; 023 import java.util.List; 024 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.hadoop.fs.Path; 027 import org.apache.oozie.CoordinatorActionBean; 028 import org.apache.oozie.CoordinatorJobBean; 029 import org.apache.oozie.ErrorCode; 030 import org.apache.oozie.client.CoordinatorAction; 031 import org.apache.oozie.client.Job; 032 import org.apache.oozie.client.OozieClient; 033 import org.apache.oozie.command.CommandException; 034 import org.apache.oozie.command.PreconditionException; 035 import org.apache.oozie.coord.CoordELEvaluator; 036 import org.apache.oozie.coord.CoordELFunctions; 037 import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor; 038 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 039 import org.apache.oozie.executor.jpa.JPAExecutorException; 040 import org.apache.oozie.service.HadoopAccessorException; 041 import org.apache.oozie.service.HadoopAccessorService; 042 import org.apache.oozie.service.JPAService; 043 import org.apache.oozie.service.Service; 044 import org.apache.oozie.service.Services; 045 import org.apache.oozie.util.DateUtils; 046 import org.apache.oozie.util.ELEvaluator; 047 import org.apache.oozie.util.Instrumentation; 048 import org.apache.oozie.util.LogUtils; 049 import org.apache.oozie.util.ParamChecker; 050 import org.apache.oozie.util.StatusUtils; 051 import org.apache.oozie.util.XConfiguration; 052 import org.apache.oozie.util.XmlUtils; 053 import org.jdom.Element; 054 055 /** 056 * The command to check if an action's data input paths exist in the file system. 057 */ 058 public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> { 059 060 private final String actionId; 061 /** 062 * Property name of command re-queue interval for coordinator action input check in 063 * milliseconds. 064 */ 065 public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX 066 + "coord.input.check.requeue.interval"; 067 /** 068 * Default re-queue interval in ms. It is applied when no value defined in 069 * the oozie configuration. 070 */ 071 private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 60000; // 1 minute 072 private CoordinatorActionBean coordAction = null; 073 private CoordinatorJobBean coordJob = null; 074 private JPAService jpaService = null; 075 private String jobId = null; 076 077 public CoordActionInputCheckXCommand(String actionId, String jobId) { 078 super("coord_action_input", "coord_action_input", 1); 079 this.actionId = ParamChecker.notEmpty(actionId, "actionId"); 080 this.jobId = jobId; 081 } 082 083 /* (non-Javadoc) 084 * @see org.apache.oozie.command.XCommand#execute() 085 */ 086 @Override 087 protected Void execute() throws CommandException { 088 LOG.info("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state."); 089 090 // this action should only get processed if current time > nominal time; 091 // otherwise, requeue this action for delay execution; 092 Date nominalTime = coordAction.getNominalTime(); 093 Date currentTime = new Date(); 094 if (nominalTime.compareTo(currentTime) > 0) { 095 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), Math.max((nominalTime.getTime() - currentTime 096 .getTime()), getCoordInputCheckRequeueInterval())); 097 // update lastModifiedTime 098 coordAction.setLastModifiedTime(new Date()); 099 try { 100 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor(coordAction)); 101 } 102 catch (JPAExecutorException e) { 103 throw new CommandException(e); 104 } 105 LOG.info("[" + actionId 106 + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current=" 107 + currentTime + ", nominal=" + nominalTime); 108 109 return null; 110 } 111 112 StringBuilder actionXml = new StringBuilder(coordAction.getActionXml()); 113 Instrumentation.Cron cron = new Instrumentation.Cron(); 114 try { 115 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); 116 cron.start(); 117 StringBuilder existList = new StringBuilder(); 118 StringBuilder nonExistList = new StringBuilder(); 119 StringBuilder nonResolvedList = new StringBuilder(); 120 String firstMissingDependency = ""; 121 CoordCommandUtils.getResolvedList(coordAction.getMissingDependencies(), nonExistList, nonResolvedList); 122 123 // For clarity regarding which is the missing dependency in synchronous order 124 // instead of printing entire list, some of which, may be available 125 if(nonExistList.length() > 0) { 126 firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0]; 127 } 128 LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + firstMissingDependency + " " 129 + nonResolvedList.toString()); 130 // Updating the list of data dependencies that are available and those that are yet not 131 boolean status = checkInput(actionXml, existList, nonExistList, actionConf); 132 coordAction.setLastModifiedTime(currentTime); 133 coordAction.setActionXml(actionXml.toString()); 134 if (nonResolvedList.length() > 0 && status == false) { 135 nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList); 136 } 137 coordAction.setMissingDependencies(nonExistList.toString()); 138 if (status == true) { 139 coordAction.setStatus(CoordinatorAction.Status.READY); 140 // pass jobID to the CoordActionReadyXCommand 141 queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100); 142 } 143 else { 144 long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction 145 .getCreatedTime().getTime())) 146 / (60 * 1000); 147 int timeOut = coordAction.getTimeOut(); 148 if ((timeOut >= 0) && (waitingTime > timeOut)) { 149 queue(new CoordActionTimeOutXCommand(coordAction), 100); 150 } 151 else { 152 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), getCoordInputCheckRequeueInterval()); 153 } 154 } 155 coordAction.setLastModifiedTime(new Date()); 156 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor(coordAction)); 157 } 158 catch (Exception e) { 159 throw new CommandException(ErrorCode.E1021, e.getMessage(), e); 160 } 161 cron.stop(); 162 163 return null; 164 } 165 166 /** 167 * This function reads the value of re-queue interval for coordinator input 168 * check command from the Oozie configuration provided by Configuration 169 * Service. If nothing defined in the configuration, it uses the code 170 * specified default value. 171 * 172 * @return re-queue interval in ms 173 */ 174 public long getCoordInputCheckRequeueInterval() { 175 long requeueInterval = Services.get().getConf().getLong(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL, 176 DEFAULT_COMMAND_REQUEUE_INTERVAL); 177 return requeueInterval; 178 } 179 180 /** 181 * To check the list of input paths if all of them exist 182 * 183 * @param actionXml action xml 184 * @param existList the list of existed paths 185 * @param nonExistList the list of non existed paths 186 * @param conf action configuration 187 * @return true if all input paths are existed 188 * @throws Exception thrown of unable to check input path 189 */ 190 protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList, 191 Configuration conf) throws Exception { 192 Element eAction = XmlUtils.parseXml(actionXml.toString()); 193 boolean allExist = checkResolvedUris(eAction, existList, nonExistList, conf); 194 if (allExist) { 195 LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future"); 196 allExist = checkUnresolvedInstances(eAction, conf); 197 } 198 if (allExist == true) { 199 materializeDataProperties(eAction, conf); 200 actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString()); 201 } 202 return allExist; 203 } 204 205 /** 206 * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list 207 * of files that will be needed. 208 * 209 * @param eAction action element 210 * @param conf action configuration 211 * @throws Exception thrown if failed to resolve data properties 212 * @update modify 'Action' element with appropriate list of files. 213 */ 214 @SuppressWarnings("unchecked") 215 private void materializeDataProperties(Element eAction, Configuration conf) throws Exception { 216 ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, conf, actionId); 217 Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow", 218 eAction.getNamespace()).getChild("configuration", eAction.getNamespace()); 219 if (configElem != null) { 220 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) { 221 resolveTagContents("value", propElem, eval); 222 } 223 } 224 } 225 226 /** 227 * To resolve property value which contains el functions 228 * 229 * @param tagName tag name 230 * @param elem the child element of "property" element 231 * @param eval el functions evaluator 232 * @throws Exception thrown if unable to resolve tag value 233 */ 234 private void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception { 235 if (elem == null) { 236 return; 237 } 238 Element tagElem = elem.getChild(tagName, elem.getNamespace()); 239 if (tagElem != null) { 240 String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText()); 241 tagElem.removeContent(); 242 tagElem.addContent(updated); 243 } 244 else { 245 LOG.warn(" Value NOT FOUND " + tagName); 246 } 247 } 248 249 /** 250 * Check if any unsolved paths under data output. Resolve the unresolved data input paths. 251 * 252 * @param eAction action element 253 * @param actionConf action configuration 254 * @return true if successful to resolve input and output paths 255 * @throws Exception thrown if failed to resolve data input and output paths 256 */ 257 @SuppressWarnings("unchecked") 258 private boolean checkUnresolvedInstances(Element eAction, Configuration actionConf) throws Exception { 259 String strAction = XmlUtils.prettyPrint(eAction).toString(); 260 Date nominalTime = DateUtils.parseDateOozieTZ(eAction.getAttributeValue("action-nominal-time")); 261 String actualTimeStr = eAction.getAttributeValue("action-actual-time"); 262 Date actualTime = null; 263 if (actualTimeStr == null) { 264 LOG.debug("Unable to get action-actual-time from action xml, this job is submitted " + 265 "from previous version. Assign current date to actual time, action = " + actionId); 266 actualTime = new Date(); 267 } else { 268 actualTime = DateUtils.parseDateOozieTZ(actualTimeStr); 269 } 270 271 StringBuffer resultedXml = new StringBuffer(); 272 273 boolean ret; 274 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 275 if (inputList != null) { 276 ret = materializeUnresolvedEvent(inputList.getChildren("data-in", eAction.getNamespace()), nominalTime, 277 actualTime, actionConf); 278 if (ret == false) { 279 resultedXml.append(strAction); 280 return false; 281 } 282 } 283 284 // Using latest() or future() in output-event is not intuitive. 285 // We need to make sure, this assumption is correct. 286 Element outputList = eAction.getChild("output-events", eAction.getNamespace()); 287 if (outputList != null) { 288 for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) { 289 if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) != null) { 290 throw new CommandException(ErrorCode.E1006, "coord:latest()/future()", 291 " not permitted in output-event "); 292 } 293 } 294 } 295 return true; 296 } 297 298 /** 299 * Resolve the list of data input paths 300 * 301 * @param eDataEvents the list of data input elements 302 * @param nominalTime action nominal time 303 * @param actualTime current time 304 * @param conf action configuration 305 * @return true if all unresolved URIs can be resolved 306 * @throws Exception thrown if failed to resolve data input paths 307 */ 308 @SuppressWarnings("unchecked") 309 private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime, 310 Configuration conf) throws Exception { 311 for (Element dEvent : eDataEvents) { 312 if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) == null) { 313 continue; 314 } 315 ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf); 316 String uresolvedInstance = dEvent.getChild("unresolved-instances", dEvent.getNamespace()).getTextTrim(); 317 String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR); 318 StringBuffer resolvedTmp = new StringBuffer(); 319 for (int i = 0; i < unresolvedList.length; i++) { 320 String ret = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]); 321 Boolean isResolved = (Boolean) eval.getVariable("is_resolved"); 322 if (isResolved == false) { 323 LOG.info("[" + actionId + "]::Cannot resolve: " + ret); 324 return false; 325 } 326 if (resolvedTmp.length() > 0) { 327 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR); 328 } 329 resolvedTmp.append((String) eval.getVariable("resolved_path")); 330 } 331 if (resolvedTmp.length() > 0) { 332 if (dEvent.getChild("uris", dEvent.getNamespace()) != null) { 333 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append( 334 dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim()); 335 dEvent.removeChild("uris", dEvent.getNamespace()); 336 } 337 Element uriInstance = new Element("uris", dEvent.getNamespace()); 338 uriInstance.addContent(resolvedTmp.toString()); 339 dEvent.getContent().add(1, uriInstance); 340 } 341 dEvent.removeChild("unresolved-instances", dEvent.getNamespace()); 342 } 343 344 return true; 345 } 346 347 /** 348 * Check all resolved URIs existence 349 * 350 * @param eAction action element 351 * @param existList the list of existed paths 352 * @param nonExistList the list of paths to check existence 353 * @param conf action configuration 354 * @return true if all nonExistList paths exist 355 * @throws IOException thrown if unable to access the path 356 */ 357 private boolean checkResolvedUris(Element eAction, StringBuilder existList, StringBuilder nonExistList, 358 Configuration conf) throws IOException { 359 LOG.info("[" + actionId + "]::ActionInputCheck:: In checkResolvedUris..."); 360 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 361 if (inputList != null) { 362 if (nonExistList.length() > 0) { 363 checkListOfPaths(existList, nonExistList, conf); 364 } 365 return nonExistList.length() == 0; 366 } 367 return true; 368 } 369 370 /** 371 * Check a list of non existed paths and add to exist list if it exists 372 * 373 * @param existList the list of existed paths 374 * @param nonExistList the list of paths to check existence 375 * @param conf action configuration 376 * @return true if all nonExistList paths exist 377 * @throws IOException thrown if unable to access the path 378 */ 379 private boolean checkListOfPaths(StringBuilder existList, StringBuilder nonExistList, Configuration conf) 380 throws IOException { 381 382 String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR); 383 if (uriList[0] != null) { 384 LOG.info("[" + actionId + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + " is Missing."); 385 } 386 387 nonExistList.delete(0, nonExistList.length()); 388 boolean allExists = true; 389 String existSeparator = "", nonExistSeparator = ""; 390 for (int i = 0; i < uriList.length; i++) { 391 if (allExists) { 392 allExists = pathExists(uriList[i], conf); 393 LOG.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists); 394 } 395 if (allExists) { 396 existList.append(existSeparator).append(uriList[i]); 397 existSeparator = CoordELFunctions.INSTANCE_SEPARATOR; 398 } 399 else { 400 nonExistList.append(nonExistSeparator).append(uriList[i]); 401 nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR; 402 } 403 } 404 return allExists; 405 } 406 407 /** 408 * Check if given path exists 409 * 410 * @param sPath uri path 411 * @param actionConf action configuration 412 * @return true if path exists 413 * @throws IOException thrown if unable to access the path 414 */ 415 private boolean pathExists(String sPath, Configuration actionConf) throws IOException { 416 LOG.debug("checking for the file " + sPath); 417 Path path = new Path(sPath); 418 String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 419 try { 420 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 421 Configuration fsConf = has.createJobConf(path.toUri().getAuthority()); 422 return has.createFileSystem(user, path.toUri(), fsConf).exists(path); 423 } 424 catch (HadoopAccessorException e) { 425 throw new IOException(e); 426 } 427 } 428 429 /** 430 * The function create a list of URIs separated by "," using the instances time stamp and URI-template 431 * 432 * @param event : <data-in> event 433 * @param instances : List of time stamp seprated by "," 434 * @param unresolvedInstances : list of instance with latest/future function 435 * @return : list of URIs separated by ",". 436 * @throws Exception thrown if failed to create URIs from unresolvedInstances 437 */ 438 @SuppressWarnings("unused") 439 private String createURIs(Element event, String instances, StringBuilder unresolvedInstances) throws Exception { 440 if (instances == null || instances.length() == 0) { 441 return ""; 442 } 443 String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR); 444 StringBuilder uris = new StringBuilder(); 445 446 for (int i = 0; i < instanceList.length; i++) { 447 int funcType = CoordCommandUtils.getFuncType(instanceList[i]); 448 if (funcType == CoordCommandUtils.LATEST || funcType == CoordCommandUtils.FUTURE) { 449 if (unresolvedInstances.length() > 0) { 450 unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR); 451 } 452 unresolvedInstances.append(instanceList[i]); 453 continue; 454 } 455 ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]); 456 if (uris.length() > 0) { 457 uris.append(CoordELFunctions.INSTANCE_SEPARATOR); 458 } 459 uris.append(CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()).getChild( 460 "uri-template", event.getNamespace()).getTextTrim())); 461 } 462 return uris.toString(); 463 } 464 465 /* (non-Javadoc) 466 * @see org.apache.oozie.command.XCommand#getEntityKey() 467 */ 468 @Override 469 public String getEntityKey() { 470 return this.jobId; 471 } 472 473 /* (non-Javadoc) 474 * @see org.apache.oozie.command.XCommand#isLockRequired() 475 */ 476 @Override 477 protected boolean isLockRequired() { 478 return true; 479 } 480 481 /* (non-Javadoc) 482 * @see org.apache.oozie.command.XCommand#eagerLoadState() 483 */ 484 @Override 485 protected void eagerLoadState() throws CommandException { 486 loadState(); 487 } 488 489 /* (non-Javadoc) 490 * @see org.apache.oozie.command.XCommand#loadState() 491 */ 492 @Override 493 protected void loadState() throws CommandException { 494 if (jpaService == null) { 495 jpaService = Services.get().get(JPAService.class); 496 } 497 try { 498 coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId)); 499 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId())); 500 } 501 catch (JPAExecutorException je) { 502 throw new CommandException(je); 503 } 504 LogUtils.setLogInfo(coordAction, logInfo); 505 } 506 507 /* (non-Javadoc) 508 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 509 */ 510 @Override 511 protected void verifyPrecondition() throws CommandException, PreconditionException { 512 if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) { 513 throw new PreconditionException(ErrorCode.E1100, "[" + actionId 514 + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state=" 515 + coordAction.getStatus()); 516 } 517 518 // if eligible to do action input check when running with backward support is true 519 if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) { 520 return; 521 } 522 523 if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR && coordJob.getStatus() != Job.Status.PAUSED 524 && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) { 525 throw new PreconditionException( 526 ErrorCode.E1100, "["+ actionId + "]::CoordActionInputCheck:: Ignoring action." + 527 " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state=" 528 + coordJob.getStatus()); 529 } 530 } 531 532 /* (non-Javadoc) 533 * @see org.apache.oozie.command.XCommand#getKey() 534 */ 535 @Override 536 public String getKey(){ 537 return getName() + "_" + actionId; 538 } 539 540 }