This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.StringReader;
021 import java.util.Date;
022 import java.util.List;
023
024 import org.apache.hadoop.conf.Configuration;
025 import org.apache.oozie.CoordinatorActionBean;
026 import org.apache.oozie.ErrorCode;
027 import org.apache.oozie.client.CoordinatorAction;
028 import org.apache.oozie.command.CommandException;
029 import org.apache.oozie.coord.CoordELEvaluator;
030 import org.apache.oozie.coord.CoordELFunctions;
031 import org.apache.oozie.coord.CoordUtils;
032 import org.apache.oozie.coord.CoordinatorJobException;
033 import org.apache.oozie.coord.SyncCoordAction;
034 import org.apache.oozie.coord.TimeUnit;
035 import org.apache.oozie.service.Services;
036 import org.apache.oozie.service.UUIDService;
037 import org.apache.oozie.util.DateUtils;
038 import org.apache.oozie.util.ELEvaluator;
039 import org.apache.oozie.util.XConfiguration;
040 import org.apache.oozie.util.XmlUtils;
041 import org.jdom.Element;
042
043 public class CoordCommandUtils {
044 public static int CURRENT = 0;
045 public static int LATEST = 1;
046 public static int FUTURE = 2;
047 public static int UNEXPECTED = -1;
048 public static final String RESOLVED_UNRESOLVED_SEPARATOR = ";";
049
050 /**
051 * parse a function like coord:latest(n)/future() and return the 'n'.
052 * <p/>
053 * @param function
054 * @param event
055 * @param appInst
056 * @param conf
057 * @param restArg
058 * @return int instanceNumber
059 * @throws Exception
060 */
061 public static int getInstanceNumber(String function, Element event, SyncCoordAction appInst, Configuration conf,
062 StringBuilder restArg) throws Exception {
063 ELEvaluator eval = CoordELEvaluator
064 .createInstancesELEvaluator("coord-action-create-inst", event, appInst, conf);
065 String newFunc = CoordELFunctions.evalAndWrap(eval, function);
066 int funcType = getFuncType(newFunc);
067 if (funcType == CURRENT || funcType == LATEST) {
068 return parseOneArg(newFunc);
069 }
070 else {
071 return parseMoreArgs(newFunc, restArg);
072 }
073 }
074
075 private static int parseOneArg(String funcName) throws Exception {
076 int firstPos = funcName.indexOf("(");
077 int lastPos = funcName.lastIndexOf(")");
078 if (firstPos >= 0 && lastPos > firstPos) {
079 String tmp = funcName.substring(firstPos + 1, lastPos).trim();
080 if (tmp.length() > 0) {
081 return (int) Double.parseDouble(tmp);
082 }
083 }
084 throw new RuntimeException("Unformatted function :" + funcName);
085 }
086
087 private static int parseMoreArgs(String funcName, StringBuilder restArg) throws Exception {
088 int firstPos = funcName.indexOf("(");
089 int secondPos = funcName.lastIndexOf(",");
090 int lastPos = funcName.lastIndexOf(")");
091 if (firstPos >= 0 && secondPos > firstPos) {
092 String tmp = funcName.substring(firstPos + 1, secondPos).trim();
093 if (tmp.length() > 0) {
094 restArg.append(funcName.substring(secondPos + 1, lastPos).trim());
095 return (int) Double.parseDouble(tmp);
096 }
097 }
098 throw new RuntimeException("Unformatted function :" + funcName);
099 }
100
101 /**
102 * @param EL function name
103 * @return type of EL function
104 */
105 public static int getFuncType(String function) {
106 if (function.indexOf("current") >= 0) {
107 return CURRENT;
108 }
109 else if (function.indexOf("latest") >= 0) {
110 return LATEST;
111 }
112 else if (function.indexOf("future") >= 0) {
113 return FUTURE;
114 }
115 return UNEXPECTED;
116 // throw new RuntimeException("Unexpected instance name "+ function);
117 }
118
119 /**
120 * @param startInst: EL function name
121 * @param endInst: EL function name
122 * @throws CommandException if both are not the same function
123 */
124 public static void checkIfBothSameType(String startInst, String endInst) throws CommandException {
125 if (getFuncType(startInst) != getFuncType(endInst)) {
126 throw new CommandException(ErrorCode.E1010,
127 " start-instance and end-instance both should be either latest or current or future\n"
128 + " start " + startInst + " and end " + endInst);
129 }
130 }
131
132 /**
133 * Resolve list of <instance> </instance> tags.
134 *
135 * @param event
136 * @param instances
137 * @param actionInst
138 * @param conf
139 * @param eval: ELEvalautor
140 * @throws Exception
141 */
142 public static void resolveInstances(Element event, StringBuilder instances, SyncCoordAction actionInst,
143 Configuration conf, ELEvaluator eval) throws Exception {
144 for (Element eInstance : (List<Element>) event.getChildren("instance", event.getNamespace())) {
145 if (instances.length() > 0) {
146 instances.append(CoordELFunctions.INSTANCE_SEPARATOR);
147 }
148 instances.append(materializeInstance(event, eInstance.getTextTrim(), actionInst, conf, eval));
149 }
150 event.removeChildren("instance", event.getNamespace());
151 }
152
153 /**
154 * Resolve <start-instance> <end-insatnce> tag. Don't resolve any
155 * latest()/future()
156 *
157 * @param event
158 * @param instances
159 * @param appInst
160 * @param conf
161 * @param eval: ELEvalautor
162 * @throws Exception
163 */
164 public static void resolveInstanceRange(Element event, StringBuilder instances, SyncCoordAction appInst,
165 Configuration conf, ELEvaluator eval) throws Exception {
166 Element eStartInst = event.getChild("start-instance", event.getNamespace());
167 Element eEndInst = event.getChild("end-instance", event.getNamespace());
168 if (eStartInst != null && eEndInst != null) {
169 String strStart = eStartInst.getTextTrim();
170 String strEnd = eEndInst.getTextTrim();
171 checkIfBothSameType(strStart, strEnd);
172 StringBuilder restArg = new StringBuilder(); // To store rest
173 // arguments for
174 // future
175 // function
176 int startIndex = getInstanceNumber(strStart, event, appInst, conf, restArg);
177 restArg.delete(0, restArg.length());
178 int endIndex = getInstanceNumber(strEnd, event, appInst, conf, restArg);
179 if (startIndex > endIndex) {
180 throw new CommandException(ErrorCode.E1010,
181 " start-instance should be equal or earlier than the end-instance \n"
182 + XmlUtils.prettyPrint(event));
183 }
184 int funcType = getFuncType(strStart);
185 if (funcType == CURRENT) {
186 // Everything could be resolved NOW. no latest() ELs
187 for (int i = endIndex; i >= startIndex; i--) {
188 String matInstance = materializeInstance(event, "${coord:current(" + i + ")}", appInst, conf, eval);
189 if (matInstance == null || matInstance.length() == 0) {
190 // Earlier than dataset's initial instance
191 break;
192 }
193 if (instances.length() > 0) {
194 instances.append(CoordELFunctions.INSTANCE_SEPARATOR);
195 }
196 instances.append(matInstance);
197 }
198 }
199 else { // latest(n)/future() EL is present
200 for (; startIndex <= endIndex; startIndex++) {
201 if (instances.length() > 0) {
202 instances.append(CoordELFunctions.INSTANCE_SEPARATOR);
203 }
204 if (funcType == LATEST) {
205 instances.append("${coord:latest(" + startIndex + ")}");
206 }
207 else { // For future
208 instances.append("${coord:future(" + startIndex + ",'" + restArg + "')}");
209 }
210 }
211 }
212 // Remove start-instance and end-instances
213 event.removeChild("start-instance", event.getNamespace());
214 event.removeChild("end-instance", event.getNamespace());
215 }
216 }
217
218 /**
219 * Materialize one instance like current(-2)
220 *
221 * @param event : <data-in>
222 * @param expr : instance like current(-1)
223 * @param appInst : application specific info
224 * @param conf
225 * @param evalInst :ELEvaluator
226 * @return materialized date string
227 * @throws Exception
228 */
229 public static String materializeInstance(Element event, String expr, SyncCoordAction appInst, Configuration conf,
230 ELEvaluator evalInst) throws Exception {
231 if (event == null) {
232 return null;
233 }
234 // ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event,
235 // appInst, conf);
236 return CoordELFunctions.evalAndWrap(evalInst, expr);
237 }
238
239 /**
240 * Create two new tags with <uris> and <unresolved-instances>.
241 *
242 * @param event
243 * @param instances
244 * @param dependencyList
245 * @throws Exception
246 */
247 public static void separateResolvedAndUnresolved(Element event, StringBuilder instances, StringBuffer dependencyList)
248 throws Exception {
249 StringBuilder unresolvedInstances = new StringBuilder();
250 StringBuilder urisWithDoneFlag = new StringBuilder();
251 String uris = createEarlyURIs(event, instances.toString(), unresolvedInstances, urisWithDoneFlag);
252 if (uris.length() > 0) {
253 Element uriInstance = new Element("uris", event.getNamespace());
254 uriInstance.addContent(uris);
255 event.getContent().add(1, uriInstance);
256 if (dependencyList.length() > 0) {
257 dependencyList.append(CoordELFunctions.INSTANCE_SEPARATOR);
258 }
259 dependencyList.append(urisWithDoneFlag);
260 }
261 if (unresolvedInstances.length() > 0) {
262 Element elemInstance = new Element("unresolved-instances", event.getNamespace());
263 elemInstance.addContent(unresolvedInstances.toString());
264 event.getContent().add(1, elemInstance);
265 }
266 }
267
268 /**
269 * The function create a list of URIs separated by "," using the instances
270 * time stamp and URI-template
271 *
272 * @param event : <data-in> event
273 * @param instances : List of time stamp separated by ","
274 * @param unresolvedInstances : list of instance with latest function
275 * @param urisWithDoneFlag : list of URIs with the done flag appended
276 * @return : list of URIs separated by ";" as a string.
277 * @throws Exception
278 */
279 public static String createEarlyURIs(Element event, String instances, StringBuilder unresolvedInstances,
280 StringBuilder urisWithDoneFlag) throws Exception {
281 if (instances == null || instances.length() == 0) {
282 return "";
283 }
284 String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR);
285 StringBuilder uris = new StringBuilder();
286
287 Element doneFlagElement = event.getChild("dataset", event.getNamespace()).getChild("done-flag",
288 event.getNamespace());
289 String doneFlag = CoordUtils.getDoneFlag(doneFlagElement);
290
291 for (int i = 0; i < instanceList.length; i++) {
292 if(instanceList[i].trim().length() == 0) {
293 continue;
294 }
295 int funcType = getFuncType(instanceList[i]);
296 if (funcType == LATEST || funcType == FUTURE) {
297 if (unresolvedInstances.length() > 0) {
298 unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR);
299 }
300 unresolvedInstances.append(instanceList[i]);
301 continue;
302 }
303 ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]);
304 if (uris.length() > 0) {
305 uris.append(CoordELFunctions.INSTANCE_SEPARATOR);
306 urisWithDoneFlag.append(CoordELFunctions.INSTANCE_SEPARATOR);
307 }
308
309 String uriPath = CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace())
310 .getChild("uri-template", event.getNamespace()).getTextTrim());
311 uris.append(uriPath);
312 if (doneFlag.length() > 0) {
313 uriPath += "/" + doneFlag;
314 }
315 urisWithDoneFlag.append(uriPath);
316 }
317 return uris.toString();
318 }
319
320 /**
321 * @param eSla
322 * @param nominalTime
323 * @param conf
324 * @return boolean to determine whether the SLA element is present or not
325 * @throws CoordinatorJobException
326 */
327 public static boolean materializeSLA(Element eSla, Date nominalTime, Configuration conf)
328 throws CoordinatorJobException {
329 if (eSla == null) {
330 // eAppXml.getNamespace("sla"));
331 return false;
332 }
333 try {
334 ELEvaluator evalSla = CoordELEvaluator.createSLAEvaluator(nominalTime, conf);
335 List<Element> elemList = eSla.getChildren();
336 for (Element elem : elemList) {
337 String updated;
338 try {
339 updated = CoordELFunctions.evalAndWrap(evalSla, elem.getText().trim());
340 }
341 catch (Exception e) {
342 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
343 }
344 elem.removeContent();
345 elem.addContent(updated);
346 }
347 }
348 catch (Exception e) {
349 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
350 }
351 return true;
352 }
353
354 /**
355 * Materialize one instance for specific nominal time. It includes: 1.
356 * Materialize data events (i.e. <data-in> and <data-out>) 2. Materialize
357 * data properties (i.e dataIn(<DS>) and dataOut(<DS>) 3. remove 'start' and
358 * 'end' tag 4. Add 'instance_number' and 'nominal-time' tag
359 *
360 * @param jobId coordinator job id
361 * @param dryrun true if it is dryrun
362 * @param eAction frequency unexploded-job
363 * @param nominalTime materialization time
364 * @param actualTime action actual time
365 * @param instanceCount instance numbers
366 * @param conf job configuration
367 * @param actionBean CoordinatorActionBean to materialize
368 * @return one materialized action for specific nominal time
369 * @throws Exception
370 */
371 @SuppressWarnings("unchecked")
372 public static String materializeOneInstance(String jobId, boolean dryrun, Element eAction, Date nominalTime,
373 Date actualTime, int instanceCount, Configuration conf, CoordinatorActionBean actionBean) throws Exception {
374 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, instanceCount + "");
375 SyncCoordAction appInst = new SyncCoordAction();
376 appInst.setActionId(actionId);
377 appInst.setName(eAction.getAttributeValue("name"));
378 appInst.setNominalTime(nominalTime);
379 appInst.setActualTime(actualTime);
380 int frequency = Integer.parseInt(eAction.getAttributeValue("frequency"));
381 appInst.setFrequency(frequency);
382 appInst.setTimeUnit(TimeUnit.valueOf(eAction.getAttributeValue("freq_timeunit")));
383 appInst.setTimeZone(DateUtils.getTimeZone(eAction.getAttributeValue("timezone")));
384 appInst.setEndOfDuration(TimeUnit.valueOf(eAction.getAttributeValue("end_of_duration")));
385
386 StringBuffer dependencyList = new StringBuffer();
387
388 Element inputList = eAction.getChild("input-events", eAction.getNamespace());
389 List<Element> dataInList = null;
390 if (inputList != null) {
391 dataInList = inputList.getChildren("data-in", eAction.getNamespace());
392 materializeDataEvents(dataInList, appInst, conf, dependencyList);
393 }
394
395 Element outputList = eAction.getChild("output-events", eAction.getNamespace());
396 List<Element> dataOutList = null;
397 if (outputList != null) {
398 dataOutList = outputList.getChildren("data-out", eAction.getNamespace());
399 StringBuffer tmp = new StringBuffer();
400 // no dependency checks
401 materializeDataEvents(dataOutList, appInst, conf, tmp);
402 }
403
404 eAction.removeAttribute("start");
405 eAction.removeAttribute("end");
406 eAction.setAttribute("instance-number", Integer.toString(instanceCount));
407 eAction.setAttribute("action-nominal-time", DateUtils.formatDateUTC(nominalTime));
408 eAction.setAttribute("action-actual-time", DateUtils.formatDateUTC(actualTime));
409
410 boolean isSla = CoordCommandUtils.materializeSLA(eAction.getChild("action", eAction.getNamespace()).getChild(
411 "info", eAction.getNamespace("sla")), nominalTime, conf);
412
413 // Setting up action bean
414 actionBean.setCreatedConf(XmlUtils.prettyPrint(conf).toString());
415 actionBean.setRunConf(XmlUtils.prettyPrint(conf).toString());
416 actionBean.setCreatedTime(actualTime);
417 actionBean.setJobId(jobId);
418 actionBean.setId(actionId);
419 actionBean.setLastModifiedTime(new Date());
420 actionBean.setStatus(CoordinatorAction.Status.WAITING);
421 actionBean.setActionNumber(instanceCount);
422 actionBean.setMissingDependencies(dependencyList.toString());
423 actionBean.setNominalTime(nominalTime);
424 if (isSla == true) {
425 actionBean.setSlaXml(XmlUtils.prettyPrint(
426 eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")))
427 .toString());
428 }
429
430 // actionBean.setTrackerUri(trackerUri);//TOOD:
431 // actionBean.setConsoleUrl(consoleUrl); //TODO:
432 // actionBean.setType(type);//TODO:
433 // actionBean.setErrorInfo(errorCode, errorMessage); //TODO:
434 // actionBean.setExternalStatus(externalStatus);//TODO
435 if (!dryrun) {
436 return XmlUtils.prettyPrint(eAction).toString();
437 }
438 else {
439 String action = XmlUtils.prettyPrint(eAction).toString();
440 CoordActionInputCheckXCommand coordActionInput = new CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId());
441 StringBuilder actionXml = new StringBuilder(action);
442 StringBuilder existList = new StringBuilder();
443 StringBuilder nonExistList = new StringBuilder();
444 StringBuilder nonResolvedList = new StringBuilder();
445 getResolvedList(actionBean.getMissingDependencies(), nonExistList, nonResolvedList);
446 Configuration actionConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
447 coordActionInput.checkInput(actionXml, existList, nonExistList, actionConf);
448 return actionXml.toString();
449 }
450 }
451
452 /**
453 * Materialize all <input-events>/<data-in> or <output-events>/<data-out>
454 * tags Create uris for resolved instances. Create unresolved instance for
455 * latest()/future().
456 *
457 * @param events
458 * @param appInst
459 * @param conf
460 * @throws Exception
461 */
462 public static void materializeDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf,
463 StringBuffer dependencyList) throws Exception {
464
465 if (events == null) {
466 return;
467 }
468 StringBuffer unresolvedList = new StringBuffer();
469 for (Element event : events) {
470 StringBuilder instances = new StringBuilder();
471 ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, appInst, conf);
472 // Handle list of instance tag
473 resolveInstances(event, instances, appInst, conf, eval);
474 // Handle start-instance and end-instance
475 resolveInstanceRange(event, instances, appInst, conf, eval);
476 // Separate out the unresolved instances
477 separateResolvedAndUnresolved(event, instances, dependencyList);
478 String tmpUnresolved = event.getChildTextTrim("unresolved-instances", event.getNamespace());
479 if (tmpUnresolved != null) {
480 if (unresolvedList.length() > 0) {
481 unresolvedList.append(CoordELFunctions.INSTANCE_SEPARATOR);
482 }
483 unresolvedList.append(tmpUnresolved);
484 }
485 }
486 if (unresolvedList.length() > 0) {
487 dependencyList.append(RESOLVED_UNRESOLVED_SEPARATOR);
488 dependencyList.append(unresolvedList);
489 }
490 return;
491 }
492
493 /**
494 * Get resolved string from missDepList
495 *
496 * @param missDepList
497 * @param resolved
498 * @param unresolved
499 * @return resolved string
500 */
501 public static String getResolvedList(String missDepList, StringBuilder resolved, StringBuilder unresolved) {
502 if (missDepList != null) {
503 int index = missDepList.indexOf(RESOLVED_UNRESOLVED_SEPARATOR);
504 if (index < 0) {
505 resolved.append(missDepList);
506 }
507 else {
508 resolved.append(missDepList.substring(0, index));
509 unresolved.append(missDepList.substring(index + 1));
510 }
511 }
512 return resolved.toString();
513 }
514
515 }