This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.util.Date;
023 import java.util.List;
024
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.fs.Path;
027 import org.apache.oozie.CoordinatorActionBean;
028 import org.apache.oozie.CoordinatorJobBean;
029 import org.apache.oozie.ErrorCode;
030 import org.apache.oozie.client.CoordinatorAction;
031 import org.apache.oozie.client.Job;
032 import org.apache.oozie.client.OozieClient;
033 import org.apache.oozie.command.CommandException;
034 import org.apache.oozie.command.PreconditionException;
035 import org.apache.oozie.coord.CoordELEvaluator;
036 import org.apache.oozie.coord.CoordELFunctions;
037 import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
038 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
039 import org.apache.oozie.executor.jpa.JPAExecutorException;
040 import org.apache.oozie.service.HadoopAccessorException;
041 import org.apache.oozie.service.HadoopAccessorService;
042 import org.apache.oozie.service.JPAService;
043 import org.apache.oozie.service.Service;
044 import org.apache.oozie.service.Services;
045 import org.apache.oozie.util.DateUtils;
046 import org.apache.oozie.util.ELEvaluator;
047 import org.apache.oozie.util.Instrumentation;
048 import org.apache.oozie.util.LogUtils;
049 import org.apache.oozie.util.ParamChecker;
050 import org.apache.oozie.util.StatusUtils;
051 import org.apache.oozie.util.XConfiguration;
052 import org.apache.oozie.util.XmlUtils;
053 import org.jdom.Element;
054
055 /**
056 * The command to check if an action's data input paths exist in the file system.
057 */
058 public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
059
060 private final String actionId;
061 /**
062 * Property name of command re-queue interval for coordinator action input check in
063 * milliseconds.
064 */
065 public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX
066 + "coord.input.check.requeue.interval";
067 /**
068 * Default re-queue interval in ms. It is applied when no value defined in
069 * the oozie configuration.
070 */
071 private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 60000; // 1 minute
072 private CoordinatorActionBean coordAction = null;
073 private CoordinatorJobBean coordJob = null;
074 private JPAService jpaService = null;
075 private String jobId = null;
076
077 public CoordActionInputCheckXCommand(String actionId, String jobId) {
078 super("coord_action_input", "coord_action_input", 1);
079 this.actionId = ParamChecker.notEmpty(actionId, "actionId");
080 this.jobId = jobId;
081 }
082
083 /* (non-Javadoc)
084 * @see org.apache.oozie.command.XCommand#execute()
085 */
086 @Override
087 protected Void execute() throws CommandException {
088 LOG.info("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state.");
089
090 // this action should only get processed if current time > nominal time;
091 // otherwise, requeue this action for delay execution;
092 Date nominalTime = coordAction.getNominalTime();
093 Date currentTime = new Date();
094 if (nominalTime.compareTo(currentTime) > 0) {
095 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), Math.max((nominalTime.getTime() - currentTime
096 .getTime()), getCoordInputCheckRequeueInterval()));
097 // update lastModifiedTime
098 coordAction.setLastModifiedTime(new Date());
099 try {
100 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor(coordAction));
101 }
102 catch (JPAExecutorException e) {
103 throw new CommandException(e);
104 }
105 LOG.info("[" + actionId
106 + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current="
107 + currentTime + ", nominal=" + nominalTime);
108
109 return null;
110 }
111
112 StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
113 Instrumentation.Cron cron = new Instrumentation.Cron();
114 try {
115 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
116 cron.start();
117 StringBuilder existList = new StringBuilder();
118 StringBuilder nonExistList = new StringBuilder();
119 StringBuilder nonResolvedList = new StringBuilder();
120 String firstMissingDependency = "";
121 CoordCommandUtils.getResolvedList(coordAction.getMissingDependencies(), nonExistList, nonResolvedList);
122
123 // For clarity regarding which is the missing dependency in synchronous order
124 // instead of printing entire list, some of which, may be available
125 if(nonExistList.length() > 0) {
126 firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0];
127 }
128 LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + firstMissingDependency + " "
129 + nonResolvedList.toString());
130 // Updating the list of data dependencies that are available and those that are yet not
131 boolean status = checkInput(actionXml, existList, nonExistList, actionConf);
132 coordAction.setLastModifiedTime(currentTime);
133 coordAction.setActionXml(actionXml.toString());
134 if (nonResolvedList.length() > 0 && status == false) {
135 nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
136 }
137 coordAction.setMissingDependencies(nonExistList.toString());
138 if (status == true) {
139 coordAction.setStatus(CoordinatorAction.Status.READY);
140 // pass jobID to the CoordActionReadyXCommand
141 queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
142 }
143 else {
144 long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
145 .getCreatedTime().getTime()))
146 / (60 * 1000);
147 int timeOut = coordAction.getTimeOut();
148 if ((timeOut >= 0) && (waitingTime > timeOut)) {
149 queue(new CoordActionTimeOutXCommand(coordAction), 100);
150 }
151 else {
152 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), getCoordInputCheckRequeueInterval());
153 }
154 }
155 coordAction.setLastModifiedTime(new Date());
156 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor(coordAction));
157 }
158 catch (Exception e) {
159 throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
160 }
161 cron.stop();
162
163 return null;
164 }
165
166 /**
167 * This function reads the value of re-queue interval for coordinator input
168 * check command from the Oozie configuration provided by Configuration
169 * Service. If nothing defined in the configuration, it uses the code
170 * specified default value.
171 *
172 * @return re-queue interval in ms
173 */
174 public long getCoordInputCheckRequeueInterval() {
175 long requeueInterval = Services.get().getConf().getLong(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL,
176 DEFAULT_COMMAND_REQUEUE_INTERVAL);
177 return requeueInterval;
178 }
179
180 /**
181 * To check the list of input paths if all of them exist
182 *
183 * @param actionXml action xml
184 * @param existList the list of existed paths
185 * @param nonExistList the list of non existed paths
186 * @param conf action configuration
187 * @return true if all input paths are existed
188 * @throws Exception thrown of unable to check input path
189 */
190 protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList,
191 Configuration conf) throws Exception {
192 Element eAction = XmlUtils.parseXml(actionXml.toString());
193 boolean allExist = checkResolvedUris(eAction, existList, nonExistList, conf);
194 if (allExist) {
195 LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
196 allExist = checkUnresolvedInstances(eAction, conf);
197 }
198 if (allExist == true) {
199 materializeDataProperties(eAction, conf);
200 actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString());
201 }
202 return allExist;
203 }
204
205 /**
206 * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list
207 * of files that will be needed.
208 *
209 * @param eAction action element
210 * @param conf action configuration
211 * @throws Exception thrown if failed to resolve data properties
212 * @update modify 'Action' element with appropriate list of files.
213 */
214 @SuppressWarnings("unchecked")
215 private void materializeDataProperties(Element eAction, Configuration conf) throws Exception {
216 ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, conf, actionId);
217 Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow",
218 eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
219 if (configElem != null) {
220 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
221 resolveTagContents("value", propElem, eval);
222 }
223 }
224 }
225
226 /**
227 * To resolve property value which contains el functions
228 *
229 * @param tagName tag name
230 * @param elem the child element of "property" element
231 * @param eval el functions evaluator
232 * @throws Exception thrown if unable to resolve tag value
233 */
234 private void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception {
235 if (elem == null) {
236 return;
237 }
238 Element tagElem = elem.getChild(tagName, elem.getNamespace());
239 if (tagElem != null) {
240 String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText());
241 tagElem.removeContent();
242 tagElem.addContent(updated);
243 }
244 else {
245 LOG.warn(" Value NOT FOUND " + tagName);
246 }
247 }
248
249 /**
250 * Check if any unsolved paths under data output. Resolve the unresolved data input paths.
251 *
252 * @param eAction action element
253 * @param actionConf action configuration
254 * @return true if successful to resolve input and output paths
255 * @throws Exception thrown if failed to resolve data input and output paths
256 */
257 @SuppressWarnings("unchecked")
258 private boolean checkUnresolvedInstances(Element eAction, Configuration actionConf) throws Exception {
259 String strAction = XmlUtils.prettyPrint(eAction).toString();
260 Date nominalTime = DateUtils.parseDateOozieTZ(eAction.getAttributeValue("action-nominal-time"));
261 String actualTimeStr = eAction.getAttributeValue("action-actual-time");
262 Date actualTime = null;
263 if (actualTimeStr == null) {
264 LOG.debug("Unable to get action-actual-time from action xml, this job is submitted " +
265 "from previous version. Assign current date to actual time, action = " + actionId);
266 actualTime = new Date();
267 } else {
268 actualTime = DateUtils.parseDateOozieTZ(actualTimeStr);
269 }
270
271 StringBuffer resultedXml = new StringBuffer();
272
273 boolean ret;
274 Element inputList = eAction.getChild("input-events", eAction.getNamespace());
275 if (inputList != null) {
276 ret = materializeUnresolvedEvent(inputList.getChildren("data-in", eAction.getNamespace()), nominalTime,
277 actualTime, actionConf);
278 if (ret == false) {
279 resultedXml.append(strAction);
280 return false;
281 }
282 }
283
284 // Using latest() or future() in output-event is not intuitive.
285 // We need to make sure, this assumption is correct.
286 Element outputList = eAction.getChild("output-events", eAction.getNamespace());
287 if (outputList != null) {
288 for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
289 if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) != null) {
290 throw new CommandException(ErrorCode.E1006, "coord:latest()/future()",
291 " not permitted in output-event ");
292 }
293 }
294 }
295 return true;
296 }
297
298 /**
299 * Resolve the list of data input paths
300 *
301 * @param eDataEvents the list of data input elements
302 * @param nominalTime action nominal time
303 * @param actualTime current time
304 * @param conf action configuration
305 * @return true if all unresolved URIs can be resolved
306 * @throws Exception thrown if failed to resolve data input paths
307 */
308 @SuppressWarnings("unchecked")
309 private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime,
310 Configuration conf) throws Exception {
311 for (Element dEvent : eDataEvents) {
312 if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) == null) {
313 continue;
314 }
315 ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf);
316 String uresolvedInstance = dEvent.getChild("unresolved-instances", dEvent.getNamespace()).getTextTrim();
317 String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR);
318 StringBuffer resolvedTmp = new StringBuffer();
319 for (int i = 0; i < unresolvedList.length; i++) {
320 String ret = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]);
321 Boolean isResolved = (Boolean) eval.getVariable("is_resolved");
322 if (isResolved == false) {
323 LOG.info("[" + actionId + "]::Cannot resolve: " + ret);
324 return false;
325 }
326 if (resolvedTmp.length() > 0) {
327 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR);
328 }
329 resolvedTmp.append((String) eval.getVariable("resolved_path"));
330 }
331 if (resolvedTmp.length() > 0) {
332 if (dEvent.getChild("uris", dEvent.getNamespace()) != null) {
333 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append(
334 dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim());
335 dEvent.removeChild("uris", dEvent.getNamespace());
336 }
337 Element uriInstance = new Element("uris", dEvent.getNamespace());
338 uriInstance.addContent(resolvedTmp.toString());
339 dEvent.getContent().add(1, uriInstance);
340 }
341 dEvent.removeChild("unresolved-instances", dEvent.getNamespace());
342 }
343
344 return true;
345 }
346
347 /**
348 * Check all resolved URIs existence
349 *
350 * @param eAction action element
351 * @param existList the list of existed paths
352 * @param nonExistList the list of paths to check existence
353 * @param conf action configuration
354 * @return true if all nonExistList paths exist
355 * @throws IOException thrown if unable to access the path
356 */
357 private boolean checkResolvedUris(Element eAction, StringBuilder existList, StringBuilder nonExistList,
358 Configuration conf) throws IOException {
359 LOG.info("[" + actionId + "]::ActionInputCheck:: In checkResolvedUris...");
360 Element inputList = eAction.getChild("input-events", eAction.getNamespace());
361 if (inputList != null) {
362 if (nonExistList.length() > 0) {
363 checkListOfPaths(existList, nonExistList, conf);
364 }
365 return nonExistList.length() == 0;
366 }
367 return true;
368 }
369
370 /**
371 * Check a list of non existed paths and add to exist list if it exists
372 *
373 * @param existList the list of existed paths
374 * @param nonExistList the list of paths to check existence
375 * @param conf action configuration
376 * @return true if all nonExistList paths exist
377 * @throws IOException thrown if unable to access the path
378 */
379 private boolean checkListOfPaths(StringBuilder existList, StringBuilder nonExistList, Configuration conf)
380 throws IOException {
381
382 String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR);
383 if (uriList[0] != null) {
384 LOG.info("[" + actionId + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + " is Missing.");
385 }
386
387 nonExistList.delete(0, nonExistList.length());
388 boolean allExists = true;
389 String existSeparator = "", nonExistSeparator = "";
390 for (int i = 0; i < uriList.length; i++) {
391 if (allExists) {
392 allExists = pathExists(uriList[i], conf);
393 LOG.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists);
394 }
395 if (allExists) {
396 existList.append(existSeparator).append(uriList[i]);
397 existSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
398 }
399 else {
400 nonExistList.append(nonExistSeparator).append(uriList[i]);
401 nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
402 }
403 }
404 return allExists;
405 }
406
407 /**
408 * Check if given path exists
409 *
410 * @param sPath uri path
411 * @param actionConf action configuration
412 * @return true if path exists
413 * @throws IOException thrown if unable to access the path
414 */
415 private boolean pathExists(String sPath, Configuration actionConf) throws IOException {
416 LOG.debug("checking for the file " + sPath);
417 Path path = new Path(sPath);
418 String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
419 try {
420 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
421 Configuration fsConf = has.createJobConf(path.toUri().getAuthority());
422 return has.createFileSystem(user, path.toUri(), fsConf).exists(path);
423 }
424 catch (HadoopAccessorException e) {
425 throw new IOException(e);
426 }
427 }
428
429 /**
430 * The function create a list of URIs separated by "," using the instances time stamp and URI-template
431 *
432 * @param event : <data-in> event
433 * @param instances : List of time stamp seprated by ","
434 * @param unresolvedInstances : list of instance with latest/future function
435 * @return : list of URIs separated by ",".
436 * @throws Exception thrown if failed to create URIs from unresolvedInstances
437 */
438 @SuppressWarnings("unused")
439 private String createURIs(Element event, String instances, StringBuilder unresolvedInstances) throws Exception {
440 if (instances == null || instances.length() == 0) {
441 return "";
442 }
443 String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR);
444 StringBuilder uris = new StringBuilder();
445
446 for (int i = 0; i < instanceList.length; i++) {
447 int funcType = CoordCommandUtils.getFuncType(instanceList[i]);
448 if (funcType == CoordCommandUtils.LATEST || funcType == CoordCommandUtils.FUTURE) {
449 if (unresolvedInstances.length() > 0) {
450 unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR);
451 }
452 unresolvedInstances.append(instanceList[i]);
453 continue;
454 }
455 ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]);
456 if (uris.length() > 0) {
457 uris.append(CoordELFunctions.INSTANCE_SEPARATOR);
458 }
459 uris.append(CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()).getChild(
460 "uri-template", event.getNamespace()).getTextTrim()));
461 }
462 return uris.toString();
463 }
464
465 /* (non-Javadoc)
466 * @see org.apache.oozie.command.XCommand#getEntityKey()
467 */
468 @Override
469 public String getEntityKey() {
470 return this.jobId;
471 }
472
473 /* (non-Javadoc)
474 * @see org.apache.oozie.command.XCommand#isLockRequired()
475 */
476 @Override
477 protected boolean isLockRequired() {
478 return true;
479 }
480
481 /* (non-Javadoc)
482 * @see org.apache.oozie.command.XCommand#eagerLoadState()
483 */
484 @Override
485 protected void eagerLoadState() throws CommandException {
486 loadState();
487 }
488
489 /* (non-Javadoc)
490 * @see org.apache.oozie.command.XCommand#loadState()
491 */
492 @Override
493 protected void loadState() throws CommandException {
494 if (jpaService == null) {
495 jpaService = Services.get().get(JPAService.class);
496 }
497 try {
498 coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
499 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
500 }
501 catch (JPAExecutorException je) {
502 throw new CommandException(je);
503 }
504 LogUtils.setLogInfo(coordAction, logInfo);
505 }
506
507 /* (non-Javadoc)
508 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
509 */
510 @Override
511 protected void verifyPrecondition() throws CommandException, PreconditionException {
512 if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
513 throw new PreconditionException(ErrorCode.E1100, "[" + actionId
514 + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state="
515 + coordAction.getStatus());
516 }
517
518 // if eligible to do action input check when running with backward support is true
519 if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
520 return;
521 }
522
523 if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR && coordJob.getStatus() != Job.Status.PAUSED
524 && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
525 throw new PreconditionException(
526 ErrorCode.E1100, "["+ actionId + "]::CoordActionInputCheck:: Ignoring action." +
527 " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state="
528 + coordJob.getStatus());
529 }
530 }
531
532 /* (non-Javadoc)
533 * @see org.apache.oozie.command.XCommand#getKey()
534 */
535 @Override
536 public String getKey(){
537 return getName() + "_" + actionId;
538 }
539
540 }