001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.util.Date; 023 import java.util.List; 024 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.hadoop.fs.Path; 027 import org.apache.oozie.CoordinatorActionBean; 028 import org.apache.oozie.CoordinatorJobBean; 029 import org.apache.oozie.ErrorCode; 030 import org.apache.oozie.client.CoordinatorAction; 031 import org.apache.oozie.client.Job; 032 import org.apache.oozie.client.OozieClient; 033 import org.apache.oozie.command.CommandException; 034 import org.apache.oozie.command.PreconditionException; 035 import org.apache.oozie.coord.CoordELEvaluator; 036 import org.apache.oozie.coord.CoordELFunctions; 037 import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor; 038 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 039 import org.apache.oozie.executor.jpa.JPAExecutorException; 040 import org.apache.oozie.service.HadoopAccessorException; 041 import org.apache.oozie.service.HadoopAccessorService; 042 import org.apache.oozie.service.JPAService; 043 import org.apache.oozie.service.Service; 044 import org.apache.oozie.service.Services; 045 import org.apache.oozie.util.DateUtils; 046 import org.apache.oozie.util.ELEvaluator; 047 import org.apache.oozie.util.Instrumentation; 048 import org.apache.oozie.util.LogUtils; 049 import org.apache.oozie.util.ParamChecker; 050 import org.apache.oozie.util.StatusUtils; 051 import org.apache.oozie.util.XConfiguration; 052 import org.apache.oozie.util.XmlUtils; 053 import org.jdom.Element; 054 055 /** 056 * The command to check if an action's data input paths exist in the file system. 057 */ 058 public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> { 059 060 private final String actionId; 061 /** 062 * Property name of command re-queue interval for coordinator action input check in 063 * milliseconds. 064 */ 065 public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX 066 + "coord.input.check.requeue.interval"; 067 /** 068 * Default re-queue interval in ms. It is applied when no value defined in 069 * the oozie configuration. 070 */ 071 private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 60000; // 1 minute 072 private CoordinatorActionBean coordAction = null; 073 private CoordinatorJobBean coordJob = null; 074 private JPAService jpaService = null; 075 private String jobId = null; 076 077 public CoordActionInputCheckXCommand(String actionId, String jobId) { 078 super("coord_action_input", "coord_action_input", 1); 079 this.actionId = ParamChecker.notEmpty(actionId, "actionId"); 080 this.jobId = jobId; 081 } 082 083 /* (non-Javadoc) 084 * @see org.apache.oozie.command.XCommand#execute() 085 */ 086 @Override 087 protected Void execute() throws CommandException { 088 LOG.info("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state."); 089 090 // this action should only get processed if current time > nominal time; 091 // otherwise, requeue this action for delay execution; 092 Date nominalTime = coordAction.getNominalTime(); 093 Date currentTime = new Date(); 094 if (nominalTime.compareTo(currentTime) > 0) { 095 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), Math.max((nominalTime.getTime() - currentTime 096 .getTime()), getCoordInputCheckRequeueInterval())); 097 // update lastModifiedTime 098 coordAction.setLastModifiedTime(new Date()); 099 try { 100 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor(coordAction)); 101 } 102 catch (JPAExecutorException e) { 103 throw new CommandException(e); 104 } 105 LOG.info("[" + actionId 106 + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current=" 107 + currentTime + ", nominal=" + nominalTime); 108 109 return null; 110 } 111 112 StringBuilder actionXml = new StringBuilder(coordAction.getActionXml()); 113 Instrumentation.Cron cron = new Instrumentation.Cron(); 114 try { 115 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); 116 cron.start(); 117 StringBuilder existList = new StringBuilder(); 118 StringBuilder nonExistList = new StringBuilder(); 119 StringBuilder nonResolvedList = new StringBuilder(); 120 CoordCommandUtils.getResolvedList(coordAction.getMissingDependencies(), nonExistList, nonResolvedList); 121 122 LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + nonExistList.toString() + " " 123 + nonResolvedList.toString()); 124 boolean status = checkInput(actionXml, existList, nonExistList, actionConf); 125 coordAction.setLastModifiedTime(currentTime); 126 coordAction.setActionXml(actionXml.toString()); 127 if (nonResolvedList.length() > 0 && status == false) { 128 nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList); 129 } 130 coordAction.setMissingDependencies(nonExistList.toString()); 131 if (status == true) { 132 coordAction.setStatus(CoordinatorAction.Status.READY); 133 // pass jobID to the CoordActionReadyXCommand 134 queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100); 135 } 136 else { 137 long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction 138 .getCreatedTime().getTime())) 139 / (60 * 1000); 140 int timeOut = coordAction.getTimeOut(); 141 if ((timeOut >= 0) && (waitingTime > timeOut)) { 142 queue(new CoordActionTimeOutXCommand(coordAction), 100); 143 } 144 else { 145 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), getCoordInputCheckRequeueInterval()); 146 } 147 } 148 coordAction.setLastModifiedTime(new Date()); 149 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor(coordAction)); 150 } 151 catch (Exception e) { 152 throw new CommandException(ErrorCode.E1021, e.getMessage(), e); 153 } 154 cron.stop(); 155 156 return null; 157 } 158 159 /** 160 * This function reads the value of re-queue interval for coordinator input 161 * check command from the Oozie configuration provided by Configuration 162 * Service. If nothing defined in the configuration, it uses the code 163 * specified default value. 164 * 165 * @return re-queue interval in ms 166 */ 167 public long getCoordInputCheckRequeueInterval() { 168 long requeueInterval = Services.get().getConf().getLong(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL, 169 DEFAULT_COMMAND_REQUEUE_INTERVAL); 170 return requeueInterval; 171 } 172 173 /** 174 * To check the list of input paths if all of them exist 175 * 176 * @param actionXml action xml 177 * @param existList the list of existed paths 178 * @param nonExistList the list of non existed paths 179 * @param conf action configuration 180 * @return true if all input paths are existed 181 * @throws Exception thrown of unable to check input path 182 */ 183 protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList, 184 Configuration conf) throws Exception { 185 Element eAction = XmlUtils.parseXml(actionXml.toString()); 186 boolean allExist = checkResolvedUris(eAction, existList, nonExistList, conf); 187 if (allExist) { 188 LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future"); 189 allExist = checkUnresolvedInstances(eAction, conf); 190 } 191 if (allExist == true) { 192 materializeDataProperties(eAction, conf); 193 actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString()); 194 } 195 return allExist; 196 } 197 198 /** 199 * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list 200 * of files that will be needed. 201 * 202 * @param eAction action element 203 * @param conf action configuration 204 * @throws Exception thrown if failed to resolve data properties 205 * @update modify 'Action' element with appropriate list of files. 206 */ 207 @SuppressWarnings("unchecked") 208 private void materializeDataProperties(Element eAction, Configuration conf) throws Exception { 209 ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, conf, actionId); 210 Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow", 211 eAction.getNamespace()).getChild("configuration", eAction.getNamespace()); 212 if (configElem != null) { 213 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) { 214 resolveTagContents("value", propElem, eval); 215 } 216 } 217 } 218 219 /** 220 * To resolve property value which contains el functions 221 * 222 * @param tagName tag name 223 * @param elem the child element of "property" element 224 * @param eval el functions evaluator 225 * @throws Exception thrown if unable to resolve tag value 226 */ 227 private void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception { 228 if (elem == null) { 229 return; 230 } 231 Element tagElem = elem.getChild(tagName, elem.getNamespace()); 232 if (tagElem != null) { 233 String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText()); 234 tagElem.removeContent(); 235 tagElem.addContent(updated); 236 } 237 else { 238 LOG.warn(" Value NOT FOUND " + tagName); 239 } 240 } 241 242 /** 243 * Check if any unsolved paths under data output. Resolve the unresolved data input paths. 244 * 245 * @param eAction action element 246 * @param actionConf action configuration 247 * @return true if successful to resolve input and output paths 248 * @throws Exception thrown if failed to resolve data input and output paths 249 */ 250 @SuppressWarnings("unchecked") 251 private boolean checkUnresolvedInstances(Element eAction, Configuration actionConf) throws Exception { 252 String strAction = XmlUtils.prettyPrint(eAction).toString(); 253 Date nominalTime = DateUtils.parseDateUTC(eAction.getAttributeValue("action-nominal-time")); 254 String actualTimeStr = eAction.getAttributeValue("action-actual-time"); 255 Date actualTime = null; 256 if (actualTimeStr == null) { 257 LOG.debug("Unable to get action-actual-time from action xml, this job is submitted " + 258 "from previous version. Assign current date to actual time, action = " + actionId); 259 actualTime = new Date(); 260 } else { 261 actualTime = DateUtils.parseDateUTC(actualTimeStr); 262 } 263 264 StringBuffer resultedXml = new StringBuffer(); 265 266 boolean ret; 267 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 268 if (inputList != null) { 269 ret = materializeUnresolvedEvent(inputList.getChildren("data-in", eAction.getNamespace()), nominalTime, 270 actualTime, actionConf); 271 if (ret == false) { 272 resultedXml.append(strAction); 273 return false; 274 } 275 } 276 277 // Using latest() or future() in output-event is not intuitive. 278 // We need to make sure, this assumption is correct. 279 Element outputList = eAction.getChild("output-events", eAction.getNamespace()); 280 if (outputList != null) { 281 for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) { 282 if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) != null) { 283 throw new CommandException(ErrorCode.E1006, "coord:latest()/future()", 284 " not permitted in output-event "); 285 } 286 } 287 } 288 return true; 289 } 290 291 /** 292 * Resolve the list of data input paths 293 * 294 * @param eDataEvents the list of data input elements 295 * @param nominalTime action nominal time 296 * @param actualTime current time 297 * @param conf action configuration 298 * @return true if all unresolved URIs can be resolved 299 * @throws Exception thrown if failed to resolve data input paths 300 */ 301 @SuppressWarnings("unchecked") 302 private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime, 303 Configuration conf) throws Exception { 304 for (Element dEvent : eDataEvents) { 305 if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) == null) { 306 continue; 307 } 308 ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf); 309 String uresolvedInstance = dEvent.getChild("unresolved-instances", dEvent.getNamespace()).getTextTrim(); 310 String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR); 311 StringBuffer resolvedTmp = new StringBuffer(); 312 for (int i = 0; i < unresolvedList.length; i++) { 313 String ret = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]); 314 Boolean isResolved = (Boolean) eval.getVariable("is_resolved"); 315 if (isResolved == false) { 316 LOG.info("[" + actionId + "]::Cannot resolve: " + ret); 317 return false; 318 } 319 if (resolvedTmp.length() > 0) { 320 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR); 321 } 322 resolvedTmp.append((String) eval.getVariable("resolved_path")); 323 } 324 if (resolvedTmp.length() > 0) { 325 if (dEvent.getChild("uris", dEvent.getNamespace()) != null) { 326 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append( 327 dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim()); 328 dEvent.removeChild("uris", dEvent.getNamespace()); 329 } 330 Element uriInstance = new Element("uris", dEvent.getNamespace()); 331 uriInstance.addContent(resolvedTmp.toString()); 332 dEvent.getContent().add(1, uriInstance); 333 } 334 dEvent.removeChild("unresolved-instances", dEvent.getNamespace()); 335 } 336 337 return true; 338 } 339 340 /** 341 * Check all resolved URIs existence 342 * 343 * @param eAction action element 344 * @param existList the list of existed paths 345 * @param nonExistList the list of paths to check existence 346 * @param conf action configuration 347 * @return true if all nonExistList paths exist 348 * @throws IOException thrown if unable to access the path 349 */ 350 private boolean checkResolvedUris(Element eAction, StringBuilder existList, StringBuilder nonExistList, 351 Configuration conf) throws IOException { 352 LOG.info("[" + actionId + "]::ActionInputCheck:: In checkResolvedUris..."); 353 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 354 if (inputList != null) { 355 if (nonExistList.length() > 0) { 356 checkListOfPaths(existList, nonExistList, conf); 357 } 358 return nonExistList.length() == 0; 359 } 360 return true; 361 } 362 363 /** 364 * Check a list of non existed paths and add to exist list if it exists 365 * 366 * @param existList the list of existed paths 367 * @param nonExistList the list of paths to check existence 368 * @param conf action configuration 369 * @return true if all nonExistList paths exist 370 * @throws IOException thrown if unable to access the path 371 */ 372 private boolean checkListOfPaths(StringBuilder existList, StringBuilder nonExistList, Configuration conf) 373 throws IOException { 374 375 String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR); 376 if (uriList[0] != null) { 377 LOG.info("[" + actionId + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + " is Missing."); 378 } 379 380 nonExistList.delete(0, nonExistList.length()); 381 boolean allExists = true; 382 String existSeparator = "", nonExistSeparator = ""; 383 for (int i = 0; i < uriList.length; i++) { 384 if (allExists) { 385 allExists = pathExists(uriList[i], conf); 386 LOG.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists); 387 } 388 if (allExists) { 389 existList.append(existSeparator).append(uriList[i]); 390 existSeparator = CoordELFunctions.INSTANCE_SEPARATOR; 391 } 392 else { 393 nonExistList.append(nonExistSeparator).append(uriList[i]); 394 nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR; 395 } 396 } 397 return allExists; 398 } 399 400 /** 401 * Check if given path exists 402 * 403 * @param sPath uri path 404 * @param actionConf action configuration 405 * @return true if path exists 406 * @throws IOException thrown if unable to access the path 407 */ 408 private boolean pathExists(String sPath, Configuration actionConf) throws IOException { 409 LOG.debug("checking for the file " + sPath); 410 Path path = new Path(sPath); 411 String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 412 try { 413 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 414 Configuration fsConf = has.createJobConf(path.toUri().getAuthority()); 415 return has.createFileSystem(user, path.toUri(), fsConf).exists(path); 416 } 417 catch (HadoopAccessorException e) { 418 throw new IOException(e); 419 } 420 } 421 422 /** 423 * The function create a list of URIs separated by "," using the instances time stamp and URI-template 424 * 425 * @param event : <data-in> event 426 * @param instances : List of time stamp seprated by "," 427 * @param unresolvedInstances : list of instance with latest/future function 428 * @return : list of URIs separated by ",". 429 * @throws Exception thrown if failed to create URIs from unresolvedInstances 430 */ 431 @SuppressWarnings("unused") 432 private String createURIs(Element event, String instances, StringBuilder unresolvedInstances) throws Exception { 433 if (instances == null || instances.length() == 0) { 434 return ""; 435 } 436 String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR); 437 StringBuilder uris = new StringBuilder(); 438 439 for (int i = 0; i < instanceList.length; i++) { 440 int funcType = CoordCommandUtils.getFuncType(instanceList[i]); 441 if (funcType == CoordCommandUtils.LATEST || funcType == CoordCommandUtils.FUTURE) { 442 if (unresolvedInstances.length() > 0) { 443 unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR); 444 } 445 unresolvedInstances.append(instanceList[i]); 446 continue; 447 } 448 ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]); 449 if (uris.length() > 0) { 450 uris.append(CoordELFunctions.INSTANCE_SEPARATOR); 451 } 452 uris.append(CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()).getChild( 453 "uri-template", event.getNamespace()).getTextTrim())); 454 } 455 return uris.toString(); 456 } 457 458 /* (non-Javadoc) 459 * @see org.apache.oozie.command.XCommand#getEntityKey() 460 */ 461 @Override 462 public String getEntityKey() { 463 return this.jobId; 464 } 465 466 /* (non-Javadoc) 467 * @see org.apache.oozie.command.XCommand#isLockRequired() 468 */ 469 @Override 470 protected boolean isLockRequired() { 471 return true; 472 } 473 474 /* (non-Javadoc) 475 * @see org.apache.oozie.command.XCommand#eagerLoadState() 476 */ 477 @Override 478 protected void eagerLoadState() throws CommandException { 479 loadState(); 480 } 481 482 /* (non-Javadoc) 483 * @see org.apache.oozie.command.XCommand#loadState() 484 */ 485 @Override 486 protected void loadState() throws CommandException { 487 if (jpaService == null) { 488 jpaService = Services.get().get(JPAService.class); 489 } 490 try { 491 coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId)); 492 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId())); 493 } 494 catch (JPAExecutorException je) { 495 throw new CommandException(je); 496 } 497 LogUtils.setLogInfo(coordAction, logInfo); 498 } 499 500 /* (non-Javadoc) 501 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 502 */ 503 @Override 504 protected void verifyPrecondition() throws CommandException, PreconditionException { 505 if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) { 506 throw new PreconditionException(ErrorCode.E1100, "[" + actionId 507 + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state=" 508 + coordAction.getStatus()); 509 } 510 511 // if eligible to do action input check when running with backward support is true 512 if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) { 513 return; 514 } 515 516 if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.PAUSED 517 && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) { 518 throw new PreconditionException( 519 ErrorCode.E1100, "["+ actionId + "]::CoordActionInputCheck:: Ignoring action." + 520 " Coordinator job is not in RUNNING/PAUSED/PAUSEDWITHERROR state, but state=" 521 + coordJob.getStatus()); 522 } 523 } 524 525 /* (non-Javadoc) 526 * @see org.apache.oozie.command.XCommand#getKey() 527 */ 528 @Override 529 public String getKey(){ 530 return getName() + "_" + actionId; 531 } 532 533 }