This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.util.Date;
023 import java.util.List;
024
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.fs.Path;
027 import org.apache.oozie.CoordinatorActionBean;
028 import org.apache.oozie.CoordinatorJobBean;
029 import org.apache.oozie.ErrorCode;
030 import org.apache.oozie.client.CoordinatorAction;
031 import org.apache.oozie.client.Job;
032 import org.apache.oozie.client.OozieClient;
033 import org.apache.oozie.command.CommandException;
034 import org.apache.oozie.command.PreconditionException;
035 import org.apache.oozie.coord.CoordELEvaluator;
036 import org.apache.oozie.coord.CoordELFunctions;
037 import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
038 import org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor;
039 import org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor;
040 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
041 import org.apache.oozie.executor.jpa.JPAExecutorException;
042 import org.apache.oozie.service.HadoopAccessorException;
043 import org.apache.oozie.service.HadoopAccessorService;
044 import org.apache.oozie.service.JPAService;
045 import org.apache.oozie.service.Service;
046 import org.apache.oozie.service.Services;
047 import org.apache.oozie.util.DateUtils;
048 import org.apache.oozie.util.ELEvaluator;
049 import org.apache.oozie.util.Instrumentation;
050 import org.apache.oozie.util.LogUtils;
051 import org.apache.oozie.util.ParamChecker;
052 import org.apache.oozie.util.StatusUtils;
053 import org.apache.oozie.util.XConfiguration;
054 import org.apache.oozie.util.XmlUtils;
055 import org.jdom.Element;
056
057 /**
058 * The command to check if an action's data input paths exist in the file system.
059 */
060 public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
061
062 private final String actionId;
063 /**
064 * Property name of command re-queue interval for coordinator action input check in
065 * milliseconds.
066 */
067 public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX
068 + "coord.input.check.requeue.interval";
069 /**
070 * Default re-queue interval in ms. It is applied when no value defined in
071 * the oozie configuration.
072 */
073 private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 60000; // 1 minute
074 private CoordinatorActionBean coordAction = null;
075 private CoordinatorJobBean coordJob = null;
076 private JPAService jpaService = null;
077 private String jobId = null;
078
079 public CoordActionInputCheckXCommand(String actionId, String jobId) {
080 super("coord_action_input", "coord_action_input", 1);
081 this.actionId = ParamChecker.notEmpty(actionId, "actionId");
082 this.jobId = jobId;
083 }
084
085 /* (non-Javadoc)
086 * @see org.apache.oozie.command.XCommand#execute()
087 */
088 @Override
089 protected Void execute() throws CommandException {
090 LOG.info("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state.");
091
092 // this action should only get processed if current time > nominal time;
093 // otherwise, requeue this action for delay execution;
094 Date nominalTime = coordAction.getNominalTime();
095 Date currentTime = new Date();
096 if (nominalTime.compareTo(currentTime) > 0) {
097 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), Math.max((nominalTime.getTime() - currentTime
098 .getTime()), getCoordInputCheckRequeueInterval()));
099 // update lastModifiedTime
100 coordAction.setLastModifiedTime(new Date());
101 try {
102 jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(coordAction));
103 }
104 catch (JPAExecutorException e) {
105 throw new CommandException(e);
106 }
107 LOG.info("[" + actionId
108 + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current="
109 + currentTime + ", nominal=" + nominalTime);
110
111 return null;
112 }
113
114 StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
115 Instrumentation.Cron cron = new Instrumentation.Cron();
116 boolean isChangeInDependency = false;
117 try {
118 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
119 cron.start();
120 StringBuilder existList = new StringBuilder();
121 StringBuilder nonExistList = new StringBuilder();
122 StringBuilder nonResolvedList = new StringBuilder();
123 String firstMissingDependency = "";
124 String missingDeps = coordAction.getMissingDependencies();
125 CoordCommandUtils.getResolvedList(missingDeps, nonExistList, nonResolvedList);
126
127 // For clarity regarding which is the missing dependency in synchronous order
128 // instead of printing entire list, some of which, may be available
129 if(nonExistList.length() > 0) {
130 firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0];
131 }
132 LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + firstMissingDependency + " "
133 + nonResolvedList.toString());
134 // Updating the list of data dependencies that are available and those that are yet not
135 boolean status = checkInput(actionXml, existList, nonExistList, actionConf);
136 coordAction.setLastModifiedTime(currentTime);
137 coordAction.setActionXml(actionXml.toString());
138 if (nonResolvedList.length() > 0 && status == false) {
139 nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
140 }
141 String nonExistListStr = nonExistList.toString();
142 if (!nonExistListStr.equals(missingDeps) || missingDeps.isEmpty()) {
143 // missingDeps empty means action should become READY
144 isChangeInDependency = true;
145 coordAction.setMissingDependencies(nonExistListStr);
146 }
147 if (status == true) {
148 coordAction.setStatus(CoordinatorAction.Status.READY);
149 // pass jobID to the CoordActionReadyXCommand
150 queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
151 }
152 else {
153 long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
154 .getCreatedTime().getTime()))
155 / (60 * 1000);
156 int timeOut = coordAction.getTimeOut();
157 if ((timeOut >= 0) && (waitingTime > timeOut)) {
158 queue(new CoordActionTimeOutXCommand(coordAction), 100);
159 }
160 else {
161 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), getCoordInputCheckRequeueInterval());
162 }
163 }
164 }
165 catch (Exception e) {
166 throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
167 }
168 finally {
169 coordAction.setLastModifiedTime(new Date());
170 cron.stop();
171 if(jpaService != null) {
172 try {
173 if (isChangeInDependency) {
174 jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(coordAction));
175 }
176 else {
177 jpaService.execute(new CoordActionUpdateForModifiedTimeJPAExecutor(coordAction));
178 }
179 }
180 catch(JPAExecutorException jex) {
181 throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
182 }
183 }
184 }
185 return null;
186 }
187
188 /**
189 * This function reads the value of re-queue interval for coordinator input
190 * check command from the Oozie configuration provided by Configuration
191 * Service. If nothing defined in the configuration, it uses the code
192 * specified default value.
193 *
194 * @return re-queue interval in ms
195 */
196 public long getCoordInputCheckRequeueInterval() {
197 long requeueInterval = Services.get().getConf().getLong(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL,
198 DEFAULT_COMMAND_REQUEUE_INTERVAL);
199 return requeueInterval;
200 }
201
202 /**
203 * To check the list of input paths if all of them exist
204 *
205 * @param actionXml action xml
206 * @param existList the list of existed paths
207 * @param nonExistList the list of non existed paths
208 * @param conf action configuration
209 * @return true if all input paths are existed
210 * @throws Exception thrown of unable to check input path
211 */
212 protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList,
213 Configuration conf) throws Exception {
214 Element eAction = XmlUtils.parseXml(actionXml.toString());
215 boolean allExist = checkResolvedUris(eAction, existList, nonExistList, conf);
216 if (allExist) {
217 LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
218 allExist = checkUnresolvedInstances(eAction, conf);
219 }
220 if (allExist == true) {
221 materializeDataProperties(eAction, conf);
222 actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString());
223 }
224 return allExist;
225 }
226
227 /**
228 * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list
229 * of files that will be needed.
230 *
231 * @param eAction action element
232 * @param conf action configuration
233 * @throws Exception thrown if failed to resolve data properties
234 * @update modify 'Action' element with appropriate list of files.
235 */
236 @SuppressWarnings("unchecked")
237 private void materializeDataProperties(Element eAction, Configuration conf) throws Exception {
238 ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, conf, actionId);
239 Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow",
240 eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
241 if (configElem != null) {
242 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
243 resolveTagContents("value", propElem, eval);
244 }
245 }
246 }
247
248 /**
249 * To resolve property value which contains el functions
250 *
251 * @param tagName tag name
252 * @param elem the child element of "property" element
253 * @param eval el functions evaluator
254 * @throws Exception thrown if unable to resolve tag value
255 */
256 private void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception {
257 if (elem == null) {
258 return;
259 }
260 Element tagElem = elem.getChild(tagName, elem.getNamespace());
261 if (tagElem != null) {
262 String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText());
263 tagElem.removeContent();
264 tagElem.addContent(updated);
265 }
266 else {
267 LOG.warn(" Value NOT FOUND " + tagName);
268 }
269 }
270
271 /**
272 * Check if any unsolved paths under data output. Resolve the unresolved data input paths.
273 *
274 * @param eAction action element
275 * @param actionConf action configuration
276 * @return true if successful to resolve input and output paths
277 * @throws Exception thrown if failed to resolve data input and output paths
278 */
279 @SuppressWarnings("unchecked")
280 private boolean checkUnresolvedInstances(Element eAction, Configuration actionConf) throws Exception {
281 String strAction = XmlUtils.prettyPrint(eAction).toString();
282 Date nominalTime = DateUtils.parseDateOozieTZ(eAction.getAttributeValue("action-nominal-time"));
283 String actualTimeStr = eAction.getAttributeValue("action-actual-time");
284 Date actualTime = null;
285 if (actualTimeStr == null) {
286 LOG.debug("Unable to get action-actual-time from action xml, this job is submitted " +
287 "from previous version. Assign current date to actual time, action = " + actionId);
288 actualTime = new Date();
289 } else {
290 actualTime = DateUtils.parseDateOozieTZ(actualTimeStr);
291 }
292
293 StringBuffer resultedXml = new StringBuffer();
294
295 boolean ret;
296 Element inputList = eAction.getChild("input-events", eAction.getNamespace());
297 if (inputList != null) {
298 ret = materializeUnresolvedEvent(inputList.getChildren("data-in", eAction.getNamespace()), nominalTime,
299 actualTime, actionConf);
300 if (ret == false) {
301 resultedXml.append(strAction);
302 return false;
303 }
304 }
305
306 // Using latest() or future() in output-event is not intuitive.
307 // We need to make sure, this assumption is correct.
308 Element outputList = eAction.getChild("output-events", eAction.getNamespace());
309 if (outputList != null) {
310 for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
311 if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) != null) {
312 throw new CommandException(ErrorCode.E1006, "coord:latest()/future()",
313 " not permitted in output-event ");
314 }
315 }
316 }
317 return true;
318 }
319
320 /**
321 * Resolve the list of data input paths
322 *
323 * @param eDataEvents the list of data input elements
324 * @param nominalTime action nominal time
325 * @param actualTime current time
326 * @param conf action configuration
327 * @return true if all unresolved URIs can be resolved
328 * @throws Exception thrown if failed to resolve data input paths
329 */
330 @SuppressWarnings("unchecked")
331 private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime,
332 Configuration conf) throws Exception {
333 for (Element dEvent : eDataEvents) {
334 if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) == null) {
335 continue;
336 }
337 ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf);
338 String uresolvedInstance = dEvent.getChild("unresolved-instances", dEvent.getNamespace()).getTextTrim();
339 String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR);
340 StringBuffer resolvedTmp = new StringBuffer();
341 for (int i = 0; i < unresolvedList.length; i++) {
342 String ret = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]);
343 Boolean isResolved = (Boolean) eval.getVariable("is_resolved");
344 if (isResolved == false) {
345 LOG.info("[" + actionId + "]::Cannot resolve: " + ret);
346 return false;
347 }
348 if (resolvedTmp.length() > 0) {
349 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR);
350 }
351 resolvedTmp.append((String) eval.getVariable("resolved_path"));
352 }
353 if (resolvedTmp.length() > 0) {
354 if (dEvent.getChild("uris", dEvent.getNamespace()) != null) {
355 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append(
356 dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim());
357 dEvent.removeChild("uris", dEvent.getNamespace());
358 }
359 Element uriInstance = new Element("uris", dEvent.getNamespace());
360 uriInstance.addContent(resolvedTmp.toString());
361 dEvent.getContent().add(1, uriInstance);
362 }
363 dEvent.removeChild("unresolved-instances", dEvent.getNamespace());
364 }
365
366 return true;
367 }
368
369 /**
370 * Check all resolved URIs existence
371 *
372 * @param eAction action element
373 * @param existList the list of existed paths
374 * @param nonExistList the list of paths to check existence
375 * @param conf action configuration
376 * @return true if all nonExistList paths exist
377 * @throws IOException thrown if unable to access the path
378 */
379 private boolean checkResolvedUris(Element eAction, StringBuilder existList, StringBuilder nonExistList,
380 Configuration conf) throws IOException {
381 LOG.info("[" + actionId + "]::ActionInputCheck:: In checkResolvedUris...");
382 Element inputList = eAction.getChild("input-events", eAction.getNamespace());
383 if (inputList != null) {
384 if (nonExistList.length() > 0) {
385 checkListOfPaths(existList, nonExistList, conf);
386 }
387 return nonExistList.length() == 0;
388 }
389 return true;
390 }
391
392 /**
393 * Check a list of non existed paths and add to exist list if it exists
394 *
395 * @param existList the list of existed paths
396 * @param nonExistList the list of paths to check existence
397 * @param conf action configuration
398 * @return true if all nonExistList paths exist
399 * @throws IOException thrown if unable to access the path
400 */
401 private boolean checkListOfPaths(StringBuilder existList, StringBuilder nonExistList, Configuration conf)
402 throws IOException {
403
404 String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR);
405 if (uriList[0] != null) {
406 LOG.info("[" + actionId + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + " is Missing.");
407 }
408
409 nonExistList.delete(0, nonExistList.length());
410 boolean allExists = true;
411 String existSeparator = "", nonExistSeparator = "";
412 for (int i = 0; i < uriList.length; i++) {
413 if (allExists) {
414 allExists = pathExists(uriList[i], conf);
415 LOG.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists);
416 }
417 if (allExists) {
418 existList.append(existSeparator).append(uriList[i]);
419 existSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
420 }
421 else {
422 nonExistList.append(nonExistSeparator).append(uriList[i]);
423 nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
424 }
425 }
426 return allExists;
427 }
428
429 /**
430 * Check if given path exists
431 *
432 * @param sPath uri path
433 * @param actionConf action configuration
434 * @return true if path exists
435 * @throws IOException thrown if unable to access the path
436 */
437 protected boolean pathExists(String sPath, Configuration actionConf) throws IOException {
438 LOG.debug("checking for the file " + sPath);
439 Path path = new Path(sPath);
440 String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
441 try {
442 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
443 Configuration fsConf = has.createJobConf(path.toUri().getAuthority());
444 return has.createFileSystem(user, path.toUri(), fsConf).exists(path);
445 }
446 catch (HadoopAccessorException e) {
447 coordAction.setErrorCode(e.getErrorCode().toString());
448 coordAction.setErrorMessage(e.getMessage());
449 throw new IOException(e);
450 }
451 }
452
453 /**
454 * The function create a list of URIs separated by "," using the instances time stamp and URI-template
455 *
456 * @param event : <data-in> event
457 * @param instances : List of time stamp seprated by ","
458 * @param unresolvedInstances : list of instance with latest/future function
459 * @return : list of URIs separated by ",".
460 * @throws Exception thrown if failed to create URIs from unresolvedInstances
461 */
462 @SuppressWarnings("unused")
463 private String createURIs(Element event, String instances, StringBuilder unresolvedInstances) throws Exception {
464 if (instances == null || instances.length() == 0) {
465 return "";
466 }
467 String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR);
468 StringBuilder uris = new StringBuilder();
469
470 for (int i = 0; i < instanceList.length; i++) {
471 int funcType = CoordCommandUtils.getFuncType(instanceList[i]);
472 if (funcType == CoordCommandUtils.LATEST || funcType == CoordCommandUtils.FUTURE) {
473 if (unresolvedInstances.length() > 0) {
474 unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR);
475 }
476 unresolvedInstances.append(instanceList[i]);
477 continue;
478 }
479 ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]);
480 if (uris.length() > 0) {
481 uris.append(CoordELFunctions.INSTANCE_SEPARATOR);
482 }
483 uris.append(CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()).getChild(
484 "uri-template", event.getNamespace()).getTextTrim()));
485 }
486 return uris.toString();
487 }
488
489 /**
490 * getting the error code of the coord action. (used mainly for unit testing)
491 */
492 protected String getCoordActionErrorCode() {
493 if (coordAction != null) {
494 return coordAction.getErrorCode();
495 }
496 return null;
497 }
498
499 /**
500 * getting the error message of the coord action. (used mainly for unit testing)
501 */
502 protected String getCoordActionErrorMsg() {
503 if (coordAction != null) {
504 return coordAction.getErrorMessage();
505 }
506 return null;
507 }
508
509 /* (non-Javadoc)
510 * @see org.apache.oozie.command.XCommand#getEntityKey()
511 */
512 @Override
513 public String getEntityKey() {
514 return this.jobId;
515 }
516
517 /* (non-Javadoc)
518 * @see org.apache.oozie.command.XCommand#isLockRequired()
519 */
520 @Override
521 protected boolean isLockRequired() {
522 return true;
523 }
524
525 /* (non-Javadoc)
526 * @see org.apache.oozie.command.XCommand#eagerLoadState()
527 */
528 // TODO - why loadState() is being called from eagerLoadState();
529 @Override
530 protected void eagerLoadState() throws CommandException {
531 loadState();
532 }
533
534 /* (non-Javadoc)
535 * @see org.apache.oozie.command.XCommand#loadState()
536 */
537 @Override
538 protected void loadState() throws CommandException {
539 if (jpaService == null) {
540 jpaService = Services.get().get(JPAService.class);
541 }
542 try {
543 coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
544 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
545 }
546 catch (JPAExecutorException je) {
547 throw new CommandException(je);
548 }
549 LogUtils.setLogInfo(coordAction, logInfo);
550 }
551
552 /* (non-Javadoc)
553 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
554 */
555 @Override
556 protected void verifyPrecondition() throws CommandException, PreconditionException {
557 if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
558 throw new PreconditionException(ErrorCode.E1100, "[" + actionId
559 + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state="
560 + coordAction.getStatus());
561 }
562
563 // if eligible to do action input check when running with backward support is true
564 if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
565 return;
566 }
567
568 if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR && coordJob.getStatus() != Job.Status.PAUSED
569 && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
570 throw new PreconditionException(
571 ErrorCode.E1100, "["+ actionId + "]::CoordActionInputCheck:: Ignoring action." +
572 " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state="
573 + coordJob.getStatus());
574 }
575 }
576
577 /* (non-Javadoc)
578 * @see org.apache.oozie.command.XCommand#getKey()
579 */
580 @Override
581 public String getKey(){
582 return getName() + "_" + actionId;
583 }
584
585 }