This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.util.Date;
023 import java.util.List;
024
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.fs.Path;
027 import org.apache.oozie.CoordinatorActionBean;
028 import org.apache.oozie.CoordinatorJobBean;
029 import org.apache.oozie.ErrorCode;
030 import org.apache.oozie.client.CoordinatorAction;
031 import org.apache.oozie.client.Job;
032 import org.apache.oozie.client.OozieClient;
033 import org.apache.oozie.command.CommandException;
034 import org.apache.oozie.command.PreconditionException;
035 import org.apache.oozie.coord.CoordELEvaluator;
036 import org.apache.oozie.coord.CoordELFunctions;
037 import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
038 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
039 import org.apache.oozie.executor.jpa.JPAExecutorException;
040 import org.apache.oozie.service.HadoopAccessorException;
041 import org.apache.oozie.service.HadoopAccessorService;
042 import org.apache.oozie.service.JPAService;
043 import org.apache.oozie.service.Service;
044 import org.apache.oozie.service.Services;
045 import org.apache.oozie.util.DateUtils;
046 import org.apache.oozie.util.ELEvaluator;
047 import org.apache.oozie.util.Instrumentation;
048 import org.apache.oozie.util.LogUtils;
049 import org.apache.oozie.util.ParamChecker;
050 import org.apache.oozie.util.StatusUtils;
051 import org.apache.oozie.util.XConfiguration;
052 import org.apache.oozie.util.XmlUtils;
053 import org.jdom.Element;
054
055 /**
056 * The command to check if an action's data input paths exist in the file system.
057 */
058 public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
059
060 private final String actionId;
061 /**
062 * Property name of command re-queue interval for coordinator action input check in
063 * milliseconds.
064 */
065 public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX
066 + "coord.input.check.requeue.interval";
067 /**
068 * Default re-queue interval in ms. It is applied when no value defined in
069 * the oozie configuration.
070 */
071 private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 60000; // 1 minute
072 private CoordinatorActionBean coordAction = null;
073 private CoordinatorJobBean coordJob = null;
074 private JPAService jpaService = null;
075 private String jobId = null;
076
077 public CoordActionInputCheckXCommand(String actionId, String jobId) {
078 super("coord_action_input", "coord_action_input", 1);
079 this.actionId = ParamChecker.notEmpty(actionId, "actionId");
080 this.jobId = jobId;
081 }
082
083 /* (non-Javadoc)
084 * @see org.apache.oozie.command.XCommand#execute()
085 */
086 @Override
087 protected Void execute() throws CommandException {
088 LOG.info("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state.");
089
090 // this action should only get processed if current time > nominal time;
091 // otherwise, requeue this action for delay execution;
092 Date nominalTime = coordAction.getNominalTime();
093 Date currentTime = new Date();
094 if (nominalTime.compareTo(currentTime) > 0) {
095 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), Math.max((nominalTime.getTime() - currentTime
096 .getTime()), getCoordInputCheckRequeueInterval()));
097 // update lastModifiedTime
098 coordAction.setLastModifiedTime(new Date());
099 try {
100 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor(coordAction));
101 }
102 catch (JPAExecutorException e) {
103 throw new CommandException(e);
104 }
105 LOG.info("[" + actionId
106 + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current="
107 + currentTime + ", nominal=" + nominalTime);
108
109 return null;
110 }
111
112 StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
113 Instrumentation.Cron cron = new Instrumentation.Cron();
114 try {
115 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
116 cron.start();
117 StringBuilder existList = new StringBuilder();
118 StringBuilder nonExistList = new StringBuilder();
119 StringBuilder nonResolvedList = new StringBuilder();
120 CoordCommandUtils.getResolvedList(coordAction.getMissingDependencies(), nonExistList, nonResolvedList);
121
122 LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + nonExistList.toString() + " "
123 + nonResolvedList.toString());
124 boolean status = checkInput(actionXml, existList, nonExistList, actionConf);
125 coordAction.setLastModifiedTime(currentTime);
126 coordAction.setActionXml(actionXml.toString());
127 if (nonResolvedList.length() > 0 && status == false) {
128 nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
129 }
130 coordAction.setMissingDependencies(nonExistList.toString());
131 if (status == true) {
132 coordAction.setStatus(CoordinatorAction.Status.READY);
133 // pass jobID to the CoordActionReadyXCommand
134 queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
135 }
136 else {
137 long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
138 .getCreatedTime().getTime()))
139 / (60 * 1000);
140 int timeOut = coordAction.getTimeOut();
141 if ((timeOut >= 0) && (waitingTime > timeOut)) {
142 queue(new CoordActionTimeOutXCommand(coordAction), 100);
143 }
144 else {
145 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), getCoordInputCheckRequeueInterval());
146 }
147 }
148 coordAction.setLastModifiedTime(new Date());
149 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor(coordAction));
150 }
151 catch (Exception e) {
152 throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
153 }
154 cron.stop();
155
156 return null;
157 }
158
159 /**
160 * This function reads the value of re-queue interval for coordinator input
161 * check command from the Oozie configuration provided by Configuration
162 * Service. If nothing defined in the configuration, it uses the code
163 * specified default value.
164 *
165 * @return re-queue interval in ms
166 */
167 public long getCoordInputCheckRequeueInterval() {
168 long requeueInterval = Services.get().getConf().getLong(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL,
169 DEFAULT_COMMAND_REQUEUE_INTERVAL);
170 return requeueInterval;
171 }
172
173 /**
174 * To check the list of input paths if all of them exist
175 *
176 * @param actionXml action xml
177 * @param existList the list of existed paths
178 * @param nonExistList the list of non existed paths
179 * @param conf action configuration
180 * @return true if all input paths are existed
181 * @throws Exception thrown of unable to check input path
182 */
183 protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList,
184 Configuration conf) throws Exception {
185 Element eAction = XmlUtils.parseXml(actionXml.toString());
186 boolean allExist = checkResolvedUris(eAction, existList, nonExistList, conf);
187 if (allExist) {
188 LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
189 allExist = checkUnresolvedInstances(eAction, conf);
190 }
191 if (allExist == true) {
192 materializeDataProperties(eAction, conf);
193 actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString());
194 }
195 return allExist;
196 }
197
198 /**
199 * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list
200 * of files that will be needed.
201 *
202 * @param eAction action element
203 * @param conf action configuration
204 * @throws Exception thrown if failed to resolve data properties
205 * @update modify 'Action' element with appropriate list of files.
206 */
207 @SuppressWarnings("unchecked")
208 private void materializeDataProperties(Element eAction, Configuration conf) throws Exception {
209 ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, conf, actionId);
210 Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow",
211 eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
212 if (configElem != null) {
213 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
214 resolveTagContents("value", propElem, eval);
215 }
216 }
217 }
218
219 /**
220 * To resolve property value which contains el functions
221 *
222 * @param tagName tag name
223 * @param elem the child element of "property" element
224 * @param eval el functions evaluator
225 * @throws Exception thrown if unable to resolve tag value
226 */
227 private void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception {
228 if (elem == null) {
229 return;
230 }
231 Element tagElem = elem.getChild(tagName, elem.getNamespace());
232 if (tagElem != null) {
233 String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText());
234 tagElem.removeContent();
235 tagElem.addContent(updated);
236 }
237 else {
238 LOG.warn(" Value NOT FOUND " + tagName);
239 }
240 }
241
242 /**
243 * Check if any unsolved paths under data output. Resolve the unresolved data input paths.
244 *
245 * @param eAction action element
246 * @param actionConf action configuration
247 * @return true if successful to resolve input and output paths
248 * @throws Exception thrown if failed to resolve data input and output paths
249 */
250 @SuppressWarnings("unchecked")
251 private boolean checkUnresolvedInstances(Element eAction, Configuration actionConf) throws Exception {
252 String strAction = XmlUtils.prettyPrint(eAction).toString();
253 Date nominalTime = DateUtils.parseDateUTC(eAction.getAttributeValue("action-nominal-time"));
254 String actualTimeStr = eAction.getAttributeValue("action-actual-time");
255 Date actualTime = null;
256 if (actualTimeStr == null) {
257 LOG.debug("Unable to get action-actual-time from action xml, this job is submitted " +
258 "from previous version. Assign current date to actual time, action = " + actionId);
259 actualTime = new Date();
260 } else {
261 actualTime = DateUtils.parseDateUTC(actualTimeStr);
262 }
263
264 StringBuffer resultedXml = new StringBuffer();
265
266 boolean ret;
267 Element inputList = eAction.getChild("input-events", eAction.getNamespace());
268 if (inputList != null) {
269 ret = materializeUnresolvedEvent(inputList.getChildren("data-in", eAction.getNamespace()), nominalTime,
270 actualTime, actionConf);
271 if (ret == false) {
272 resultedXml.append(strAction);
273 return false;
274 }
275 }
276
277 // Using latest() or future() in output-event is not intuitive.
278 // We need to make sure, this assumption is correct.
279 Element outputList = eAction.getChild("output-events", eAction.getNamespace());
280 if (outputList != null) {
281 for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
282 if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) != null) {
283 throw new CommandException(ErrorCode.E1006, "coord:latest()/future()",
284 " not permitted in output-event ");
285 }
286 }
287 }
288 return true;
289 }
290
291 /**
292 * Resolve the list of data input paths
293 *
294 * @param eDataEvents the list of data input elements
295 * @param nominalTime action nominal time
296 * @param actualTime current time
297 * @param conf action configuration
298 * @return true if all unresolved URIs can be resolved
299 * @throws Exception thrown if failed to resolve data input paths
300 */
301 @SuppressWarnings("unchecked")
302 private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime,
303 Configuration conf) throws Exception {
304 for (Element dEvent : eDataEvents) {
305 if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) == null) {
306 continue;
307 }
308 ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf);
309 String uresolvedInstance = dEvent.getChild("unresolved-instances", dEvent.getNamespace()).getTextTrim();
310 String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR);
311 StringBuffer resolvedTmp = new StringBuffer();
312 for (int i = 0; i < unresolvedList.length; i++) {
313 String ret = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]);
314 Boolean isResolved = (Boolean) eval.getVariable("is_resolved");
315 if (isResolved == false) {
316 LOG.info("[" + actionId + "]::Cannot resolve: " + ret);
317 return false;
318 }
319 if (resolvedTmp.length() > 0) {
320 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR);
321 }
322 resolvedTmp.append((String) eval.getVariable("resolved_path"));
323 }
324 if (resolvedTmp.length() > 0) {
325 if (dEvent.getChild("uris", dEvent.getNamespace()) != null) {
326 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append(
327 dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim());
328 dEvent.removeChild("uris", dEvent.getNamespace());
329 }
330 Element uriInstance = new Element("uris", dEvent.getNamespace());
331 uriInstance.addContent(resolvedTmp.toString());
332 dEvent.getContent().add(1, uriInstance);
333 }
334 dEvent.removeChild("unresolved-instances", dEvent.getNamespace());
335 }
336
337 return true;
338 }
339
340 /**
341 * Check all resolved URIs existence
342 *
343 * @param eAction action element
344 * @param existList the list of existed paths
345 * @param nonExistList the list of paths to check existence
346 * @param conf action configuration
347 * @return true if all nonExistList paths exist
348 * @throws IOException thrown if unable to access the path
349 */
350 private boolean checkResolvedUris(Element eAction, StringBuilder existList, StringBuilder nonExistList,
351 Configuration conf) throws IOException {
352 LOG.info("[" + actionId + "]::ActionInputCheck:: In checkResolvedUris...");
353 Element inputList = eAction.getChild("input-events", eAction.getNamespace());
354 if (inputList != null) {
355 if (nonExistList.length() > 0) {
356 checkListOfPaths(existList, nonExistList, conf);
357 }
358 return nonExistList.length() == 0;
359 }
360 return true;
361 }
362
363 /**
364 * Check a list of non existed paths and add to exist list if it exists
365 *
366 * @param existList the list of existed paths
367 * @param nonExistList the list of paths to check existence
368 * @param conf action configuration
369 * @return true if all nonExistList paths exist
370 * @throws IOException thrown if unable to access the path
371 */
372 private boolean checkListOfPaths(StringBuilder existList, StringBuilder nonExistList, Configuration conf)
373 throws IOException {
374
375 String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR);
376 if (uriList[0] != null) {
377 LOG.info("[" + actionId + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + " is Missing.");
378 }
379
380 nonExistList.delete(0, nonExistList.length());
381 boolean allExists = true;
382 String existSeparator = "", nonExistSeparator = "";
383 for (int i = 0; i < uriList.length; i++) {
384 if (allExists) {
385 allExists = pathExists(uriList[i], conf);
386 LOG.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists);
387 }
388 if (allExists) {
389 existList.append(existSeparator).append(uriList[i]);
390 existSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
391 }
392 else {
393 nonExistList.append(nonExistSeparator).append(uriList[i]);
394 nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
395 }
396 }
397 return allExists;
398 }
399
400 /**
401 * Check if given path exists
402 *
403 * @param sPath uri path
404 * @param actionConf action configuration
405 * @return true if path exists
406 * @throws IOException thrown if unable to access the path
407 */
408 private boolean pathExists(String sPath, Configuration actionConf) throws IOException {
409 LOG.debug("checking for the file " + sPath);
410 Path path = new Path(sPath);
411 String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
412 try {
413 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
414 Configuration fsConf = has.createJobConf(path.toUri().getAuthority());
415 return has.createFileSystem(user, path.toUri(), fsConf).exists(path);
416 }
417 catch (HadoopAccessorException e) {
418 throw new IOException(e);
419 }
420 }
421
422 /**
423 * The function create a list of URIs separated by "," using the instances time stamp and URI-template
424 *
425 * @param event : <data-in> event
426 * @param instances : List of time stamp seprated by ","
427 * @param unresolvedInstances : list of instance with latest/future function
428 * @return : list of URIs separated by ",".
429 * @throws Exception thrown if failed to create URIs from unresolvedInstances
430 */
431 @SuppressWarnings("unused")
432 private String createURIs(Element event, String instances, StringBuilder unresolvedInstances) throws Exception {
433 if (instances == null || instances.length() == 0) {
434 return "";
435 }
436 String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR);
437 StringBuilder uris = new StringBuilder();
438
439 for (int i = 0; i < instanceList.length; i++) {
440 int funcType = CoordCommandUtils.getFuncType(instanceList[i]);
441 if (funcType == CoordCommandUtils.LATEST || funcType == CoordCommandUtils.FUTURE) {
442 if (unresolvedInstances.length() > 0) {
443 unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR);
444 }
445 unresolvedInstances.append(instanceList[i]);
446 continue;
447 }
448 ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]);
449 if (uris.length() > 0) {
450 uris.append(CoordELFunctions.INSTANCE_SEPARATOR);
451 }
452 uris.append(CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()).getChild(
453 "uri-template", event.getNamespace()).getTextTrim()));
454 }
455 return uris.toString();
456 }
457
458 /* (non-Javadoc)
459 * @see org.apache.oozie.command.XCommand#getEntityKey()
460 */
461 @Override
462 public String getEntityKey() {
463 return this.jobId;
464 }
465
466 /* (non-Javadoc)
467 * @see org.apache.oozie.command.XCommand#isLockRequired()
468 */
469 @Override
470 protected boolean isLockRequired() {
471 return true;
472 }
473
474 /* (non-Javadoc)
475 * @see org.apache.oozie.command.XCommand#eagerLoadState()
476 */
477 @Override
478 protected void eagerLoadState() throws CommandException {
479 loadState();
480 }
481
482 /* (non-Javadoc)
483 * @see org.apache.oozie.command.XCommand#loadState()
484 */
485 @Override
486 protected void loadState() throws CommandException {
487 if (jpaService == null) {
488 jpaService = Services.get().get(JPAService.class);
489 }
490 try {
491 coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
492 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
493 }
494 catch (JPAExecutorException je) {
495 throw new CommandException(je);
496 }
497 LogUtils.setLogInfo(coordAction, logInfo);
498 }
499
500 /* (non-Javadoc)
501 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
502 */
503 @Override
504 protected void verifyPrecondition() throws CommandException, PreconditionException {
505 if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
506 throw new PreconditionException(ErrorCode.E1100, "[" + actionId
507 + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state="
508 + coordAction.getStatus());
509 }
510
511 // if eligible to do action input check when running with backward support is true
512 if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
513 return;
514 }
515
516 if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.PAUSED
517 && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
518 throw new PreconditionException(
519 ErrorCode.E1100, "["+ actionId + "]::CoordActionInputCheck:: Ignoring action." +
520 " Coordinator job is not in RUNNING/PAUSED/PAUSEDWITHERROR state, but state="
521 + coordJob.getStatus());
522 }
523 }
524
525 /* (non-Javadoc)
526 * @see org.apache.oozie.command.XCommand#getKey()
527 */
528 @Override
529 public String getKey(){
530 return getName() + "_" + actionId;
531 }
532
533 }