001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.net.URI; 023 import java.net.URISyntaxException; 024 import java.util.Date; 025 import java.util.List; 026 027 import org.apache.hadoop.conf.Configuration; 028 import org.apache.oozie.CoordinatorActionBean; 029 import org.apache.oozie.CoordinatorJobBean; 030 import org.apache.oozie.ErrorCode; 031 import org.apache.oozie.client.CoordinatorAction; 032 import org.apache.oozie.client.Job; 033 import org.apache.oozie.client.OozieClient; 034 import org.apache.oozie.command.CommandException; 035 import org.apache.oozie.command.PreconditionException; 036 import org.apache.oozie.coord.CoordELEvaluator; 037 import org.apache.oozie.coord.CoordELFunctions; 038 import org.apache.oozie.dependency.URIHandler; 039 import org.apache.oozie.dependency.URIHandlerException; 040 import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor; 041 import org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor; 042 import org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor; 043 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 044 import org.apache.oozie.executor.jpa.JPAExecutorException; 045 import org.apache.oozie.service.CallableQueueService; 046 import org.apache.oozie.service.EventHandlerService; 047 import org.apache.oozie.service.JPAService; 048 import org.apache.oozie.service.Service; 049 import org.apache.oozie.service.Services; 050 import org.apache.oozie.service.URIHandlerService; 051 import org.apache.oozie.util.DateUtils; 052 import org.apache.oozie.util.ELEvaluator; 053 import org.apache.oozie.util.Instrumentation; 054 import org.apache.oozie.util.LogUtils; 055 import org.apache.oozie.util.ParamChecker; 056 import org.apache.oozie.util.StatusUtils; 057 import org.apache.oozie.util.XConfiguration; 058 import org.apache.oozie.util.XLog; 059 import org.apache.oozie.util.XmlUtils; 060 import org.jdom.Element; 061 062 /** 063 * The command to check if an action's data input paths exist in the file system. 064 */ 065 public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> { 066 067 private final String actionId; 068 /** 069 * Property name of command re-queue interval for coordinator action input check in 070 * milliseconds. 071 */ 072 public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX 073 + "coord.input.check.requeue.interval"; 074 /** 075 * Default re-queue interval in ms. It is applied when no value defined in 076 * the oozie configuration. 077 */ 078 private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 60000; // 1 minute 079 private CoordinatorActionBean coordAction = null; 080 private CoordinatorJobBean coordJob = null; 081 private JPAService jpaService = null; 082 private String jobId = null; 083 084 public CoordActionInputCheckXCommand(String actionId, String jobId) { 085 super("coord_action_input", "coord_action_input", 1); 086 this.actionId = ParamChecker.notEmpty(actionId, "actionId"); 087 this.jobId = jobId; 088 } 089 090 @Override 091 protected Void execute() throws CommandException { 092 LOG.info("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state."); 093 094 // this action should only get processed if current time > nominal time; 095 // otherwise, requeue this action for delay execution; 096 Date nominalTime = coordAction.getNominalTime(); 097 Date currentTime = new Date(); 098 if (nominalTime.compareTo(currentTime) > 0) { 099 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), Math.max((nominalTime.getTime() - currentTime 100 .getTime()), getCoordInputCheckRequeueInterval())); 101 updateCoordAction(coordAction, false); 102 LOG.info("[" + actionId 103 + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current=" 104 + currentTime + ", nominal=" + nominalTime); 105 106 return null; 107 } 108 109 StringBuilder actionXml = new StringBuilder(coordAction.getActionXml()); 110 Instrumentation.Cron cron = new Instrumentation.Cron(); 111 boolean isChangeInDependency = false; 112 try { 113 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); 114 cron.start(); 115 StringBuilder existList = new StringBuilder(); 116 StringBuilder nonExistList = new StringBuilder(); 117 StringBuilder nonResolvedList = new StringBuilder(); 118 String firstMissingDependency = ""; 119 String missingDeps = coordAction.getMissingDependencies(); 120 CoordCommandUtils.getResolvedList(missingDeps, nonExistList, nonResolvedList); 121 122 // For clarity regarding which is the missing dependency in synchronous order 123 // instead of printing entire list, some of which, may be available 124 if(nonExistList.length() > 0) { 125 firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0]; 126 } 127 LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + firstMissingDependency + " " 128 + nonResolvedList.toString()); 129 // Updating the list of data dependencies that are available and those that are yet not 130 boolean status = checkInput(actionXml, existList, nonExistList, actionConf); 131 String pushDeps = coordAction.getPushMissingDependencies(); 132 // Resolve latest/future only when all current missingDependencies and 133 // pushMissingDependencies are met 134 if (status && nonResolvedList.length() > 0) { 135 status = (pushDeps == null || pushDeps.length() == 0) ? checkUnResolvedInput(actionXml, actionConf) 136 : false; 137 } 138 coordAction.setLastModifiedTime(currentTime); 139 coordAction.setActionXml(actionXml.toString()); 140 if (nonResolvedList.length() > 0 && status == false) { 141 nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList); 142 } 143 String nonExistListStr = nonExistList.toString(); 144 if (!nonExistListStr.equals(missingDeps) || missingDeps.isEmpty()) { 145 // missingDeps null or empty means action should become READY 146 isChangeInDependency = true; 147 coordAction.setMissingDependencies(nonExistListStr); 148 } 149 if (status && (pushDeps == null || pushDeps.length() == 0)) { 150 String newActionXml = resolveCoordConfiguration(actionXml, actionConf, actionId); 151 actionXml.replace(0, actionXml.length(), newActionXml); 152 coordAction.setActionXml(actionXml.toString()); 153 coordAction.setStatus(CoordinatorAction.Status.READY); 154 // pass jobID to the CoordActionReadyXCommand 155 queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100); 156 } 157 else if (!isTimeout(currentTime)) { 158 if (status == false) { 159 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 160 getCoordInputCheckRequeueInterval()); 161 } 162 } 163 else { 164 if (!nonExistListStr.isEmpty() && pushDeps == null || pushDeps.length() == 0) { 165 queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); 166 } 167 else { 168 // Let CoordPushDependencyCheckXCommand queue the timeout 169 queue(new CoordPushDependencyCheckXCommand(coordAction.getId())); 170 } 171 } 172 } 173 catch (Exception e) { 174 if (isTimeout(currentTime)) { 175 LOG.debug("Queueing timeout command"); 176 // XCommand.queue() will not work when there is a Exception 177 Services.get().get(CallableQueueService.class) 178 .queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); 179 } 180 throw new CommandException(ErrorCode.E1021, e.getMessage(), e); 181 } 182 finally { 183 cron.stop(); 184 updateCoordAction(coordAction, isChangeInDependency); 185 } 186 return null; 187 } 188 189 190 static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId) throws Exception { 191 Element eAction = XmlUtils.parseXml(actionXml.toString()); 192 ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, actionConf, actionId); 193 materializeDataProperties(eAction, actionConf, eval); 194 return XmlUtils.prettyPrint(eAction).toString(); 195 } 196 197 private boolean isTimeout(Date currentTime) { 198 long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction 199 .getCreatedTime().getTime())) 200 / (60 * 1000); 201 int timeOut = coordAction.getTimeOut(); 202 return (timeOut >= 0) && (waitingTime > timeOut); 203 } 204 205 private void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency) 206 throws CommandException { 207 coordAction.setLastModifiedTime(new Date()); 208 if (jpaService != null) { 209 try { 210 if (isChangeInDependency) { 211 jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(coordAction)); 212 if (EventHandlerService.isEnabled() 213 && coordAction.getStatus() != CoordinatorAction.Status.READY) { 214 //since event is not to be generated unless action RUNNING via StartX 215 generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null); 216 } 217 } 218 else { 219 jpaService.execute(new CoordActionUpdateForModifiedTimeJPAExecutor(coordAction)); 220 } 221 } 222 catch (JPAExecutorException jex) { 223 throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex); 224 } 225 } 226 } 227 228 /** 229 * This function reads the value of re-queue interval for coordinator input 230 * check command from the Oozie configuration provided by Configuration 231 * Service. If nothing defined in the configuration, it uses the code 232 * specified default value. 233 * 234 * @return re-queue interval in ms 235 */ 236 public long getCoordInputCheckRequeueInterval() { 237 long requeueInterval = Services.get().getConf().getLong(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL, 238 DEFAULT_COMMAND_REQUEUE_INTERVAL); 239 return requeueInterval; 240 } 241 242 /** 243 * To check the list of input paths if all of them exist 244 * 245 * @param actionXml action xml 246 * @param existList the list of existed paths 247 * @param nonExistList the list of non existed paths 248 * @param conf action configuration 249 * @return true if all input paths are existed 250 * @throws Exception thrown of unable to check input path 251 */ 252 protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList, 253 Configuration conf) throws Exception { 254 Element eAction = XmlUtils.parseXml(actionXml.toString()); 255 return checkResolvedUris(eAction, existList, nonExistList, conf); 256 } 257 258 protected boolean checkUnResolvedInput(StringBuilder actionXml, Configuration conf) throws Exception { 259 Element eAction = XmlUtils.parseXml(actionXml.toString()); 260 LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future"); 261 boolean allExist = checkUnresolvedInstances(eAction, conf); 262 if (allExist) { 263 actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString()); 264 } 265 return allExist; 266 } 267 268 /** 269 * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list 270 * of files that will be needed. 271 * 272 * @param eAction action element 273 * @param conf action configuration 274 * @throws Exception thrown if failed to resolve data properties 275 * @update modify 'Action' element with appropriate list of files. 276 */ 277 @SuppressWarnings("unchecked") 278 static void materializeDataProperties(Element eAction, Configuration conf, ELEvaluator eval) throws Exception { 279 Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow", 280 eAction.getNamespace()).getChild("configuration", eAction.getNamespace()); 281 if (configElem != null) { 282 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) { 283 resolveTagContents("value", propElem, eval); 284 } 285 } 286 } 287 288 /** 289 * To resolve property value which contains el functions 290 * 291 * @param tagName tag name 292 * @param elem the child element of "property" element 293 * @param eval el functions evaluator 294 * @throws Exception thrown if unable to resolve tag value 295 */ 296 private static void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception { 297 if (elem == null) { 298 return; 299 } 300 Element tagElem = elem.getChild(tagName, elem.getNamespace()); 301 if (tagElem != null) { 302 String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText()); 303 tagElem.removeContent(); 304 tagElem.addContent(updated); 305 } 306 else { 307 XLog.getLog(CoordActionInputCheckXCommand.class).warn(" Value NOT FOUND " + tagName); 308 } 309 } 310 311 /** 312 * Check if any unsolved paths under data output. Resolve the unresolved data input paths. 313 * 314 * @param eAction action element 315 * @param actionConf action configuration 316 * @return true if successful to resolve input and output paths 317 * @throws Exception thrown if failed to resolve data input and output paths 318 */ 319 @SuppressWarnings("unchecked") 320 private boolean checkUnresolvedInstances(Element eAction, Configuration actionConf) throws Exception { 321 String strAction = XmlUtils.prettyPrint(eAction).toString(); 322 Date nominalTime = DateUtils.parseDateOozieTZ(eAction.getAttributeValue("action-nominal-time")); 323 String actualTimeStr = eAction.getAttributeValue("action-actual-time"); 324 Date actualTime = null; 325 if (actualTimeStr == null) { 326 LOG.debug("Unable to get action-actual-time from action xml, this job is submitted " + 327 "from previous version. Assign current date to actual time, action = " + actionId); 328 actualTime = new Date(); 329 } else { 330 actualTime = DateUtils.parseDateOozieTZ(actualTimeStr); 331 } 332 333 StringBuffer resultedXml = new StringBuffer(); 334 335 boolean ret; 336 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 337 if (inputList != null) { 338 ret = materializeUnresolvedEvent(inputList.getChildren("data-in", eAction.getNamespace()), nominalTime, 339 actualTime, actionConf); 340 if (ret == false) { 341 resultedXml.append(strAction); 342 return false; 343 } 344 } 345 346 // Using latest() or future() in output-event is not intuitive. 347 // We need to make sure, this assumption is correct. 348 Element outputList = eAction.getChild("output-events", eAction.getNamespace()); 349 if (outputList != null) { 350 for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) { 351 if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()) != null) { 352 throw new CommandException(ErrorCode.E1006, "coord:latest()/future()", 353 " not permitted in output-event "); 354 } 355 } 356 } 357 return true; 358 } 359 360 /** 361 * Resolve the list of data input paths 362 * 363 * @param eDataEvents the list of data input elements 364 * @param nominalTime action nominal time 365 * @param actualTime current time 366 * @param conf action configuration 367 * @return true if all unresolved URIs can be resolved 368 * @throws Exception thrown if failed to resolve data input paths 369 */ 370 @SuppressWarnings("unchecked") 371 private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime, 372 Configuration conf) throws Exception { 373 for (Element dEvent : eDataEvents) { 374 if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()) == null) { 375 continue; 376 } 377 ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf); 378 String uresolvedInstance = dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()).getTextTrim(); 379 String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR); 380 StringBuffer resolvedTmp = new StringBuffer(); 381 for (int i = 0; i < unresolvedList.length; i++) { 382 String ret = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]); 383 Boolean isResolved = (Boolean) eval.getVariable("is_resolved"); 384 if (isResolved == false) { 385 LOG.info("[" + actionId + "]::Cannot resolve: " + ret); 386 return false; 387 } 388 if (resolvedTmp.length() > 0) { 389 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR); 390 } 391 resolvedTmp.append((String) eval.getVariable("resolved_path")); 392 } 393 if (resolvedTmp.length() > 0) { 394 if (dEvent.getChild("uris", dEvent.getNamespace()) != null) { 395 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append( 396 dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim()); 397 dEvent.removeChild("uris", dEvent.getNamespace()); 398 } 399 Element uriInstance = new Element("uris", dEvent.getNamespace()); 400 uriInstance.addContent(resolvedTmp.toString()); 401 dEvent.getContent().add(1, uriInstance); 402 } 403 dEvent.removeChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()); 404 } 405 406 return true; 407 } 408 409 /** 410 * Check all resolved URIs existence 411 * 412 * @param eAction action element 413 * @param existList the list of existed paths 414 * @param nonExistList the list of paths to check existence 415 * @param conf action configuration 416 * @return true if all nonExistList paths exist 417 * @throws IOException thrown if unable to access the path 418 */ 419 private boolean checkResolvedUris(Element eAction, StringBuilder existList, StringBuilder nonExistList, 420 Configuration conf) throws IOException { 421 LOG.info("[" + actionId + "]::ActionInputCheck:: In checkResolvedUris..."); 422 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 423 if (inputList != null) { 424 if (nonExistList.length() > 0) { 425 checkListOfPaths(existList, nonExistList, conf); 426 } 427 return nonExistList.length() == 0; 428 } 429 return true; 430 } 431 432 /** 433 * Check a list of non existed paths and add to exist list if it exists 434 * 435 * @param existList the list of existed paths 436 * @param nonExistList the list of paths to check existence 437 * @param conf action configuration 438 * @return true if all nonExistList paths exist 439 * @throws IOException thrown if unable to access the path 440 */ 441 private boolean checkListOfPaths(StringBuilder existList, StringBuilder nonExistList, Configuration conf) 442 throws IOException { 443 444 String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR); 445 if (uriList[0] != null) { 446 LOG.info("[" + actionId + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + " is Missing."); 447 } 448 449 nonExistList.delete(0, nonExistList.length()); 450 boolean allExists = true; 451 String existSeparator = "", nonExistSeparator = ""; 452 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 453 for (int i = 0; i < uriList.length; i++) { 454 if (allExists) { 455 allExists = pathExists(uriList[i], conf, user); 456 LOG.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists); 457 } 458 if (allExists) { 459 existList.append(existSeparator).append(uriList[i]); 460 existSeparator = CoordELFunctions.INSTANCE_SEPARATOR; 461 } 462 else { 463 nonExistList.append(nonExistSeparator).append(uriList[i]); 464 nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR; 465 } 466 } 467 return allExists; 468 } 469 470 /** 471 * Check if given path exists 472 * 473 * @param sPath uri path 474 * @param actionConf action configuration 475 * @return true if path exists 476 * @throws IOException thrown if unable to access the path 477 */ 478 protected boolean pathExists(String sPath, Configuration actionConf, String user) throws IOException { 479 LOG.debug("checking for the file " + sPath); 480 try { 481 URI uri = new URI(sPath); 482 URIHandlerService service = Services.get().get(URIHandlerService.class); 483 URIHandler handler = service.getURIHandler(uri); 484 return handler.exists(uri, actionConf, user); 485 } 486 catch (URIHandlerException e) { 487 coordAction.setErrorCode(e.getErrorCode().toString()); 488 coordAction.setErrorMessage(e.getMessage()); 489 throw new IOException(e); 490 } catch (URISyntaxException e) { 491 coordAction.setErrorCode(ErrorCode.E0906.toString()); 492 coordAction.setErrorMessage(e.getMessage()); 493 throw new IOException(e); 494 } 495 } 496 497 /** 498 * The function create a list of URIs separated by "," using the instances time stamp and URI-template 499 * 500 * @param event : <data-in> event 501 * @param instances : List of time stamp seprated by "," 502 * @param unresolvedInstances : list of instance with latest/future function 503 * @return : list of URIs separated by ",". 504 * @throws Exception thrown if failed to create URIs from unresolvedInstances 505 */ 506 @SuppressWarnings("unused") 507 private String createURIs(Element event, String instances, StringBuilder unresolvedInstances) throws Exception { 508 if (instances == null || instances.length() == 0) { 509 return ""; 510 } 511 String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR); 512 StringBuilder uris = new StringBuilder(); 513 514 for (int i = 0; i < instanceList.length; i++) { 515 int funcType = CoordCommandUtils.getFuncType(instanceList[i]); 516 if (funcType == CoordCommandUtils.LATEST || funcType == CoordCommandUtils.FUTURE) { 517 if (unresolvedInstances.length() > 0) { 518 unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR); 519 } 520 unresolvedInstances.append(instanceList[i]); 521 continue; 522 } 523 ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]); 524 if (uris.length() > 0) { 525 uris.append(CoordELFunctions.INSTANCE_SEPARATOR); 526 } 527 uris.append(CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()).getChild( 528 "uri-template", event.getNamespace()).getTextTrim())); 529 } 530 return uris.toString(); 531 } 532 533 /** 534 * getting the error code of the coord action. (used mainly for unit testing) 535 */ 536 protected String getCoordActionErrorCode() { 537 if (coordAction != null) { 538 return coordAction.getErrorCode(); 539 } 540 return null; 541 } 542 543 /** 544 * getting the error message of the coord action. (used mainly for unit testing) 545 */ 546 protected String getCoordActionErrorMsg() { 547 if (coordAction != null) { 548 return coordAction.getErrorMessage(); 549 } 550 return null; 551 } 552 553 @Override 554 public String getEntityKey() { 555 return this.jobId; 556 } 557 558 @Override 559 protected boolean isLockRequired() { 560 return true; 561 } 562 563 @Override 564 protected void loadState() throws CommandException { 565 if (jpaService == null) { 566 jpaService = Services.get().get(JPAService.class); 567 } 568 try { 569 coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId)); 570 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId())); 571 } 572 catch (JPAExecutorException je) { 573 throw new CommandException(je); 574 } 575 LogUtils.setLogInfo(coordAction, logInfo); 576 } 577 578 @Override 579 protected void verifyPrecondition() throws CommandException, PreconditionException { 580 if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) { 581 throw new PreconditionException(ErrorCode.E1100, "[" + actionId 582 + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state=" 583 + coordAction.getStatus()); 584 } 585 586 // if eligible to do action input check when running with backward support is true 587 if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) { 588 return; 589 } 590 591 if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR && coordJob.getStatus() != Job.Status.PAUSED 592 && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) { 593 throw new PreconditionException( 594 ErrorCode.E1100, "["+ actionId + "]::CoordActionInputCheck:: Ignoring action." + 595 " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state=" 596 + coordJob.getStatus()); 597 } 598 } 599 600 @Override 601 public String getKey(){ 602 return getName() + "_" + actionId; 603 } 604 605 }