This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.util.Date;
023 import java.util.List;
024
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.fs.Path;
027 import org.apache.oozie.CoordinatorActionBean;
028 import org.apache.oozie.ErrorCode;
029 import org.apache.oozie.client.CoordinatorAction;
030 import org.apache.oozie.client.OozieClient;
031 import org.apache.oozie.command.CommandException;
032 import org.apache.oozie.coord.CoordELEvaluator;
033 import org.apache.oozie.coord.CoordELFunctions;
034 import org.apache.oozie.service.HadoopAccessorException;
035 import org.apache.oozie.service.HadoopAccessorService;
036 import org.apache.oozie.service.Services;
037 import org.apache.oozie.store.CoordinatorStore;
038 import org.apache.oozie.store.StoreException;
039 import org.apache.oozie.util.DateUtils;
040 import org.apache.oozie.util.ELEvaluator;
041 import org.apache.oozie.util.Instrumentation;
042 import org.apache.oozie.util.ParamChecker;
043 import org.apache.oozie.util.XConfiguration;
044 import org.apache.oozie.util.XLog;
045 import org.apache.oozie.util.XmlUtils;
046 import org.jdom.Element;
047
048 public class CoordActionInputCheckCommand extends CoordinatorCommand<Void> {
049
050 private String actionId;
051 private final XLog log = XLog.getLog(getClass());
052 private int COMMAND_REQUEUE_INTERVAL = 60000; // 1 minute
053 private CoordinatorActionBean coordAction = null;
054
055 public CoordActionInputCheckCommand(String actionId) {
056 super("coord_action_input", "coord_action_input", 1, XLog.STD);
057 this.actionId = actionId;
058 }
059
060 @Override
061 protected Void call(CoordinatorStore store) throws StoreException, CommandException {
062 log.debug("After store.get() for action ID " + actionId + " : " + coordAction.getStatus());
063 // this action should only get processed if current time >
064 // materialization time
065 // otherwise, requeue this action after 30 seconds
066 Date nominalTime = coordAction.getNominalTime();
067 Date currentTime = new Date();
068 if (nominalTime.compareTo(currentTime) > 0) {
069 log.info("[" + actionId
070 + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current="
071 + currentTime + ", nominal=" + nominalTime);
072 queueCallable(new CoordActionInputCheckCommand(coordAction.getId()), Math.max(
073 (nominalTime.getTime() - currentTime.getTime()), COMMAND_REQUEUE_INTERVAL));
074 // update lastModifiedTime
075 store.updateCoordinatorAction(coordAction);
076 return null;
077 }
078 if (coordAction.getStatus() == CoordinatorActionBean.Status.WAITING) {
079 log.info("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state.");
080 StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());// job.getXml();
081 Instrumentation.Cron cron = new Instrumentation.Cron();
082 try {
083 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
084 cron.start();
085 StringBuilder existList = new StringBuilder();
086 StringBuilder nonExistList = new StringBuilder();
087 StringBuilder nonResolvedList = new StringBuilder();
088 CoordCommandUtils.getResolvedList(coordAction.getMissingDependencies(), nonExistList, nonResolvedList);
089
090 String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR);
091 if (uriList.length > 0) {
092 log.info("[" + actionId + "]::ActionInputCheck:: Missing deps:" + uriList[0] + ", NonResolvedList:"
093 + nonResolvedList.toString());
094 } else {
095 log.info("[" + actionId + "]::ActionInputCheck:: No missing deps, NonResolvedList:"
096 + nonResolvedList.toString());
097 }
098 boolean status = checkInput(actionXml, existList, nonExistList, actionConf);
099 coordAction.setLastModifiedTime(currentTime);
100 coordAction.setActionXml(actionXml.toString());
101 if (nonResolvedList.length() > 0 && status == false) {
102 nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
103 }
104 coordAction.setMissingDependencies(nonExistList.toString());
105 if (status == true) {
106 coordAction.setStatus(CoordinatorAction.Status.READY);
107 // pass jobID to the ReadyCommand
108 queueCallable(new CoordActionReadyCommand(coordAction.getJobId()), 100);
109 }
110 else {
111 long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(),
112 coordAction.getCreatedTime().getTime())) / (60 * 1000);
113 int timeOut = coordAction.getTimeOut();
114 if ((timeOut >= 0) && (waitingTime > timeOut)) {
115 queueCallable(new CoordActionTimeOut(coordAction), 100);
116 coordAction.setStatus(CoordinatorAction.Status.TIMEDOUT);
117 }
118 else {
119 queueCallable(new CoordActionInputCheckCommand(coordAction.getId()), COMMAND_REQUEUE_INTERVAL);
120 }
121 }
122 store.updateCoordActionMin(coordAction);
123 }
124 catch (Exception e) {
125 log.warn(actionId + ": Exception occurs: " + e + " STORE is active " + store.isActive(), e);
126 throw new CommandException(ErrorCode.E1005, e.getMessage(), e);
127 }
128 cron.stop();
129 }
130 else {
131 log.info("[" + actionId + "]::ActionInputCheck:: Ignoring action. Should be in WAITING state, but state="
132 + coordAction.getStatus());
133 }
134 return null;
135 }
136
137 protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList,
138 Configuration conf) throws Exception {
139 Element eAction = XmlUtils.parseXml(actionXml.toString());
140 boolean allExist = checkResolvedUris(eAction, existList, nonExistList, conf);
141 if (allExist) {
142 log.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
143 allExist = checkUnresolvedInstances(eAction, conf);
144 }
145 if (allExist == true) {
146 materializeDataProperties(eAction, conf);
147 actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString());
148 }
149 return allExist;
150 }
151
152 /**
153 * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list
154 * of files that will be needed.
155 *
156 * @param eAction
157 * @param conf
158 * @throws Exception
159 * @update modify 'Action' element with appropriate list of files.
160 */
161 private void materializeDataProperties(Element eAction, Configuration conf) throws Exception {
162 ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, conf, actionId);
163 Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow",
164 eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
165 if (configElem != null) {
166 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
167 resolveTagContents("value", propElem, eval);
168 }
169 }
170 }
171
172 private void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception {
173 if (elem == null) {
174 return;
175 }
176 Element tagElem = elem.getChild(tagName, elem.getNamespace());
177 if (tagElem != null) {
178 String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText());
179 tagElem.removeContent();
180 tagElem.addContent(updated);
181 }
182 else {
183 log.warn(" Value NOT FOUND " + tagName);
184 }
185 }
186
187 private boolean checkUnresolvedInstances(Element eAction, Configuration actionConf)
188 throws Exception {
189 String strAction = XmlUtils.prettyPrint(eAction).toString();
190 Date nominalTime = DateUtils.parseDateUTC(eAction.getAttributeValue("action-nominal-time"));
191 String actualTimeStr = eAction.getAttributeValue("action-actual-time");
192 Date actualTime = null;
193 if (actualTimeStr == null) {
194 log.debug("Unable to get action-actual-time from action xml, this job is submitted " +
195 "from previous version. Assign current date to actual time, action = " + actionId);
196 actualTime = new Date();
197 } else {
198 actualTime = DateUtils.parseDateUTC(actualTimeStr);
199 }
200
201 StringBuffer resultedXml = new StringBuffer();
202
203 boolean ret;
204 Element inputList = eAction.getChild("input-events", eAction.getNamespace());
205 if (inputList != null) {
206 ret = materializeUnresolvedEvent(inputList.getChildren("data-in", eAction.getNamespace()), nominalTime,
207 actualTime, actionConf);
208 if (ret == false) {
209 resultedXml.append(strAction);
210 return false;
211 }
212 }
213
214 // Using latest() or future() in output-event is not intuitive.
215 // We need to make
216 // sure, this assumption is correct.
217 Element outputList = eAction.getChild("output-events", eAction.getNamespace());
218 if (outputList != null) {
219 for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
220 if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) != null) {
221 throw new CommandException(ErrorCode.E1006, "coord:latest()/future()",
222 " not permitted in output-event ");
223 }
224 }
225 /*
226 * ret = materializeUnresolvedEvent( (List<Element>)
227 * outputList.getChildren("data-out", eAction.getNamespace()),
228 * actualTime, nominalTime, actionConf); if (ret == false) {
229 * resultedXml.append(strAction); return false; }
230 */
231 }
232 return true;
233 }
234
235 private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime,
236 Configuration conf) throws Exception {
237 for (Element dEvent : eDataEvents) {
238 if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) == null) {
239 continue;
240 }
241 ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf);
242 String uresolvedInstance = dEvent.getChild("unresolved-instances", dEvent.getNamespace()).getTextTrim();
243 String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR);
244 StringBuffer resolvedTmp = new StringBuffer();
245 for (int i = 0; i < unresolvedList.length; i++) {
246 String ret = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]);
247 Boolean isResolved = (Boolean) eval.getVariable("is_resolved");
248 if (isResolved == false) {
249 log.info("[" + actionId + "]::Cannot resolve: " + ret);
250 return false;
251 }
252 if (resolvedTmp.length() > 0) {
253 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR);
254 }
255 resolvedTmp.append((String) eval.getVariable("resolved_path"));
256 }
257 if (resolvedTmp.length() > 0) {
258 if (dEvent.getChild("uris", dEvent.getNamespace()) != null) {
259 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append(
260 dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim());
261 dEvent.removeChild("uris", dEvent.getNamespace());
262 }
263 Element uriInstance = new Element("uris", dEvent.getNamespace());
264 uriInstance.addContent(resolvedTmp.toString());
265 dEvent.getContent().add(1, uriInstance);
266 }
267 dEvent.removeChild("unresolved-instances", dEvent.getNamespace());
268 }
269
270 return true;
271 }
272
273 private boolean checkResolvedUris(Element eAction, StringBuilder existList, StringBuilder nonExistList,
274 Configuration conf) throws IOException {
275
276 log.info("[" + actionId + "]::ActionInputCheck:: In checkResolvedUris...");
277 Element inputList = eAction.getChild("input-events", eAction.getNamespace());
278 if (inputList != null) {
279 // List<Element> eDataEvents = inputList.getChildren("data-in",
280 // eAction.getNamespace());
281 // for (Element event : eDataEvents) {
282 // Element uris = event.getChild("uris", event.getNamespace());
283 if (nonExistList.length() > 0) {
284 checkListOfPaths(existList, nonExistList, conf);
285 }
286 // }
287 return nonExistList.length() == 0;
288 }
289 return true;
290 }
291
292 private boolean checkListOfPaths(StringBuilder existList, StringBuilder nonExistList, Configuration conf)
293 throws IOException {
294
295 String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR);
296 if (uriList[0] != null) {
297 log.info("[" + actionId + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + " is Missing.");
298 }
299
300 nonExistList.delete(0, nonExistList.length());
301 boolean allExists = true;
302 String existSeparator = "", nonExistSeparator = "";
303 for (int i = 0; i < uriList.length; i++) {
304 if (allExists) {
305 allExists = pathExists(uriList[i], conf);
306 log.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists);
307 }
308 if (allExists) {
309 existList.append(existSeparator).append(uriList[i]);
310 existSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
311 }
312 else {
313 nonExistList.append(nonExistSeparator).append(uriList[i]);
314 nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
315 }
316 }
317 return allExists;
318 }
319
320 private boolean pathExists(String sPath, Configuration actionConf) throws IOException {
321 log.debug("checking for the file " + sPath);
322 Path path = new Path(sPath);
323 String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
324 String group = ParamChecker.notEmpty(actionConf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME);
325 try {
326 return Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, path.toUri(),
327 actionConf).exists(path);
328 }
329 catch (HadoopAccessorException e) {
330 throw new IOException(e);
331 }
332 }
333
334 /**
335 * The function create a list of URIs separated by "," using the instances time stamp and URI-template
336 *
337 * @param event : <data-in> event
338 * @param instances : List of time stamp seprated by ","
339 * @param unresolvedInstances : list of instance with latest/future function
340 * @return : list of URIs separated by ",".
341 * @throws Exception
342 */
343 private String createURIs(Element event, String instances, StringBuilder unresolvedInstances) throws Exception {
344 if (instances == null || instances.length() == 0) {
345 return "";
346 }
347 String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR);
348 StringBuilder uris = new StringBuilder();
349
350 for (int i = 0; i < instanceList.length; i++) {
351 int funcType = CoordCommandUtils.getFuncType(instanceList[i]);
352 if (funcType == CoordCommandUtils.LATEST || funcType == CoordCommandUtils.FUTURE) {
353 if (unresolvedInstances.length() > 0) {
354 unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR);
355 }
356 unresolvedInstances.append(instanceList[i]);
357 continue;
358 }
359 ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]);
360 // uris.append(eval.evaluate(event.getChild("dataset",
361 // event.getNamespace()).getChild("uri-template",
362 // event.getNamespace()).getTextTrim(), String.class));
363 if (uris.length() > 0) {
364 uris.append(CoordELFunctions.INSTANCE_SEPARATOR);
365 }
366 uris.append(CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()).getChild(
367 "uri-template", event.getNamespace()).getTextTrim()));
368 }
369 return uris.toString();
370 }
371
372 @Override
373 protected Void execute(CoordinatorStore store) throws StoreException, CommandException {
374 log.info("STARTED CoordActionInputCheckCommand for actionid=" + actionId);
375 try {
376 coordAction = store.getEntityManager().find(CoordinatorActionBean.class, actionId);
377 setLogInfo(coordAction);
378 if (lock(coordAction.getJobId())) {
379 call(store);
380 }
381 else {
382 queueCallable(new CoordActionInputCheckCommand(actionId), LOCK_FAILURE_REQUEUE_INTERVAL);
383 log.warn("CoordActionInputCheckCommand lock was not acquired - failed jobId=" + coordAction.getJobId()
384 + ", actionId=" + actionId + ". Requeing the same.");
385 }
386 }
387 catch (InterruptedException e) {
388 queueCallable(new CoordActionInputCheckCommand(actionId), LOCK_FAILURE_REQUEUE_INTERVAL);
389 log.warn("CoordActionInputCheckCommand lock acquiring failed with exception " + e.getMessage()
390 + " for jobId=" + coordAction.getJobId() + ", actionId=" + actionId + " Requeing the same.");
391 }
392 finally {
393 log.info("ENDED CoordActionInputCheckCommand for actionid=" + actionId);
394 }
395 return null;
396 }
397
398 /* (non-Javadoc)
399 * @see org.apache.oozie.command.Command#getKey()
400 */
401 @Override
402 public String getKey(){
403 return getName() + "_" + actionId;
404 }
405
406 }