001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.hadoop.fs.FileStatus; 022 import org.apache.hadoop.fs.FileSystem; 023 import org.apache.hadoop.fs.Path; 024 import org.apache.hadoop.fs.PathFilter; 025 import org.apache.hadoop.mapred.JobConf; 026 import org.apache.oozie.client.OozieClient; 027 import org.apache.oozie.workflow.WorkflowApp; 028 import org.apache.oozie.workflow.WorkflowException; 029 import org.apache.oozie.util.IOUtils; 030 import org.apache.oozie.util.XConfiguration; 031 import org.apache.oozie.util.XLog; 032 import org.apache.oozie.ErrorCode; 033 034 import java.io.IOException; 035 import java.io.InputStreamReader; 036 import java.io.Reader; 037 import java.io.StringWriter; 038 import java.net.URI; 039 import java.net.URISyntaxException; 040 import java.util.ArrayList; 041 import java.util.List; 042 import java.util.Map; 043 044 /** 045 * Service that provides application workflow definition reading from the path and creation of the proto configuration. 046 */ 047 public abstract class WorkflowAppService implements Service { 048 049 public static final String CONF_PREFIX = Service.CONF_PREFIX + "WorkflowAppService."; 050 051 public static final String SYSTEM_LIB_PATH = CONF_PREFIX + "system.libpath"; 052 053 public static final String APP_LIB_PATH_LIST = "oozie.wf.application.lib"; 054 055 public static final String HADOOP_USER = "user.name"; 056 057 public static final String CONFG_MAX_WF_LENGTH = CONF_PREFIX + "WorkflowDefinitionMaxLength"; 058 059 private Path systemLibPath; 060 private long maxWFLength; 061 062 /** 063 * Initialize the workflow application service. 064 * 065 * @param services services instance. 066 */ 067 public void init(Services services) { 068 Configuration conf = services.getConf(); 069 070 String path = conf.get(SYSTEM_LIB_PATH, " "); 071 if (path.trim().length() > 0) { 072 systemLibPath = new Path(path.trim()); 073 } 074 075 maxWFLength = conf.getInt(CONFG_MAX_WF_LENGTH, 100000); 076 } 077 078 /** 079 * Destroy the workflow application service. 080 */ 081 public void destroy() { 082 } 083 084 /** 085 * Return the public interface for workflow application service. 086 * 087 * @return {@link WorkflowAppService}. 088 */ 089 public Class<? extends Service> getInterface() { 090 return WorkflowAppService.class; 091 } 092 093 /** 094 * Read workflow definition. 095 * 096 * 097 * @param appPath application path. 098 * @param user user name. 099 * @param autToken authentication token. 100 * @return workflow definition. 101 * @throws WorkflowException thrown if the definition could not be read. 102 */ 103 protected String readDefinition(String appPath, String user, String autToken, Configuration conf) 104 throws WorkflowException { 105 try { 106 URI uri = new URI(appPath); 107 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 108 JobConf jobConf = has.createJobConf(uri.getAuthority()); 109 FileSystem fs = has.createFileSystem(user, uri, jobConf); 110 111 // app path could be a directory 112 Path path = new Path(uri.getPath()); 113 if (!fs.isFile(path)) { 114 path = new Path(path, "workflow.xml"); 115 } 116 117 FileStatus fsStatus = fs.getFileStatus(path); 118 if (fsStatus.getLen() > this.maxWFLength) { 119 throw new WorkflowException(ErrorCode.E0736, fsStatus.getLen(), this.maxWFLength); 120 } 121 122 Reader reader = new InputStreamReader(fs.open(path)); 123 StringWriter writer = new StringWriter(); 124 IOUtils.copyCharStream(reader, writer); 125 return writer.toString(); 126 127 } 128 catch (WorkflowException wfe) { 129 throw wfe; 130 } 131 catch (IOException ex) { 132 throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex); 133 } 134 catch (URISyntaxException ex) { 135 throw new WorkflowException(ErrorCode.E0711, appPath, ex.getMessage(), ex); 136 } 137 catch (HadoopAccessorException ex) { 138 throw new WorkflowException(ex); 139 } 140 catch (Exception ex) { 141 throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex); 142 } 143 } 144 145 /** 146 * Create proto configuration. <p/> The proto configuration includes the user,group and the paths which need to be 147 * added to distributed cache. These paths include .jar,.so and the resource file paths. 148 * 149 * @param jobConf job configuration. 150 * @param authToken authentication token. 151 * @param isWorkflowJob indicates if the job is a workflow job or not. 152 * @return proto configuration. 153 * @throws WorkflowException thrown if the proto action configuration could not be created. 154 */ 155 public XConfiguration createProtoActionConf(Configuration jobConf, String authToken, boolean isWorkflowJob) 156 throws WorkflowException { 157 try { 158 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 159 URI uri = new URI(jobConf.get(OozieClient.APP_PATH)); 160 161 Configuration conf = has.createJobConf(uri.getAuthority()); 162 163 String user = jobConf.get(OozieClient.USER_NAME); 164 conf.set(OozieClient.USER_NAME, user); 165 166 FileSystem fs = has.createFileSystem(user, uri, conf); 167 168 Path appPath = new Path(uri.getPath()); 169 XLog.getLog(getClass()).debug("jobConf.libPath = " + jobConf.get(OozieClient.LIBPATH)); 170 XLog.getLog(getClass()).debug("jobConf.appPath = " + appPath); 171 172 List<String> filePaths; 173 if (isWorkflowJob) { 174 // app path could be a directory 175 Path path = new Path(uri.getPath()); 176 if (!fs.isFile(path)) { 177 filePaths = getLibFiles(fs, new Path(appPath + "/lib")); 178 } else { 179 filePaths = getLibFiles(fs, new Path(appPath.getParent(), "lib")); 180 } 181 } 182 else { 183 filePaths = new ArrayList<String>(); 184 } 185 186 String[] libPaths = jobConf.getStrings(OozieClient.LIBPATH); 187 if (libPaths != null && libPaths.length > 0) { 188 for (int i = 0; i < libPaths.length; i++) { 189 if (libPaths[i].trim().length() > 0) { 190 Path libPath = new Path(libPaths[i].trim()); 191 List<String> libFilePaths = getLibFiles(fs, libPath); 192 filePaths.addAll(libFilePaths); 193 } 194 } 195 } 196 197 conf.setStrings(APP_LIB_PATH_LIST, filePaths.toArray(new String[filePaths.size()])); 198 199 //Add all properties start with 'oozie.' 200 for (Map.Entry<String, String> entry : jobConf) { 201 if (entry.getKey().startsWith("oozie.")) { 202 String name = entry.getKey(); 203 String value = entry.getValue(); 204 // Append application lib jars of both parent and child in 205 // subworkflow to APP_LIB_PATH_LIST 206 if ((conf.get(name) != null) && name.equals(APP_LIB_PATH_LIST)) { 207 value = value + "," + conf.get(name); 208 } 209 conf.set(name, value); 210 } 211 } 212 XConfiguration retConf = new XConfiguration(); 213 XConfiguration.copy(conf, retConf); 214 return retConf; 215 } 216 catch (IOException ex) { 217 throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex); 218 } 219 catch (URISyntaxException ex) { 220 throw new WorkflowException(ErrorCode.E0711, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex); 221 } 222 catch (HadoopAccessorException ex) { 223 throw new WorkflowException(ex); 224 } 225 catch (Exception ex) { 226 throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH), 227 ex.getMessage(), ex); 228 } 229 } 230 231 /** 232 * Parse workflow definition. 233 * 234 * @param jobConf job configuration. 235 * @param authToken authentication token. 236 * @return workflow application. 237 * @throws WorkflowException thrown if the workflow application could not be parsed. 238 */ 239 public abstract WorkflowApp parseDef(Configuration jobConf, String authToken) throws WorkflowException; 240 241 /** 242 * Parse workflow definition. 243 * @param wfXml workflow. 244 * @return workflow application. 245 * @throws WorkflowException thrown if the workflow application could not be parsed. 246 */ 247 public abstract WorkflowApp parseDef(String wfXml) throws WorkflowException; 248 249 /** 250 * Get all library paths. 251 * 252 * @param fs file system object. 253 * @param libPath hdfs library path. 254 * @return list of paths. 255 * @throws IOException thrown if the lib paths could not be obtained. 256 */ 257 private List<String> getLibFiles(FileSystem fs, Path libPath) throws IOException { 258 List<String> libPaths = new ArrayList<String>(); 259 if (fs.exists(libPath)) { 260 FileStatus[] files = fs.listStatus(libPath, new NoPathFilter()); 261 262 for (FileStatus file : files) { 263 libPaths.add(file.getPath().toUri().getPath().trim()); 264 } 265 } 266 else { 267 XLog.getLog(getClass()).warn("libpath [{0}] does not exists", libPath); 268 } 269 return libPaths; 270 } 271 272 /* 273 * Filter class doing no filtering. 274 * We dont need define this class, but seems fs.listStatus() is not working properly without this. 275 * So providing this dummy no filtering Filter class. 276 */ 277 private class NoPathFilter implements PathFilter { 278 @Override 279 public boolean accept(Path path) { 280 return true; 281 } 282 } 283 284 /** 285 * Returns Oozie system libpath. 286 * 287 * @return Oozie system libpath (sharelib) in HDFS if present, otherwise it returns <code>NULL</code>. 288 */ 289 public Path getSystemLibPath() { 290 return systemLibPath; 291 } 292 }