001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.coord; 019 020import java.io.StringReader; 021import java.net.URI; 022import java.text.ParseException; 023import java.util.TimeZone; 024import java.util.Map; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Date; 028import java.util.Calendar; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.oozie.CoordinatorActionBean; 032import org.apache.oozie.ErrorCode; 033import org.apache.oozie.client.CoordinatorAction; 034import org.apache.oozie.command.CommandException; 035import org.apache.oozie.coord.CoordELEvaluator; 036import org.apache.oozie.coord.CoordELFunctions; 037import org.apache.oozie.coord.CoordUtils; 038import org.apache.oozie.coord.CoordinatorJobException; 039import org.apache.oozie.coord.SyncCoordAction; 040import org.apache.oozie.coord.TimeUnit; 041import org.apache.oozie.dependency.ActionDependency; 042import org.apache.oozie.dependency.DependencyChecker; 043import org.apache.oozie.dependency.URIHandler; 044import org.apache.oozie.dependency.URIHandler.DependencyType; 045import org.apache.oozie.service.Services; 046import org.apache.oozie.service.URIHandlerService; 047import org.apache.oozie.service.UUIDService; 048import org.apache.oozie.util.DateUtils; 049import org.apache.oozie.util.ELEvaluator; 050import org.apache.oozie.util.XConfiguration; 051import org.apache.oozie.util.XmlUtils; 052import org.jdom.Element; 053import org.quartz.CronExpression; 054import org.apache.commons.lang.StringUtils; 055import org.apache.oozie.CoordinatorJobBean; 056 057public class CoordCommandUtils { 058 public static int CURRENT = 0; 059 public static int LATEST = 1; 060 public static int FUTURE = 2; 061 public static int OFFSET = 3; 062 public static int ABSOLUTE = 4; 063 public static int UNEXPECTED = -1; 064 public static final String RESOLVED_UNRESOLVED_SEPARATOR = "!!"; 065 public static final String UNRESOLVED_INST_TAG = "unresolved-instances"; 066 067 /** 068 * parse a function like coord:latest(n)/future() and return the 'n'. 069 * <p/> 070 * 071 * @param function 072 * @param restArg 073 * @return int instanceNumber 074 * @throws Exception 075 */ 076 public static int getInstanceNumber(String function, StringBuilder restArg) throws Exception { 077 int funcType = getFuncType(function); 078 if (funcType == ABSOLUTE) { 079 return ABSOLUTE; 080 } 081 if (funcType == CURRENT || funcType == LATEST) { 082 return parseOneArg(function); 083 } 084 else { 085 return parseMoreArgs(function, restArg); 086 } 087 } 088 089 /** 090 * Evaluates function for coord-action-create-inst tag 091 * @param event 092 * @param appInst 093 * @param conf 094 * @param function 095 * @return evaluation result 096 * @throws Exception 097 */ 098 private static String evaluateInstanceFunction(Element event, SyncCoordAction appInst, Configuration conf, 099 String function) throws Exception { 100 ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator("coord-action-create-inst", event, appInst, conf); 101 return CoordELFunctions.evalAndWrap(eval, function); 102 } 103 104 public static int parseOneArg(String funcName) throws Exception { 105 int firstPos = funcName.indexOf("("); 106 int lastPos = funcName.lastIndexOf(")"); 107 if (firstPos >= 0 && lastPos > firstPos) { 108 String tmp = funcName.substring(firstPos + 1, lastPos).trim(); 109 if (tmp.length() > 0) { 110 return (int) Double.parseDouble(tmp); 111 } 112 } 113 throw new RuntimeException("Unformatted function :" + funcName); 114 } 115 116 public static String parseOneStringArg(String funcName) throws Exception { 117 int firstPos = funcName.indexOf("("); 118 int lastPos = funcName.lastIndexOf(")"); 119 if (firstPos >= 0 && lastPos > firstPos) { 120 return funcName.substring(firstPos + 1, lastPos).trim(); 121 } 122 throw new RuntimeException("Unformatted function :" + funcName); 123 } 124 125 private static int parseMoreArgs(String funcName, StringBuilder restArg) throws Exception { 126 int firstPos = funcName.indexOf("("); 127 int secondPos = funcName.lastIndexOf(","); 128 int lastPos = funcName.lastIndexOf(")"); 129 if (firstPos >= 0 && secondPos > firstPos) { 130 String tmp = funcName.substring(firstPos + 1, secondPos).trim(); 131 if (tmp.length() > 0) { 132 restArg.append(funcName.substring(secondPos + 1, lastPos).trim()); 133 return (int) Double.parseDouble(tmp); 134 } 135 } 136 throw new RuntimeException("Unformatted function :" + funcName); 137 } 138 139 /** 140 * @param EL function name 141 * @return type of EL function 142 */ 143 public static int getFuncType(String function) { 144 if (function.indexOf("current") >= 0) { 145 return CURRENT; 146 } 147 else if (function.indexOf("latest") >= 0) { 148 return LATEST; 149 } 150 else if (function.indexOf("future") >= 0) { 151 return FUTURE; 152 } 153 else if (function.indexOf("offset") >= 0) { 154 return OFFSET; 155 } 156 else if (function.indexOf("absolute") >= 0) { 157 return ABSOLUTE; 158 } 159 return UNEXPECTED; 160 // throw new RuntimeException("Unexpected instance name "+ function); 161 } 162 163 /** 164 * @param startInst: EL function name 165 * @param endInst: EL function name 166 * @throws CommandException if both are not the same function 167 */ 168 public static void checkIfBothSameType(String startInst, String endInst) throws CommandException { 169 if (getFuncType(startInst) != getFuncType(endInst)) { 170 if (getFuncType(startInst) == ABSOLUTE) { 171 if (getFuncType(endInst) != CURRENT) { 172 throw new CommandException(ErrorCode.E1010, 173 "Only start-instance as absolute and end-instance as current is supported." + " start = " 174 + startInst + " end = " + endInst); 175 } 176 } 177 else { 178 throw new CommandException(ErrorCode.E1010, 179 " start-instance and end-instance both should be either latest or current or future or offset\n" 180 + " start " + startInst + " and end " + endInst); 181 } 182 } 183 } 184 185 186 /** 187 * Resolve list of <instance> </instance> tags. 188 * 189 * @param event 190 * @param instances 191 * @param actionInst 192 * @param conf 193 * @param eval: ELEvalautor 194 * @throws Exception 195 */ 196 public static void resolveInstances(Element event, StringBuilder instances, SyncCoordAction actionInst, 197 Configuration conf, ELEvaluator eval) throws Exception { 198 for (Element eInstance : (List<Element>) event.getChildren("instance", event.getNamespace())) { 199 200 if (instances.length() > 0) { 201 instances.append(CoordELFunctions.INSTANCE_SEPARATOR); 202 } 203 instances.append(materializeInstance(event, eInstance.getTextTrim(), actionInst, conf, eval)); 204 } 205 event.removeChildren("instance", event.getNamespace()); 206 } 207 208 /** 209 * Resolve <start-instance> <end-insatnce> tag. Don't resolve any 210 * latest()/future() 211 * 212 * @param event 213 * @param instances 214 * @param appInst 215 * @param conf 216 * @param eval: ELEvalautor 217 * @throws Exception 218 */ 219 public static void resolveInstanceRange(Element event, StringBuilder instances, SyncCoordAction appInst, 220 Configuration conf, ELEvaluator eval) throws Exception { 221 Element eStartInst = event.getChild("start-instance", event.getNamespace()); 222 Element eEndInst = event.getChild("end-instance", event.getNamespace()); 223 if (eStartInst != null && eEndInst != null) { 224 String strStart = evaluateInstanceFunction(event, appInst, conf, eStartInst.getTextTrim()); 225 String strEnd = evaluateInstanceFunction(event, appInst, conf, eEndInst.getTextTrim()); 226 checkIfBothSameType(strStart, strEnd); 227 StringBuilder restArg = new StringBuilder(); // To store rest 228 // arguments for 229 // future 230 // function 231 int startIndex = getInstanceNumber(strStart, restArg); 232 String startRestArg = restArg.toString(); 233 restArg.delete(0, restArg.length()); 234 int endIndex = getInstanceNumber(strEnd, restArg); 235 String endRestArg = restArg.toString(); 236 int funcType = getFuncType(strStart); 237 238 if (funcType == ABSOLUTE) { 239 StringBuffer bf = new StringBuffer(); 240 bf.append("${coord:absoluteRange(\"").append(parseOneStringArg(strStart)) 241 .append("\",").append(endIndex).append(")}"); 242 String matInstance = materializeInstance(event, bf.toString(), appInst, conf, eval); 243 if (matInstance != null && !matInstance.isEmpty()) { 244 if (instances.length() > 0) { 245 instances.append(CoordELFunctions.INSTANCE_SEPARATOR); 246 } 247 instances.append(matInstance); 248 } 249 } 250 else { 251 if (funcType == OFFSET) { 252 TimeUnit startU = TimeUnit.valueOf(startRestArg); 253 TimeUnit endU = TimeUnit.valueOf(endRestArg); 254 if (startU.getCalendarUnit() * startIndex > endU.getCalendarUnit() * endIndex) { 255 throw new CommandException(ErrorCode.E1010, 256 " start-instance should be equal or earlier than the end-instance \n" 257 + XmlUtils.prettyPrint(event)); 258 } 259 Calendar startCal = CoordELFunctions.resolveOffsetRawTime(startIndex, startU, eval); 260 Calendar endCal = CoordELFunctions.resolveOffsetRawTime(endIndex, endU, eval); 261 if (startCal != null && endCal != null) { 262 List<Integer> expandedFreqs = CoordELFunctions.expandOffsetTimes(startCal, endCal, eval); 263 for (int i = expandedFreqs.size() - 1; i >= 0; i--) { 264 String matInstance = materializeInstance(event, "${coord:offset(" + expandedFreqs.get(i) 265 + ", \"MINUTE\")}", appInst, conf, eval); 266 if (matInstance == null || matInstance.length() == 0) { 267 // Earlier than dataset's initial instance 268 break; 269 } 270 if (instances.length() > 0) { 271 instances.append(CoordELFunctions.INSTANCE_SEPARATOR); 272 } 273 instances.append(matInstance); 274 } 275 } 276 } 277 else { 278 if (startIndex > endIndex) { 279 throw new CommandException(ErrorCode.E1010, 280 " start-instance should be equal or earlier than the end-instance \n" 281 + XmlUtils.prettyPrint(event)); 282 } 283 if (funcType == CURRENT) { 284 // Everything could be resolved NOW. no latest() ELs 285 String matInstance = materializeInstance(event, "${coord:currentRange(" + startIndex + "," 286 + endIndex + ")}", appInst, conf, eval); 287 if (matInstance != null && !matInstance.isEmpty()) { 288 if (instances.length() > 0) { 289 instances.append(CoordELFunctions.INSTANCE_SEPARATOR); 290 } 291 instances.append(matInstance); 292 } 293 } 294 295 else { // latest(n)/future() EL is present 296 if (funcType == LATEST) { 297 instances.append("${coord:latestRange(").append(startIndex).append(",").append(endIndex) 298 .append(")}"); 299 } 300 else if (funcType == FUTURE) { 301 instances.append("${coord:futureRange(").append(startIndex).append(",").append(endIndex) 302 .append(",'").append(endRestArg).append("')}"); 303 } 304 } 305 } 306 } 307 // Remove start-instance and end-instances 308 event.removeChild("start-instance", event.getNamespace()); 309 event.removeChild("end-instance", event.getNamespace()); 310 } 311 } 312 313 /** 314 * Materialize one instance like current(-2) 315 * 316 * @param event : <data-in> 317 * @param expr : instance like current(-1) 318 * @param appInst : application specific info 319 * @param conf 320 * @param evalInst :ELEvaluator 321 * @return materialized date string 322 * @throws Exception 323 */ 324 public static String materializeInstance(Element event, String expr, SyncCoordAction appInst, Configuration conf, 325 ELEvaluator evalInst) throws Exception { 326 if (event == null) { 327 return null; 328 } 329 // ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, 330 // appInst, conf); 331 return CoordELFunctions.evalAndWrap(evalInst, expr); 332 } 333 334 /** 335 * Create two new tags with <uris> and <unresolved-instances>. 336 * 337 * @param event 338 * @param instances 339 * @throws Exception 340 */ 341 private static String separateResolvedAndUnresolved(Element event, StringBuilder instances) 342 throws Exception { 343 StringBuilder unresolvedInstances = new StringBuilder(); 344 StringBuilder urisWithDoneFlag = new StringBuilder(); 345 StringBuilder depList = new StringBuilder(); 346 String uris = createEarlyURIs(event, instances.toString(), unresolvedInstances, urisWithDoneFlag); 347 if (uris.length() > 0) { 348 Element uriInstance = new Element("uris", event.getNamespace()); 349 uriInstance.addContent(uris); 350 event.getContent().add(1, uriInstance); 351 if (depList.length() > 0) { 352 depList.append(CoordELFunctions.INSTANCE_SEPARATOR); 353 } 354 depList.append(urisWithDoneFlag); 355 } 356 if (unresolvedInstances.length() > 0) { 357 Element elemInstance = new Element(UNRESOLVED_INST_TAG, event.getNamespace()); 358 elemInstance.addContent(unresolvedInstances.toString()); 359 event.getContent().add(1, elemInstance); 360 } 361 return depList.toString(); 362 } 363 364 /** 365 * The function create a list of URIs separated by "," using the instances 366 * time stamp and URI-template 367 * 368 * @param event : <data-in> event 369 * @param instances : List of time stamp separated by "," 370 * @param unresolvedInstances : list of instance with latest function 371 * @param urisWithDoneFlag : list of URIs with the done flag appended 372 * @return : list of URIs separated by ";" as a string. 373 * @throws Exception 374 */ 375 public static String createEarlyURIs(Element event, String instances, StringBuilder unresolvedInstances, 376 StringBuilder urisWithDoneFlag) throws Exception { 377 if (instances == null || instances.length() == 0) { 378 return ""; 379 } 380 String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR); 381 StringBuilder uris = new StringBuilder(); 382 383 Element doneFlagElement = event.getChild("dataset", event.getNamespace()).getChild("done-flag", 384 event.getNamespace()); 385 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 386 387 for (int i = 0; i < instanceList.length; i++) { 388 if (instanceList[i].trim().length() == 0) { 389 continue; 390 } 391 int funcType = getFuncType(instanceList[i]); 392 if (funcType == LATEST || funcType == FUTURE) { 393 if (unresolvedInstances.length() > 0) { 394 unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR); 395 } 396 unresolvedInstances.append(instanceList[i]); 397 continue; 398 } 399 ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]); 400 if (uris.length() > 0) { 401 uris.append(CoordELFunctions.INSTANCE_SEPARATOR); 402 urisWithDoneFlag.append(CoordELFunctions.INSTANCE_SEPARATOR); 403 } 404 405 String uriPath = CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()) 406 .getChild("uri-template", event.getNamespace()).getTextTrim()); 407 URIHandler uriHandler = uriService.getURIHandler(uriPath); 408 uriHandler.validate(uriPath); 409 uris.append(uriPath); 410 urisWithDoneFlag.append(uriHandler.getURIWithDoneFlag(uriPath, CoordUtils.getDoneFlag(doneFlagElement))); 411 } 412 return uris.toString(); 413 } 414 415 /** 416 * @param eAction 417 * @param coordAction 418 * @param conf 419 * @return boolean to determine whether the SLA element is present or not 420 * @throws CoordinatorJobException 421 */ 422 public static boolean materializeSLA(Element eAction, CoordinatorActionBean coordAction, Configuration conf) 423 throws CoordinatorJobException { 424 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")); 425 if (eSla == null) { 426 // eAppXml.getNamespace("sla")); 427 return false; 428 } 429 try { 430 ELEvaluator evalSla = CoordELEvaluator.createSLAEvaluator(eAction, coordAction, conf); 431 List<Element> elemList = eSla.getChildren(); 432 for (Element elem : elemList) { 433 String updated; 434 try { 435 updated = CoordELFunctions.evalAndWrap(evalSla, elem.getText().trim()); 436 } 437 catch (Exception e) { 438 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 439 } 440 elem.removeContent(); 441 elem.addContent(updated); 442 } 443 } 444 catch (Exception e) { 445 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 446 } 447 return true; 448 } 449 450 /** 451 * Materialize one instance for specific nominal time. It includes: 1. 452 * Materialize data events (i.e. <data-in> and <data-out>) 2. Materialize 453 * data properties (i.e dataIn(<DS>) and dataOut(<DS>) 3. remove 'start' and 454 * 'end' tag 4. Add 'instance_number' and 'nominal-time' tag 455 * 456 * @param jobId coordinator job id 457 * @param dryrun true if it is dryrun 458 * @param eAction frequency unexploded-job 459 * @param nominalTime materialization time 460 * @param actualTime action actual time 461 * @param instanceCount instance numbers 462 * @param conf job configuration 463 * @param actionBean CoordinatorActionBean to materialize 464 * @return one materialized action for specific nominal time 465 * @throws Exception 466 */ 467 @SuppressWarnings("unchecked") 468 public static String materializeOneInstance(String jobId, boolean dryrun, Element eAction, Date nominalTime, 469 Date actualTime, int instanceCount, Configuration conf, CoordinatorActionBean actionBean) throws Exception { 470 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, instanceCount + ""); 471 SyncCoordAction appInst = new SyncCoordAction(); 472 appInst.setActionId(actionId); 473 appInst.setName(eAction.getAttributeValue("name")); 474 appInst.setNominalTime(nominalTime); 475 appInst.setActualTime(actualTime); 476 String frequency = eAction.getAttributeValue("frequency"); 477 appInst.setFrequency(frequency); 478 appInst.setTimeUnit(TimeUnit.valueOf(eAction.getAttributeValue("freq_timeunit"))); 479 appInst.setTimeZone(DateUtils.getTimeZone(eAction.getAttributeValue("timezone"))); 480 appInst.setEndOfDuration(TimeUnit.valueOf(eAction.getAttributeValue("end_of_duration"))); 481 482 Map<String, StringBuilder> dependencyMap = null; 483 484 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 485 List<Element> dataInList = null; 486 if (inputList != null) { 487 dataInList = inputList.getChildren("data-in", eAction.getNamespace()); 488 dependencyMap = materializeDataEvents(dataInList, appInst, conf); 489 } 490 491 Element outputList = eAction.getChild("output-events", eAction.getNamespace()); 492 List<Element> dataOutList = null; 493 if (outputList != null) { 494 dataOutList = outputList.getChildren("data-out", eAction.getNamespace()); 495 materializeDataEvents(dataOutList, appInst, conf); 496 } 497 498 eAction.removeAttribute("start"); 499 eAction.removeAttribute("end"); 500 eAction.setAttribute("instance-number", Integer.toString(instanceCount)); 501 eAction.setAttribute("action-nominal-time", DateUtils.formatDateOozieTZ(nominalTime)); 502 eAction.setAttribute("action-actual-time", DateUtils.formatDateOozieTZ(actualTime)); 503 504 // Setting up action bean 505 actionBean.setCreatedConf(XmlUtils.prettyPrint(conf).toString()); 506 actionBean.setRunConf(XmlUtils.prettyPrint(conf).toString()); 507 actionBean.setCreatedTime(actualTime); 508 actionBean.setJobId(jobId); 509 actionBean.setId(actionId); 510 actionBean.setLastModifiedTime(new Date()); 511 actionBean.setStatus(CoordinatorAction.Status.WAITING); 512 actionBean.setActionNumber(instanceCount); 513 if (dependencyMap != null) { 514 StringBuilder sbPull = dependencyMap.get(DependencyType.PULL.name()); 515 if (sbPull != null) { 516 actionBean.setMissingDependencies(sbPull.toString()); 517 } 518 StringBuilder sbPush = dependencyMap.get(DependencyType.PUSH.name()); 519 if (sbPush != null) { 520 actionBean.setPushMissingDependencies(sbPush.toString()); 521 } 522 } 523 actionBean.setNominalTime(nominalTime); 524 boolean isSla = CoordCommandUtils.materializeSLA(eAction, actionBean, conf); 525 if (isSla == true) { 526 actionBean.setSlaXml(XmlUtils.prettyPrint( 527 eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"))) 528 .toString()); 529 } 530 531 // actionBean.setTrackerUri(trackerUri);//TOOD: 532 // actionBean.setConsoleUrl(consoleUrl); //TODO: 533 // actionBean.setType(type);//TODO: 534 // actionBean.setErrorInfo(errorCode, errorMessage); //TODO: 535 // actionBean.setExternalStatus(externalStatus);//TODO 536 if (!dryrun) { 537 return XmlUtils.prettyPrint(eAction).toString(); 538 } 539 else { 540 return dryRunCoord(eAction, actionBean); 541 } 542 } 543 544 /** 545 * @param eAction the actionXml related element 546 * @param actionBean the coordinator action bean 547 * @return 548 * @throws Exception 549 */ 550 static String dryRunCoord(Element eAction, CoordinatorActionBean actionBean) throws Exception { 551 String action = XmlUtils.prettyPrint(eAction).toString(); 552 StringBuilder actionXml = new StringBuilder(action); 553 Configuration actionConf = new XConfiguration(new StringReader(actionBean.getRunConf())); 554 555 boolean isPushDepAvailable = true; 556 if (actionBean.getPushMissingDependencies() != null) { 557 ActionDependency actionDep = DependencyChecker.checkForAvailability( 558 actionBean.getPushMissingDependencies(), actionConf, true); 559 if (actionDep.getMissingDependencies().size() != 0) { 560 isPushDepAvailable = false; 561 } 562 563 } 564 boolean isPullDepAvailable = true; 565 CoordActionInputCheckXCommand coordActionInput = new CoordActionInputCheckXCommand(actionBean.getId(), 566 actionBean.getJobId()); 567 if (actionBean.getMissingDependencies() != null) { 568 StringBuilder existList = new StringBuilder(); 569 StringBuilder nonExistList = new StringBuilder(); 570 StringBuilder nonResolvedList = new StringBuilder(); 571 getResolvedList(actionBean.getMissingDependencies(), nonExistList, nonResolvedList); 572 isPullDepAvailable = coordActionInput.checkInput(actionXml, existList, nonExistList, actionConf); 573 } 574 575 if (isPullDepAvailable && isPushDepAvailable) { 576 // Check for latest/future 577 boolean isLatestFutureDepAvailable = coordActionInput.checkUnResolvedInput(actionXml, actionConf); 578 if (isLatestFutureDepAvailable) { 579 String newActionXml = CoordActionInputCheckXCommand.resolveCoordConfiguration(actionXml, actionConf, 580 actionBean.getId()); 581 actionXml.replace(0, actionXml.length(), newActionXml); 582 } 583 } 584 585 return actionXml.toString(); 586 } 587 588 /** 589 * Materialize all <input-events>/<data-in> or <output-events>/<data-out> 590 * tags Create uris for resolved instances. Create unresolved instance for 591 * latest()/future(). 592 * 593 * @param events 594 * @param appInst 595 * @param conf 596 * @throws Exception 597 */ 598 public static Map<String, StringBuilder> materializeDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf 599 ) throws Exception { 600 601 if (events == null) { 602 return null; 603 } 604 StringBuilder unresolvedList = new StringBuilder(); 605 Map<String, StringBuilder> dependencyMap = new HashMap<String, StringBuilder>(); 606 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 607 StringBuilder pullMissingDep = null; 608 StringBuilder pushMissingDep = null; 609 610 for (Element event : events) { 611 StringBuilder instances = new StringBuilder(); 612 ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, appInst, conf); 613 // Handle list of instance tag 614 resolveInstances(event, instances, appInst, conf, eval); 615 // Handle start-instance and end-instance 616 resolveInstanceRange(event, instances, appInst, conf, eval); 617 // Separate out the unresolved instances 618 String resolvedList = separateResolvedAndUnresolved(event, instances); 619 if (!resolvedList.isEmpty()) { 620 Element uri = event.getChild("dataset", event.getNamespace()).getChild("uri-template", 621 event.getNamespace()); 622 String uriTemplate = uri.getText(); 623 URI baseURI = uriService.getAuthorityWithScheme(uriTemplate); 624 URIHandler handler = uriService.getURIHandler(baseURI); 625 if (handler.getDependencyType(baseURI).equals(DependencyType.PULL)) { 626 pullMissingDep = (pullMissingDep == null) ? new StringBuilder(resolvedList) : pullMissingDep.append( 627 CoordELFunctions.INSTANCE_SEPARATOR).append(resolvedList); 628 } 629 else { 630 pushMissingDep = (pushMissingDep == null) ? new StringBuilder(resolvedList) : pushMissingDep.append( 631 CoordELFunctions.INSTANCE_SEPARATOR).append(resolvedList); 632 } 633 } 634 635 String tmpUnresolved = event.getChildTextTrim(UNRESOLVED_INST_TAG, event.getNamespace()); 636 if (tmpUnresolved != null) { 637 if (unresolvedList.length() > 0) { 638 unresolvedList.append(CoordELFunctions.INSTANCE_SEPARATOR); 639 } 640 unresolvedList.append(tmpUnresolved); 641 } 642 } 643 if (unresolvedList.length() > 0) { 644 if (pullMissingDep == null) { 645 pullMissingDep = new StringBuilder(); 646 } 647 pullMissingDep.append(RESOLVED_UNRESOLVED_SEPARATOR).append(unresolvedList); 648 } 649 dependencyMap.put(DependencyType.PULL.name(), pullMissingDep); 650 dependencyMap.put(DependencyType.PUSH.name(), pushMissingDep); 651 return dependencyMap; 652 } 653 654 /** 655 * Get resolved string from missDepList 656 * 657 * @param missDepList 658 * @param resolved 659 * @param unresolved 660 * @return resolved string 661 */ 662 public static String getResolvedList(String missDepList, StringBuilder resolved, StringBuilder unresolved) { 663 if (missDepList != null) { 664 int index = missDepList.indexOf(RESOLVED_UNRESOLVED_SEPARATOR); 665 if (index < 0) { 666 resolved.append(missDepList); 667 } 668 else { 669 resolved.append(missDepList.substring(0, index)); 670 unresolved.append(missDepList.substring(index + RESOLVED_UNRESOLVED_SEPARATOR.length())); 671 } 672 } 673 return resolved.toString(); 674 } 675 676 /** 677 * Get the next action time after a given time 678 * 679 * @param targetDate 680 * @param coordJob 681 * @return the next valid action time 682 */ 683 public static Date getNextValidActionTimeForCronFrequency(Date targetDate, CoordinatorJobBean coordJob) throws ParseException { 684 685 String freq = coordJob.getFrequency(); 686 TimeZone tz = DateUtils.getOozieProcessingTimeZone(); 687 String[] cronArray = freq.split(" "); 688 Date nextTime = null; 689 690 // Current CronExpression doesn't support operations 691 // where both date of months and day of weeks are specified. 692 // As a result, we need to split this scenario into two cases 693 // and return the earlier time 694 if (!cronArray[2].trim().equals("?") && !cronArray[4].trim().equals("?")) { 695 696 // When any one of day of month or day of week fields is a wildcard 697 // we need to replace the wildcard with "?" 698 if (cronArray[2].trim().equals("*") || cronArray[4].trim().equals("*")) { 699 if (cronArray[2].trim().equals("*")) { 700 cronArray[2] = "?"; 701 } 702 else { 703 cronArray[4] = "?"; 704 } 705 freq= StringUtils.join(cronArray, " "); 706 707 // The cronExpression class takes second 708 // as the first field where oozie is operating on 709 // minute basis 710 CronExpression expr = new CronExpression("0 " + freq); 711 expr.setTimeZone(tz); 712 nextTime = expr.getNextValidTimeAfter(targetDate); 713 } 714 // If both fields are specified by non-wildcards, 715 // we need to split it into two expressions 716 else { 717 String[] cronArray1 = freq.split(" "); 718 String[] cronArray2 = freq.split(" "); 719 720 cronArray1[2] = "?"; 721 cronArray2[4] = "?"; 722 723 String freq1 = StringUtils.join(cronArray1, " "); 724 String freq2 = StringUtils.join(cronArray2, " "); 725 726 // The cronExpression class takes second 727 // as the first field where oozie is operating on 728 // minute basis 729 CronExpression expr1 = new CronExpression("0 " + freq1); 730 expr1.setTimeZone(tz); 731 CronExpression expr2 = new CronExpression("0 " + freq2); 732 expr2.setTimeZone(tz); 733 nextTime = expr1.getNextValidTimeAfter(targetDate); 734 Date nextTime2 = expr2.getNextValidTimeAfter(targetDate); 735 nextTime = nextTime.compareTo(nextTime2) < 0 ? nextTime: nextTime2; 736 } 737 } 738 else { 739 // The cronExpression class takes second 740 // as the first field where oozie is operating on 741 // minute basis 742 CronExpression expr = new CronExpression("0 " + freq); 743 expr.setTimeZone(tz); 744 nextTime = expr.getNextValidTimeAfter(targetDate); 745 } 746 747 return nextTime; 748 } 749 750}