001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.coord.input.dependency; 020 021import java.io.IOException; 022import java.io.StringReader; 023import java.net.URISyntaxException; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.Collection; 027import java.util.Date; 028import java.util.List; 029 030import org.apache.commons.lang.StringUtils; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.security.AccessControlException; 033import org.apache.oozie.CoordinatorActionBean; 034import org.apache.oozie.ErrorCode; 035import org.apache.oozie.client.OozieClient; 036import org.apache.oozie.command.CommandException; 037import org.apache.oozie.command.coord.CoordCommandUtils; 038import org.apache.oozie.coord.CoordELConstants; 039import org.apache.oozie.coord.CoordELEvaluator; 040import org.apache.oozie.coord.CoordELFunctions; 041import org.apache.oozie.dependency.ActionDependency; 042import org.apache.oozie.dependency.DependencyChecker; 043import org.apache.oozie.dependency.URIHandlerException; 044import org.apache.oozie.util.DateUtils; 045import org.apache.oozie.util.ELEvaluator; 046import org.apache.oozie.util.ParamChecker; 047import org.apache.oozie.util.XConfiguration; 048import org.apache.oozie.util.XLog; 049import org.apache.oozie.util.XmlUtils; 050import org.jdom.Element; 051import org.jdom.JDOMException; 052 053/** 054 * Old approach where dependencies are stored as String. 055 * 056 */ 057public class CoordOldInputDependency implements CoordInputDependency { 058 059 private XLog log = XLog.getLog(getClass()); 060 061 protected transient String missingDependencies = ""; 062 063 public CoordOldInputDependency(String missingDependencies) { 064 this.missingDependencies = missingDependencies; 065 } 066 067 public CoordOldInputDependency() { 068 } 069 070 @Override 071 public void addInputInstanceList(String inputEventName, List<CoordInputInstance> inputInstanceList) { 072 appendToDependencies(inputInstanceList); 073 } 074 075 @Override 076 public String getMissingDependencies() { 077 return missingDependencies; 078 } 079 080 @Override 081 public boolean isDependencyMet() { 082 return StringUtils.isEmpty(missingDependencies); 083 } 084 085 @Override 086 public boolean isUnResolvedDependencyMet() { 087 return false; 088 } 089 090 @Override 091 public void setDependencyMet(boolean isDependencyMeet) { 092 if (isDependencyMeet) { 093 missingDependencies = ""; 094 } 095 096 } 097 098 @Override 099 public String serialize() throws IOException { 100 return missingDependencies; 101 } 102 103 @Override 104 public List<String> getMissingDependenciesAsList() { 105 return Arrays.asList(DependencyChecker.dependenciesAsArray(missingDependencies)); 106 } 107 108 @Override 109 public List<String> getAvailableDependenciesAsList() { 110 return new ArrayList<String>(); 111 } 112 113 @Override 114 public void setMissingDependencies(String missingDependencies) { 115 this.missingDependencies = missingDependencies; 116 117 } 118 119 public void appendToDependencies(List<CoordInputInstance> inputInstanceList) { 120 StringBuilder sb = new StringBuilder(missingDependencies); 121 boolean isFirst = true; 122 for (CoordInputInstance coordInputInstance : inputInstanceList) { 123 if (isFirst) { 124 if (!StringUtils.isEmpty(sb.toString())) { 125 sb.append(CoordELFunctions.INSTANCE_SEPARATOR); 126 } 127 } 128 else { 129 sb.append(CoordELFunctions.INSTANCE_SEPARATOR); 130 131 } 132 sb.append(coordInputInstance.getInputDataInstance()); 133 isFirst = false; 134 } 135 missingDependencies = sb.toString(); 136 } 137 138 @Override 139 public void addUnResolvedList(String name, String unresolvedDependencies) { 140 StringBuilder sb = new StringBuilder(missingDependencies); 141 sb.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(unresolvedDependencies); 142 missingDependencies = sb.toString(); 143 } 144 145 @Override 146 public List<String> getAvailableDependencies(String dataSet) { 147 return null; 148 } 149 150 @Override 151 public void addToAvailableDependencies(Collection<String> availableList) { 152 153 if (StringUtils.isEmpty(missingDependencies)) { 154 return; 155 } 156 List<String> missingDependenciesList = new ArrayList<String>(Arrays.asList((DependencyChecker 157 .dependenciesAsArray(missingDependencies)))); 158 missingDependenciesList.removeAll(availableList); 159 missingDependencies = DependencyChecker.dependenciesAsString(missingDependenciesList); 160 161 } 162 163 @Override 164 public boolean checkPullMissingDependencies(CoordinatorActionBean coordAction, StringBuilder existList, 165 StringBuilder nonExistList) throws IOException, JDOMException { 166 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); 167 Element eAction = XmlUtils.parseXml(coordAction.getActionXml()); 168 169 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 170 if (inputList != null) { 171 if (nonExistList.length() > 0) { 172 checkListOfPaths(coordAction, existList, nonExistList, actionConf); 173 } 174 return nonExistList.length() == 0; 175 } 176 return true; 177 } 178 179 public ActionDependency checkPushMissingDependencies(CoordinatorActionBean coordAction, 180 boolean registerForNotification) throws CommandException, IOException { 181 return DependencyChecker.checkForAvailability(getMissingDependenciesAsList(), new XConfiguration( 182 new StringReader(coordAction.getRunConf())), !registerForNotification); 183 } 184 185 private boolean checkListOfPaths(CoordinatorActionBean coordAction, StringBuilder existList, 186 StringBuilder nonExistList, Configuration conf) throws IOException { 187 188 String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR); 189 if (uriList[0] != null) { 190 log.info("[" + coordAction.getId() + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] 191 + " is Missing."); 192 } 193 194 nonExistList.delete(0, nonExistList.length()); 195 boolean allExists = true; 196 String existSeparator = "", nonExistSeparator = ""; 197 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 198 for (int i = 0; i < uriList.length; i++) { 199 if (allExists) { 200 allExists = pathExists(coordAction, uriList[i], conf, user); 201 log.info("[" + coordAction.getId() + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" 202 + allExists); 203 } 204 if (allExists) { 205 existList.append(existSeparator).append(uriList[i]); 206 existSeparator = CoordELFunctions.INSTANCE_SEPARATOR; 207 } 208 else { 209 nonExistList.append(nonExistSeparator).append(uriList[i]); 210 nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR; 211 } 212 } 213 return allExists; 214 } 215 216 public boolean pathExists(CoordinatorActionBean coordAction, String sPath, Configuration actionConf, String user) 217 throws IOException { 218 log.debug("checking for the file " + sPath); 219 try { 220 return CoordCommandUtils.pathExists(sPath, actionConf, user); 221 } 222 catch (URIHandlerException e) { 223 if (coordAction != null) { 224 coordAction.setErrorCode(e.getErrorCode().toString()); 225 coordAction.setErrorMessage(e.getMessage()); 226 } 227 if (e.getCause() != null && e.getCause() instanceof AccessControlException) { 228 throw (AccessControlException) e.getCause(); 229 } 230 else { 231 log.error(e); 232 throw new IOException(e); 233 } 234 } 235 catch (URISyntaxException e) { 236 if (coordAction != null) { 237 coordAction.setErrorCode(ErrorCode.E0906.toString()); 238 coordAction.setErrorMessage(e.getMessage()); 239 } 240 log.error(e); 241 throw new IOException(e); 242 } 243 } 244 245 public boolean isChangeInDependency(StringBuilder nonExistList, String missingDependencies, 246 StringBuilder nonResolvedList, boolean status) { 247 if ((!nonExistList.toString().equals(missingDependencies) || missingDependencies.isEmpty())) { 248 setMissingDependencies(nonExistList.toString()); 249 return true; 250 } 251 return false; 252 } 253 254 @SuppressWarnings("unchecked") 255 public boolean checkUnresolved(CoordinatorActionBean coordAction, Element eAction) 256 throws Exception { 257 Date nominalTime = DateUtils.parseDateOozieTZ(eAction.getAttributeValue("action-nominal-time")); 258 String actualTimeStr = eAction.getAttributeValue("action-actual-time"); 259 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 260 261 if(inputList==null){ 262 return true; 263 } 264 265 List<Element> eDataEvents = inputList.getChildren("data-in", eAction.getNamespace()); 266 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); 267 268 if (eDataEvents != null) { 269 Date actualTime = null; 270 if (actualTimeStr == null) { 271 actualTime = new Date(); 272 } 273 else { 274 actualTime = DateUtils.parseDateOozieTZ(actualTimeStr); 275 } 276 277 for (Element dEvent : eDataEvents) { 278 if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()) == null) { 279 continue; 280 } 281 ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, actionConf); 282 String unResolvedInstance = dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, 283 dEvent.getNamespace()).getTextTrim(); 284 String unresolvedList[] = unResolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR); 285 StringBuffer resolvedTmp = new StringBuffer(); 286 for (int i = 0; i < unresolvedList.length; i++) { 287 String returnData = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]); 288 Boolean isResolved = (Boolean) eval.getVariable(CoordELConstants.IS_RESOLVED); 289 if (isResolved == false) { 290 log.info("[" + coordAction.getId() + "] :: Cannot resolve : " + returnData); 291 return false; 292 } 293 if (resolvedTmp.length() > 0) { 294 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR); 295 } 296 resolvedTmp.append((String) eval.getVariable(CoordELConstants.RESOLVED_PATH)); 297 } 298 if (resolvedTmp.length() > 0) { 299 if (dEvent.getChild("uris", dEvent.getNamespace()) != null) { 300 resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append( 301 dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim()); 302 dEvent.removeChild("uris", dEvent.getNamespace()); 303 } 304 Element uriInstance = new Element("uris", dEvent.getNamespace()); 305 uriInstance.addContent(resolvedTmp.toString()); 306 dEvent.getContent().add(1, uriInstance); 307 } 308 dEvent.removeChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()); 309 } 310 } 311 312 return true; 313 } 314 315}