001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.util.Date; 023 import java.util.List; 024 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.hadoop.fs.Path; 027 import org.apache.oozie.CoordinatorActionBean; 028 import org.apache.oozie.ErrorCode; 029 import org.apache.oozie.client.CoordinatorAction; 030 import org.apache.oozie.client.OozieClient; 031 import org.apache.oozie.command.CommandException; 032 import org.apache.oozie.coord.CoordELEvaluator; 033 import org.apache.oozie.coord.CoordELFunctions; 034 import org.apache.oozie.service.HadoopAccessorException; 035 import org.apache.oozie.service.HadoopAccessorService; 036 import org.apache.oozie.service.Services; 037 import org.apache.oozie.store.CoordinatorStore; 038 import org.apache.oozie.store.StoreException; 039 import org.apache.oozie.util.DateUtils; 040 import org.apache.oozie.util.ELEvaluator; 041 import org.apache.oozie.util.Instrumentation; 042 import org.apache.oozie.util.ParamChecker; 043 import org.apache.oozie.util.XConfiguration; 044 import org.apache.oozie.util.XLog; 045 import org.apache.oozie.util.XmlUtils; 046 import org.jdom.Element; 047 048 public class CoordActionInputCheckCommand extends CoordinatorCommand<Void> { 049 050 private String actionId; 051 private final XLog log = XLog.getLog(getClass()); 052 private int COMMAND_REQUEUE_INTERVAL = 60000; // 1 minute 053 private CoordinatorActionBean coordAction = null; 054 055 public CoordActionInputCheckCommand(String actionId) { 056 super("coord_action_input", "coord_action_input", 1, XLog.STD); 057 this.actionId = actionId; 058 } 059 060 @Override 061 protected Void call(CoordinatorStore store) throws StoreException, CommandException { 062 log.debug("After store.get() for action ID " + actionId + " : " + coordAction.getStatus()); 063 // this action should only get processed if current time > 064 // materialization time 065 // otherwise, requeue this action after 30 seconds 066 Date nominalTime = coordAction.getNominalTime(); 067 Date currentTime = new Date(); 068 if (nominalTime.compareTo(currentTime) > 0) { 069 log.info("[" + actionId 070 + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current=" 071 + currentTime + ", nominal=" + nominalTime); 072 queueCallable(new CoordActionInputCheckCommand(coordAction.getId()), Math.max( 073 (nominalTime.getTime() - currentTime.getTime()), COMMAND_REQUEUE_INTERVAL)); 074 // update lastModifiedTime 075 store.updateCoordinatorAction(coordAction); 076 return null; 077 } 078 if (coordAction.getStatus() == CoordinatorActionBean.Status.WAITING) { 079 log.info("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state."); 080 StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());// job.getXml(); 081 Instrumentation.Cron cron = new Instrumentation.Cron(); 082 try { 083 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); 084 cron.start(); 085 StringBuilder existList = new StringBuilder(); 086 StringBuilder nonExistList = new StringBuilder(); 087 StringBuilder nonResolvedList = new StringBuilder(); 088 CoordCommandUtils.getResolvedList(coordAction.getMissingDependencies(), nonExistList, nonResolvedList); 089 090 String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR); 091 if (uriList.length > 0) { 092 log.info("[" + actionId + "]::ActionInputCheck:: Missing deps:" + uriList[0] + ", NonResolvedList:" 093 + nonResolvedList.toString()); 094 } else { 095 log.info("[" + actionId + "]::ActionInputCheck:: No missing deps, NonResolvedList:" 096 + nonResolvedList.toString()); 097 } 098 boolean status = checkInput(actionXml, existList, nonExistList, actionConf); 099 coordAction.setLastModifiedTime(currentTime); 100 coordAction.setActionXml(actionXml.toString()); 101 if (nonResolvedList.length() > 0 && status == false) { 102 nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList); 103 } 104 coordAction.setMissingDependencies(nonExistList.toString()); 105 if (status == true) { 106 coordAction.setStatus(CoordinatorAction.Status.READY); 107 // pass jobID to the ReadyCommand 108 queueCallable(new CoordActionReadyCommand(coordAction.getJobId()), 100); 109 } 110 else { 111 long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), 112 coordAction.getCreatedTime().getTime())) / (60 * 1000); 113 int timeOut = coordAction.getTimeOut(); 114 if ((timeOut >= 0) && (waitingTime > timeOut)) { 115 queueCallable(new CoordActionTimeOut(coordAction), 100); 116 coordAction.setStatus(CoordinatorAction.Status.TIMEDOUT); 117 } 118 else { 119 queueCallable(new CoordActionInputCheckCommand(coordAction.getId()), COMMAND_REQUEUE_INTERVAL); 120 } 121 } 122 store.updateCoordActionMin(coordAction); 123 } 124 catch (Exception e) { 125 log.warn(actionId + ": Exception occurs: " + e + " STORE is active " + store.isActive(), e); 126 throw new CommandException(ErrorCode.E1005, e.getMessage(), e); 127 } 128 cron.stop(); 129 } 130 else { 131 log.info("[" + actionId + "]::ActionInputCheck:: Ignoring action. Should be in WAITING state, but state=" 132 + coordAction.getStatus()); 133 } 134 return null; 135 } 136 137 protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList, 138 Configuration conf) throws Exception { 139 Element eAction = XmlUtils.parseXml(actionXml.toString()); 140 boolean allExist = checkResolvedUris(eAction, existList, nonExistList, conf); 141 if (allExist) { 142 log.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future"); 143 allExist = checkUnresolvedInstances(eAction, conf); 144 } 145 if (allExist == true) { 146 materializeDataProperties(eAction, conf); 147 actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString()); 148 } 149 return allExist; 150 } 151 152 /** 153 * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list 154 * of files that will be needed. 155 * 156 * @param eAction 157 * @param conf 158 * @throws Exception 159 * @update modify 'Action' element with appropriate list of files. 160 */ 161 private void materializeDataProperties(Element eAction, Configuration conf) throws Exception { 162 ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, conf, actionId); 163 Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow", 164 eAction.getNamespace()).getChild("configuration", eAction.getNamespace()); 165 if (configElem != null) { 166 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) { 167 resolveTagContents("value", propElem, eval); 168 } 169 } 170 } 171 172 private void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception { 173 if (elem == null) { 174 return; 175 } 176 Element tagElem = elem.getChild(tagName, elem.getNamespace()); 177 if (tagElem != null) { 178 String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText()); 179 tagElem.removeContent(); 180 tagElem.addContent(updated); 181 } 182 else { 183 log.warn(" Value NOT FOUND " + tagName); 184 } 185 } 186 187 private boolean checkUnresolvedInstances(Element eAction, Configuration actionConf) 188 throws Exception { 189 String strAction = XmlUtils.prettyPrint(eAction).toString(); 190 Date nominalTime = DateUtils.parseDateUTC(eAction.getAttributeValue("action-nominal-time")); 191 String actualTimeStr = eAction.getAttributeValue("action-actual-time"); 192 Date actualTime = null; 193 if (actualTimeStr == null) { 194 log.debug("Unable to get action-actual-time from action xml, this job is submitted " + 195 "from previous version. Assign current date to actual time, action = " + actionId); 196 actualTime = new Date(); 197 } else { 198 actualTime = DateUtils.parseDateUTC(actualTimeStr); 199 } 200 201 StringBuffer resultedXml = new StringBuffer(); 202 203 boolean ret; 204 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 205 if (inputList != null) { 206 ret = materializeUnresolvedEvent(inputList.getChildren("data-in", eAction.getNamespace()), nominalTime, 207 actualTime, actionConf); 208 if (ret == false) { 209 resultedXml.append(strAction); 210 return false; 211 } 212 } 213 214 // Using latest() or future() in output-event is not intuitive. 215 // We need to make 216 // sure, this assumption is correct. 217 Element outputList = eAction.getChild("output-events", eAction.getNamespace()); 218 if (outputList != null) { 219 for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) { 220 if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) != null) { 221 throw new CommandException(ErrorCode.E1006, "coord:latest()/future()", 222 " not permitted in output-event "); 223 } 224 } 225 /* 226 * ret = materializeUnresolvedEvent( (List<Element>) 227 * outputList.getChildren("data-out", eAction.getNamespace()), 228 * actualTime, nominalTime, actionConf); if (ret == false) { 229 * resultedXml.append(strAction); return false; } 230 */ 231 } 232 return true; 233 } 234 235 private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime, 236 Configuration conf) throws Exception { 237 for (Element dEvent : eDataEvents) { 238 if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) == null) { 239 continue; 240 } 241 ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf); 242 String uresolvedInstance = dEvent.getChild("unresolved-instances", dEvent.getNamespace()).getTextTrim(); 243 String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR); 244 StringBuffer resolvedTmp = new StringBuffer(); 245 for (int i = 0; i < unresolvedList.length; i++) { 246 String ret = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]); 247 Boolean isResolved = (Boolean) eval.getVariable("is_resolved"); 248 if (isResolved == false) { 249 log.info("[" + actionId + "]::Cannot resolve: " + ret); 250 return false; 251 } 252 if (resolvedTmp.length() > 0) { 253 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR); 254 } 255 resolvedTmp.append((String) eval.getVariable("resolved_path")); 256 } 257 if (resolvedTmp.length() > 0) { 258 if (dEvent.getChild("uris", dEvent.getNamespace()) != null) { 259 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append( 260 dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim()); 261 dEvent.removeChild("uris", dEvent.getNamespace()); 262 } 263 Element uriInstance = new Element("uris", dEvent.getNamespace()); 264 uriInstance.addContent(resolvedTmp.toString()); 265 dEvent.getContent().add(1, uriInstance); 266 } 267 dEvent.removeChild("unresolved-instances", dEvent.getNamespace()); 268 } 269 270 return true; 271 } 272 273 private boolean checkResolvedUris(Element eAction, StringBuilder existList, StringBuilder nonExistList, 274 Configuration conf) throws IOException { 275 276 log.info("[" + actionId + "]::ActionInputCheck:: In checkResolvedUris..."); 277 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 278 if (inputList != null) { 279 // List<Element> eDataEvents = inputList.getChildren("data-in", 280 // eAction.getNamespace()); 281 // for (Element event : eDataEvents) { 282 // Element uris = event.getChild("uris", event.getNamespace()); 283 if (nonExistList.length() > 0) { 284 checkListOfPaths(existList, nonExistList, conf); 285 } 286 // } 287 return nonExistList.length() == 0; 288 } 289 return true; 290 } 291 292 private boolean checkListOfPaths(StringBuilder existList, StringBuilder nonExistList, Configuration conf) 293 throws IOException { 294 295 String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR); 296 if (uriList[0] != null) { 297 log.info("[" + actionId + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + " is Missing."); 298 } 299 300 nonExistList.delete(0, nonExistList.length()); 301 boolean allExists = true; 302 String existSeparator = "", nonExistSeparator = ""; 303 for (int i = 0; i < uriList.length; i++) { 304 if (allExists) { 305 allExists = pathExists(uriList[i], conf); 306 log.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists); 307 } 308 if (allExists) { 309 existList.append(existSeparator).append(uriList[i]); 310 existSeparator = CoordELFunctions.INSTANCE_SEPARATOR; 311 } 312 else { 313 nonExistList.append(nonExistSeparator).append(uriList[i]); 314 nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR; 315 } 316 } 317 return allExists; 318 } 319 320 private boolean pathExists(String sPath, Configuration actionConf) throws IOException { 321 log.debug("checking for the file " + sPath); 322 Path path = new Path(sPath); 323 String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 324 String group = ParamChecker.notEmpty(actionConf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME); 325 try { 326 return Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, path.toUri(), 327 actionConf).exists(path); 328 } 329 catch (HadoopAccessorException e) { 330 throw new IOException(e); 331 } 332 } 333 334 /** 335 * The function create a list of URIs separated by "," using the instances time stamp and URI-template 336 * 337 * @param event : <data-in> event 338 * @param instances : List of time stamp seprated by "," 339 * @param unresolvedInstances : list of instance with latest/future function 340 * @return : list of URIs separated by ",". 341 * @throws Exception 342 */ 343 private String createURIs(Element event, String instances, StringBuilder unresolvedInstances) throws Exception { 344 if (instances == null || instances.length() == 0) { 345 return ""; 346 } 347 String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR); 348 StringBuilder uris = new StringBuilder(); 349 350 for (int i = 0; i < instanceList.length; i++) { 351 int funcType = CoordCommandUtils.getFuncType(instanceList[i]); 352 if (funcType == CoordCommandUtils.LATEST || funcType == CoordCommandUtils.FUTURE) { 353 if (unresolvedInstances.length() > 0) { 354 unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR); 355 } 356 unresolvedInstances.append(instanceList[i]); 357 continue; 358 } 359 ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]); 360 // uris.append(eval.evaluate(event.getChild("dataset", 361 // event.getNamespace()).getChild("uri-template", 362 // event.getNamespace()).getTextTrim(), String.class)); 363 if (uris.length() > 0) { 364 uris.append(CoordELFunctions.INSTANCE_SEPARATOR); 365 } 366 uris.append(CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()).getChild( 367 "uri-template", event.getNamespace()).getTextTrim())); 368 } 369 return uris.toString(); 370 } 371 372 @Override 373 protected Void execute(CoordinatorStore store) throws StoreException, CommandException { 374 log.info("STARTED CoordActionInputCheckCommand for actionid=" + actionId); 375 try { 376 coordAction = store.getEntityManager().find(CoordinatorActionBean.class, actionId); 377 setLogInfo(coordAction); 378 if (lock(coordAction.getJobId())) { 379 call(store); 380 } 381 else { 382 queueCallable(new CoordActionInputCheckCommand(actionId), LOCK_FAILURE_REQUEUE_INTERVAL); 383 log.warn("CoordActionInputCheckCommand lock was not acquired - failed jobId=" + coordAction.getJobId() 384 + ", actionId=" + actionId + ". Requeing the same."); 385 } 386 } 387 catch (InterruptedException e) { 388 queueCallable(new CoordActionInputCheckCommand(actionId), LOCK_FAILURE_REQUEUE_INTERVAL); 389 log.warn("CoordActionInputCheckCommand lock acquiring failed with exception " + e.getMessage() 390 + " for jobId=" + coordAction.getJobId() + ", actionId=" + actionId + " Requeing the same."); 391 } 392 finally { 393 log.info("ENDED CoordActionInputCheckCommand for actionid=" + actionId); 394 } 395 return null; 396 } 397 398 /* (non-Javadoc) 399 * @see org.apache.oozie.command.Command#getKey() 400 */ 401 @Override 402 public String getKey(){ 403 return getName() + "_" + actionId; 404 } 405 406 }