001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.coord; 019 020import java.io.IOException; 021import java.io.StringReader; 022import java.net.URI; 023import java.net.URISyntaxException; 024import java.text.ParseException; 025import java.util.Calendar; 026import java.util.Date; 027import java.util.List; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.oozie.CoordinatorActionBean; 031import org.apache.oozie.CoordinatorJobBean; 032import org.apache.oozie.ErrorCode; 033import org.apache.oozie.client.CoordinatorAction; 034import org.apache.oozie.client.Job; 035import org.apache.oozie.client.OozieClient; 036import org.apache.oozie.command.CommandException; 037import org.apache.oozie.command.PreconditionException; 038import org.apache.oozie.coord.CoordELEvaluator; 039import org.apache.oozie.coord.CoordELFunctions; 040import org.apache.oozie.coord.TimeUnit; 041import org.apache.oozie.dependency.URIHandler; 042import org.apache.oozie.dependency.URIHandlerException; 043import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor; 044import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; 045import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 046import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 047import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 048import org.apache.oozie.executor.jpa.JPAExecutorException; 049import org.apache.oozie.service.CallableQueueService; 050import org.apache.oozie.service.EventHandlerService; 051import org.apache.oozie.service.JPAService; 052import org.apache.oozie.service.Service; 053import org.apache.oozie.service.Services; 054import org.apache.oozie.service.URIHandlerService; 055import org.apache.oozie.util.DateUtils; 056import org.apache.oozie.util.ELEvaluator; 057import org.apache.oozie.util.LogUtils; 058import org.apache.oozie.util.ParamChecker; 059import org.apache.oozie.util.StatusUtils; 060import org.apache.oozie.util.XConfiguration; 061import org.apache.oozie.util.XLog; 062import org.apache.oozie.util.XmlUtils; 063import org.jdom.Element; 064 065/** 066 * The command to check if an action's data input paths exist in the file system. 067 */ 068public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> { 069 070 private final String actionId; 071 /** 072 * Property name of command re-queue interval for coordinator action input check in 073 * milliseconds. 074 */ 075 public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX 076 + "coord.input.check.requeue.interval"; 077 /** 078 * Default re-queue interval in ms. It is applied when no value defined in 079 * the oozie configuration. 080 */ 081 private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 60000; // 1 minute 082 private CoordinatorActionBean coordAction = null; 083 private CoordinatorJobBean coordJob = null; 084 private JPAService jpaService = null; 085 private String jobId = null; 086 087 public CoordActionInputCheckXCommand(String actionId, String jobId) { 088 super("coord_action_input", "coord_action_input", 1); 089 this.actionId = ParamChecker.notEmpty(actionId, "actionId"); 090 this.jobId = jobId; 091 } 092 093 /** 094 * Computes the nominal time of the next action. 095 * Based on CoordMaterializeTransitionXCommand#materializeActions 096 * 097 * @return the nominal time of the next action 098 * @throws ParseException 099 */ 100 private Date computeNextNominalTime() throws ParseException { 101 Date nextNominalTime; 102 boolean isCronFrequency = false; 103 int freq = -1; 104 try { 105 freq = Integer.parseInt(coordJob.getFrequency()); 106 } catch (NumberFormatException e) { 107 isCronFrequency = true; 108 } 109 110 if (isCronFrequency) { 111 nextNominalTime = CoordCommandUtils.getNextValidActionTimeForCronFrequency(coordAction.getNominalTime(), coordJob); 112 } else { 113 Calendar nextNominalTimeCal = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone())); 114 nextNominalTimeCal.setTime(coordAction.getNominalTime()); 115 TimeUnit freqTU = TimeUnit.valueOf(coordJob.getTimeUnitStr()); 116 nextNominalTimeCal.add(freqTU.getCalendarUnit(), freq); 117 nextNominalTime = nextNominalTimeCal.getTime(); 118 } 119 120 // If the next nominal time is after the job's end time, then this is the last action, so return null 121 if (nextNominalTime.after(coordJob.getEndTime())) { 122 nextNominalTime = null; 123 } 124 return nextNominalTime; 125 } 126 127 @Override 128 protected Void execute() throws CommandException { 129 LOG.debug("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state."); 130 131 // this action should only get processed if current time > nominal time; 132 // otherwise, requeue this action for delay execution; 133 Date nominalTime = coordAction.getNominalTime(); 134 Date currentTime = new Date(); 135 if (nominalTime.compareTo(currentTime) > 0) { 136 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), Math.max((nominalTime.getTime() - currentTime 137 .getTime()), getCoordInputCheckRequeueInterval())); 138 updateCoordAction(coordAction, false); 139 LOG.info("[" + actionId 140 + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current=" 141 + DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" + DateUtils.formatDateOozieTZ(nominalTime)); 142 143 return null; 144 } 145 146 StringBuilder actionXml = new StringBuilder(coordAction.getActionXml()); 147 boolean isChangeInDependency = false; 148 try { 149 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); 150 Date now = new Date(); 151 if (coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.LAST_ONLY)) { 152 Date nextNominalTime = computeNextNominalTime(); 153 if (nextNominalTime != null) { 154 // If the current time is after the next action's nominal time, then we've passed the window where this action 155 // should be started; so set it to SKIPPED 156 if (now.after(nextNominalTime)) { 157 LOG.info("LAST_ONLY execution: Preparing to skip action [{0}] because the current time [{1}] is later than " 158 + "the nominal time [{2}] of the next action]", coordAction.getId(), 159 DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime)); 160 queue(new CoordActionSkipXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); 161 return null; 162 } else { 163 LOG.debug("LAST_ONLY execution: Not skipping action [{0}] because the current time [{1}] is earlier than " 164 + "the nominal time [{2}] of the next action]", coordAction.getId(), 165 DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime)); 166 } 167 } 168 } 169 else if (coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.NONE)) { 170 // If the current time is after the nominal time of this action plus some tolerance, 171 // then we've passed the window where this action 172 // should be started; so set it to SKIPPED 173 Calendar cal = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone())); 174 cal.setTime(nominalTime); 175 cal.add(Calendar.MINUTE, Services.get().getConf().getInt("oozie.coord.execution.none.tolerance", 1)); 176 nominalTime = cal.getTime(); 177 if (now.after(nominalTime)) { 178 LOG.info("NONE execution: Preparing to skip action [{0}] because the current time [{1}] is later than " 179 + "the nominal time [{2}] of the current action]", coordAction.getId(), 180 DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nominalTime)); 181 queue(new CoordActionSkipXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); 182 return null; 183 } else { 184 LOG.debug("NONE execution: Not skipping action [{0}] because the current time [{1}] is earlier than " 185 + "the nominal time [{2}] of the current action]", coordAction.getId(), 186 DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(coordAction.getNominalTime())); 187 } 188 } 189 190 StringBuilder existList = new StringBuilder(); 191 StringBuilder nonExistList = new StringBuilder(); 192 StringBuilder nonResolvedList = new StringBuilder(); 193 String firstMissingDependency = ""; 194 String missingDeps = coordAction.getMissingDependencies(); 195 CoordCommandUtils.getResolvedList(missingDeps, nonExistList, nonResolvedList); 196 197 // For clarity regarding which is the missing dependency in synchronous order 198 // instead of printing entire list, some of which, may be available 199 if(nonExistList.length() > 0) { 200 firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0]; 201 } 202 LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + firstMissingDependency + " " 203 + nonResolvedList.toString()); 204 // Updating the list of data dependencies that are available and those that are yet not 205 boolean status = checkInput(actionXml, existList, nonExistList, actionConf); 206 String pushDeps = coordAction.getPushMissingDependencies(); 207 // Resolve latest/future only when all current missingDependencies and 208 // pushMissingDependencies are met 209 if (status && nonResolvedList.length() > 0) { 210 status = (pushDeps == null || pushDeps.length() == 0) ? checkUnResolvedInput(actionXml, actionConf) 211 : false; 212 } 213 coordAction.setLastModifiedTime(currentTime); 214 coordAction.setActionXml(actionXml.toString()); 215 if (nonResolvedList.length() > 0 && status == false) { 216 nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList); 217 } 218 String nonExistListStr = nonExistList.toString(); 219 if (!nonExistListStr.equals(missingDeps) || missingDeps.isEmpty()) { 220 // missingDeps null or empty means action should become READY 221 isChangeInDependency = true; 222 coordAction.setMissingDependencies(nonExistListStr); 223 } 224 if (status && (pushDeps == null || pushDeps.length() == 0)) { 225 String newActionXml = resolveCoordConfiguration(actionXml, actionConf, actionId); 226 actionXml.replace(0, actionXml.length(), newActionXml); 227 coordAction.setActionXml(actionXml.toString()); 228 coordAction.setStatus(CoordinatorAction.Status.READY); 229 updateCoordAction(coordAction, true); 230 new CoordActionReadyXCommand(coordAction.getJobId()).call(getEntityKey()); 231 } 232 else if (!isTimeout(currentTime)) { 233 if (status == false) { 234 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 235 getCoordInputCheckRequeueInterval()); 236 } 237 updateCoordAction(coordAction, isChangeInDependency); 238 } 239 else { 240 if (!nonExistListStr.isEmpty() && pushDeps == null || pushDeps.length() == 0) { 241 queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); 242 } 243 else { 244 // Let CoordPushDependencyCheckXCommand queue the timeout 245 queue(new CoordPushDependencyCheckXCommand(coordAction.getId())); 246 } 247 updateCoordAction(coordAction, isChangeInDependency); 248 } 249 } 250 catch (Exception e) { 251 if (isTimeout(currentTime)) { 252 LOG.debug("Queueing timeout command"); 253 // XCommand.queue() will not work when there is a Exception 254 Services.get().get(CallableQueueService.class) 255 .queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); 256 } 257 updateCoordAction(coordAction, isChangeInDependency); 258 throw new CommandException(ErrorCode.E1021, e.getMessage(), e); 259 } 260 return null; 261 } 262 263 264 static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId) throws Exception { 265 Element eAction = XmlUtils.parseXml(actionXml.toString()); 266 ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, actionConf, actionId); 267 materializeDataProperties(eAction, actionConf, eval); 268 return XmlUtils.prettyPrint(eAction).toString(); 269 } 270 271 private boolean isTimeout(Date currentTime) { 272 long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction 273 .getCreatedTime().getTime())) 274 / (60 * 1000); 275 int timeOut = coordAction.getTimeOut(); 276 return (timeOut >= 0) && (waitingTime > timeOut); 277 } 278 279 private void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency) 280 throws CommandException { 281 coordAction.setLastModifiedTime(new Date()); 282 if (jpaService != null) { 283 try { 284 if (isChangeInDependency) { 285 CoordActionQueryExecutor.getInstance().executeUpdate( 286 CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK, coordAction); 287 if (EventHandlerService.isEnabled() && coordAction.getStatus() != CoordinatorAction.Status.READY) { 288 // since event is not to be generated unless action 289 // RUNNING via StartX 290 generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null); 291 } 292 } 293 else { 294 CoordActionQueryExecutor.getInstance().executeUpdate( 295 CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, coordAction); 296 } 297 } 298 catch (JPAExecutorException jex) { 299 throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex); 300 } 301 } 302 } 303 304 /** 305 * This function reads the value of re-queue interval for coordinator input 306 * check command from the Oozie configuration provided by Configuration 307 * Service. If nothing defined in the configuration, it uses the code 308 * specified default value. 309 * 310 * @return re-queue interval in ms 311 */ 312 public long getCoordInputCheckRequeueInterval() { 313 long requeueInterval = Services.get().getConf().getLong(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL, 314 DEFAULT_COMMAND_REQUEUE_INTERVAL); 315 return requeueInterval; 316 } 317 318 /** 319 * To check the list of input paths if all of them exist 320 * 321 * @param actionXml action xml 322 * @param existList the list of existed paths 323 * @param nonExistList the list of non existed paths 324 * @param conf action configuration 325 * @return true if all input paths are existed 326 * @throws Exception thrown of unable to check input path 327 */ 328 protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList, 329 Configuration conf) throws Exception { 330 Element eAction = XmlUtils.parseXml(actionXml.toString()); 331 return checkResolvedUris(eAction, existList, nonExistList, conf); 332 } 333 334 protected boolean checkUnResolvedInput(StringBuilder actionXml, Configuration conf) throws Exception { 335 Element eAction = XmlUtils.parseXml(actionXml.toString()); 336 LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future"); 337 boolean allExist = checkUnresolvedInstances(eAction, conf); 338 if (allExist) { 339 actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString()); 340 } 341 return allExist; 342 } 343 344 /** 345 * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list 346 * of files that will be needed. 347 * 348 * @param eAction action element 349 * @param conf action configuration 350 * @throws Exception thrown if failed to resolve data properties 351 * @update modify 'Action' element with appropriate list of files. 352 */ 353 @SuppressWarnings("unchecked") 354 static void materializeDataProperties(Element eAction, Configuration conf, ELEvaluator eval) throws Exception { 355 Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow", 356 eAction.getNamespace()).getChild("configuration", eAction.getNamespace()); 357 if (configElem != null) { 358 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) { 359 resolveTagContents("value", propElem, eval); 360 } 361 } 362 } 363 364 /** 365 * To resolve property value which contains el functions 366 * 367 * @param tagName tag name 368 * @param elem the child element of "property" element 369 * @param eval el functions evaluator 370 * @throws Exception thrown if unable to resolve tag value 371 */ 372 private static void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception { 373 if (elem == null) { 374 return; 375 } 376 Element tagElem = elem.getChild(tagName, elem.getNamespace()); 377 if (tagElem != null) { 378 String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText()); 379 tagElem.removeContent(); 380 tagElem.addContent(updated); 381 } 382 else { 383 XLog.getLog(CoordActionInputCheckXCommand.class).warn(" Value NOT FOUND " + tagName); 384 } 385 } 386 387 /** 388 * Check if any unsolved paths under data output. Resolve the unresolved data input paths. 389 * 390 * @param eAction action element 391 * @param actionConf action configuration 392 * @return true if successful to resolve input and output paths 393 * @throws Exception thrown if failed to resolve data input and output paths 394 */ 395 @SuppressWarnings("unchecked") 396 private boolean checkUnresolvedInstances(Element eAction, Configuration actionConf) throws Exception { 397 String strAction = XmlUtils.prettyPrint(eAction).toString(); 398 Date nominalTime = DateUtils.parseDateOozieTZ(eAction.getAttributeValue("action-nominal-time")); 399 String actualTimeStr = eAction.getAttributeValue("action-actual-time"); 400 Date actualTime = null; 401 if (actualTimeStr == null) { 402 LOG.debug("Unable to get action-actual-time from action xml, this job is submitted " + 403 "from previous version. Assign current date to actual time, action = " + actionId); 404 actualTime = new Date(); 405 } else { 406 actualTime = DateUtils.parseDateOozieTZ(actualTimeStr); 407 } 408 409 StringBuffer resultedXml = new StringBuffer(); 410 411 boolean ret; 412 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 413 if (inputList != null) { 414 ret = materializeUnresolvedEvent(inputList.getChildren("data-in", eAction.getNamespace()), nominalTime, 415 actualTime, actionConf); 416 if (ret == false) { 417 resultedXml.append(strAction); 418 return false; 419 } 420 } 421 422 // Using latest() or future() in output-event is not intuitive. 423 // We need to make sure, this assumption is correct. 424 Element outputList = eAction.getChild("output-events", eAction.getNamespace()); 425 if (outputList != null) { 426 for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) { 427 if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()) != null) { 428 throw new CommandException(ErrorCode.E1006, "coord:latest()/future()", 429 " not permitted in output-event "); 430 } 431 } 432 } 433 return true; 434 } 435 436 /** 437 * Resolve the list of data input paths 438 * 439 * @param eDataEvents the list of data input elements 440 * @param nominalTime action nominal time 441 * @param actualTime current time 442 * @param conf action configuration 443 * @return true if all unresolved URIs can be resolved 444 * @throws Exception thrown if failed to resolve data input paths 445 */ 446 @SuppressWarnings("unchecked") 447 private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime, 448 Configuration conf) throws Exception { 449 for (Element dEvent : eDataEvents) { 450 if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()) == null) { 451 continue; 452 } 453 ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf); 454 String uresolvedInstance = dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()).getTextTrim(); 455 String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR); 456 StringBuffer resolvedTmp = new StringBuffer(); 457 for (int i = 0; i < unresolvedList.length; i++) { 458 String ret = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]); 459 Boolean isResolved = (Boolean) eval.getVariable("is_resolved"); 460 if (isResolved == false) { 461 LOG.info("[" + actionId + "]::Cannot resolve: " + ret); 462 return false; 463 } 464 if (resolvedTmp.length() > 0) { 465 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR); 466 } 467 resolvedTmp.append((String) eval.getVariable("resolved_path")); 468 } 469 if (resolvedTmp.length() > 0) { 470 if (dEvent.getChild("uris", dEvent.getNamespace()) != null) { 471 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append( 472 dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim()); 473 dEvent.removeChild("uris", dEvent.getNamespace()); 474 } 475 Element uriInstance = new Element("uris", dEvent.getNamespace()); 476 uriInstance.addContent(resolvedTmp.toString()); 477 dEvent.getContent().add(1, uriInstance); 478 } 479 dEvent.removeChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()); 480 } 481 482 return true; 483 } 484 485 /** 486 * Check all resolved URIs existence 487 * 488 * @param eAction action element 489 * @param existList the list of existed paths 490 * @param nonExistList the list of paths to check existence 491 * @param conf action configuration 492 * @return true if all nonExistList paths exist 493 * @throws IOException thrown if unable to access the path 494 */ 495 private boolean checkResolvedUris(Element eAction, StringBuilder existList, StringBuilder nonExistList, 496 Configuration conf) throws IOException { 497 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 498 if (inputList != null) { 499 if (nonExistList.length() > 0) { 500 checkListOfPaths(existList, nonExistList, conf); 501 } 502 return nonExistList.length() == 0; 503 } 504 return true; 505 } 506 507 /** 508 * Check a list of non existed paths and add to exist list if it exists 509 * 510 * @param existList the list of existed paths 511 * @param nonExistList the list of paths to check existence 512 * @param conf action configuration 513 * @return true if all nonExistList paths exist 514 * @throws IOException thrown if unable to access the path 515 */ 516 private boolean checkListOfPaths(StringBuilder existList, StringBuilder nonExistList, Configuration conf) 517 throws IOException { 518 519 String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR); 520 if (uriList[0] != null) { 521 LOG.info("[" + actionId + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + " is Missing."); 522 } 523 524 nonExistList.delete(0, nonExistList.length()); 525 boolean allExists = true; 526 String existSeparator = "", nonExistSeparator = ""; 527 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 528 for (int i = 0; i < uriList.length; i++) { 529 if (allExists) { 530 allExists = pathExists(uriList[i], conf, user); 531 LOG.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists); 532 } 533 if (allExists) { 534 existList.append(existSeparator).append(uriList[i]); 535 existSeparator = CoordELFunctions.INSTANCE_SEPARATOR; 536 } 537 else { 538 nonExistList.append(nonExistSeparator).append(uriList[i]); 539 nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR; 540 } 541 } 542 return allExists; 543 } 544 545 /** 546 * Check if given path exists 547 * 548 * @param sPath uri path 549 * @param actionConf action configuration 550 * @return true if path exists 551 * @throws IOException thrown if unable to access the path 552 */ 553 protected boolean pathExists(String sPath, Configuration actionConf, String user) throws IOException { 554 LOG.debug("checking for the file " + sPath); 555 try { 556 URI uri = new URI(sPath); 557 URIHandlerService service = Services.get().get(URIHandlerService.class); 558 URIHandler handler = service.getURIHandler(uri); 559 return handler.exists(uri, actionConf, user); 560 } 561 catch (URIHandlerException e) { 562 coordAction.setErrorCode(e.getErrorCode().toString()); 563 coordAction.setErrorMessage(e.getMessage()); 564 throw new IOException(e); 565 } catch (URISyntaxException e) { 566 coordAction.setErrorCode(ErrorCode.E0906.toString()); 567 coordAction.setErrorMessage(e.getMessage()); 568 throw new IOException(e); 569 } 570 } 571 572 /** 573 * The function create a list of URIs separated by "," using the instances time stamp and URI-template 574 * 575 * @param event : <data-in> event 576 * @param instances : List of time stamp seprated by "," 577 * @param unresolvedInstances : list of instance with latest/future function 578 * @return : list of URIs separated by ",". 579 * @throws Exception thrown if failed to create URIs from unresolvedInstances 580 */ 581 @SuppressWarnings("unused") 582 private String createURIs(Element event, String instances, StringBuilder unresolvedInstances) throws Exception { 583 if (instances == null || instances.length() == 0) { 584 return ""; 585 } 586 String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR); 587 StringBuilder uris = new StringBuilder(); 588 589 for (int i = 0; i < instanceList.length; i++) { 590 int funcType = CoordCommandUtils.getFuncType(instanceList[i]); 591 if (funcType == CoordCommandUtils.LATEST || funcType == CoordCommandUtils.FUTURE) { 592 if (unresolvedInstances.length() > 0) { 593 unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR); 594 } 595 unresolvedInstances.append(instanceList[i]); 596 continue; 597 } 598 ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]); 599 if (uris.length() > 0) { 600 uris.append(CoordELFunctions.INSTANCE_SEPARATOR); 601 } 602 uris.append(CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()).getChild( 603 "uri-template", event.getNamespace()).getTextTrim())); 604 } 605 return uris.toString(); 606 } 607 608 /** 609 * getting the error code of the coord action. (used mainly for unit testing) 610 */ 611 protected String getCoordActionErrorCode() { 612 if (coordAction != null) { 613 return coordAction.getErrorCode(); 614 } 615 return null; 616 } 617 618 /** 619 * getting the error message of the coord action. (used mainly for unit testing) 620 */ 621 protected String getCoordActionErrorMsg() { 622 if (coordAction != null) { 623 return coordAction.getErrorMessage(); 624 } 625 return null; 626 } 627 628 @Override 629 public String getEntityKey() { 630 return this.jobId; 631 } 632 633 @Override 634 protected boolean isLockRequired() { 635 return true; 636 } 637 638 @Override 639 protected void loadState() throws CommandException { 640 if (jpaService == null) { 641 jpaService = Services.get().get(JPAService.class); 642 } 643 try { 644 coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId)); 645 coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_INPUT_CHECK, 646 coordAction.getJobId()); 647 } 648 catch (JPAExecutorException je) { 649 throw new CommandException(je); 650 } 651 LogUtils.setLogInfo(coordAction, logInfo); 652 } 653 654 @Override 655 protected void verifyPrecondition() throws CommandException, PreconditionException { 656 if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) { 657 throw new PreconditionException(ErrorCode.E1100, "[" + actionId 658 + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state=" 659 + coordAction.getStatus()); 660 } 661 662 // if eligible to do action input check when running with backward support is true 663 if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) { 664 return; 665 } 666 667 if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR && coordJob.getStatus() != Job.Status.PAUSED 668 && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) { 669 throw new PreconditionException( 670 ErrorCode.E1100, "["+ actionId + "]::CoordActionInputCheck:: Ignoring action." + 671 " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state=" 672 + coordJob.getStatus()); 673 } 674 } 675 676 @Override 677 public String getKey(){ 678 return getName() + "_" + actionId; 679 } 680 681}