001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.StringReader; 021 import java.util.Calendar; 022 import java.util.Date; 023 import java.util.List; 024 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.oozie.CoordinatorActionBean; 027 import org.apache.oozie.ErrorCode; 028 import org.apache.oozie.client.CoordinatorAction; 029 import org.apache.oozie.command.CommandException; 030 import org.apache.oozie.coord.CoordELEvaluator; 031 import org.apache.oozie.coord.CoordELFunctions; 032 import org.apache.oozie.coord.CoordUtils; 033 import org.apache.oozie.coord.CoordinatorJobException; 034 import org.apache.oozie.coord.SyncCoordAction; 035 import org.apache.oozie.coord.TimeUnit; 036 import org.apache.oozie.service.Services; 037 import org.apache.oozie.service.UUIDService; 038 import org.apache.oozie.util.DateUtils; 039 import org.apache.oozie.util.ELEvaluator; 040 import org.apache.oozie.util.XConfiguration; 041 import org.apache.oozie.util.XmlUtils; 042 import org.jdom.Element; 043 044 public class CoordCommandUtils { 045 public static int CURRENT = 0; 046 public static int LATEST = 1; 047 public static int FUTURE = 2; 048 public static int OFFSET = 3; 049 public static int UNEXPECTED = -1; 050 public static final String RESOLVED_UNRESOLVED_SEPARATOR = ";"; 051 052 /** 053 * parse a function like coord:latest(n)/future() and return the 'n'. 054 * <p/> 055 * @param function 056 * @param event 057 * @param appInst 058 * @param conf 059 * @param restArg 060 * @return int instanceNumber 061 * @throws Exception 062 */ 063 public static int getInstanceNumber(String function, Element event, SyncCoordAction appInst, Configuration conf, 064 StringBuilder restArg) throws Exception { 065 ELEvaluator eval = CoordELEvaluator 066 .createInstancesELEvaluator("coord-action-create-inst", event, appInst, conf); 067 String newFunc = CoordELFunctions.evalAndWrap(eval, function); 068 int funcType = getFuncType(newFunc); 069 if (funcType == CURRENT || funcType == LATEST) { 070 return parseOneArg(newFunc); 071 } 072 else { 073 return parseMoreArgs(newFunc, restArg); 074 } 075 } 076 077 private static int parseOneArg(String funcName) throws Exception { 078 int firstPos = funcName.indexOf("("); 079 int lastPos = funcName.lastIndexOf(")"); 080 if (firstPos >= 0 && lastPos > firstPos) { 081 String tmp = funcName.substring(firstPos + 1, lastPos).trim(); 082 if (tmp.length() > 0) { 083 return (int) Double.parseDouble(tmp); 084 } 085 } 086 throw new RuntimeException("Unformatted function :" + funcName); 087 } 088 089 private static int parseMoreArgs(String funcName, StringBuilder restArg) throws Exception { 090 int firstPos = funcName.indexOf("("); 091 int secondPos = funcName.lastIndexOf(","); 092 int lastPos = funcName.lastIndexOf(")"); 093 if (firstPos >= 0 && secondPos > firstPos) { 094 String tmp = funcName.substring(firstPos + 1, secondPos).trim(); 095 if (tmp.length() > 0) { 096 restArg.append(funcName.substring(secondPos + 1, lastPos).trim()); 097 return (int) Double.parseDouble(tmp); 098 } 099 } 100 throw new RuntimeException("Unformatted function :" + funcName); 101 } 102 103 /** 104 * @param EL function name 105 * @return type of EL function 106 */ 107 public static int getFuncType(String function) { 108 if (function.indexOf("current") >= 0) { 109 return CURRENT; 110 } 111 else if (function.indexOf("latest") >= 0) { 112 return LATEST; 113 } 114 else if (function.indexOf("future") >= 0) { 115 return FUTURE; 116 } 117 else if (function.indexOf("offset") >= 0) { 118 return OFFSET; 119 } 120 return UNEXPECTED; 121 // throw new RuntimeException("Unexpected instance name "+ function); 122 } 123 124 /** 125 * @param startInst: EL function name 126 * @param endInst: EL function name 127 * @throws CommandException if both are not the same function 128 */ 129 public static void checkIfBothSameType(String startInst, String endInst) throws CommandException { 130 if (getFuncType(startInst) != getFuncType(endInst)) { 131 throw new CommandException(ErrorCode.E1010, 132 " start-instance and end-instance both should be either latest or current or future or offset\n" 133 + " start " + startInst + " and end " + endInst); 134 } 135 } 136 137 /** 138 * Resolve list of <instance> </instance> tags. 139 * 140 * @param event 141 * @param instances 142 * @param actionInst 143 * @param conf 144 * @param eval: ELEvalautor 145 * @throws Exception 146 */ 147 public static void resolveInstances(Element event, StringBuilder instances, SyncCoordAction actionInst, 148 Configuration conf, ELEvaluator eval) throws Exception { 149 for (Element eInstance : (List<Element>) event.getChildren("instance", event.getNamespace())) { 150 if (instances.length() > 0) { 151 instances.append(CoordELFunctions.INSTANCE_SEPARATOR); 152 } 153 instances.append(materializeInstance(event, eInstance.getTextTrim(), actionInst, conf, eval)); 154 } 155 event.removeChildren("instance", event.getNamespace()); 156 } 157 158 /** 159 * Resolve <start-instance> <end-insatnce> tag. Don't resolve any 160 * latest()/future() 161 * 162 * @param event 163 * @param instances 164 * @param appInst 165 * @param conf 166 * @param eval: ELEvalautor 167 * @throws Exception 168 */ 169 public static void resolveInstanceRange(Element event, StringBuilder instances, SyncCoordAction appInst, 170 Configuration conf, ELEvaluator eval) throws Exception { 171 Element eStartInst = event.getChild("start-instance", event.getNamespace()); 172 Element eEndInst = event.getChild("end-instance", event.getNamespace()); 173 if (eStartInst != null && eEndInst != null) { 174 String strStart = eStartInst.getTextTrim(); 175 String strEnd = eEndInst.getTextTrim(); 176 checkIfBothSameType(strStart, strEnd); 177 StringBuilder restArg = new StringBuilder(); // To store rest 178 // arguments for 179 // future 180 // function 181 int startIndex = getInstanceNumber(strStart, event, appInst, conf, restArg); 182 String startRestArg = restArg.toString(); 183 restArg.delete(0, restArg.length()); 184 int endIndex = getInstanceNumber(strEnd, event, appInst, conf, restArg); 185 String endRestArg = restArg.toString(); 186 int funcType = getFuncType(strStart); 187 if (funcType == OFFSET) { 188 TimeUnit startU = TimeUnit.valueOf(startRestArg); 189 TimeUnit endU = TimeUnit.valueOf(endRestArg); 190 if (startU.getCalendarUnit() * startIndex > endU.getCalendarUnit() * endIndex) { 191 throw new CommandException(ErrorCode.E1010, 192 " start-instance should be equal or earlier than the end-instance \n" 193 + XmlUtils.prettyPrint(event)); 194 } 195 Calendar startCal = CoordELFunctions.resolveOffsetRawTime(startIndex, startU, eval); 196 Calendar endCal = CoordELFunctions.resolveOffsetRawTime(endIndex, endU, eval); 197 if (startCal != null && endCal != null) { 198 List<Integer> expandedFreqs = CoordELFunctions.expandOffsetTimes(startCal, endCal, eval); 199 for (int i = expandedFreqs.size() - 1; i >= 0; i--) { 200 String matInstance = materializeInstance(event, "${coord:offset(" + expandedFreqs.get(i) + ", \"MINUTE\")}", 201 appInst, conf, eval); 202 if (matInstance == null || matInstance.length() == 0) { 203 // Earlier than dataset's initial instance 204 break; 205 } 206 if (instances.length() > 0) { 207 instances.append(CoordELFunctions.INSTANCE_SEPARATOR); 208 } 209 instances.append(matInstance); 210 } 211 } 212 } 213 else { 214 if (startIndex > endIndex) { 215 throw new CommandException(ErrorCode.E1010, 216 " start-instance should be equal or earlier than the end-instance \n" 217 + XmlUtils.prettyPrint(event)); 218 } 219 if (funcType == CURRENT) { 220 // Everything could be resolved NOW. no latest() ELs 221 for (int i = endIndex; i >= startIndex; i--) { 222 String matInstance = materializeInstance(event, "${coord:current(" + i + ")}", appInst, conf, eval); 223 if (matInstance == null || matInstance.length() == 0) { 224 // Earlier than dataset's initial instance 225 break; 226 } 227 if (instances.length() > 0) { 228 instances.append(CoordELFunctions.INSTANCE_SEPARATOR); 229 } 230 instances.append(matInstance); 231 } 232 } 233 else { // latest(n)/future() EL is present 234 if (funcType == LATEST) { 235 instances.append("${coord:latestRange(").append(startIndex).append(",").append(endIndex) 236 .append(")}"); 237 } 238 else if (funcType == FUTURE) { 239 instances.append("${coord:futureRange(").append(startIndex).append(",").append(endIndex) 240 .append(",'").append(endRestArg).append("')}"); 241 } 242 } 243 } 244 // Remove start-instance and end-instances 245 event.removeChild("start-instance", event.getNamespace()); 246 event.removeChild("end-instance", event.getNamespace()); 247 } 248 } 249 250 /** 251 * Materialize one instance like current(-2) 252 * 253 * @param event : <data-in> 254 * @param expr : instance like current(-1) 255 * @param appInst : application specific info 256 * @param conf 257 * @param evalInst :ELEvaluator 258 * @return materialized date string 259 * @throws Exception 260 */ 261 public static String materializeInstance(Element event, String expr, SyncCoordAction appInst, Configuration conf, 262 ELEvaluator evalInst) throws Exception { 263 if (event == null) { 264 return null; 265 } 266 // ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, 267 // appInst, conf); 268 return CoordELFunctions.evalAndWrap(evalInst, expr); 269 } 270 271 /** 272 * Create two new tags with <uris> and <unresolved-instances>. 273 * 274 * @param event 275 * @param instances 276 * @param dependencyList 277 * @throws Exception 278 */ 279 public static void separateResolvedAndUnresolved(Element event, StringBuilder instances, StringBuffer dependencyList) 280 throws Exception { 281 StringBuilder unresolvedInstances = new StringBuilder(); 282 StringBuilder urisWithDoneFlag = new StringBuilder(); 283 String uris = createEarlyURIs(event, instances.toString(), unresolvedInstances, urisWithDoneFlag); 284 if (uris.length() > 0) { 285 Element uriInstance = new Element("uris", event.getNamespace()); 286 uriInstance.addContent(uris); 287 event.getContent().add(1, uriInstance); 288 if (dependencyList.length() > 0) { 289 dependencyList.append(CoordELFunctions.INSTANCE_SEPARATOR); 290 } 291 dependencyList.append(urisWithDoneFlag); 292 } 293 if (unresolvedInstances.length() > 0) { 294 Element elemInstance = new Element("unresolved-instances", event.getNamespace()); 295 elemInstance.addContent(unresolvedInstances.toString()); 296 event.getContent().add(1, elemInstance); 297 } 298 } 299 300 /** 301 * The function create a list of URIs separated by "," using the instances 302 * time stamp and URI-template 303 * 304 * @param event : <data-in> event 305 * @param instances : List of time stamp separated by "," 306 * @param unresolvedInstances : list of instance with latest function 307 * @param urisWithDoneFlag : list of URIs with the done flag appended 308 * @return : list of URIs separated by ";" as a string. 309 * @throws Exception 310 */ 311 public static String createEarlyURIs(Element event, String instances, StringBuilder unresolvedInstances, 312 StringBuilder urisWithDoneFlag) throws Exception { 313 if (instances == null || instances.length() == 0) { 314 return ""; 315 } 316 String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR); 317 StringBuilder uris = new StringBuilder(); 318 319 Element doneFlagElement = event.getChild("dataset", event.getNamespace()).getChild("done-flag", 320 event.getNamespace()); 321 String doneFlag = CoordUtils.getDoneFlag(doneFlagElement); 322 323 for (int i = 0; i < instanceList.length; i++) { 324 if(instanceList[i].trim().length() == 0) { 325 continue; 326 } 327 int funcType = getFuncType(instanceList[i]); 328 if (funcType == LATEST || funcType == FUTURE) { 329 if (unresolvedInstances.length() > 0) { 330 unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR); 331 } 332 unresolvedInstances.append(instanceList[i]); 333 continue; 334 } 335 ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]); 336 if (uris.length() > 0) { 337 uris.append(CoordELFunctions.INSTANCE_SEPARATOR); 338 urisWithDoneFlag.append(CoordELFunctions.INSTANCE_SEPARATOR); 339 } 340 341 String uriPath = CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()) 342 .getChild("uri-template", event.getNamespace()).getTextTrim()); 343 uris.append(uriPath); 344 if (doneFlag.length() > 0) { 345 uriPath += "/" + doneFlag; 346 } 347 urisWithDoneFlag.append(uriPath); 348 } 349 return uris.toString(); 350 } 351 352 /** 353 * @param eSla 354 * @param nominalTime 355 * @param conf 356 * @return boolean to determine whether the SLA element is present or not 357 * @throws CoordinatorJobException 358 */ 359 public static boolean materializeSLA(Element eSla, Date nominalTime, Configuration conf) 360 throws CoordinatorJobException { 361 if (eSla == null) { 362 // eAppXml.getNamespace("sla")); 363 return false; 364 } 365 try { 366 ELEvaluator evalSla = CoordELEvaluator.createSLAEvaluator(nominalTime, conf); 367 List<Element> elemList = eSla.getChildren(); 368 for (Element elem : elemList) { 369 String updated; 370 try { 371 updated = CoordELFunctions.evalAndWrap(evalSla, elem.getText().trim()); 372 } 373 catch (Exception e) { 374 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 375 } 376 elem.removeContent(); 377 elem.addContent(updated); 378 } 379 } 380 catch (Exception e) { 381 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 382 } 383 return true; 384 } 385 386 /** 387 * Materialize one instance for specific nominal time. It includes: 1. 388 * Materialize data events (i.e. <data-in> and <data-out>) 2. Materialize 389 * data properties (i.e dataIn(<DS>) and dataOut(<DS>) 3. remove 'start' and 390 * 'end' tag 4. Add 'instance_number' and 'nominal-time' tag 391 * 392 * @param jobId coordinator job id 393 * @param dryrun true if it is dryrun 394 * @param eAction frequency unexploded-job 395 * @param nominalTime materialization time 396 * @param actualTime action actual time 397 * @param instanceCount instance numbers 398 * @param conf job configuration 399 * @param actionBean CoordinatorActionBean to materialize 400 * @return one materialized action for specific nominal time 401 * @throws Exception 402 */ 403 @SuppressWarnings("unchecked") 404 public static String materializeOneInstance(String jobId, boolean dryrun, Element eAction, Date nominalTime, 405 Date actualTime, int instanceCount, Configuration conf, CoordinatorActionBean actionBean) throws Exception { 406 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, instanceCount + ""); 407 SyncCoordAction appInst = new SyncCoordAction(); 408 appInst.setActionId(actionId); 409 appInst.setName(eAction.getAttributeValue("name")); 410 appInst.setNominalTime(nominalTime); 411 appInst.setActualTime(actualTime); 412 int frequency = Integer.parseInt(eAction.getAttributeValue("frequency")); 413 appInst.setFrequency(frequency); 414 appInst.setTimeUnit(TimeUnit.valueOf(eAction.getAttributeValue("freq_timeunit"))); 415 appInst.setTimeZone(DateUtils.getTimeZone(eAction.getAttributeValue("timezone"))); 416 appInst.setEndOfDuration(TimeUnit.valueOf(eAction.getAttributeValue("end_of_duration"))); 417 418 StringBuffer dependencyList = new StringBuffer(); 419 420 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 421 List<Element> dataInList = null; 422 if (inputList != null) { 423 dataInList = inputList.getChildren("data-in", eAction.getNamespace()); 424 materializeDataEvents(dataInList, appInst, conf, dependencyList); 425 } 426 427 Element outputList = eAction.getChild("output-events", eAction.getNamespace()); 428 List<Element> dataOutList = null; 429 if (outputList != null) { 430 dataOutList = outputList.getChildren("data-out", eAction.getNamespace()); 431 StringBuffer tmp = new StringBuffer(); 432 // no dependency checks 433 materializeDataEvents(dataOutList, appInst, conf, tmp); 434 } 435 436 eAction.removeAttribute("start"); 437 eAction.removeAttribute("end"); 438 eAction.setAttribute("instance-number", Integer.toString(instanceCount)); 439 eAction.setAttribute("action-nominal-time", DateUtils.formatDateOozieTZ(nominalTime)); 440 eAction.setAttribute("action-actual-time", DateUtils.formatDateOozieTZ(actualTime)); 441 442 boolean isSla = CoordCommandUtils.materializeSLA(eAction.getChild("action", eAction.getNamespace()).getChild( 443 "info", eAction.getNamespace("sla")), nominalTime, conf); 444 445 // Setting up action bean 446 actionBean.setCreatedConf(XmlUtils.prettyPrint(conf).toString()); 447 actionBean.setRunConf(XmlUtils.prettyPrint(conf).toString()); 448 actionBean.setCreatedTime(actualTime); 449 actionBean.setJobId(jobId); 450 actionBean.setId(actionId); 451 actionBean.setLastModifiedTime(new Date()); 452 actionBean.setStatus(CoordinatorAction.Status.WAITING); 453 actionBean.setActionNumber(instanceCount); 454 actionBean.setMissingDependencies(dependencyList.toString()); 455 actionBean.setNominalTime(nominalTime); 456 if (isSla == true) { 457 actionBean.setSlaXml(XmlUtils.prettyPrint( 458 eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"))) 459 .toString()); 460 } 461 462 // actionBean.setTrackerUri(trackerUri);//TOOD: 463 // actionBean.setConsoleUrl(consoleUrl); //TODO: 464 // actionBean.setType(type);//TODO: 465 // actionBean.setErrorInfo(errorCode, errorMessage); //TODO: 466 // actionBean.setExternalStatus(externalStatus);//TODO 467 if (!dryrun) { 468 return XmlUtils.prettyPrint(eAction).toString(); 469 } 470 else { 471 String action = XmlUtils.prettyPrint(eAction).toString(); 472 CoordActionInputCheckXCommand coordActionInput = new CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId()); 473 StringBuilder actionXml = new StringBuilder(action); 474 StringBuilder existList = new StringBuilder(); 475 StringBuilder nonExistList = new StringBuilder(); 476 StringBuilder nonResolvedList = new StringBuilder(); 477 getResolvedList(actionBean.getMissingDependencies(), nonExistList, nonResolvedList); 478 Configuration actionConf = new XConfiguration(new StringReader(actionBean.getRunConf())); 479 coordActionInput.checkInput(actionXml, existList, nonExistList, actionConf); 480 return actionXml.toString(); 481 } 482 } 483 484 /** 485 * Materialize all <input-events>/<data-in> or <output-events>/<data-out> 486 * tags Create uris for resolved instances. Create unresolved instance for 487 * latest()/future(). 488 * 489 * @param events 490 * @param appInst 491 * @param conf 492 * @throws Exception 493 */ 494 public static void materializeDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf, 495 StringBuffer dependencyList) throws Exception { 496 497 if (events == null) { 498 return; 499 } 500 StringBuffer unresolvedList = new StringBuffer(); 501 for (Element event : events) { 502 StringBuilder instances = new StringBuilder(); 503 ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, appInst, conf); 504 // Handle list of instance tag 505 resolveInstances(event, instances, appInst, conf, eval); 506 // Handle start-instance and end-instance 507 resolveInstanceRange(event, instances, appInst, conf, eval); 508 // Separate out the unresolved instances 509 separateResolvedAndUnresolved(event, instances, dependencyList); 510 String tmpUnresolved = event.getChildTextTrim("unresolved-instances", event.getNamespace()); 511 if (tmpUnresolved != null) { 512 if (unresolvedList.length() > 0) { 513 unresolvedList.append(CoordELFunctions.INSTANCE_SEPARATOR); 514 } 515 unresolvedList.append(tmpUnresolved); 516 } 517 } 518 if (unresolvedList.length() > 0) { 519 dependencyList.append(RESOLVED_UNRESOLVED_SEPARATOR); 520 dependencyList.append(unresolvedList); 521 } 522 return; 523 } 524 525 /** 526 * Get resolved string from missDepList 527 * 528 * @param missDepList 529 * @param resolved 530 * @param unresolved 531 * @return resolved string 532 */ 533 public static String getResolvedList(String missDepList, StringBuilder resolved, StringBuilder unresolved) { 534 if (missDepList != null) { 535 int index = missDepList.indexOf(RESOLVED_UNRESOLVED_SEPARATOR); 536 if (index < 0) { 537 resolved.append(missDepList); 538 } 539 else { 540 resolved.append(missDepList.substring(0, index)); 541 unresolved.append(missDepList.substring(index + 1)); 542 } 543 } 544 return resolved.toString(); 545 } 546 547 }