This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.util.Date;
023 import java.util.List;
024
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.fs.Path;
027 import org.apache.oozie.CoordinatorActionBean;
028 import org.apache.oozie.CoordinatorJobBean;
029 import org.apache.oozie.ErrorCode;
030 import org.apache.oozie.client.CoordinatorAction;
031 import org.apache.oozie.client.Job;
032 import org.apache.oozie.client.OozieClient;
033 import org.apache.oozie.command.CommandException;
034 import org.apache.oozie.command.PreconditionException;
035 import org.apache.oozie.coord.CoordELEvaluator;
036 import org.apache.oozie.coord.CoordELFunctions;
037 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
038 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
039 import org.apache.oozie.executor.jpa.JPAExecutorException;
040 import org.apache.oozie.service.HadoopAccessorException;
041 import org.apache.oozie.service.HadoopAccessorService;
042 import org.apache.oozie.service.JPAService;
043 import org.apache.oozie.service.Services;
044 import org.apache.oozie.util.DateUtils;
045 import org.apache.oozie.util.ELEvaluator;
046 import org.apache.oozie.util.Instrumentation;
047 import org.apache.oozie.util.LogUtils;
048 import org.apache.oozie.util.ParamChecker;
049 import org.apache.oozie.util.StatusUtils;
050 import org.apache.oozie.util.XConfiguration;
051 import org.apache.oozie.util.XmlUtils;
052 import org.jdom.Element;
053
054 /**
055 * The command to check if an action's data input paths exist in the file system.
056 */
057 public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
058
059 private final String actionId;
060 private final int COMMAND_REQUEUE_INTERVAL = 60000; // 1 minute
061 private CoordinatorActionBean coordAction = null;
062 private CoordinatorJobBean coordJob = null;
063 private JPAService jpaService = null;
064
065 public CoordActionInputCheckXCommand(String actionId) {
066 super("coord_action_input", "coord_action_input", 1);
067 this.actionId = ParamChecker.notEmpty(actionId, "actionId");
068 }
069
070 /* (non-Javadoc)
071 * @see org.apache.oozie.command.XCommand#execute()
072 */
073 @Override
074 protected Void execute() throws CommandException {
075 LOG.info("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state.");
076
077 // this action should only get processed if current time > nominal time;
078 // otherwise, requeue this action for delay execution;
079 Date nominalTime = coordAction.getNominalTime();
080 Date currentTime = new Date();
081 if (nominalTime.compareTo(currentTime) > 0) {
082 queue(new CoordActionInputCheckXCommand(coordAction.getId()), Math.max(
083 (nominalTime.getTime() - currentTime.getTime()), COMMAND_REQUEUE_INTERVAL));
084 // update lastModifiedTime
085 coordAction.setLastModifiedTime(new Date());
086 try {
087 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor(coordAction));
088 }
089 catch (JPAExecutorException e) {
090 throw new CommandException(e);
091 }
092 LOG.info("[" + actionId
093 + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current="
094 + currentTime + ", nominal=" + nominalTime);
095
096 return null;
097 }
098
099 StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
100 Instrumentation.Cron cron = new Instrumentation.Cron();
101 try {
102 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
103 cron.start();
104 StringBuilder existList = new StringBuilder();
105 StringBuilder nonExistList = new StringBuilder();
106 StringBuilder nonResolvedList = new StringBuilder();
107 CoordCommandUtils.getResolvedList(coordAction.getMissingDependencies(), nonExistList, nonResolvedList);
108
109 LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + nonExistList.toString() + " "
110 + nonResolvedList.toString());
111 boolean status = checkInput(actionXml, existList, nonExistList, actionConf);
112 coordAction.setLastModifiedTime(currentTime);
113 coordAction.setActionXml(actionXml.toString());
114 if (nonResolvedList.length() > 0 && status == false) {
115 nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
116 }
117 coordAction.setMissingDependencies(nonExistList.toString());
118 if (status == true) {
119 coordAction.setStatus(CoordinatorAction.Status.READY);
120 // pass jobID to the CoordActionReadyXCommand
121 queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
122 }
123 else {
124 long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
125 .getCreatedTime().getTime()))
126 / (60 * 1000);
127 int timeOut = coordAction.getTimeOut();
128 if ((timeOut >= 0) && (waitingTime > timeOut)) {
129 queue(new CoordActionTimeOutXCommand(coordAction), 100);
130 }
131 else {
132 queue(new CoordActionInputCheckXCommand(coordAction.getId()), COMMAND_REQUEUE_INTERVAL);
133 }
134 }
135 coordAction.setLastModifiedTime(new Date());
136 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor(coordAction));
137 }
138 catch (Exception e) {
139 throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
140 }
141 cron.stop();
142
143 return null;
144 }
145
146 /**
147 * To check the list of input paths if all of them exist
148 *
149 * @param actionXml action xml
150 * @param existList the list of existed paths
151 * @param nonExistList the list of non existed paths
152 * @param conf action configuration
153 * @return true if all input paths are existed
154 * @throws Exception thrown of unable to check input path
155 */
156 protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList,
157 Configuration conf) throws Exception {
158 Element eAction = XmlUtils.parseXml(actionXml.toString());
159 boolean allExist = checkResolvedUris(eAction, existList, nonExistList, conf);
160 if (allExist) {
161 LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
162 allExist = checkUnresolvedInstances(eAction, conf);
163 }
164 if (allExist == true) {
165 materializeDataProperties(eAction, conf);
166 actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString());
167 }
168 return allExist;
169 }
170
171 /**
172 * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list
173 * of files that will be needed.
174 *
175 * @param eAction action element
176 * @param conf action configuration
177 * @throws Exception thrown if failed to resolve data properties
178 * @update modify 'Action' element with appropriate list of files.
179 */
180 @SuppressWarnings("unchecked")
181 private void materializeDataProperties(Element eAction, Configuration conf) throws Exception {
182 ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, conf, actionId);
183 Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow",
184 eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
185 if (configElem != null) {
186 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
187 resolveTagContents("value", propElem, eval);
188 }
189 }
190 }
191
192 /**
193 * To resolve property value which contains el functions
194 *
195 * @param tagName tag name
196 * @param elem the child element of "property" element
197 * @param eval el functions evaluator
198 * @throws Exception thrown if unable to resolve tag value
199 */
200 private void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception {
201 if (elem == null) {
202 return;
203 }
204 Element tagElem = elem.getChild(tagName, elem.getNamespace());
205 if (tagElem != null) {
206 String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText());
207 tagElem.removeContent();
208 tagElem.addContent(updated);
209 }
210 else {
211 LOG.warn(" Value NOT FOUND " + tagName);
212 }
213 }
214
215 /**
216 * Check if any unsolved paths under data output. Resolve the unresolved data input paths.
217 *
218 * @param eAction action element
219 * @param actionConf action configuration
220 * @return true if successful to resolve input and output paths
221 * @throws Exception thrown if failed to resolve data input and output paths
222 */
223 @SuppressWarnings("unchecked")
224 private boolean checkUnresolvedInstances(Element eAction, Configuration actionConf) throws Exception {
225 String strAction = XmlUtils.prettyPrint(eAction).toString();
226 Date nominalTime = DateUtils.parseDateUTC(eAction.getAttributeValue("action-nominal-time"));
227 String actualTimeStr = eAction.getAttributeValue("action-actual-time");
228 Date actualTime = null;
229 if (actualTimeStr == null) {
230 LOG.debug("Unable to get action-actual-time from action xml, this job is submitted " +
231 "from previous version. Assign current date to actual time, action = " + actionId);
232 actualTime = new Date();
233 } else {
234 actualTime = DateUtils.parseDateUTC(actualTimeStr);
235 }
236
237 StringBuffer resultedXml = new StringBuffer();
238
239 boolean ret;
240 Element inputList = eAction.getChild("input-events", eAction.getNamespace());
241 if (inputList != null) {
242 ret = materializeUnresolvedEvent(inputList.getChildren("data-in", eAction.getNamespace()), nominalTime,
243 actualTime, actionConf);
244 if (ret == false) {
245 resultedXml.append(strAction);
246 return false;
247 }
248 }
249
250 // Using latest() or future() in output-event is not intuitive.
251 // We need to make sure, this assumption is correct.
252 Element outputList = eAction.getChild("output-events", eAction.getNamespace());
253 if (outputList != null) {
254 for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
255 if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) != null) {
256 throw new CommandException(ErrorCode.E1006, "coord:latest()/future()",
257 " not permitted in output-event ");
258 }
259 }
260 }
261 return true;
262 }
263
264 /**
265 * Resolve the list of data input paths
266 *
267 * @param eDataEvents the list of data input elements
268 * @param nominalTime action nominal time
269 * @param actualTime current time
270 * @param conf action configuration
271 * @return true if all unresolved URIs can be resolved
272 * @throws Exception thrown if failed to resolve data input paths
273 */
274 @SuppressWarnings("unchecked")
275 private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime,
276 Configuration conf) throws Exception {
277 for (Element dEvent : eDataEvents) {
278 if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) == null) {
279 continue;
280 }
281 ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf);
282 String uresolvedInstance = dEvent.getChild("unresolved-instances", dEvent.getNamespace()).getTextTrim();
283 String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR);
284 StringBuffer resolvedTmp = new StringBuffer();
285 for (int i = 0; i < unresolvedList.length; i++) {
286 String ret = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]);
287 Boolean isResolved = (Boolean) eval.getVariable("is_resolved");
288 if (isResolved == false) {
289 LOG.info("[" + actionId + "]::Cannot resolve: " + ret);
290 return false;
291 }
292 if (resolvedTmp.length() > 0) {
293 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR);
294 }
295 resolvedTmp.append((String) eval.getVariable("resolved_path"));
296 }
297 if (resolvedTmp.length() > 0) {
298 if (dEvent.getChild("uris", dEvent.getNamespace()) != null) {
299 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append(
300 dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim());
301 dEvent.removeChild("uris", dEvent.getNamespace());
302 }
303 Element uriInstance = new Element("uris", dEvent.getNamespace());
304 uriInstance.addContent(resolvedTmp.toString());
305 dEvent.getContent().add(1, uriInstance);
306 }
307 dEvent.removeChild("unresolved-instances", dEvent.getNamespace());
308 }
309
310 return true;
311 }
312
313 /**
314 * Check all resolved URIs existence
315 *
316 * @param eAction action element
317 * @param existList the list of existed paths
318 * @param nonExistList the list of paths to check existence
319 * @param conf action configuration
320 * @return true if all nonExistList paths exist
321 * @throws IOException thrown if unable to access the path
322 */
323 private boolean checkResolvedUris(Element eAction, StringBuilder existList, StringBuilder nonExistList,
324 Configuration conf) throws IOException {
325 LOG.info("[" + actionId + "]::ActionInputCheck:: In checkResolvedUris...");
326 Element inputList = eAction.getChild("input-events", eAction.getNamespace());
327 if (inputList != null) {
328 if (nonExistList.length() > 0) {
329 checkListOfPaths(existList, nonExistList, conf);
330 }
331 return nonExistList.length() == 0;
332 }
333 return true;
334 }
335
336 /**
337 * Check a list of non existed paths and add to exist list if it exists
338 *
339 * @param existList the list of existed paths
340 * @param nonExistList the list of paths to check existence
341 * @param conf action configuration
342 * @return true if all nonExistList paths exist
343 * @throws IOException thrown if unable to access the path
344 */
345 private boolean checkListOfPaths(StringBuilder existList, StringBuilder nonExistList, Configuration conf)
346 throws IOException {
347
348 String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR);
349 if (uriList[0] != null) {
350 LOG.info("[" + actionId + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + " is Missing.");
351 }
352
353 nonExistList.delete(0, nonExistList.length());
354 boolean allExists = true;
355 String existSeparator = "", nonExistSeparator = "";
356 for (int i = 0; i < uriList.length; i++) {
357 if (allExists) {
358 allExists = pathExists(uriList[i], conf);
359 LOG.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists);
360 }
361 if (allExists) {
362 existList.append(existSeparator).append(uriList[i]);
363 existSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
364 }
365 else {
366 nonExistList.append(nonExistSeparator).append(uriList[i]);
367 nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
368 }
369 }
370 return allExists;
371 }
372
373 /**
374 * Check if given path exists
375 *
376 * @param sPath uri path
377 * @param actionConf action configuration
378 * @return true if path exists
379 * @throws IOException thrown if unable to access the path
380 */
381 private boolean pathExists(String sPath, Configuration actionConf) throws IOException {
382 LOG.debug("checking for the file " + sPath);
383 Path path = new Path(sPath);
384 String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
385 String group = ParamChecker.notEmpty(actionConf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME);
386 try {
387 return Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, path.toUri(),
388 new Configuration()).exists(path);
389 }
390 catch (HadoopAccessorException e) {
391 throw new IOException(e);
392 }
393 }
394
395 /**
396 * The function create a list of URIs separated by "," using the instances time stamp and URI-template
397 *
398 * @param event : <data-in> event
399 * @param instances : List of time stamp seprated by ","
400 * @param unresolvedInstances : list of instance with latest/future function
401 * @return : list of URIs separated by ",".
402 * @throws Exception thrown if failed to create URIs from unresolvedInstances
403 */
404 @SuppressWarnings("unused")
405 private String createURIs(Element event, String instances, StringBuilder unresolvedInstances) throws Exception {
406 if (instances == null || instances.length() == 0) {
407 return "";
408 }
409 String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR);
410 StringBuilder uris = new StringBuilder();
411
412 for (int i = 0; i < instanceList.length; i++) {
413 int funcType = CoordCommandUtils.getFuncType(instanceList[i]);
414 if (funcType == CoordCommandUtils.LATEST || funcType == CoordCommandUtils.FUTURE) {
415 if (unresolvedInstances.length() > 0) {
416 unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR);
417 }
418 unresolvedInstances.append(instanceList[i]);
419 continue;
420 }
421 ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]);
422 if (uris.length() > 0) {
423 uris.append(CoordELFunctions.INSTANCE_SEPARATOR);
424 }
425 uris.append(CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()).getChild(
426 "uri-template", event.getNamespace()).getTextTrim()));
427 }
428 return uris.toString();
429 }
430
431 /* (non-Javadoc)
432 * @see org.apache.oozie.command.XCommand#getEntityKey()
433 */
434 @Override
435 protected String getEntityKey() {
436 return coordAction.getJobId();
437 }
438
439 /* (non-Javadoc)
440 * @see org.apache.oozie.command.XCommand#isLockRequired()
441 */
442 @Override
443 protected boolean isLockRequired() {
444 return true;
445 }
446
447 /* (non-Javadoc)
448 * @see org.apache.oozie.command.XCommand#eagerLoadState()
449 */
450 @Override
451 protected void eagerLoadState() throws CommandException {
452 loadState();
453 }
454
455 /* (non-Javadoc)
456 * @see org.apache.oozie.command.XCommand#loadState()
457 */
458 @Override
459 protected void loadState() throws CommandException {
460 if (jpaService == null) {
461 jpaService = Services.get().get(JPAService.class);
462 }
463 try {
464 coordAction = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
465 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
466 }
467 catch (JPAExecutorException je) {
468 throw new CommandException(je);
469 }
470 LogUtils.setLogInfo(coordAction, logInfo);
471 }
472
473 /* (non-Javadoc)
474 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
475 */
476 @Override
477 protected void verifyPrecondition() throws CommandException, PreconditionException {
478 if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
479 throw new PreconditionException(ErrorCode.E1100, "[" + actionId
480 + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state="
481 + coordAction.getStatus());
482 }
483
484 // if eligible to do action input check when running with backward support is true
485 if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
486 return;
487 }
488
489 if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.PAUSED
490 && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
491 throw new PreconditionException(
492 ErrorCode.E1100, "["+ actionId + "]::CoordActionInputCheck:: Ignoring action." +
493 " Coordinator job is not in RUNNING/PAUSED/PAUSEDWITHERROR state, but state="
494 + coordJob.getStatus());
495 }
496 }
497
498 /* (non-Javadoc)
499 * @see org.apache.oozie.command.XCommand#getKey()
500 */
501 @Override
502 public String getKey(){
503 return getName() + "_" + actionId;
504 }
505
506 }