001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.util.Date; 023 import java.util.List; 024 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.hadoop.fs.Path; 027 import org.apache.oozie.CoordinatorActionBean; 028 import org.apache.oozie.CoordinatorJobBean; 029 import org.apache.oozie.ErrorCode; 030 import org.apache.oozie.client.CoordinatorAction; 031 import org.apache.oozie.client.Job; 032 import org.apache.oozie.client.OozieClient; 033 import org.apache.oozie.command.CommandException; 034 import org.apache.oozie.command.PreconditionException; 035 import org.apache.oozie.coord.CoordELEvaluator; 036 import org.apache.oozie.coord.CoordELFunctions; 037 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; 038 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 039 import org.apache.oozie.executor.jpa.JPAExecutorException; 040 import org.apache.oozie.service.HadoopAccessorException; 041 import org.apache.oozie.service.HadoopAccessorService; 042 import org.apache.oozie.service.JPAService; 043 import org.apache.oozie.service.Services; 044 import org.apache.oozie.util.DateUtils; 045 import org.apache.oozie.util.ELEvaluator; 046 import org.apache.oozie.util.Instrumentation; 047 import org.apache.oozie.util.LogUtils; 048 import org.apache.oozie.util.ParamChecker; 049 import org.apache.oozie.util.StatusUtils; 050 import org.apache.oozie.util.XConfiguration; 051 import org.apache.oozie.util.XmlUtils; 052 import org.jdom.Element; 053 054 /** 055 * The command to check if an action's data input paths exist in the file system. 056 */ 057 public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> { 058 059 private final String actionId; 060 private final int COMMAND_REQUEUE_INTERVAL = 60000; // 1 minute 061 private CoordinatorActionBean coordAction = null; 062 private CoordinatorJobBean coordJob = null; 063 private JPAService jpaService = null; 064 065 public CoordActionInputCheckXCommand(String actionId) { 066 super("coord_action_input", "coord_action_input", 1); 067 this.actionId = ParamChecker.notEmpty(actionId, "actionId"); 068 } 069 070 /* (non-Javadoc) 071 * @see org.apache.oozie.command.XCommand#execute() 072 */ 073 @Override 074 protected Void execute() throws CommandException { 075 LOG.info("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state."); 076 077 // this action should only get processed if current time > nominal time; 078 // otherwise, requeue this action for delay execution; 079 Date nominalTime = coordAction.getNominalTime(); 080 Date currentTime = new Date(); 081 if (nominalTime.compareTo(currentTime) > 0) { 082 queue(new CoordActionInputCheckXCommand(coordAction.getId()), Math.max( 083 (nominalTime.getTime() - currentTime.getTime()), COMMAND_REQUEUE_INTERVAL)); 084 // update lastModifiedTime 085 coordAction.setLastModifiedTime(new Date()); 086 try { 087 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor(coordAction)); 088 } 089 catch (JPAExecutorException e) { 090 throw new CommandException(e); 091 } 092 LOG.info("[" + actionId 093 + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current=" 094 + currentTime + ", nominal=" + nominalTime); 095 096 return null; 097 } 098 099 StringBuilder actionXml = new StringBuilder(coordAction.getActionXml()); 100 Instrumentation.Cron cron = new Instrumentation.Cron(); 101 try { 102 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); 103 cron.start(); 104 StringBuilder existList = new StringBuilder(); 105 StringBuilder nonExistList = new StringBuilder(); 106 StringBuilder nonResolvedList = new StringBuilder(); 107 CoordCommandUtils.getResolvedList(coordAction.getMissingDependencies(), nonExistList, nonResolvedList); 108 109 LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + nonExistList.toString() + " " 110 + nonResolvedList.toString()); 111 boolean status = checkInput(actionXml, existList, nonExistList, actionConf); 112 coordAction.setLastModifiedTime(currentTime); 113 coordAction.setActionXml(actionXml.toString()); 114 if (nonResolvedList.length() > 0 && status == false) { 115 nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList); 116 } 117 coordAction.setMissingDependencies(nonExistList.toString()); 118 if (status == true) { 119 coordAction.setStatus(CoordinatorAction.Status.READY); 120 // pass jobID to the CoordActionReadyXCommand 121 queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100); 122 } 123 else { 124 long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction 125 .getCreatedTime().getTime())) 126 / (60 * 1000); 127 int timeOut = coordAction.getTimeOut(); 128 if ((timeOut >= 0) && (waitingTime > timeOut)) { 129 queue(new CoordActionTimeOutXCommand(coordAction), 100); 130 } 131 else { 132 queue(new CoordActionInputCheckXCommand(coordAction.getId()), COMMAND_REQUEUE_INTERVAL); 133 } 134 } 135 coordAction.setLastModifiedTime(new Date()); 136 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor(coordAction)); 137 } 138 catch (Exception e) { 139 throw new CommandException(ErrorCode.E1021, e.getMessage(), e); 140 } 141 cron.stop(); 142 143 return null; 144 } 145 146 /** 147 * To check the list of input paths if all of them exist 148 * 149 * @param actionXml action xml 150 * @param existList the list of existed paths 151 * @param nonExistList the list of non existed paths 152 * @param conf action configuration 153 * @return true if all input paths are existed 154 * @throws Exception thrown of unable to check input path 155 */ 156 protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList, 157 Configuration conf) throws Exception { 158 Element eAction = XmlUtils.parseXml(actionXml.toString()); 159 boolean allExist = checkResolvedUris(eAction, existList, nonExistList, conf); 160 if (allExist) { 161 LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future"); 162 allExist = checkUnresolvedInstances(eAction, conf); 163 } 164 if (allExist == true) { 165 materializeDataProperties(eAction, conf); 166 actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString()); 167 } 168 return allExist; 169 } 170 171 /** 172 * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list 173 * of files that will be needed. 174 * 175 * @param eAction action element 176 * @param conf action configuration 177 * @throws Exception thrown if failed to resolve data properties 178 * @update modify 'Action' element with appropriate list of files. 179 */ 180 @SuppressWarnings("unchecked") 181 private void materializeDataProperties(Element eAction, Configuration conf) throws Exception { 182 ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, conf, actionId); 183 Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow", 184 eAction.getNamespace()).getChild("configuration", eAction.getNamespace()); 185 if (configElem != null) { 186 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) { 187 resolveTagContents("value", propElem, eval); 188 } 189 } 190 } 191 192 /** 193 * To resolve property value which contains el functions 194 * 195 * @param tagName tag name 196 * @param elem the child element of "property" element 197 * @param eval el functions evaluator 198 * @throws Exception thrown if unable to resolve tag value 199 */ 200 private void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception { 201 if (elem == null) { 202 return; 203 } 204 Element tagElem = elem.getChild(tagName, elem.getNamespace()); 205 if (tagElem != null) { 206 String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText()); 207 tagElem.removeContent(); 208 tagElem.addContent(updated); 209 } 210 else { 211 LOG.warn(" Value NOT FOUND " + tagName); 212 } 213 } 214 215 /** 216 * Check if any unsolved paths under data output. Resolve the unresolved data input paths. 217 * 218 * @param eAction action element 219 * @param actionConf action configuration 220 * @return true if successful to resolve input and output paths 221 * @throws Exception thrown if failed to resolve data input and output paths 222 */ 223 @SuppressWarnings("unchecked") 224 private boolean checkUnresolvedInstances(Element eAction, Configuration actionConf) throws Exception { 225 String strAction = XmlUtils.prettyPrint(eAction).toString(); 226 Date nominalTime = DateUtils.parseDateUTC(eAction.getAttributeValue("action-nominal-time")); 227 String actualTimeStr = eAction.getAttributeValue("action-actual-time"); 228 Date actualTime = null; 229 if (actualTimeStr == null) { 230 LOG.debug("Unable to get action-actual-time from action xml, this job is submitted " + 231 "from previous version. Assign current date to actual time, action = " + actionId); 232 actualTime = new Date(); 233 } else { 234 actualTime = DateUtils.parseDateUTC(actualTimeStr); 235 } 236 237 StringBuffer resultedXml = new StringBuffer(); 238 239 boolean ret; 240 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 241 if (inputList != null) { 242 ret = materializeUnresolvedEvent(inputList.getChildren("data-in", eAction.getNamespace()), nominalTime, 243 actualTime, actionConf); 244 if (ret == false) { 245 resultedXml.append(strAction); 246 return false; 247 } 248 } 249 250 // Using latest() or future() in output-event is not intuitive. 251 // We need to make sure, this assumption is correct. 252 Element outputList = eAction.getChild("output-events", eAction.getNamespace()); 253 if (outputList != null) { 254 for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) { 255 if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) != null) { 256 throw new CommandException(ErrorCode.E1006, "coord:latest()/future()", 257 " not permitted in output-event "); 258 } 259 } 260 } 261 return true; 262 } 263 264 /** 265 * Resolve the list of data input paths 266 * 267 * @param eDataEvents the list of data input elements 268 * @param nominalTime action nominal time 269 * @param actualTime current time 270 * @param conf action configuration 271 * @return true if all unresolved URIs can be resolved 272 * @throws Exception thrown if failed to resolve data input paths 273 */ 274 @SuppressWarnings("unchecked") 275 private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime, 276 Configuration conf) throws Exception { 277 for (Element dEvent : eDataEvents) { 278 if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) == null) { 279 continue; 280 } 281 ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf); 282 String uresolvedInstance = dEvent.getChild("unresolved-instances", dEvent.getNamespace()).getTextTrim(); 283 String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR); 284 StringBuffer resolvedTmp = new StringBuffer(); 285 for (int i = 0; i < unresolvedList.length; i++) { 286 String ret = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]); 287 Boolean isResolved = (Boolean) eval.getVariable("is_resolved"); 288 if (isResolved == false) { 289 LOG.info("[" + actionId + "]::Cannot resolve: " + ret); 290 return false; 291 } 292 if (resolvedTmp.length() > 0) { 293 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR); 294 } 295 resolvedTmp.append((String) eval.getVariable("resolved_path")); 296 } 297 if (resolvedTmp.length() > 0) { 298 if (dEvent.getChild("uris", dEvent.getNamespace()) != null) { 299 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append( 300 dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim()); 301 dEvent.removeChild("uris", dEvent.getNamespace()); 302 } 303 Element uriInstance = new Element("uris", dEvent.getNamespace()); 304 uriInstance.addContent(resolvedTmp.toString()); 305 dEvent.getContent().add(1, uriInstance); 306 } 307 dEvent.removeChild("unresolved-instances", dEvent.getNamespace()); 308 } 309 310 return true; 311 } 312 313 /** 314 * Check all resolved URIs existence 315 * 316 * @param eAction action element 317 * @param existList the list of existed paths 318 * @param nonExistList the list of paths to check existence 319 * @param conf action configuration 320 * @return true if all nonExistList paths exist 321 * @throws IOException thrown if unable to access the path 322 */ 323 private boolean checkResolvedUris(Element eAction, StringBuilder existList, StringBuilder nonExistList, 324 Configuration conf) throws IOException { 325 LOG.info("[" + actionId + "]::ActionInputCheck:: In checkResolvedUris..."); 326 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 327 if (inputList != null) { 328 if (nonExistList.length() > 0) { 329 checkListOfPaths(existList, nonExistList, conf); 330 } 331 return nonExistList.length() == 0; 332 } 333 return true; 334 } 335 336 /** 337 * Check a list of non existed paths and add to exist list if it exists 338 * 339 * @param existList the list of existed paths 340 * @param nonExistList the list of paths to check existence 341 * @param conf action configuration 342 * @return true if all nonExistList paths exist 343 * @throws IOException thrown if unable to access the path 344 */ 345 private boolean checkListOfPaths(StringBuilder existList, StringBuilder nonExistList, Configuration conf) 346 throws IOException { 347 348 String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR); 349 if (uriList[0] != null) { 350 LOG.info("[" + actionId + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + " is Missing."); 351 } 352 353 nonExistList.delete(0, nonExistList.length()); 354 boolean allExists = true; 355 String existSeparator = "", nonExistSeparator = ""; 356 for (int i = 0; i < uriList.length; i++) { 357 if (allExists) { 358 allExists = pathExists(uriList[i], conf); 359 LOG.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists); 360 } 361 if (allExists) { 362 existList.append(existSeparator).append(uriList[i]); 363 existSeparator = CoordELFunctions.INSTANCE_SEPARATOR; 364 } 365 else { 366 nonExistList.append(nonExistSeparator).append(uriList[i]); 367 nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR; 368 } 369 } 370 return allExists; 371 } 372 373 /** 374 * Check if given path exists 375 * 376 * @param sPath uri path 377 * @param actionConf action configuration 378 * @return true if path exists 379 * @throws IOException thrown if unable to access the path 380 */ 381 private boolean pathExists(String sPath, Configuration actionConf) throws IOException { 382 LOG.debug("checking for the file " + sPath); 383 Path path = new Path(sPath); 384 String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 385 String group = ParamChecker.notEmpty(actionConf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME); 386 try { 387 return Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, path.toUri(), 388 new Configuration()).exists(path); 389 } 390 catch (HadoopAccessorException e) { 391 throw new IOException(e); 392 } 393 } 394 395 /** 396 * The function create a list of URIs separated by "," using the instances time stamp and URI-template 397 * 398 * @param event : <data-in> event 399 * @param instances : List of time stamp seprated by "," 400 * @param unresolvedInstances : list of instance with latest/future function 401 * @return : list of URIs separated by ",". 402 * @throws Exception thrown if failed to create URIs from unresolvedInstances 403 */ 404 @SuppressWarnings("unused") 405 private String createURIs(Element event, String instances, StringBuilder unresolvedInstances) throws Exception { 406 if (instances == null || instances.length() == 0) { 407 return ""; 408 } 409 String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR); 410 StringBuilder uris = new StringBuilder(); 411 412 for (int i = 0; i < instanceList.length; i++) { 413 int funcType = CoordCommandUtils.getFuncType(instanceList[i]); 414 if (funcType == CoordCommandUtils.LATEST || funcType == CoordCommandUtils.FUTURE) { 415 if (unresolvedInstances.length() > 0) { 416 unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR); 417 } 418 unresolvedInstances.append(instanceList[i]); 419 continue; 420 } 421 ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]); 422 if (uris.length() > 0) { 423 uris.append(CoordELFunctions.INSTANCE_SEPARATOR); 424 } 425 uris.append(CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()).getChild( 426 "uri-template", event.getNamespace()).getTextTrim())); 427 } 428 return uris.toString(); 429 } 430 431 /* (non-Javadoc) 432 * @see org.apache.oozie.command.XCommand#getEntityKey() 433 */ 434 @Override 435 protected String getEntityKey() { 436 return coordAction.getJobId(); 437 } 438 439 /* (non-Javadoc) 440 * @see org.apache.oozie.command.XCommand#isLockRequired() 441 */ 442 @Override 443 protected boolean isLockRequired() { 444 return true; 445 } 446 447 /* (non-Javadoc) 448 * @see org.apache.oozie.command.XCommand#eagerLoadState() 449 */ 450 @Override 451 protected void eagerLoadState() throws CommandException { 452 loadState(); 453 } 454 455 /* (non-Javadoc) 456 * @see org.apache.oozie.command.XCommand#loadState() 457 */ 458 @Override 459 protected void loadState() throws CommandException { 460 if (jpaService == null) { 461 jpaService = Services.get().get(JPAService.class); 462 } 463 try { 464 coordAction = jpaService.execute(new CoordActionGetJPAExecutor(actionId)); 465 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId())); 466 } 467 catch (JPAExecutorException je) { 468 throw new CommandException(je); 469 } 470 LogUtils.setLogInfo(coordAction, logInfo); 471 } 472 473 /* (non-Javadoc) 474 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 475 */ 476 @Override 477 protected void verifyPrecondition() throws CommandException, PreconditionException { 478 if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) { 479 throw new PreconditionException(ErrorCode.E1100, "[" + actionId 480 + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state=" 481 + coordAction.getStatus()); 482 } 483 484 // if eligible to do action input check when running with backward support is true 485 if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) { 486 return; 487 } 488 489 if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.PAUSED 490 && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) { 491 throw new PreconditionException( 492 ErrorCode.E1100, "["+ actionId + "]::CoordActionInputCheck:: Ignoring action." + 493 " Coordinator job is not in RUNNING/PAUSED/PAUSEDWITHERROR state, but state=" 494 + coordJob.getStatus()); 495 } 496 } 497 498 /* (non-Javadoc) 499 * @see org.apache.oozie.command.XCommand#getKey() 500 */ 501 @Override 502 public String getKey(){ 503 return getName() + "_" + actionId; 504 } 505 506 }