This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.StringReader;
021 import java.util.Calendar;
022 import java.util.Date;
023 import java.util.List;
024
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.oozie.CoordinatorActionBean;
027 import org.apache.oozie.ErrorCode;
028 import org.apache.oozie.client.CoordinatorAction;
029 import org.apache.oozie.command.CommandException;
030 import org.apache.oozie.coord.CoordELEvaluator;
031 import org.apache.oozie.coord.CoordELFunctions;
032 import org.apache.oozie.coord.CoordUtils;
033 import org.apache.oozie.coord.CoordinatorJobException;
034 import org.apache.oozie.coord.SyncCoordAction;
035 import org.apache.oozie.coord.TimeUnit;
036 import org.apache.oozie.service.Services;
037 import org.apache.oozie.service.UUIDService;
038 import org.apache.oozie.util.DateUtils;
039 import org.apache.oozie.util.ELEvaluator;
040 import org.apache.oozie.util.XConfiguration;
041 import org.apache.oozie.util.XmlUtils;
042 import org.jdom.Element;
043
044 public class CoordCommandUtils {
045 public static int CURRENT = 0;
046 public static int LATEST = 1;
047 public static int FUTURE = 2;
048 public static int OFFSET = 3;
049 public static int UNEXPECTED = -1;
050 public static final String RESOLVED_UNRESOLVED_SEPARATOR = ";";
051
052 /**
053 * parse a function like coord:latest(n)/future() and return the 'n'.
054 * <p/>
055 * @param function
056 * @param event
057 * @param appInst
058 * @param conf
059 * @param restArg
060 * @return int instanceNumber
061 * @throws Exception
062 */
063 public static int getInstanceNumber(String function, Element event, SyncCoordAction appInst, Configuration conf,
064 StringBuilder restArg) throws Exception {
065 ELEvaluator eval = CoordELEvaluator
066 .createInstancesELEvaluator("coord-action-create-inst", event, appInst, conf);
067 String newFunc = CoordELFunctions.evalAndWrap(eval, function);
068 int funcType = getFuncType(newFunc);
069 if (funcType == CURRENT || funcType == LATEST) {
070 return parseOneArg(newFunc);
071 }
072 else {
073 return parseMoreArgs(newFunc, restArg);
074 }
075 }
076
077 private static int parseOneArg(String funcName) throws Exception {
078 int firstPos = funcName.indexOf("(");
079 int lastPos = funcName.lastIndexOf(")");
080 if (firstPos >= 0 && lastPos > firstPos) {
081 String tmp = funcName.substring(firstPos + 1, lastPos).trim();
082 if (tmp.length() > 0) {
083 return (int) Double.parseDouble(tmp);
084 }
085 }
086 throw new RuntimeException("Unformatted function :" + funcName);
087 }
088
089 private static int parseMoreArgs(String funcName, StringBuilder restArg) throws Exception {
090 int firstPos = funcName.indexOf("(");
091 int secondPos = funcName.lastIndexOf(",");
092 int lastPos = funcName.lastIndexOf(")");
093 if (firstPos >= 0 && secondPos > firstPos) {
094 String tmp = funcName.substring(firstPos + 1, secondPos).trim();
095 if (tmp.length() > 0) {
096 restArg.append(funcName.substring(secondPos + 1, lastPos).trim());
097 return (int) Double.parseDouble(tmp);
098 }
099 }
100 throw new RuntimeException("Unformatted function :" + funcName);
101 }
102
103 /**
104 * @param EL function name
105 * @return type of EL function
106 */
107 public static int getFuncType(String function) {
108 if (function.indexOf("current") >= 0) {
109 return CURRENT;
110 }
111 else if (function.indexOf("latest") >= 0) {
112 return LATEST;
113 }
114 else if (function.indexOf("future") >= 0) {
115 return FUTURE;
116 }
117 else if (function.indexOf("offset") >= 0) {
118 return OFFSET;
119 }
120 return UNEXPECTED;
121 // throw new RuntimeException("Unexpected instance name "+ function);
122 }
123
124 /**
125 * @param startInst: EL function name
126 * @param endInst: EL function name
127 * @throws CommandException if both are not the same function
128 */
129 public static void checkIfBothSameType(String startInst, String endInst) throws CommandException {
130 if (getFuncType(startInst) != getFuncType(endInst)) {
131 throw new CommandException(ErrorCode.E1010,
132 " start-instance and end-instance both should be either latest or current or future or offset\n"
133 + " start " + startInst + " and end " + endInst);
134 }
135 }
136
137 /**
138 * Resolve list of <instance> </instance> tags.
139 *
140 * @param event
141 * @param instances
142 * @param actionInst
143 * @param conf
144 * @param eval: ELEvalautor
145 * @throws Exception
146 */
147 public static void resolveInstances(Element event, StringBuilder instances, SyncCoordAction actionInst,
148 Configuration conf, ELEvaluator eval) throws Exception {
149 for (Element eInstance : (List<Element>) event.getChildren("instance", event.getNamespace())) {
150 if (instances.length() > 0) {
151 instances.append(CoordELFunctions.INSTANCE_SEPARATOR);
152 }
153 instances.append(materializeInstance(event, eInstance.getTextTrim(), actionInst, conf, eval));
154 }
155 event.removeChildren("instance", event.getNamespace());
156 }
157
158 /**
159 * Resolve <start-instance> <end-insatnce> tag. Don't resolve any
160 * latest()/future()
161 *
162 * @param event
163 * @param instances
164 * @param appInst
165 * @param conf
166 * @param eval: ELEvalautor
167 * @throws Exception
168 */
169 public static void resolveInstanceRange(Element event, StringBuilder instances, SyncCoordAction appInst,
170 Configuration conf, ELEvaluator eval) throws Exception {
171 Element eStartInst = event.getChild("start-instance", event.getNamespace());
172 Element eEndInst = event.getChild("end-instance", event.getNamespace());
173 if (eStartInst != null && eEndInst != null) {
174 String strStart = eStartInst.getTextTrim();
175 String strEnd = eEndInst.getTextTrim();
176 checkIfBothSameType(strStart, strEnd);
177 StringBuilder restArg = new StringBuilder(); // To store rest
178 // arguments for
179 // future
180 // function
181 int startIndex = getInstanceNumber(strStart, event, appInst, conf, restArg);
182 String startRestArg = restArg.toString();
183 restArg.delete(0, restArg.length());
184 int endIndex = getInstanceNumber(strEnd, event, appInst, conf, restArg);
185 String endRestArg = restArg.toString();
186 int funcType = getFuncType(strStart);
187 if (funcType == OFFSET) {
188 TimeUnit startU = TimeUnit.valueOf(startRestArg);
189 TimeUnit endU = TimeUnit.valueOf(endRestArg);
190 if (startU.getCalendarUnit() * startIndex > endU.getCalendarUnit() * endIndex) {
191 throw new CommandException(ErrorCode.E1010,
192 " start-instance should be equal or earlier than the end-instance \n"
193 + XmlUtils.prettyPrint(event));
194 }
195 Calendar startCal = CoordELFunctions.resolveOffsetRawTime(startIndex, startU, eval);
196 Calendar endCal = CoordELFunctions.resolveOffsetRawTime(endIndex, endU, eval);
197 if (startCal != null && endCal != null) {
198 List<Integer> expandedFreqs = CoordELFunctions.expandOffsetTimes(startCal, endCal, eval);
199 for (int i = expandedFreqs.size() - 1; i >= 0; i--) {
200 String matInstance = materializeInstance(event, "${coord:offset(" + expandedFreqs.get(i) + ", \"MINUTE\")}",
201 appInst, conf, eval);
202 if (matInstance == null || matInstance.length() == 0) {
203 // Earlier than dataset's initial instance
204 break;
205 }
206 if (instances.length() > 0) {
207 instances.append(CoordELFunctions.INSTANCE_SEPARATOR);
208 }
209 instances.append(matInstance);
210 }
211 }
212 }
213 else {
214 if (startIndex > endIndex) {
215 throw new CommandException(ErrorCode.E1010,
216 " start-instance should be equal or earlier than the end-instance \n"
217 + XmlUtils.prettyPrint(event));
218 }
219 if (funcType == CURRENT) {
220 // Everything could be resolved NOW. no latest() ELs
221 for (int i = endIndex; i >= startIndex; i--) {
222 String matInstance = materializeInstance(event, "${coord:current(" + i + ")}", appInst, conf, eval);
223 if (matInstance == null || matInstance.length() == 0) {
224 // Earlier than dataset's initial instance
225 break;
226 }
227 if (instances.length() > 0) {
228 instances.append(CoordELFunctions.INSTANCE_SEPARATOR);
229 }
230 instances.append(matInstance);
231 }
232 }
233 else { // latest(n)/future() EL is present
234 if (funcType == LATEST) {
235 instances.append("${coord:latestRange(").append(startIndex).append(",").append(endIndex)
236 .append(")}");
237 }
238 else if (funcType == FUTURE) {
239 instances.append("${coord:futureRange(").append(startIndex).append(",").append(endIndex)
240 .append(",'").append(endRestArg).append("')}");
241 }
242 }
243 }
244 // Remove start-instance and end-instances
245 event.removeChild("start-instance", event.getNamespace());
246 event.removeChild("end-instance", event.getNamespace());
247 }
248 }
249
250 /**
251 * Materialize one instance like current(-2)
252 *
253 * @param event : <data-in>
254 * @param expr : instance like current(-1)
255 * @param appInst : application specific info
256 * @param conf
257 * @param evalInst :ELEvaluator
258 * @return materialized date string
259 * @throws Exception
260 */
261 public static String materializeInstance(Element event, String expr, SyncCoordAction appInst, Configuration conf,
262 ELEvaluator evalInst) throws Exception {
263 if (event == null) {
264 return null;
265 }
266 // ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event,
267 // appInst, conf);
268 return CoordELFunctions.evalAndWrap(evalInst, expr);
269 }
270
271 /**
272 * Create two new tags with <uris> and <unresolved-instances>.
273 *
274 * @param event
275 * @param instances
276 * @param dependencyList
277 * @throws Exception
278 */
279 public static void separateResolvedAndUnresolved(Element event, StringBuilder instances, StringBuffer dependencyList)
280 throws Exception {
281 StringBuilder unresolvedInstances = new StringBuilder();
282 StringBuilder urisWithDoneFlag = new StringBuilder();
283 String uris = createEarlyURIs(event, instances.toString(), unresolvedInstances, urisWithDoneFlag);
284 if (uris.length() > 0) {
285 Element uriInstance = new Element("uris", event.getNamespace());
286 uriInstance.addContent(uris);
287 event.getContent().add(1, uriInstance);
288 if (dependencyList.length() > 0) {
289 dependencyList.append(CoordELFunctions.INSTANCE_SEPARATOR);
290 }
291 dependencyList.append(urisWithDoneFlag);
292 }
293 if (unresolvedInstances.length() > 0) {
294 Element elemInstance = new Element("unresolved-instances", event.getNamespace());
295 elemInstance.addContent(unresolvedInstances.toString());
296 event.getContent().add(1, elemInstance);
297 }
298 }
299
300 /**
301 * The function create a list of URIs separated by "," using the instances
302 * time stamp and URI-template
303 *
304 * @param event : <data-in> event
305 * @param instances : List of time stamp separated by ","
306 * @param unresolvedInstances : list of instance with latest function
307 * @param urisWithDoneFlag : list of URIs with the done flag appended
308 * @return : list of URIs separated by ";" as a string.
309 * @throws Exception
310 */
311 public static String createEarlyURIs(Element event, String instances, StringBuilder unresolvedInstances,
312 StringBuilder urisWithDoneFlag) throws Exception {
313 if (instances == null || instances.length() == 0) {
314 return "";
315 }
316 String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR);
317 StringBuilder uris = new StringBuilder();
318
319 Element doneFlagElement = event.getChild("dataset", event.getNamespace()).getChild("done-flag",
320 event.getNamespace());
321 String doneFlag = CoordUtils.getDoneFlag(doneFlagElement);
322
323 for (int i = 0; i < instanceList.length; i++) {
324 if(instanceList[i].trim().length() == 0) {
325 continue;
326 }
327 int funcType = getFuncType(instanceList[i]);
328 if (funcType == LATEST || funcType == FUTURE) {
329 if (unresolvedInstances.length() > 0) {
330 unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR);
331 }
332 unresolvedInstances.append(instanceList[i]);
333 continue;
334 }
335 ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]);
336 if (uris.length() > 0) {
337 uris.append(CoordELFunctions.INSTANCE_SEPARATOR);
338 urisWithDoneFlag.append(CoordELFunctions.INSTANCE_SEPARATOR);
339 }
340
341 String uriPath = CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace())
342 .getChild("uri-template", event.getNamespace()).getTextTrim());
343 uris.append(uriPath);
344 if (doneFlag.length() > 0) {
345 uriPath += "/" + doneFlag;
346 }
347 urisWithDoneFlag.append(uriPath);
348 }
349 return uris.toString();
350 }
351
352 /**
353 * @param eSla
354 * @param nominalTime
355 * @param conf
356 * @return boolean to determine whether the SLA element is present or not
357 * @throws CoordinatorJobException
358 */
359 public static boolean materializeSLA(Element eSla, Date nominalTime, Configuration conf)
360 throws CoordinatorJobException {
361 if (eSla == null) {
362 // eAppXml.getNamespace("sla"));
363 return false;
364 }
365 try {
366 ELEvaluator evalSla = CoordELEvaluator.createSLAEvaluator(nominalTime, conf);
367 List<Element> elemList = eSla.getChildren();
368 for (Element elem : elemList) {
369 String updated;
370 try {
371 updated = CoordELFunctions.evalAndWrap(evalSla, elem.getText().trim());
372 }
373 catch (Exception e) {
374 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
375 }
376 elem.removeContent();
377 elem.addContent(updated);
378 }
379 }
380 catch (Exception e) {
381 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
382 }
383 return true;
384 }
385
386 /**
387 * Materialize one instance for specific nominal time. It includes: 1.
388 * Materialize data events (i.e. <data-in> and <data-out>) 2. Materialize
389 * data properties (i.e dataIn(<DS>) and dataOut(<DS>) 3. remove 'start' and
390 * 'end' tag 4. Add 'instance_number' and 'nominal-time' tag
391 *
392 * @param jobId coordinator job id
393 * @param dryrun true if it is dryrun
394 * @param eAction frequency unexploded-job
395 * @param nominalTime materialization time
396 * @param actualTime action actual time
397 * @param instanceCount instance numbers
398 * @param conf job configuration
399 * @param actionBean CoordinatorActionBean to materialize
400 * @return one materialized action for specific nominal time
401 * @throws Exception
402 */
403 @SuppressWarnings("unchecked")
404 public static String materializeOneInstance(String jobId, boolean dryrun, Element eAction, Date nominalTime,
405 Date actualTime, int instanceCount, Configuration conf, CoordinatorActionBean actionBean) throws Exception {
406 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, instanceCount + "");
407 SyncCoordAction appInst = new SyncCoordAction();
408 appInst.setActionId(actionId);
409 appInst.setName(eAction.getAttributeValue("name"));
410 appInst.setNominalTime(nominalTime);
411 appInst.setActualTime(actualTime);
412 int frequency = Integer.parseInt(eAction.getAttributeValue("frequency"));
413 appInst.setFrequency(frequency);
414 appInst.setTimeUnit(TimeUnit.valueOf(eAction.getAttributeValue("freq_timeunit")));
415 appInst.setTimeZone(DateUtils.getTimeZone(eAction.getAttributeValue("timezone")));
416 appInst.setEndOfDuration(TimeUnit.valueOf(eAction.getAttributeValue("end_of_duration")));
417
418 StringBuffer dependencyList = new StringBuffer();
419
420 Element inputList = eAction.getChild("input-events", eAction.getNamespace());
421 List<Element> dataInList = null;
422 if (inputList != null) {
423 dataInList = inputList.getChildren("data-in", eAction.getNamespace());
424 materializeDataEvents(dataInList, appInst, conf, dependencyList);
425 }
426
427 Element outputList = eAction.getChild("output-events", eAction.getNamespace());
428 List<Element> dataOutList = null;
429 if (outputList != null) {
430 dataOutList = outputList.getChildren("data-out", eAction.getNamespace());
431 StringBuffer tmp = new StringBuffer();
432 // no dependency checks
433 materializeDataEvents(dataOutList, appInst, conf, tmp);
434 }
435
436 eAction.removeAttribute("start");
437 eAction.removeAttribute("end");
438 eAction.setAttribute("instance-number", Integer.toString(instanceCount));
439 eAction.setAttribute("action-nominal-time", DateUtils.formatDateOozieTZ(nominalTime));
440 eAction.setAttribute("action-actual-time", DateUtils.formatDateOozieTZ(actualTime));
441
442 boolean isSla = CoordCommandUtils.materializeSLA(eAction.getChild("action", eAction.getNamespace()).getChild(
443 "info", eAction.getNamespace("sla")), nominalTime, conf);
444
445 // Setting up action bean
446 actionBean.setCreatedConf(XmlUtils.prettyPrint(conf).toString());
447 actionBean.setRunConf(XmlUtils.prettyPrint(conf).toString());
448 actionBean.setCreatedTime(actualTime);
449 actionBean.setJobId(jobId);
450 actionBean.setId(actionId);
451 actionBean.setLastModifiedTime(new Date());
452 actionBean.setStatus(CoordinatorAction.Status.WAITING);
453 actionBean.setActionNumber(instanceCount);
454 actionBean.setMissingDependencies(dependencyList.toString());
455 actionBean.setNominalTime(nominalTime);
456 if (isSla == true) {
457 actionBean.setSlaXml(XmlUtils.prettyPrint(
458 eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")))
459 .toString());
460 }
461
462 // actionBean.setTrackerUri(trackerUri);//TOOD:
463 // actionBean.setConsoleUrl(consoleUrl); //TODO:
464 // actionBean.setType(type);//TODO:
465 // actionBean.setErrorInfo(errorCode, errorMessage); //TODO:
466 // actionBean.setExternalStatus(externalStatus);//TODO
467 if (!dryrun) {
468 return XmlUtils.prettyPrint(eAction).toString();
469 }
470 else {
471 String action = XmlUtils.prettyPrint(eAction).toString();
472 CoordActionInputCheckXCommand coordActionInput = new CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId());
473 StringBuilder actionXml = new StringBuilder(action);
474 StringBuilder existList = new StringBuilder();
475 StringBuilder nonExistList = new StringBuilder();
476 StringBuilder nonResolvedList = new StringBuilder();
477 getResolvedList(actionBean.getMissingDependencies(), nonExistList, nonResolvedList);
478 Configuration actionConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
479 coordActionInput.checkInput(actionXml, existList, nonExistList, actionConf);
480 return actionXml.toString();
481 }
482 }
483
484 /**
485 * Materialize all <input-events>/<data-in> or <output-events>/<data-out>
486 * tags Create uris for resolved instances. Create unresolved instance for
487 * latest()/future().
488 *
489 * @param events
490 * @param appInst
491 * @param conf
492 * @throws Exception
493 */
494 public static void materializeDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf,
495 StringBuffer dependencyList) throws Exception {
496
497 if (events == null) {
498 return;
499 }
500 StringBuffer unresolvedList = new StringBuffer();
501 for (Element event : events) {
502 StringBuilder instances = new StringBuilder();
503 ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, appInst, conf);
504 // Handle list of instance tag
505 resolveInstances(event, instances, appInst, conf, eval);
506 // Handle start-instance and end-instance
507 resolveInstanceRange(event, instances, appInst, conf, eval);
508 // Separate out the unresolved instances
509 separateResolvedAndUnresolved(event, instances, dependencyList);
510 String tmpUnresolved = event.getChildTextTrim("unresolved-instances", event.getNamespace());
511 if (tmpUnresolved != null) {
512 if (unresolvedList.length() > 0) {
513 unresolvedList.append(CoordELFunctions.INSTANCE_SEPARATOR);
514 }
515 unresolvedList.append(tmpUnresolved);
516 }
517 }
518 if (unresolvedList.length() > 0) {
519 dependencyList.append(RESOLVED_UNRESOLVED_SEPARATOR);
520 dependencyList.append(unresolvedList);
521 }
522 return;
523 }
524
525 /**
526 * Get resolved string from missDepList
527 *
528 * @param missDepList
529 * @param resolved
530 * @param unresolved
531 * @return resolved string
532 */
533 public static String getResolvedList(String missDepList, StringBuilder resolved, StringBuilder unresolved) {
534 if (missDepList != null) {
535 int index = missDepList.indexOf(RESOLVED_UNRESOLVED_SEPARATOR);
536 if (index < 0) {
537 resolved.append(missDepList);
538 }
539 else {
540 resolved.append(missDepList.substring(0, index));
541 unresolved.append(missDepList.substring(index + 1));
542 }
543 }
544 return resolved.toString();
545 }
546
547 }