001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.coord.input.dependency; 020 021import java.io.IOException; 022import java.io.StringReader; 023import java.net.URISyntaxException; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.Collection; 027import java.util.Date; 028import java.util.HashMap; 029import java.util.HashSet; 030import java.util.Iterator; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034 035import org.apache.commons.lang.StringUtils; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.security.AccessControlException; 038import org.apache.oozie.CoordinatorActionBean; 039import org.apache.oozie.ErrorCode; 040import org.apache.oozie.client.OozieClient; 041import org.apache.oozie.command.CommandException; 042import org.apache.oozie.command.coord.CoordCommandUtils; 043import org.apache.oozie.coord.CoordELConstants; 044import org.apache.oozie.coord.CoordELEvaluator; 045import org.apache.oozie.coord.CoordELFunctions; 046import org.apache.oozie.coord.CoordUtils; 047import org.apache.oozie.dependency.ActionDependency; 048import org.apache.oozie.dependency.DependencyChecker; 049import org.apache.oozie.dependency.URIHandler; 050import org.apache.oozie.dependency.URIHandlerException; 051import org.apache.oozie.service.Services; 052import org.apache.oozie.service.URIHandlerService; 053import org.apache.oozie.util.DateUtils; 054import org.apache.oozie.util.ELEvaluator; 055import org.apache.oozie.util.ParamChecker; 056import org.apache.oozie.util.XConfiguration; 057import org.apache.oozie.util.XLog; 058import org.apache.oozie.util.XmlUtils; 059import org.jdom.Element; 060import org.jdom.JDOMException; 061 062/** 063 * Old approach where dependencies are stored as String. 064 */ 065public class CoordOldInputDependency implements CoordInputDependency { 066 067 private XLog log = XLog.getLog(getClass()); 068 069 protected transient String missingDependencies = ""; 070 071 public CoordOldInputDependency(String missingDependencies) { 072 this.missingDependencies = missingDependencies; 073 } 074 075 public CoordOldInputDependency() { 076 } 077 078 @Override 079 public void addInputInstanceList(String inputEventName, List<CoordInputInstance> inputInstanceList) { 080 appendToDependencies(inputInstanceList); 081 } 082 083 @Override 084 public String getMissingDependencies() { 085 return missingDependencies; 086 } 087 088 @Override 089 public boolean isDependencyMet() { 090 return StringUtils.isEmpty(missingDependencies); 091 } 092 093 @Override 094 public boolean isUnResolvedDependencyMet() { 095 return false; 096 } 097 098 @Override 099 public void setDependencyMet(boolean isDependencyMeet) { 100 if (isDependencyMeet) { 101 missingDependencies = ""; 102 } 103 104 } 105 106 @Override 107 public String serialize() throws IOException { 108 return missingDependencies; 109 } 110 111 @Override 112 public List<String> getMissingDependenciesAsList() { 113 return Arrays.asList(DependencyChecker.dependenciesAsArray(missingDependencies)); 114 } 115 116 @Override 117 public List<String> getAvailableDependenciesAsList() { 118 return new ArrayList<String>(); 119 } 120 121 @Override 122 public void setMissingDependencies(String missingDependencies) { 123 this.missingDependencies = missingDependencies; 124 125 } 126 127 public void appendToDependencies(List<CoordInputInstance> inputInstanceList) { 128 StringBuilder sb = new StringBuilder(missingDependencies); 129 boolean isFirst = true; 130 for (CoordInputInstance coordInputInstance : inputInstanceList) { 131 if (isFirst) { 132 if (!StringUtils.isEmpty(sb.toString())) { 133 sb.append(CoordELFunctions.INSTANCE_SEPARATOR); 134 } 135 } 136 else { 137 sb.append(CoordELFunctions.INSTANCE_SEPARATOR); 138 139 } 140 sb.append(coordInputInstance.getInputDataInstance()); 141 isFirst = false; 142 } 143 missingDependencies = sb.toString(); 144 } 145 146 @Override 147 public void addUnResolvedList(String name, String unresolvedDependencies) { 148 StringBuilder sb = new StringBuilder(missingDependencies); 149 sb.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(unresolvedDependencies); 150 missingDependencies = sb.toString(); 151 } 152 153 @Override 154 public List<String> getAvailableDependencies(String dataSet) { 155 return null; 156 } 157 158 @Override 159 public void addToAvailableDependencies(Collection<String> availableList) { 160 161 if (StringUtils.isEmpty(missingDependencies)) { 162 return; 163 } 164 List<String> missingDependenciesList = new ArrayList<String>( 165 Arrays.asList((DependencyChecker.dependenciesAsArray(missingDependencies)))); 166 missingDependenciesList.removeAll(availableList); 167 missingDependencies = DependencyChecker.dependenciesAsString(missingDependenciesList); 168 169 } 170 171 @Override 172 public boolean checkPullMissingDependencies(CoordinatorActionBean coordAction, StringBuilder existList, 173 StringBuilder nonExistList) throws IOException, JDOMException { 174 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); 175 Element eAction = XmlUtils.parseXml(coordAction.getActionXml()); 176 177 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 178 if (inputList != null) { 179 if (nonExistList.length() > 0) { 180 checkListOfPaths(coordAction, existList, nonExistList, actionConf); 181 } 182 return nonExistList.length() == 0; 183 } 184 return true; 185 } 186 187 public ActionDependency checkPushMissingDependencies(CoordinatorActionBean coordAction, 188 boolean registerForNotification) throws CommandException, IOException { 189 return DependencyChecker.checkForAvailability(getMissingDependenciesAsList(), 190 new XConfiguration(new StringReader(coordAction.getRunConf())), !registerForNotification); 191 } 192 193 private boolean checkListOfPaths(CoordinatorActionBean coordAction, StringBuilder existList, 194 StringBuilder nonExistList, Configuration conf) throws IOException { 195 196 String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR); 197 if (uriList[0] != null) { 198 log.info("[" + coordAction.getId() + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] 199 + " is Missing."); 200 } 201 202 nonExistList.delete(0, nonExistList.length()); 203 boolean allExists = true; 204 String existSeparator = "", nonExistSeparator = ""; 205 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 206 for (int i = 0; i < uriList.length; i++) { 207 if (allExists) { 208 allExists = pathExists(coordAction, uriList[i], conf, user); 209 log.info("[" + coordAction.getId() + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" 210 + allExists); 211 } 212 if (allExists) { 213 existList.append(existSeparator).append(uriList[i]); 214 existSeparator = CoordELFunctions.INSTANCE_SEPARATOR; 215 } 216 else { 217 nonExistList.append(nonExistSeparator).append(uriList[i]); 218 nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR; 219 } 220 } 221 return allExists; 222 } 223 224 public boolean pathExists(CoordinatorActionBean coordAction, String sPath, Configuration actionConf, String user) 225 throws IOException { 226 log.debug("checking for the file " + sPath); 227 try { 228 return CoordCommandUtils.pathExists(sPath, actionConf, user); 229 } 230 catch (URIHandlerException e) { 231 if (coordAction != null) { 232 coordAction.setErrorCode(e.getErrorCode().toString()); 233 coordAction.setErrorMessage(e.getMessage()); 234 } 235 if (e.getCause() != null && e.getCause() instanceof AccessControlException) { 236 throw (AccessControlException) e.getCause(); 237 } 238 else { 239 log.error(e); 240 throw new IOException(e); 241 } 242 } 243 catch (URISyntaxException e) { 244 if (coordAction != null) { 245 coordAction.setErrorCode(ErrorCode.E0906.toString()); 246 coordAction.setErrorMessage(e.getMessage()); 247 } 248 log.error(e); 249 throw new IOException(e); 250 } 251 } 252 253 public boolean isChangeInDependency(StringBuilder nonExistList, String missingDependencies, 254 StringBuilder nonResolvedList, boolean status) { 255 if ((!nonExistList.toString().equals(missingDependencies) || missingDependencies.isEmpty())) { 256 setMissingDependencies(nonExistList.toString()); 257 return true; 258 } 259 return false; 260 } 261 262 @SuppressWarnings("unchecked") 263 public boolean checkUnresolved(CoordinatorActionBean coordAction, Element eAction) throws Exception { 264 Date nominalTime = DateUtils.parseDateOozieTZ(eAction.getAttributeValue("action-nominal-time")); 265 String actualTimeStr = eAction.getAttributeValue("action-actual-time"); 266 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 267 268 if (inputList == null) { 269 return true; 270 } 271 272 List<Element> eDataEvents = inputList.getChildren("data-in", eAction.getNamespace()); 273 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); 274 275 if (eDataEvents != null) { 276 Date actualTime = null; 277 if (actualTimeStr == null) { 278 actualTime = new Date(); 279 } 280 else { 281 actualTime = DateUtils.parseDateOozieTZ(actualTimeStr); 282 } 283 284 for (Element dEvent : eDataEvents) { 285 if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()) == null) { 286 continue; 287 } 288 ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, actionConf); 289 String unResolvedInstance = dEvent 290 .getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()).getTextTrim(); 291 String unresolvedList[] = unResolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR); 292 StringBuffer resolvedTmp = new StringBuffer(); 293 for (int i = 0; i < unresolvedList.length; i++) { 294 String returnData = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]); 295 Boolean isResolved = (Boolean) eval.getVariable(CoordELConstants.IS_RESOLVED); 296 if (isResolved == false) { 297 log.info("[" + coordAction.getId() + "] :: Cannot resolve : " + returnData); 298 return false; 299 } 300 if (resolvedTmp.length() > 0) { 301 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR); 302 } 303 resolvedTmp.append((String) eval.getVariable(CoordELConstants.RESOLVED_PATH)); 304 } 305 if (resolvedTmp.length() > 0) { 306 if (dEvent.getChild("uris", dEvent.getNamespace()) != null) { 307 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR) 308 .append(dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim()); 309 dEvent.removeChild("uris", dEvent.getNamespace()); 310 } 311 Element uriInstance = new Element("uris", dEvent.getNamespace()); 312 uriInstance.addContent(resolvedTmp.toString()); 313 dEvent.getContent().add(1, uriInstance); 314 } 315 dEvent.removeChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()); 316 } 317 } 318 319 return true; 320 } 321 322 public Map<String, ActionDependency> getMissingDependencies(CoordinatorActionBean coordAction) 323 throws CommandException, IOException, JDOMException { 324 325 Map<String, ActionDependency> dependenciesMap = null; 326 try { 327 dependenciesMap = getDependency(coordAction); 328 } 329 catch (URIHandlerException e) { 330 throw new IOException(e); 331 } 332 333 StringBuilder nonExistList = new StringBuilder(); 334 StringBuilder nonResolvedList = new StringBuilder(); 335 CoordCommandUtils.getResolvedList(getMissingDependencies(), nonExistList, nonResolvedList); 336 337 Set<String> missingSets = new HashSet<String>( 338 Arrays.asList(nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR))); 339 340 missingSets.addAll( 341 Arrays.asList(nonResolvedList.toString().split(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR))); 342 343 for (Iterator<Map.Entry<String, ActionDependency>> it = dependenciesMap.entrySet().iterator(); it.hasNext();) { 344 Map.Entry<String, ActionDependency> entry = it.next(); 345 ActionDependency dependency = entry.getValue(); 346 dependency.getMissingDependencies().retainAll(missingSets); 347 348 if (dependency.getUriTemplate() != null) { 349 for (int i = 0; i < dependency.getMissingDependencies().size(); i++) { 350 if (dependency.getMissingDependencies().get(i).trim().startsWith("${coord:")) { 351 dependency.getMissingDependencies().set(i, 352 dependency.getMissingDependencies().get(i) + " -> " + dependency.getUriTemplate()); 353 } 354 } 355 } 356 357 if (dependenciesMap.get(entry.getKey()).getMissingDependencies().isEmpty()) { 358 it.remove(); 359 } 360 361 } 362 return dependenciesMap; 363 } 364 365 @SuppressWarnings("unchecked") 366 private Map<String, ActionDependency> getDependency(CoordinatorActionBean coordAction) 367 throws JDOMException, URIHandlerException { 368 Map<String, ActionDependency> dependenciesMap = new HashMap<String, ActionDependency>(); 369 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 370 371 Element eAction = XmlUtils.parseXml(coordAction.getActionXml()); 372 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 373 List<Element> eDataEvents = inputList.getChildren("data-in", eAction.getNamespace()); 374 for (Element event : eDataEvents) { 375 Element uri = event.getChild("uris", event.getNamespace()); 376 ActionDependency dependency = new ActionDependency(); 377 if (uri != null) { 378 Element doneFlagElement = event.getChild("dataset", event.getNamespace()).getChild("done-flag", 379 event.getNamespace()); 380 String[] dataSets = uri.getText().split(CoordELFunctions.INSTANCE_SEPARATOR); 381 String doneFlag = CoordUtils.getDoneFlag(doneFlagElement); 382 383 for (String dataSet : dataSets) { 384 URIHandler uriHandler; 385 uriHandler = uriService.getURIHandler(dataSet); 386 dependency.getMissingDependencies().add(uriHandler.getURIWithDoneFlag(dataSet, doneFlag)); 387 } 388 } 389 if (event.getChildTextTrim(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, event.getNamespace()) != null) { 390 ActionDependency unResolvedDependency = getUnResolvedDependency(coordAction, event); 391 dependency.getMissingDependencies() 392 .addAll(unResolvedDependency.getMissingDependencies()); 393 dependency.setUriTemplate(unResolvedDependency.getUriTemplate()); 394 } 395 dependenciesMap.put(event.getAttributeValue("name"), dependency); 396 } 397 return dependenciesMap; 398 } 399 400 private ActionDependency getUnResolvedDependency(CoordinatorActionBean coordAction, Element event) 401 throws JDOMException, URIHandlerException { 402 String tmpUnresolved = event.getChildTextTrim(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, event.getNamespace()); 403 StringBuilder nonResolvedList = new StringBuilder(); 404 CoordCommandUtils.getResolvedList(getMissingDependencies(), new StringBuilder(), nonResolvedList); 405 ActionDependency dependency = new ActionDependency(); 406 407 if (nonResolvedList.length() > 0) { 408 Element eDataset = event.getChild("dataset", event.getNamespace()); 409 dependency.setUriTemplate(eDataset.getChild("uri-template", eDataset.getNamespace()).getTextTrim()); 410 dependency.getMissingDependencies().add(tmpUnresolved); 411 } 412 return dependency; 413 } 414 415 @Override 416 public String getFirstMissingDependency() { 417 StringBuilder nonExistList = new StringBuilder(); 418 String missingDependencies = getMissingDependencies(); 419 StringBuilder nonResolvedList = new StringBuilder(); 420 CoordCommandUtils.getResolvedList(missingDependencies, nonExistList, nonResolvedList); 421 String firstMissingDependency = ""; 422 if (nonExistList.length() > 0) { 423 firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0]; 424 } 425 else { 426 if (nonResolvedList.length() > 0) { 427 firstMissingDependency = nonResolvedList.toString().split(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR)[0]; 428 } 429 } 430 return firstMissingDependency; 431 } 432}