001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.coord; 020 021import java.io.IOException; 022import java.io.StringReader; 023import java.net.URI; 024import java.net.URISyntaxException; 025import java.text.ParseException; 026import java.util.ArrayList; 027import java.util.TimeZone; 028import java.util.Map; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Date; 032import java.util.Calendar; 033 034import org.apache.hadoop.conf.Configuration; 035import org.apache.oozie.CoordinatorActionBean; 036import org.apache.oozie.ErrorCode; 037import org.apache.oozie.client.CoordinatorAction; 038import org.apache.oozie.client.OozieClient; 039import org.apache.oozie.command.CommandException; 040import org.apache.oozie.coord.CoordELEvaluator; 041import org.apache.oozie.coord.CoordELFunctions; 042import org.apache.oozie.coord.CoordUtils; 043import org.apache.oozie.coord.CoordinatorJobException; 044import org.apache.oozie.coord.SyncCoordAction; 045import org.apache.oozie.coord.TimeUnit; 046import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorUtil; 047import org.apache.oozie.coord.input.dependency.CoordInputDependency; 048import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluator; 049import org.apache.oozie.coord.input.dependency.CoordInputDependencyFactory; 050import org.apache.oozie.coord.input.dependency.CoordInputInstance; 051import org.apache.oozie.dependency.ActionDependency; 052import org.apache.oozie.dependency.DependencyChecker; 053import org.apache.oozie.dependency.URIHandler; 054import org.apache.oozie.dependency.URIHandler.DependencyType; 055import org.apache.oozie.dependency.URIHandlerException; 056import org.apache.oozie.service.Services; 057import org.apache.oozie.service.URIHandlerService; 058import org.apache.oozie.service.UUIDService; 059import org.apache.oozie.util.DateUtils; 060import org.apache.oozie.util.ELEvaluator; 061import org.apache.oozie.util.ParamChecker; 062import org.apache.oozie.util.XConfiguration; 063import org.apache.oozie.util.XmlUtils; 064import org.jdom.Attribute; 065import org.jdom.Element; 066import org.jdom.JDOMException; 067import org.quartz.CronExpression; 068import org.apache.commons.lang.StringUtils; 069import org.apache.oozie.CoordinatorJobBean; 070 071public class CoordCommandUtils { 072 public static int CURRENT = 0; 073 public static int LATEST = 1; 074 public static int FUTURE = 2; 075 public static int OFFSET = 3; 076 public static int ABSOLUTE = 4; 077 public static int UNEXPECTED = -1; 078 079 public static final String RESOLVED_UNRESOLVED_SEPARATOR = "!!"; 080 public static final String UNRESOLVED_INSTANCES_TAG = "unresolved-instances"; 081 082 /** 083 * parse a function like coord:latest(n)/future() and return the 'n'. 084 * <p> 085 * 086 * @param function 087 * @param restArg 088 * @return int instanceNumber 089 * @throws Exception 090 */ 091 public static int getInstanceNumber(String function, StringBuilder restArg) throws Exception { 092 int funcType = getFuncType(function); 093 if (funcType == ABSOLUTE) { 094 return ABSOLUTE; 095 } 096 if (funcType == CURRENT || funcType == LATEST) { 097 return parseOneArg(function); 098 } 099 else { 100 return parseMoreArgs(function, restArg); 101 } 102 } 103 104 /** 105 * Evaluates function for coord-action-create-inst tag 106 * @param event 107 * @param appInst 108 * @param conf 109 * @param function 110 * @return evaluation result 111 * @throws Exception 112 */ 113 private static String evaluateInstanceFunction(Element event, SyncCoordAction appInst, Configuration conf, 114 String function) throws Exception { 115 ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator("coord-action-create-inst", event, appInst, conf); 116 return CoordELFunctions.evalAndWrap(eval, function); 117 } 118 119 public static int parseOneArg(String funcName) throws Exception { 120 int firstPos = funcName.indexOf("("); 121 int lastPos = funcName.lastIndexOf(")"); 122 if (firstPos >= 0 && lastPos > firstPos) { 123 String tmp = funcName.substring(firstPos + 1, lastPos).trim(); 124 if (tmp.length() > 0) { 125 return (int) Double.parseDouble(tmp); 126 } 127 } 128 throw new RuntimeException("Unformatted function :" + funcName); 129 } 130 131 public static String parseOneStringArg(String funcName) throws Exception { 132 int firstPos = funcName.indexOf("("); 133 int lastPos = funcName.lastIndexOf(")"); 134 if (firstPos >= 0 && lastPos > firstPos) { 135 return funcName.substring(firstPos + 1, lastPos).trim(); 136 } 137 throw new RuntimeException("Unformatted function :" + funcName); 138 } 139 140 private static int parseMoreArgs(String funcName, StringBuilder restArg) throws Exception { 141 int firstPos = funcName.indexOf("("); 142 int secondPos = funcName.lastIndexOf(","); 143 int lastPos = funcName.lastIndexOf(")"); 144 if (firstPos >= 0 && secondPos > firstPos) { 145 String tmp = funcName.substring(firstPos + 1, secondPos).trim(); 146 if (tmp.length() > 0) { 147 restArg.append(funcName.substring(secondPos + 1, lastPos).trim()); 148 return (int) Double.parseDouble(tmp); 149 } 150 } 151 throw new RuntimeException("Unformatted function :" + funcName); 152 } 153 154 /** 155 * @param function EL function name 156 * @return type of EL function 157 */ 158 public static int getFuncType(String function) { 159 if (function.indexOf("current") >= 0) { 160 return CURRENT; 161 } 162 else if (function.indexOf("latest") >= 0) { 163 return LATEST; 164 } 165 else if (function.indexOf("future") >= 0) { 166 return FUTURE; 167 } 168 else if (function.indexOf("offset") >= 0) { 169 return OFFSET; 170 } 171 else if (function.indexOf("absolute") >= 0) { 172 return ABSOLUTE; 173 } 174 return UNEXPECTED; 175 // throw new RuntimeException("Unexpected instance name "+ function); 176 } 177 178 /** 179 * @param startInst: EL function name 180 * @param endInst: EL function name 181 * @throws CommandException if both are not the same function 182 */ 183 public static void checkIfBothSameType(String startInst, String endInst) throws CommandException { 184 if (getFuncType(startInst) != getFuncType(endInst)) { 185 if (getFuncType(startInst) == ABSOLUTE) { 186 if (getFuncType(endInst) != CURRENT) { 187 throw new CommandException(ErrorCode.E1010, 188 "Only start-instance as absolute and end-instance as current is supported." + " start = " 189 + startInst + " end = " + endInst); 190 } 191 } 192 else { 193 throw new CommandException(ErrorCode.E1010, 194 " start-instance and end-instance both should be either latest or current or future or offset\n" 195 + " start " + startInst + " and end " + endInst); 196 } 197 } 198 } 199 200 201 /** 202 * Resolve list of <instance> </instance> tags. 203 * 204 * @param event 205 * @param instances 206 * @param actionInst 207 * @param conf 208 * @param eval: ELEvalautor 209 * @throws Exception 210 */ 211 public static void resolveInstances(Element event, StringBuilder instances, SyncCoordAction actionInst, 212 Configuration conf, ELEvaluator eval) throws Exception { 213 for (Element eInstance : (List<Element>) event.getChildren("instance", event.getNamespace())) { 214 215 if (instances.length() > 0) { 216 instances.append(CoordELFunctions.INSTANCE_SEPARATOR); 217 } 218 instances.append(materializeInstance(event, eInstance.getTextTrim(), actionInst, conf, eval)); 219 } 220 event.removeChildren("instance", event.getNamespace()); 221 } 222 223 /** 224 * Resolve <start-instance> <end-insatnce> tag. Don't resolve any 225 * latest()/future() 226 * 227 * @param event 228 * @param instances 229 * @param appInst 230 * @param conf 231 * @param eval: ELEvalautor 232 * @throws Exception 233 */ 234 public static void resolveInstanceRange(Element event, StringBuilder instances, SyncCoordAction appInst, 235 Configuration conf, ELEvaluator eval) throws Exception { 236 Element eStartInst = event.getChild("start-instance", event.getNamespace()); 237 Element eEndInst = event.getChild("end-instance", event.getNamespace()); 238 if (eStartInst != null && eEndInst != null) { 239 String strStart = evaluateInstanceFunction(event, appInst, conf, eStartInst.getTextTrim()); 240 String strEnd = evaluateInstanceFunction(event, appInst, conf, eEndInst.getTextTrim()); 241 checkIfBothSameType(strStart, strEnd); 242 StringBuilder restArg = new StringBuilder(); // To store rest 243 // arguments for 244 // future 245 // function 246 int startIndex = getInstanceNumber(strStart, restArg); 247 String startRestArg = restArg.toString(); 248 restArg.delete(0, restArg.length()); 249 int endIndex = getInstanceNumber(strEnd, restArg); 250 String endRestArg = restArg.toString(); 251 int funcType = getFuncType(strStart); 252 253 if (funcType == ABSOLUTE) { 254 StringBuffer bf = new StringBuffer(); 255 bf.append("${coord:absoluteRange(\"").append(parseOneStringArg(strStart)) 256 .append("\",").append(endIndex).append(")}"); 257 String matInstance = materializeInstance(event, bf.toString(), appInst, conf, eval); 258 if (matInstance != null && !matInstance.isEmpty()) { 259 if (instances.length() > 0) { 260 instances.append(CoordELFunctions.INSTANCE_SEPARATOR); 261 } 262 instances.append(matInstance); 263 } 264 } 265 else { 266 if (funcType == OFFSET) { 267 TimeUnit startU = TimeUnit.valueOf(startRestArg); 268 TimeUnit endU = TimeUnit.valueOf(endRestArg); 269 if (startU.getCalendarUnit() * startIndex > endU.getCalendarUnit() * endIndex) { 270 throw new CommandException(ErrorCode.E1010, 271 " start-instance should be equal or earlier than the end-instance \n" 272 + XmlUtils.prettyPrint(event)); 273 } 274 Calendar startCal = CoordELFunctions.resolveOffsetRawTime(startIndex, startU, eval); 275 Calendar endCal = CoordELFunctions.resolveOffsetRawTime(endIndex, endU, eval); 276 if (startCal != null && endCal != null) { 277 List<Integer> expandedFreqs = CoordELFunctions.expandOffsetTimes(startCal, endCal, eval); 278 for (int i = expandedFreqs.size() - 1; i >= 0; i--) { 279 //we need to use DS timeout, bcz expandOffsetTimes will expand offset in Freqs in DS timeunit 280 String matInstance = materializeInstance(event, "${coord:offset(" + expandedFreqs.get(i) 281 + ", \"" + CoordELFunctions.getDSTimeUnit(eval) + "\")}", appInst, conf, eval); 282 if (matInstance == null || matInstance.length() == 0) { 283 // Earlier than dataset's initial instance 284 break; 285 } 286 if (instances.length() > 0) { 287 instances.append(CoordELFunctions.INSTANCE_SEPARATOR); 288 } 289 instances.append(matInstance); 290 } 291 } 292 } 293 else { 294 if (startIndex > endIndex) { 295 throw new CommandException(ErrorCode.E1010, 296 " start-instance should be equal or earlier than the end-instance \n" 297 + XmlUtils.prettyPrint(event)); 298 } 299 if (funcType == CURRENT) { 300 // Everything could be resolved NOW. no latest() ELs 301 String matInstance = materializeInstance(event, "${coord:currentRange(" + startIndex + "," 302 + endIndex + ")}", appInst, conf, eval); 303 if (matInstance != null && !matInstance.isEmpty()) { 304 if (instances.length() > 0) { 305 instances.append(CoordELFunctions.INSTANCE_SEPARATOR); 306 } 307 instances.append(matInstance); 308 } 309 } 310 311 else { // latest(n)/future() EL is present 312 if (funcType == LATEST) { 313 instances.append("${coord:latestRange(").append(startIndex).append(",").append(endIndex) 314 .append(")}"); 315 } 316 else if (funcType == FUTURE) { 317 instances.append("${coord:futureRange(").append(startIndex).append(",").append(endIndex) 318 .append(",'").append(endRestArg).append("')}"); 319 } 320 } 321 } 322 } 323 // Remove start-instance and end-instances 324 event.removeChild("start-instance", event.getNamespace()); 325 event.removeChild("end-instance", event.getNamespace()); 326 } 327 } 328 329 /** 330 * Materialize one instance like current(-2) 331 * 332 * @param event : <data-in> 333 * @param expr : instance like current(-1) 334 * @param appInst : application specific info 335 * @param conf 336 * @param evalInst :ELEvaluator 337 * @return materialized date string 338 * @throws Exception 339 */ 340 public static String materializeInstance(Element event, String expr, SyncCoordAction appInst, Configuration conf, 341 ELEvaluator evalInst) throws Exception { 342 if (event == null) { 343 return null; 344 } 345 // ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, 346 // appInst, conf); 347 return CoordELFunctions.evalAndWrap(evalInst, expr); 348 } 349 350 /** 351 * Create two new tags with <uris> and <unresolved-instances>. 352 * 353 * @param event 354 * @param instances 355 * @throws Exception 356 */ 357 private static String separateResolvedAndUnresolved(Element event, StringBuilder instances) 358 throws Exception { 359 StringBuilder unresolvedInstances = new StringBuilder(); 360 StringBuilder urisWithDoneFlag = new StringBuilder(); 361 StringBuilder depList = new StringBuilder(); 362 String uris = createEarlyURIs(event, instances.toString(), unresolvedInstances, urisWithDoneFlag); 363 if (uris.length() > 0) { 364 Element uriInstance = new Element("uris", event.getNamespace()); 365 uriInstance.addContent(uris); 366 event.getContent().add(1, uriInstance); 367 if (depList.length() > 0) { 368 depList.append(CoordELFunctions.INSTANCE_SEPARATOR); 369 } 370 depList.append(urisWithDoneFlag); 371 } 372 if (unresolvedInstances.length() > 0) { 373 Element elemInstance = new Element(UNRESOLVED_INSTANCES_TAG, event.getNamespace()); 374 elemInstance.addContent(unresolvedInstances.toString()); 375 event.getContent().add(1, elemInstance); 376 } 377 return depList.toString(); 378 } 379 380 /** 381 * The function create a list of URIs separated by "," using the instances 382 * time stamp and URI-template 383 * 384 * @param event : <data-in> event 385 * @param instances : List of time stamp separated by "," 386 * @param unresolvedInstances : list of instance with latest function 387 * @param urisWithDoneFlag : list of URIs with the done flag appended 388 * @return : list of URIs separated by ";" as a string. 389 * @throws Exception 390 */ 391 public static String createEarlyURIs(Element event, String instances, StringBuilder unresolvedInstances, 392 StringBuilder urisWithDoneFlag) throws Exception { 393 if (instances == null || instances.length() == 0) { 394 return ""; 395 } 396 String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR); 397 StringBuilder uris = new StringBuilder(); 398 399 Element doneFlagElement = event.getChild("dataset", event.getNamespace()).getChild("done-flag", 400 event.getNamespace()); 401 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 402 403 for (int i = 0; i < instanceList.length; i++) { 404 if (instanceList[i].trim().length() == 0) { 405 continue; 406 } 407 int funcType = getFuncType(instanceList[i]); 408 if (funcType == LATEST || funcType == FUTURE) { 409 if (unresolvedInstances.length() > 0) { 410 unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR); 411 } 412 unresolvedInstances.append(instanceList[i]); 413 continue; 414 } 415 ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]); 416 if (uris.length() > 0) { 417 uris.append(CoordELFunctions.INSTANCE_SEPARATOR); 418 urisWithDoneFlag.append(CoordELFunctions.INSTANCE_SEPARATOR); 419 } 420 421 String uriPath = CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()) 422 .getChild("uri-template", event.getNamespace()).getTextTrim()); 423 URIHandler uriHandler = uriService.getURIHandler(uriPath); 424 uriHandler.validate(uriPath); 425 uris.append(uriPath); 426 urisWithDoneFlag.append(uriHandler.getURIWithDoneFlag(uriPath, CoordUtils.getDoneFlag(doneFlagElement))); 427 } 428 return uris.toString(); 429 } 430 431 /** 432 * @param eAction 433 * @param coordAction 434 * @param conf 435 * @return boolean to determine whether the SLA element is present or not 436 * @throws CoordinatorJobException 437 */ 438 public static boolean materializeSLA(Element eAction, CoordinatorActionBean coordAction, Configuration conf) 439 throws CoordinatorJobException { 440 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")); 441 if (eSla == null) { 442 // eAppXml.getNamespace("sla")); 443 return false; 444 } 445 try { 446 ELEvaluator evalSla = CoordELEvaluator.createSLAEvaluator(eAction, coordAction, conf); 447 List<Element> elemList = eSla.getChildren(); 448 for (Element elem : elemList) { 449 String updated; 450 try { 451 updated = CoordELFunctions.evalAndWrap(evalSla, elem.getText().trim()); 452 } 453 catch (Exception e) { 454 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 455 } 456 elem.removeContent(); 457 elem.addContent(updated); 458 } 459 } 460 catch (Exception e) { 461 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 462 } 463 return true; 464 } 465 466 /** 467 * Materialize one instance for specific nominal time. It includes: 1. 468 * Materialize data events (i.e. <data-in> and <data-out>) 2. Materialize 469 * data properties (i.e dataIn(<DS>) and dataOut(<DS>) 3. remove 'start' and 470 * 'end' tag 4. Add 'instance_number' and 'nominal-time' tag 471 * 472 * @param jobId coordinator job id 473 * @param dryrun true if it is dryrun 474 * @param eAction frequency unexploded-job 475 * @param nominalTime materialization time 476 * @param actualTime action actual time 477 * @param instanceCount instance numbers 478 * @param conf job configuration 479 * @param actionBean CoordinatorActionBean to materialize 480 * @return one materialized action for specific nominal time 481 * @throws Exception 482 */ 483 @SuppressWarnings("unchecked") 484 public static String materializeOneInstance(String jobId, boolean dryrun, Element eAction, Date nominalTime, 485 Date actualTime, int instanceCount, Configuration conf, CoordinatorActionBean actionBean) throws Exception { 486 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, instanceCount + ""); 487 SyncCoordAction appInst = new SyncCoordAction(); 488 appInst.setActionId(actionId); 489 appInst.setName(eAction.getAttributeValue("name")); 490 appInst.setNominalTime(nominalTime); 491 appInst.setActualTime(actualTime); 492 String frequency = eAction.getAttributeValue("frequency"); 493 appInst.setFrequency(frequency); 494 appInst.setTimeUnit(TimeUnit.valueOf(eAction.getAttributeValue("freq_timeunit"))); 495 appInst.setTimeZone(DateUtils.getTimeZone(eAction.getAttributeValue("timezone"))); 496 appInst.setEndOfDuration(TimeUnit.valueOf(eAction.getAttributeValue("end_of_duration"))); 497 498 boolean isInputLogicSpecified = CoordUtils.isInputLogicSpecified(eAction); 499 500 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 501 List<Element> dataInList = null; 502 if (inputList != null) { 503 dataInList = inputList.getChildren("data-in", eAction.getNamespace()); 504 materializeInputDataEvents(dataInList, appInst, conf, actionBean, isInputLogicSpecified); 505 } 506 507 if(isInputLogicSpecified){ 508 evaluateInputCheck(eAction.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, eAction.getNamespace()), 509 CoordELEvaluator.createDataEvaluator(eAction, conf, actionId)); 510 } 511 Element outputList = eAction.getChild("output-events", eAction.getNamespace()); 512 List<Element> dataOutList = null; 513 if (outputList != null) { 514 dataOutList = outputList.getChildren("data-out", eAction.getNamespace()); 515 materializeOutputDataEvents(dataOutList, appInst, conf); 516 } 517 518 eAction.removeAttribute("start"); 519 eAction.removeAttribute("end"); 520 eAction.setAttribute("instance-number", Integer.toString(instanceCount)); 521 eAction.setAttribute("action-nominal-time", DateUtils.formatDateOozieTZ(nominalTime)); 522 eAction.setAttribute("action-actual-time", DateUtils.formatDateOozieTZ(actualTime)); 523 524 // Setting up action bean 525 actionBean.setCreatedConf(XmlUtils.prettyPrint(conf).toString()); 526 actionBean.setRunConf(XmlUtils.prettyPrint(conf).toString()); 527 actionBean.setCreatedTime(actualTime); 528 actionBean.setJobId(jobId); 529 actionBean.setId(actionId); 530 actionBean.setLastModifiedTime(new Date()); 531 actionBean.setStatus(CoordinatorAction.Status.WAITING); 532 actionBean.setActionNumber(instanceCount); 533 actionBean.setNominalTime(nominalTime); 534 boolean isSla = CoordCommandUtils.materializeSLA(eAction, actionBean, conf); 535 if (isSla == true) { 536 actionBean.setSlaXml(XmlUtils.prettyPrint( 537 eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"))) 538 .toString()); 539 } 540 541 // actionBean.setTrackerUri(trackerUri);//TOOD: 542 // actionBean.setConsoleUrl(consoleUrl); //TODO: 543 // actionBean.setType(type);//TODO: 544 // actionBean.setErrorInfo(errorCode, errorMessage); //TODO: 545 // actionBean.setExternalStatus(externalStatus);//TODO 546 if (!dryrun) { 547 return XmlUtils.prettyPrint(eAction).toString(); 548 } 549 else { 550 return dryRunCoord(eAction, actionBean); 551 } 552 } 553 554 555 /** 556 * @param eAction the actionXml related element 557 * @param actionBean the coordinator action bean 558 * @return 559 * @throws Exception 560 */ 561 static String dryRunCoord(Element eAction, CoordinatorActionBean actionBean) throws Exception { 562 String action = XmlUtils.prettyPrint(eAction).toString(); 563 StringBuilder actionXml = new StringBuilder(action); 564 Configuration actionConf = new XConfiguration(new StringReader(actionBean.getRunConf())); 565 actionBean.setActionXml(action); 566 567 if (CoordUtils.isInputLogicSpecified(eAction)) { 568 new CoordInputLogicEvaluatorUtil(actionBean).validateInputLogic(); 569 } 570 571 boolean isPushDepAvailable = true; 572 String pushMissingDependencies = actionBean.getPushInputDependencies().getMissingDependencies(); 573 if (pushMissingDependencies != null) { 574 ActionDependency actionDependencies = DependencyChecker.checkForAvailability(pushMissingDependencies, 575 actionConf, true); 576 if (actionDependencies.getMissingDependencies().size() != 0) { 577 isPushDepAvailable = false; 578 } 579 580 } 581 boolean isPullDepAvailable = true; 582 CoordActionInputCheckXCommand coordActionInput = new CoordActionInputCheckXCommand(actionBean.getId(), 583 actionBean.getJobId()); 584 if (actionBean.getMissingDependencies() != null) { 585 StringBuilder existList = new StringBuilder(); 586 StringBuilder nonExistList = new StringBuilder(); 587 StringBuilder nonResolvedList = new StringBuilder(); 588 getResolvedList(actionBean.getPullInputDependencies().getMissingDependencies(), nonExistList, nonResolvedList); 589 isPullDepAvailable = actionBean.getPullInputDependencies().checkPullMissingDependencies(actionBean, 590 existList, nonExistList); 591 592 } 593 594 if (isPullDepAvailable && isPushDepAvailable) { 595 // Check for latest/future 596 boolean isLatestFutureDepAvailable = coordActionInput.checkUnResolvedInput(actionBean, actionXml, 597 actionConf); 598 if (isLatestFutureDepAvailable) { 599 String newActionXml = CoordActionInputCheckXCommand.resolveCoordConfiguration(actionXml, actionConf, 600 actionBean.getId()); 601 actionXml.replace(0, actionXml.length(), newActionXml); 602 } 603 } 604 605 return actionXml.toString(); 606 } 607 608 /** 609 * Materialize all <input-events>/<data-in> or <output-events>/<data-out> 610 * tags Create uris for resolved instances. Create unresolved instance for 611 * latest()/future(). 612 * 613 * @param events 614 * @param appInst 615 * @param conf 616 * @throws Exception 617 */ 618 private static void materializeOutputDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf) 619 throws Exception { 620 621 if (events == null) { 622 return; 623 } 624 625 for (Element event : events) { 626 StringBuilder instances = new StringBuilder(); 627 ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, appInst, conf); 628 // Handle list of instance tag 629 resolveInstances(event, instances, appInst, conf, eval); 630 // Handle start-instance and end-instance 631 resolveInstanceRange(event, instances, appInst, conf, eval); 632 // Separate out the unresolved instances 633 separateResolvedAndUnresolved(event, instances); 634 635 } 636 } 637 638 private static void evaluateInputCheck(Element root, ELEvaluator evalInputLogic) throws Exception { 639 for (Object event : root.getChildren()) { 640 Element inputElement = (Element) event; 641 642 resolveAttribute("dataset", inputElement, evalInputLogic); 643 resolveAttribute("name", inputElement, evalInputLogic); 644 resolveAttribute("min", inputElement, evalInputLogic); 645 resolveAttribute("wait", inputElement, evalInputLogic); 646 if (!inputElement.getChildren().isEmpty()) { 647 evaluateInputCheck(inputElement, evalInputLogic); 648 } 649 } 650 } 651 652 private static String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException { 653 Attribute attr = elem.getAttribute(attrName); 654 String val = null; 655 if (attr != null) { 656 try { 657 val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim()); 658 } 659 catch (Exception e) { 660 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 661 } 662 attr.setValue(val); 663 } 664 return val; 665 } 666 667 public static void materializeInputDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf, 668 CoordinatorActionBean actionBean, boolean isInputLogicSpecified) throws Exception { 669 670 if (events == null) { 671 return; 672 } 673 CoordInputDependency coordPullInputDependency = CoordInputDependencyFactory 674 .createPullInputDependencies(isInputLogicSpecified); 675 CoordInputDependency coordPushInputDependency = CoordInputDependencyFactory 676 .createPushInputDependencies(isInputLogicSpecified); 677 Map<String, String> unresolvedList = new HashMap<String, String>(); 678 679 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 680 681 for (Element event : events) { 682 StringBuilder instances = new StringBuilder(); 683 ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, appInst, conf); 684 // Handle list of instance tag 685 resolveInstances(event, instances, appInst, conf, eval); 686 // Handle start-instance and end-instance 687 resolveInstanceRange(event, instances, appInst, conf, eval); 688 // Separate out the unresolved instances 689 String resolvedList = separateResolvedAndUnresolved(event, instances); 690 String name = event.getAttribute("name").getValue(); 691 692 if (!resolvedList.isEmpty()) { 693 Element uri = event.getChild("dataset", event.getNamespace()).getChild("uri-template", 694 event.getNamespace()); 695 696 String uriTemplate = uri.getText(); 697 URI baseURI = uriService.getAuthorityWithScheme(uriTemplate); 698 URIHandler handler = uriService.getURIHandler(baseURI); 699 List<CoordInputInstance> inputInstanceList = new ArrayList<CoordInputInstance>(); 700 701 for (String inputInstance : resolvedList.split("#")) { 702 inputInstanceList.add(new CoordInputInstance(inputInstance, false)); 703 } 704 705 if (handler.getDependencyType(baseURI).equals(DependencyType.PULL)) { 706 coordPullInputDependency.addInputInstanceList(name, inputInstanceList); 707 } 708 else { 709 coordPushInputDependency.addInputInstanceList(name, inputInstanceList); 710 711 } 712 } 713 714 String tmpUnresolved = event.getChildTextTrim(UNRESOLVED_INSTANCES_TAG, event.getNamespace()); 715 if (tmpUnresolved != null) { 716 unresolvedList.put(name, tmpUnresolved); 717 } 718 } 719 for(String unresolvedDatasetName:unresolvedList.keySet()){ 720 coordPullInputDependency.addUnResolvedList(unresolvedDatasetName, unresolvedList.get(unresolvedDatasetName)); 721 } 722 actionBean.setPullInputDependencies(coordPullInputDependency); 723 actionBean.setPushInputDependencies(coordPushInputDependency); 724 actionBean.setMissingDependencies(coordPullInputDependency.serialize()); 725 actionBean.setPushMissingDependencies(coordPushInputDependency.serialize()); 726 727 } 728 /** 729 * Get resolved string from missDepList 730 * 731 * @param missDepList 732 * @param resolved 733 * @param unresolved 734 * @return resolved string 735 */ 736 public static String getResolvedList(String missDepList, StringBuilder resolved, StringBuilder unresolved) { 737 if (missDepList != null) { 738 int index = missDepList.indexOf(RESOLVED_UNRESOLVED_SEPARATOR); 739 if (index < 0) { 740 resolved.append(missDepList); 741 } 742 else { 743 resolved.append(missDepList.substring(0, index)); 744 unresolved.append(missDepList.substring(index + RESOLVED_UNRESOLVED_SEPARATOR.length())); 745 } 746 } 747 return resolved.toString(); 748 } 749 750 /** 751 * Get the next action time after a given time 752 * 753 * @param targetDate 754 * @param coordJob 755 * @return the next valid action time 756 */ 757 public static Date getNextValidActionTimeForCronFrequency(Date targetDate, CoordinatorJobBean coordJob) throws ParseException { 758 759 String freq = coordJob.getFrequency(); 760 TimeZone tz = DateUtils.getOozieProcessingTimeZone(); 761 String[] cronArray = freq.split(" "); 762 Date nextTime = null; 763 764 // Current CronExpression doesn't support operations 765 // where both date of months and day of weeks are specified. 766 // As a result, we need to split this scenario into two cases 767 // and return the earlier time 768 if (!cronArray[2].trim().equals("?") && !cronArray[4].trim().equals("?")) { 769 770 // When any one of day of month or day of week fields is a wildcard 771 // we need to replace the wildcard with "?" 772 if (cronArray[2].trim().equals("*") || cronArray[4].trim().equals("*")) { 773 if (cronArray[2].trim().equals("*")) { 774 cronArray[2] = "?"; 775 } 776 else { 777 cronArray[4] = "?"; 778 } 779 freq= StringUtils.join(cronArray, " "); 780 781 // The cronExpression class takes second 782 // as the first field where oozie is operating on 783 // minute basis 784 CronExpression expr = new CronExpression("0 " + freq); 785 expr.setTimeZone(tz); 786 nextTime = expr.getNextValidTimeAfter(targetDate); 787 } 788 // If both fields are specified by non-wildcards, 789 // we need to split it into two expressions 790 else { 791 String[] cronArray1 = freq.split(" "); 792 String[] cronArray2 = freq.split(" "); 793 794 cronArray1[2] = "?"; 795 cronArray2[4] = "?"; 796 797 String freq1 = StringUtils.join(cronArray1, " "); 798 String freq2 = StringUtils.join(cronArray2, " "); 799 800 // The cronExpression class takes second 801 // as the first field where oozie is operating on 802 // minute basis 803 CronExpression expr1 = new CronExpression("0 " + freq1); 804 expr1.setTimeZone(tz); 805 CronExpression expr2 = new CronExpression("0 " + freq2); 806 expr2.setTimeZone(tz); 807 nextTime = expr1.getNextValidTimeAfter(targetDate); 808 Date nextTime2 = expr2.getNextValidTimeAfter(targetDate); 809 nextTime = nextTime.compareTo(nextTime2) < 0 ? nextTime: nextTime2; 810 } 811 } 812 else { 813 // The cronExpression class takes second 814 // as the first field where oozie is operating on 815 // minute basis 816 CronExpression expr = new CronExpression("0 " + freq); 817 expr.setTimeZone(tz); 818 nextTime = expr.getNextValidTimeAfter(targetDate); 819 } 820 821 return nextTime; 822 } 823 824 /** 825 * Computes the nominal time of the next action. 826 * Based on CoordMaterializeTransitionXCommand#materializeActions 827 * 828 * The Coordinator Job needs to have the frequency, time unit, time zone, start time, end time, and job xml. 829 * The Coordinator Action needs to have the nominal time and action number. 830 * 831 * @param coordJob The Coordinator Job 832 * @param coordAction The Coordinator Action 833 * @return the nominal time of the next action 834 * @throws ParseException 835 * @throws JDOMException 836 */ 837 public static Date computeNextNominalTime(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction) 838 throws ParseException, JDOMException { 839 Date nextNominalTime; 840 boolean isCronFrequency = false; 841 int freq = -1; 842 try { 843 freq = Integer.parseInt(coordJob.getFrequency()); 844 } catch (NumberFormatException e) { 845 isCronFrequency = true; 846 } 847 848 if (isCronFrequency) { 849 nextNominalTime = CoordCommandUtils.getNextValidActionTimeForCronFrequency(coordAction.getNominalTime(), coordJob); 850 } else { 851 TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone()); 852 Calendar nextNominalTimeCal = Calendar.getInstance(appTz); 853 nextNominalTimeCal.setTime(coordJob.getStartTimestamp()); 854 TimeUnit freqTU = TimeUnit.valueOf(coordJob.getTimeUnitStr()); 855 // Action Number is indexed by 1, so no need to +1 here 856 nextNominalTimeCal.add(freqTU.getCalendarUnit(), coordAction.getActionNumber() * freq); 857 String jobXml = coordJob.getJobXml(); 858 Element eJob = XmlUtils.parseXml(jobXml); 859 TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration")); 860 // Move to the End of duration, if needed. 861 DateUtils.moveToEnd(nextNominalTimeCal, endOfFlag); 862 nextNominalTime = nextNominalTimeCal.getTime(); 863 } 864 865 // If the next nominal time is after the job's end time, then this is the last action, so return null 866 if (nextNominalTime.after(coordJob.getEndTime())) { 867 nextNominalTime = null; 868 } 869 return nextNominalTime; 870 } 871 872 public static boolean pathExists(String sPath, Configuration actionConf, String user) throws IOException, 873 URISyntaxException, URIHandlerException { 874 URI uri = new URI(sPath); 875 URIHandlerService service = Services.get().get(URIHandlerService.class); 876 URIHandler handler = service.getURIHandler(uri); 877 return handler.exists(uri, actionConf, user); 878 } 879 880 public static boolean pathExists(String sPath, Configuration actionConf) throws IOException, URISyntaxException, 881 URIHandlerException { 882 String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 883 return pathExists(sPath, actionConf, user); 884 } 885 886}