001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.fs.FileStatus; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.fs.PathFilter; 026import org.apache.hadoop.mapred.JobConf; 027import org.apache.oozie.client.OozieClient; 028import org.apache.oozie.workflow.WorkflowApp; 029import org.apache.oozie.workflow.WorkflowException; 030import org.apache.oozie.util.IOUtils; 031import org.apache.oozie.util.XConfiguration; 032import org.apache.oozie.util.XLog; 033import org.apache.oozie.ErrorCode; 034 035import java.io.IOException; 036import java.io.InputStreamReader; 037import java.io.Reader; 038import java.io.StringWriter; 039import java.net.URI; 040import java.net.URISyntaxException; 041import java.util.ArrayList; 042import java.util.Arrays; 043import java.util.Collection; 044import java.util.LinkedHashSet; 045import java.util.List; 046import java.util.Map; 047import java.util.Set; 048 049/** 050 * Service that provides application workflow definition reading from the path and creation of the proto configuration. 051 */ 052public abstract class WorkflowAppService implements Service { 053 054 public static final String CONF_PREFIX = Service.CONF_PREFIX + "WorkflowAppService."; 055 056 public static final String SYSTEM_LIB_PATH = CONF_PREFIX + "system.libpath"; 057 058 public static final String APP_LIB_PATH_LIST = "oozie.wf.application.lib"; 059 060 public static final String HADOOP_USER = "user.name"; 061 062 public static final String CONFG_MAX_WF_LENGTH = CONF_PREFIX + "WorkflowDefinitionMaxLength"; 063 064 public static final String OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE = "oozie.subworkflow.classpath.inheritance"; 065 066 public static final String OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE = "oozie.wf.subworkflow.classpath.inheritance"; 067 068 private Path systemLibPath; 069 private long maxWFLength; 070 private boolean oozieSubWfCPInheritance; 071 072 /** 073 * Initialize the workflow application service. 074 * 075 * @param services services instance. 076 */ 077 public void init(Services services) { 078 Configuration conf = services.getConf(); 079 080 String path = ConfigurationService.get(conf, SYSTEM_LIB_PATH); 081 if (path.trim().length() > 0) { 082 systemLibPath = new Path(path.trim()); 083 } 084 085 maxWFLength = conf.getInt(CONFG_MAX_WF_LENGTH, 100000); 086 087 oozieSubWfCPInheritance = conf.getBoolean(OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE, false); 088 } 089 090 /** 091 * Destroy the workflow application service. 092 */ 093 public void destroy() { 094 } 095 096 /** 097 * Return the public interface for workflow application service. 098 * 099 * @return {@link WorkflowAppService}. 100 */ 101 public Class<? extends Service> getInterface() { 102 return WorkflowAppService.class; 103 } 104 105 /** 106 * Read workflow definition. 107 * 108 * 109 * @param appPath application path. 110 * @param user user name. 111 * @return workflow definition. 112 * @throws WorkflowException thrown if the definition could not be read. 113 */ 114 protected String readDefinition(String appPath, String user, Configuration conf) 115 throws WorkflowException { 116 try { 117 URI uri = new URI(appPath); 118 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 119 JobConf jobConf = has.createJobConf(uri.getAuthority()); 120 FileSystem fs = has.createFileSystem(user, uri, jobConf); 121 122 // app path could be a directory 123 Path path = new Path(uri.getPath()); 124 if (!fs.isFile(path)) { 125 path = new Path(path, "workflow.xml"); 126 } 127 128 FileStatus fsStatus = fs.getFileStatus(path); 129 if (fsStatus.getLen() > this.maxWFLength) { 130 throw new WorkflowException(ErrorCode.E0736, fsStatus.getLen(), this.maxWFLength); 131 } 132 133 Reader reader = new InputStreamReader(fs.open(path)); 134 StringWriter writer = new StringWriter(); 135 IOUtils.copyCharStream(reader, writer); 136 return writer.toString(); 137 138 } 139 catch (WorkflowException wfe) { 140 throw wfe; 141 } 142 catch (IOException ex) { 143 throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex); 144 } 145 catch (URISyntaxException ex) { 146 throw new WorkflowException(ErrorCode.E0711, appPath, ex.getMessage(), ex); 147 } 148 catch (HadoopAccessorException ex) { 149 throw new WorkflowException(ex); 150 } 151 catch (Exception ex) { 152 throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex); 153 } 154 } 155 /** 156 * Create proto configuration. <p> The proto configuration includes the user,group and the paths which need to be 157 * added to distributed cache. These paths include .jar,.so and the resource file paths. 158 * 159 * @param jobConf job configuration. 160 * @param isWorkflowJob indicates if the job is a workflow job or not. 161 * @return proto configuration. 162 * @throws WorkflowException thrown if the proto action configuration could not be created. 163 */ 164 public XConfiguration createProtoActionConf(Configuration jobConf, boolean isWorkflowJob) 165 throws WorkflowException { 166 try { 167 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 168 URI uri = new URI(jobConf.get(OozieClient.APP_PATH)); 169 170 Configuration conf = has.createJobConf(uri.getAuthority()); 171 XConfiguration protoConf = new XConfiguration(); 172 173 174 String user = jobConf.get(OozieClient.USER_NAME); 175 conf.set(OozieClient.USER_NAME, user); 176 protoConf.set(OozieClient.USER_NAME, user); 177 178 FileSystem fs = has.createFileSystem(user, uri, conf); 179 180 Path appPath = new Path(uri); 181 XLog.getLog(getClass()).debug("jobConf.libPath = " + jobConf.get(OozieClient.LIBPATH)); 182 XLog.getLog(getClass()).debug("jobConf.appPath = " + appPath); 183 184 Collection<String> filePaths; 185 if (isWorkflowJob) { 186 // app path could be a directory 187 Path path = new Path(uri.getPath()); 188 if (!fs.isFile(path)) { 189 filePaths = getLibFiles(fs, new Path(appPath + "/lib")); 190 } else { 191 filePaths = getLibFiles(fs, new Path(appPath.getParent(), "lib")); 192 } 193 } 194 else { 195 filePaths = new LinkedHashSet<String>(); 196 } 197 198 String[] libPaths = jobConf.getStrings(OozieClient.LIBPATH); 199 if (libPaths != null && libPaths.length > 0) { 200 for (int i = 0; i < libPaths.length; i++) { 201 if (libPaths[i].trim().length() > 0) { 202 Path libPath = new Path(libPaths[i].trim()); 203 Collection<String> libFilePaths = getLibFiles(fs, libPath); 204 filePaths.addAll(libFilePaths); 205 } 206 } 207 } 208 209 // Check if a subworkflow should inherit the libs from the parent WF 210 // OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE has priority over OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE from oozie-site 211 // If OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE isn't specified, we use OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE 212 if (jobConf.getBoolean(OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE, oozieSubWfCPInheritance)) { 213 // Keep any libs from a parent workflow that might already be in APP_LIB_PATH_LIST and also remove duplicates 214 String[] parentFilePaths = jobConf.getStrings(APP_LIB_PATH_LIST); 215 if (parentFilePaths != null && parentFilePaths.length > 0) { 216 String[] filePathsNames = filePaths.toArray(new String[filePaths.size()]); 217 for (int i = 0; i < filePathsNames.length; i++) { 218 Path p = new Path(filePathsNames[i]); 219 filePathsNames[i] = p.getName(); 220 } 221 Arrays.sort(filePathsNames); 222 List<String> nonDuplicateParentFilePaths = new ArrayList<String>(); 223 for (String parentFilePath : parentFilePaths) { 224 Path p = new Path(parentFilePath); 225 if (Arrays.binarySearch(filePathsNames, p.getName()) < 0) { 226 nonDuplicateParentFilePaths.add(parentFilePath); 227 } 228 } 229 filePaths.addAll(nonDuplicateParentFilePaths); 230 } 231 } 232 233 protoConf.setStrings(APP_LIB_PATH_LIST, filePaths.toArray(new String[filePaths.size()])); 234 235 //Add all properties start with 'oozie.' 236 for (Map.Entry<String, String> entry : jobConf) { 237 if (entry.getKey().startsWith("oozie.")) { 238 String name = entry.getKey(); 239 String value = entry.getValue(); 240 // if property already exists, should not overwrite 241 if(protoConf.get(name) == null) { 242 protoConf.set(name, value); 243 } 244 } 245 } 246 return protoConf; 247 } 248 catch (IOException ex) { 249 throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex); 250 } 251 catch (URISyntaxException ex) { 252 throw new WorkflowException(ErrorCode.E0711, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex); 253 } 254 catch (HadoopAccessorException ex) { 255 throw new WorkflowException(ex); 256 } 257 catch (Exception ex) { 258 throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH), 259 ex.getMessage(), ex); 260 } 261 } 262 263 /** 264 * Parse workflow definition. 265 * 266 * @param jobConf 267 * @return 268 * @throws WorkflowException 269 */ 270 public abstract WorkflowApp parseDef(Configuration jobConf) throws WorkflowException; 271 272 /** 273 * Parse workflow definition along with config-default.xml config 274 * 275 * @param jobConf job configuration 276 * @param configDefault config from config-default.xml 277 * @return workflow application thrown if the workflow application could not 278 * be parsed 279 * @throws WorkflowException 280 */ 281 public abstract WorkflowApp parseDef(Configuration jobConf, Configuration configDefault) throws WorkflowException; 282 283 /** 284 * Parse workflow definition. 285 * @param wfXml workflow. 286 * @param jobConf job configuration 287 * @return workflow application. 288 * @throws WorkflowException thrown if the workflow application could not be parsed. 289 */ 290 public abstract WorkflowApp parseDef(String wfXml, Configuration jobConf) throws WorkflowException; 291 292 /** 293 * Get all library paths. 294 * 295 * @param fs file system object. 296 * @param libPath hdfs library path. 297 * @return list of paths. 298 * @throws IOException thrown if the lib paths could not be obtained. 299 */ 300 private Collection<String> getLibFiles(FileSystem fs, Path libPath) throws IOException { 301 Set<String> libPaths = new LinkedHashSet<String>(); 302 if (fs.exists(libPath)) { 303 FileStatus[] files = fs.listStatus(libPath, new NoPathFilter()); 304 305 for (FileStatus file : files) { 306 libPaths.add(file.getPath().toUri().toString()); 307 } 308 } 309 else { 310 XLog.getLog(getClass()).warn("libpath [{0}] does not exist", libPath); 311 } 312 return libPaths; 313 } 314 315 /* 316 * Filter class doing no filtering. 317 * We dont need define this class, but seems fs.listStatus() is not working properly without this. 318 * So providing this dummy no filtering Filter class. 319 */ 320 private class NoPathFilter implements PathFilter { 321 @Override 322 public boolean accept(Path path) { 323 return true; 324 } 325 } 326 327 /** 328 * Returns Oozie system libpath. 329 * 330 * @return Oozie system libpath (sharelib) in HDFS if present, otherwise it returns <code>NULL</code>. 331 */ 332 public Path getSystemLibPath() { 333 return systemLibPath; 334 } 335}