001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.StringReader; 021 import java.util.Date; 022 import java.util.List; 023 024 import org.apache.hadoop.conf.Configuration; 025 import org.apache.oozie.CoordinatorActionBean; 026 import org.apache.oozie.ErrorCode; 027 import org.apache.oozie.client.CoordinatorAction; 028 import org.apache.oozie.command.CommandException; 029 import org.apache.oozie.coord.CoordELEvaluator; 030 import org.apache.oozie.coord.CoordELFunctions; 031 import org.apache.oozie.coord.CoordUtils; 032 import org.apache.oozie.coord.CoordinatorJobException; 033 import org.apache.oozie.coord.SyncCoordAction; 034 import org.apache.oozie.coord.TimeUnit; 035 import org.apache.oozie.service.Services; 036 import org.apache.oozie.service.UUIDService; 037 import org.apache.oozie.util.DateUtils; 038 import org.apache.oozie.util.ELEvaluator; 039 import org.apache.oozie.util.XConfiguration; 040 import org.apache.oozie.util.XmlUtils; 041 import org.jdom.Element; 042 043 public class CoordCommandUtils { 044 public static int CURRENT = 0; 045 public static int LATEST = 1; 046 public static int FUTURE = 2; 047 public static int UNEXPECTED = -1; 048 public static final String RESOLVED_UNRESOLVED_SEPARATOR = ";"; 049 050 /** 051 * parse a function like coord:latest(n)/future() and return the 'n'. 052 * <p/> 053 * @param function 054 * @param event 055 * @param appInst 056 * @param conf 057 * @param restArg 058 * @return int instanceNumber 059 * @throws Exception 060 */ 061 public static int getInstanceNumber(String function, Element event, SyncCoordAction appInst, Configuration conf, 062 StringBuilder restArg) throws Exception { 063 ELEvaluator eval = CoordELEvaluator 064 .createInstancesELEvaluator("coord-action-create-inst", event, appInst, conf); 065 String newFunc = CoordELFunctions.evalAndWrap(eval, function); 066 int funcType = getFuncType(newFunc); 067 if (funcType == CURRENT || funcType == LATEST) { 068 return parseOneArg(newFunc); 069 } 070 else { 071 return parseMoreArgs(newFunc, restArg); 072 } 073 } 074 075 private static int parseOneArg(String funcName) throws Exception { 076 int firstPos = funcName.indexOf("("); 077 int lastPos = funcName.lastIndexOf(")"); 078 if (firstPos >= 0 && lastPos > firstPos) { 079 String tmp = funcName.substring(firstPos + 1, lastPos).trim(); 080 if (tmp.length() > 0) { 081 return (int) Double.parseDouble(tmp); 082 } 083 } 084 throw new RuntimeException("Unformatted function :" + funcName); 085 } 086 087 private static int parseMoreArgs(String funcName, StringBuilder restArg) throws Exception { 088 int firstPos = funcName.indexOf("("); 089 int secondPos = funcName.lastIndexOf(","); 090 int lastPos = funcName.lastIndexOf(")"); 091 if (firstPos >= 0 && secondPos > firstPos) { 092 String tmp = funcName.substring(firstPos + 1, secondPos).trim(); 093 if (tmp.length() > 0) { 094 restArg.append(funcName.substring(secondPos + 1, lastPos).trim()); 095 return (int) Double.parseDouble(tmp); 096 } 097 } 098 throw new RuntimeException("Unformatted function :" + funcName); 099 } 100 101 /** 102 * @param EL function name 103 * @return type of EL function 104 */ 105 public static int getFuncType(String function) { 106 if (function.indexOf("current") >= 0) { 107 return CURRENT; 108 } 109 else if (function.indexOf("latest") >= 0) { 110 return LATEST; 111 } 112 else if (function.indexOf("future") >= 0) { 113 return FUTURE; 114 } 115 return UNEXPECTED; 116 // throw new RuntimeException("Unexpected instance name "+ function); 117 } 118 119 /** 120 * @param startInst: EL function name 121 * @param endInst: EL function name 122 * @throws CommandException if both are not the same function 123 */ 124 public static void checkIfBothSameType(String startInst, String endInst) throws CommandException { 125 if (getFuncType(startInst) != getFuncType(endInst)) { 126 throw new CommandException(ErrorCode.E1010, 127 " start-instance and end-instance both should be either latest or current or future\n" 128 + " start " + startInst + " and end " + endInst); 129 } 130 } 131 132 /** 133 * Resolve list of <instance> </instance> tags. 134 * 135 * @param event 136 * @param instances 137 * @param actionInst 138 * @param conf 139 * @param eval: ELEvalautor 140 * @throws Exception 141 */ 142 public static void resolveInstances(Element event, StringBuilder instances, SyncCoordAction actionInst, 143 Configuration conf, ELEvaluator eval) throws Exception { 144 for (Element eInstance : (List<Element>) event.getChildren("instance", event.getNamespace())) { 145 if (instances.length() > 0) { 146 instances.append(CoordELFunctions.INSTANCE_SEPARATOR); 147 } 148 instances.append(materializeInstance(event, eInstance.getTextTrim(), actionInst, conf, eval)); 149 } 150 event.removeChildren("instance", event.getNamespace()); 151 } 152 153 /** 154 * Resolve <start-instance> <end-insatnce> tag. Don't resolve any 155 * latest()/future() 156 * 157 * @param event 158 * @param instances 159 * @param appInst 160 * @param conf 161 * @param eval: ELEvalautor 162 * @throws Exception 163 */ 164 public static void resolveInstanceRange(Element event, StringBuilder instances, SyncCoordAction appInst, 165 Configuration conf, ELEvaluator eval) throws Exception { 166 Element eStartInst = event.getChild("start-instance", event.getNamespace()); 167 Element eEndInst = event.getChild("end-instance", event.getNamespace()); 168 if (eStartInst != null && eEndInst != null) { 169 String strStart = eStartInst.getTextTrim(); 170 String strEnd = eEndInst.getTextTrim(); 171 checkIfBothSameType(strStart, strEnd); 172 StringBuilder restArg = new StringBuilder(); // To store rest 173 // arguments for 174 // future 175 // function 176 int startIndex = getInstanceNumber(strStart, event, appInst, conf, restArg); 177 restArg.delete(0, restArg.length()); 178 int endIndex = getInstanceNumber(strEnd, event, appInst, conf, restArg); 179 if (startIndex > endIndex) { 180 throw new CommandException(ErrorCode.E1010, 181 " start-instance should be equal or earlier than the end-instance \n" 182 + XmlUtils.prettyPrint(event)); 183 } 184 int funcType = getFuncType(strStart); 185 if (funcType == CURRENT) { 186 // Everything could be resolved NOW. no latest() ELs 187 for (int i = endIndex; i >= startIndex; i--) { 188 String matInstance = materializeInstance(event, "${coord:current(" + i + ")}", appInst, conf, eval); 189 if (matInstance == null || matInstance.length() == 0) { 190 // Earlier than dataset's initial instance 191 break; 192 } 193 if (instances.length() > 0) { 194 instances.append(CoordELFunctions.INSTANCE_SEPARATOR); 195 } 196 instances.append(matInstance); 197 } 198 } 199 else { // latest(n)/future() EL is present 200 for (; startIndex <= endIndex; startIndex++) { 201 if (instances.length() > 0) { 202 instances.append(CoordELFunctions.INSTANCE_SEPARATOR); 203 } 204 if (funcType == LATEST) { 205 instances.append("${coord:latest(" + startIndex + ")}"); 206 } 207 else { // For future 208 instances.append("${coord:future(" + startIndex + ",'" + restArg + "')}"); 209 } 210 } 211 } 212 // Remove start-instance and end-instances 213 event.removeChild("start-instance", event.getNamespace()); 214 event.removeChild("end-instance", event.getNamespace()); 215 } 216 } 217 218 /** 219 * Materialize one instance like current(-2) 220 * 221 * @param event : <data-in> 222 * @param expr : instance like current(-1) 223 * @param appInst : application specific info 224 * @param conf 225 * @param evalInst :ELEvaluator 226 * @return materialized date string 227 * @throws Exception 228 */ 229 public static String materializeInstance(Element event, String expr, SyncCoordAction appInst, Configuration conf, 230 ELEvaluator evalInst) throws Exception { 231 if (event == null) { 232 return null; 233 } 234 // ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, 235 // appInst, conf); 236 return CoordELFunctions.evalAndWrap(evalInst, expr); 237 } 238 239 /** 240 * Create two new tags with <uris> and <unresolved-instances>. 241 * 242 * @param event 243 * @param instances 244 * @param dependencyList 245 * @throws Exception 246 */ 247 public static void separateResolvedAndUnresolved(Element event, StringBuilder instances, StringBuffer dependencyList) 248 throws Exception { 249 StringBuilder unresolvedInstances = new StringBuilder(); 250 StringBuilder urisWithDoneFlag = new StringBuilder(); 251 String uris = createEarlyURIs(event, instances.toString(), unresolvedInstances, urisWithDoneFlag); 252 if (uris.length() > 0) { 253 Element uriInstance = new Element("uris", event.getNamespace()); 254 uriInstance.addContent(uris); 255 event.getContent().add(1, uriInstance); 256 if (dependencyList.length() > 0) { 257 dependencyList.append(CoordELFunctions.INSTANCE_SEPARATOR); 258 } 259 dependencyList.append(urisWithDoneFlag); 260 } 261 if (unresolvedInstances.length() > 0) { 262 Element elemInstance = new Element("unresolved-instances", event.getNamespace()); 263 elemInstance.addContent(unresolvedInstances.toString()); 264 event.getContent().add(1, elemInstance); 265 } 266 } 267 268 /** 269 * The function create a list of URIs separated by "," using the instances 270 * time stamp and URI-template 271 * 272 * @param event : <data-in> event 273 * @param instances : List of time stamp separated by "," 274 * @param unresolvedInstances : list of instance with latest function 275 * @param urisWithDoneFlag : list of URIs with the done flag appended 276 * @return : list of URIs separated by ";" as a string. 277 * @throws Exception 278 */ 279 public static String createEarlyURIs(Element event, String instances, StringBuilder unresolvedInstances, 280 StringBuilder urisWithDoneFlag) throws Exception { 281 if (instances == null || instances.length() == 0) { 282 return ""; 283 } 284 String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR); 285 StringBuilder uris = new StringBuilder(); 286 287 Element doneFlagElement = event.getChild("dataset", event.getNamespace()).getChild("done-flag", 288 event.getNamespace()); 289 String doneFlag = CoordUtils.getDoneFlag(doneFlagElement); 290 291 for (int i = 0; i < instanceList.length; i++) { 292 if(instanceList[i].trim().length() == 0) { 293 continue; 294 } 295 int funcType = getFuncType(instanceList[i]); 296 if (funcType == LATEST || funcType == FUTURE) { 297 if (unresolvedInstances.length() > 0) { 298 unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR); 299 } 300 unresolvedInstances.append(instanceList[i]); 301 continue; 302 } 303 ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]); 304 if (uris.length() > 0) { 305 uris.append(CoordELFunctions.INSTANCE_SEPARATOR); 306 urisWithDoneFlag.append(CoordELFunctions.INSTANCE_SEPARATOR); 307 } 308 309 String uriPath = CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()) 310 .getChild("uri-template", event.getNamespace()).getTextTrim()); 311 uris.append(uriPath); 312 if (doneFlag.length() > 0) { 313 uriPath += "/" + doneFlag; 314 } 315 urisWithDoneFlag.append(uriPath); 316 } 317 return uris.toString(); 318 } 319 320 /** 321 * @param eSla 322 * @param nominalTime 323 * @param conf 324 * @return boolean to determine whether the SLA element is present or not 325 * @throws CoordinatorJobException 326 */ 327 public static boolean materializeSLA(Element eSla, Date nominalTime, Configuration conf) 328 throws CoordinatorJobException { 329 if (eSla == null) { 330 // eAppXml.getNamespace("sla")); 331 return false; 332 } 333 try { 334 ELEvaluator evalSla = CoordELEvaluator.createSLAEvaluator(nominalTime, conf); 335 List<Element> elemList = eSla.getChildren(); 336 for (Element elem : elemList) { 337 String updated; 338 try { 339 updated = CoordELFunctions.evalAndWrap(evalSla, elem.getText().trim()); 340 } 341 catch (Exception e) { 342 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 343 } 344 elem.removeContent(); 345 elem.addContent(updated); 346 } 347 } 348 catch (Exception e) { 349 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 350 } 351 return true; 352 } 353 354 /** 355 * Materialize one instance for specific nominal time. It includes: 1. 356 * Materialize data events (i.e. <data-in> and <data-out>) 2. Materialize 357 * data properties (i.e dataIn(<DS>) and dataOut(<DS>) 3. remove 'start' and 358 * 'end' tag 4. Add 'instance_number' and 'nominal-time' tag 359 * 360 * @param jobId coordinator job id 361 * @param dryrun true if it is dryrun 362 * @param eAction frequency unexploded-job 363 * @param nominalTime materialization time 364 * @param actualTime action actual time 365 * @param instanceCount instance numbers 366 * @param conf job configuration 367 * @param actionBean CoordinatorActionBean to materialize 368 * @return one materialized action for specific nominal time 369 * @throws Exception 370 */ 371 @SuppressWarnings("unchecked") 372 public static String materializeOneInstance(String jobId, boolean dryrun, Element eAction, Date nominalTime, 373 Date actualTime, int instanceCount, Configuration conf, CoordinatorActionBean actionBean) throws Exception { 374 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, instanceCount + ""); 375 SyncCoordAction appInst = new SyncCoordAction(); 376 appInst.setActionId(actionId); 377 appInst.setName(eAction.getAttributeValue("name")); 378 appInst.setNominalTime(nominalTime); 379 appInst.setActualTime(actualTime); 380 int frequency = Integer.parseInt(eAction.getAttributeValue("frequency")); 381 appInst.setFrequency(frequency); 382 appInst.setTimeUnit(TimeUnit.valueOf(eAction.getAttributeValue("freq_timeunit"))); 383 appInst.setTimeZone(DateUtils.getTimeZone(eAction.getAttributeValue("timezone"))); 384 appInst.setEndOfDuration(TimeUnit.valueOf(eAction.getAttributeValue("end_of_duration"))); 385 386 StringBuffer dependencyList = new StringBuffer(); 387 388 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 389 List<Element> dataInList = null; 390 if (inputList != null) { 391 dataInList = inputList.getChildren("data-in", eAction.getNamespace()); 392 materializeDataEvents(dataInList, appInst, conf, dependencyList); 393 } 394 395 Element outputList = eAction.getChild("output-events", eAction.getNamespace()); 396 List<Element> dataOutList = null; 397 if (outputList != null) { 398 dataOutList = outputList.getChildren("data-out", eAction.getNamespace()); 399 StringBuffer tmp = new StringBuffer(); 400 // no dependency checks 401 materializeDataEvents(dataOutList, appInst, conf, tmp); 402 } 403 404 eAction.removeAttribute("start"); 405 eAction.removeAttribute("end"); 406 eAction.setAttribute("instance-number", Integer.toString(instanceCount)); 407 eAction.setAttribute("action-nominal-time", DateUtils.formatDateUTC(nominalTime)); 408 eAction.setAttribute("action-actual-time", DateUtils.formatDateUTC(actualTime)); 409 410 boolean isSla = CoordCommandUtils.materializeSLA(eAction.getChild("action", eAction.getNamespace()).getChild( 411 "info", eAction.getNamespace("sla")), nominalTime, conf); 412 413 // Setting up action bean 414 actionBean.setCreatedConf(XmlUtils.prettyPrint(conf).toString()); 415 actionBean.setRunConf(XmlUtils.prettyPrint(conf).toString()); 416 actionBean.setCreatedTime(actualTime); 417 actionBean.setJobId(jobId); 418 actionBean.setId(actionId); 419 actionBean.setLastModifiedTime(new Date()); 420 actionBean.setStatus(CoordinatorAction.Status.WAITING); 421 actionBean.setActionNumber(instanceCount); 422 actionBean.setMissingDependencies(dependencyList.toString()); 423 actionBean.setNominalTime(nominalTime); 424 if (isSla == true) { 425 actionBean.setSlaXml(XmlUtils.prettyPrint( 426 eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"))) 427 .toString()); 428 } 429 430 // actionBean.setTrackerUri(trackerUri);//TOOD: 431 // actionBean.setConsoleUrl(consoleUrl); //TODO: 432 // actionBean.setType(type);//TODO: 433 // actionBean.setErrorInfo(errorCode, errorMessage); //TODO: 434 // actionBean.setExternalStatus(externalStatus);//TODO 435 if (!dryrun) { 436 return XmlUtils.prettyPrint(eAction).toString(); 437 } 438 else { 439 String action = XmlUtils.prettyPrint(eAction).toString(); 440 CoordActionInputCheckXCommand coordActionInput = new CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId()); 441 StringBuilder actionXml = new StringBuilder(action); 442 StringBuilder existList = new StringBuilder(); 443 StringBuilder nonExistList = new StringBuilder(); 444 StringBuilder nonResolvedList = new StringBuilder(); 445 getResolvedList(actionBean.getMissingDependencies(), nonExistList, nonResolvedList); 446 Configuration actionConf = new XConfiguration(new StringReader(actionBean.getRunConf())); 447 coordActionInput.checkInput(actionXml, existList, nonExistList, actionConf); 448 return actionXml.toString(); 449 } 450 } 451 452 /** 453 * Materialize all <input-events>/<data-in> or <output-events>/<data-out> 454 * tags Create uris for resolved instances. Create unresolved instance for 455 * latest()/future(). 456 * 457 * @param events 458 * @param appInst 459 * @param conf 460 * @throws Exception 461 */ 462 public static void materializeDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf, 463 StringBuffer dependencyList) throws Exception { 464 465 if (events == null) { 466 return; 467 } 468 StringBuffer unresolvedList = new StringBuffer(); 469 for (Element event : events) { 470 StringBuilder instances = new StringBuilder(); 471 ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, appInst, conf); 472 // Handle list of instance tag 473 resolveInstances(event, instances, appInst, conf, eval); 474 // Handle start-instance and end-instance 475 resolveInstanceRange(event, instances, appInst, conf, eval); 476 // Separate out the unresolved instances 477 separateResolvedAndUnresolved(event, instances, dependencyList); 478 String tmpUnresolved = event.getChildTextTrim("unresolved-instances", event.getNamespace()); 479 if (tmpUnresolved != null) { 480 if (unresolvedList.length() > 0) { 481 unresolvedList.append(CoordELFunctions.INSTANCE_SEPARATOR); 482 } 483 unresolvedList.append(tmpUnresolved); 484 } 485 } 486 if (unresolvedList.length() > 0) { 487 dependencyList.append(RESOLVED_UNRESOLVED_SEPARATOR); 488 dependencyList.append(unresolvedList); 489 } 490 return; 491 } 492 493 /** 494 * Get resolved string from missDepList 495 * 496 * @param missDepList 497 * @param resolved 498 * @param unresolved 499 * @return resolved string 500 */ 501 public static String getResolvedList(String missDepList, StringBuilder resolved, StringBuilder unresolved) { 502 if (missDepList != null) { 503 int index = missDepList.indexOf(RESOLVED_UNRESOLVED_SEPARATOR); 504 if (index < 0) { 505 resolved.append(missDepList); 506 } 507 else { 508 resolved.append(missDepList.substring(0, index)); 509 unresolved.append(missDepList.substring(index + 1)); 510 } 511 } 512 return resolved.toString(); 513 } 514 515 }