001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import org.apache.hadoop.conf.Configuration; 021import org.apache.hadoop.fs.FileStatus; 022import org.apache.hadoop.fs.FileSystem; 023import org.apache.hadoop.fs.Path; 024import org.apache.hadoop.fs.PathFilter; 025import org.apache.hadoop.mapred.JobConf; 026import org.apache.oozie.client.OozieClient; 027import org.apache.oozie.workflow.WorkflowApp; 028import org.apache.oozie.workflow.WorkflowException; 029import org.apache.oozie.util.IOUtils; 030import org.apache.oozie.util.XConfiguration; 031import org.apache.oozie.util.XLog; 032import org.apache.oozie.ErrorCode; 033 034import java.io.IOException; 035import java.io.InputStreamReader; 036import java.io.Reader; 037import java.io.StringWriter; 038import java.net.URI; 039import java.net.URISyntaxException; 040import java.util.ArrayList; 041import java.util.Arrays; 042import java.util.Collection; 043import java.util.LinkedHashSet; 044import java.util.List; 045import java.util.Map; 046import java.util.Set; 047 048/** 049 * Service that provides application workflow definition reading from the path and creation of the proto configuration. 050 */ 051public abstract class WorkflowAppService implements Service { 052 053 public static final String CONF_PREFIX = Service.CONF_PREFIX + "WorkflowAppService."; 054 055 public static final String SYSTEM_LIB_PATH = CONF_PREFIX + "system.libpath"; 056 057 public static final String APP_LIB_PATH_LIST = "oozie.wf.application.lib"; 058 059 public static final String HADOOP_USER = "user.name"; 060 061 public static final String CONFG_MAX_WF_LENGTH = CONF_PREFIX + "WorkflowDefinitionMaxLength"; 062 063 public static final String OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE = "oozie.subworkflow.classpath.inheritance"; 064 065 public static final String OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE = "oozie.wf.subworkflow.classpath.inheritance"; 066 067 private Path systemLibPath; 068 private long maxWFLength; 069 private boolean oozieSubWfCPInheritance; 070 071 /** 072 * Initialize the workflow application service. 073 * 074 * @param services services instance. 075 */ 076 public void init(Services services) { 077 Configuration conf = services.getConf(); 078 079 String path = conf.get(SYSTEM_LIB_PATH, " "); 080 if (path.trim().length() > 0) { 081 systemLibPath = new Path(path.trim()); 082 } 083 084 maxWFLength = conf.getInt(CONFG_MAX_WF_LENGTH, 100000); 085 086 oozieSubWfCPInheritance = conf.getBoolean(OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE, false); 087 } 088 089 /** 090 * Destroy the workflow application service. 091 */ 092 public void destroy() { 093 } 094 095 /** 096 * Return the public interface for workflow application service. 097 * 098 * @return {@link WorkflowAppService}. 099 */ 100 public Class<? extends Service> getInterface() { 101 return WorkflowAppService.class; 102 } 103 104 /** 105 * Read workflow definition. 106 * 107 * 108 * @param appPath application path. 109 * @param user user name. 110 * @return workflow definition. 111 * @throws WorkflowException thrown if the definition could not be read. 112 */ 113 protected String readDefinition(String appPath, String user, Configuration conf) 114 throws WorkflowException { 115 try { 116 URI uri = new URI(appPath); 117 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 118 JobConf jobConf = has.createJobConf(uri.getAuthority()); 119 FileSystem fs = has.createFileSystem(user, uri, jobConf); 120 121 // app path could be a directory 122 Path path = new Path(uri.getPath()); 123 if (!fs.isFile(path)) { 124 path = new Path(path, "workflow.xml"); 125 } 126 127 FileStatus fsStatus = fs.getFileStatus(path); 128 if (fsStatus.getLen() > this.maxWFLength) { 129 throw new WorkflowException(ErrorCode.E0736, fsStatus.getLen(), this.maxWFLength); 130 } 131 132 Reader reader = new InputStreamReader(fs.open(path)); 133 StringWriter writer = new StringWriter(); 134 IOUtils.copyCharStream(reader, writer); 135 return writer.toString(); 136 137 } 138 catch (WorkflowException wfe) { 139 throw wfe; 140 } 141 catch (IOException ex) { 142 throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex); 143 } 144 catch (URISyntaxException ex) { 145 throw new WorkflowException(ErrorCode.E0711, appPath, ex.getMessage(), ex); 146 } 147 catch (HadoopAccessorException ex) { 148 throw new WorkflowException(ex); 149 } 150 catch (Exception ex) { 151 throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex); 152 } 153 } 154 /** 155 * Create proto configuration. <p/> The proto configuration includes the user,group and the paths which need to be 156 * added to distributed cache. These paths include .jar,.so and the resource file paths. 157 * 158 * @param jobConf job configuration. 159 * @param isWorkflowJob indicates if the job is a workflow job or not. 160 * @return proto configuration. 161 * @throws WorkflowException thrown if the proto action configuration could not be created. 162 */ 163 public XConfiguration createProtoActionConf(Configuration jobConf, boolean isWorkflowJob) 164 throws WorkflowException { 165 try { 166 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 167 URI uri = new URI(jobConf.get(OozieClient.APP_PATH)); 168 169 Configuration conf = has.createJobConf(uri.getAuthority()); 170 XConfiguration protoConf = new XConfiguration(); 171 172 173 String user = jobConf.get(OozieClient.USER_NAME); 174 conf.set(OozieClient.USER_NAME, user); 175 protoConf.set(OozieClient.USER_NAME, user); 176 177 FileSystem fs = has.createFileSystem(user, uri, conf); 178 179 Path appPath = new Path(uri); 180 XLog.getLog(getClass()).debug("jobConf.libPath = " + jobConf.get(OozieClient.LIBPATH)); 181 XLog.getLog(getClass()).debug("jobConf.appPath = " + appPath); 182 183 Collection<String> filePaths; 184 if (isWorkflowJob) { 185 // app path could be a directory 186 Path path = new Path(uri.getPath()); 187 if (!fs.isFile(path)) { 188 filePaths = getLibFiles(fs, new Path(appPath + "/lib")); 189 } else { 190 filePaths = getLibFiles(fs, new Path(appPath.getParent(), "lib")); 191 } 192 } 193 else { 194 filePaths = new LinkedHashSet<String>(); 195 } 196 197 String[] libPaths = jobConf.getStrings(OozieClient.LIBPATH); 198 if (libPaths != null && libPaths.length > 0) { 199 for (int i = 0; i < libPaths.length; i++) { 200 if (libPaths[i].trim().length() > 0) { 201 Path libPath = new Path(libPaths[i].trim()); 202 Collection<String> libFilePaths = getLibFiles(fs, libPath); 203 filePaths.addAll(libFilePaths); 204 } 205 } 206 } 207 208 // Check if a subworkflow should inherit the libs from the parent WF 209 // OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE has priority over OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE from oozie-site 210 // If OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE isn't specified, we use OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE 211 if (jobConf.getBoolean(OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE, oozieSubWfCPInheritance)) { 212 // Keep any libs from a parent workflow that might already be in APP_LIB_PATH_LIST and also remove duplicates 213 String[] parentFilePaths = jobConf.getStrings(APP_LIB_PATH_LIST); 214 if (parentFilePaths != null && parentFilePaths.length > 0) { 215 String[] filePathsNames = filePaths.toArray(new String[filePaths.size()]); 216 for (int i = 0; i < filePathsNames.length; i++) { 217 Path p = new Path(filePathsNames[i]); 218 filePathsNames[i] = p.getName(); 219 } 220 Arrays.sort(filePathsNames); 221 List<String> nonDuplicateParentFilePaths = new ArrayList<String>(); 222 for (String parentFilePath : parentFilePaths) { 223 Path p = new Path(parentFilePath); 224 if (Arrays.binarySearch(filePathsNames, p.getName()) < 0) { 225 nonDuplicateParentFilePaths.add(parentFilePath); 226 } 227 } 228 filePaths.addAll(nonDuplicateParentFilePaths); 229 } 230 } 231 232 protoConf.setStrings(APP_LIB_PATH_LIST, filePaths.toArray(new String[filePaths.size()])); 233 234 //Add all properties start with 'oozie.' 235 for (Map.Entry<String, String> entry : jobConf) { 236 if (entry.getKey().startsWith("oozie.")) { 237 String name = entry.getKey(); 238 String value = entry.getValue(); 239 // if property already exists, should not overwrite 240 if(protoConf.get(name) == null) { 241 protoConf.set(name, value); 242 } 243 } 244 } 245 return protoConf; 246 } 247 catch (IOException ex) { 248 throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex); 249 } 250 catch (URISyntaxException ex) { 251 throw new WorkflowException(ErrorCode.E0711, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex); 252 } 253 catch (HadoopAccessorException ex) { 254 throw new WorkflowException(ex); 255 } 256 catch (Exception ex) { 257 throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH), 258 ex.getMessage(), ex); 259 } 260 } 261 262 /** 263 * Parse workflow definition. 264 * 265 * @param jobConf 266 * @return 267 * @throws WorkflowException 268 */ 269 public abstract WorkflowApp parseDef(Configuration jobConf) throws WorkflowException; 270 271 /** 272 * Parse workflow definition along with config-default.xml config 273 * 274 * @param jobConf job configuration 275 * @param configDefault config from config-default.xml 276 * @return workflow application thrown if the workflow application could not 277 * be parsed 278 * @throws WorkflowException 279 */ 280 public abstract WorkflowApp parseDef(Configuration jobConf, Configuration configDefault) throws WorkflowException; 281 282 /** 283 * Parse workflow definition. 284 * @param wfXml workflow. 285 * @param jobConf job configuration 286 * @return workflow application. 287 * @throws WorkflowException thrown if the workflow application could not be parsed. 288 */ 289 public abstract WorkflowApp parseDef(String wfXml, Configuration jobConf) throws WorkflowException; 290 291 /** 292 * Get all library paths. 293 * 294 * @param fs file system object. 295 * @param libPath hdfs library path. 296 * @return list of paths. 297 * @throws IOException thrown if the lib paths could not be obtained. 298 */ 299 private Collection<String> getLibFiles(FileSystem fs, Path libPath) throws IOException { 300 Set<String> libPaths = new LinkedHashSet<String>(); 301 if (fs.exists(libPath)) { 302 FileStatus[] files = fs.listStatus(libPath, new NoPathFilter()); 303 304 for (FileStatus file : files) { 305 libPaths.add(file.getPath().toUri().toString()); 306 } 307 } 308 else { 309 XLog.getLog(getClass()).warn("libpath [{0}] does not exist", libPath); 310 } 311 return libPaths; 312 } 313 314 /* 315 * Filter class doing no filtering. 316 * We dont need define this class, but seems fs.listStatus() is not working properly without this. 317 * So providing this dummy no filtering Filter class. 318 */ 319 private class NoPathFilter implements PathFilter { 320 @Override 321 public boolean accept(Path path) { 322 return true; 323 } 324 } 325 326 /** 327 * Returns Oozie system libpath. 328 * 329 * @return Oozie system libpath (sharelib) in HDFS if present, otherwise it returns <code>NULL</code>. 330 */ 331 public Path getSystemLibPath() { 332 return systemLibPath; 333 } 334}