001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.hadoop.fs.FileStatus; 022 import org.apache.hadoop.fs.FileSystem; 023 import org.apache.hadoop.fs.Path; 024 import org.apache.hadoop.fs.PathFilter; 025 import org.apache.oozie.client.OozieClient; 026 import org.apache.oozie.workflow.WorkflowApp; 027 import org.apache.oozie.workflow.WorkflowException; 028 import org.apache.oozie.util.IOUtils; 029 import org.apache.oozie.util.XConfiguration; 030 import org.apache.oozie.util.XLog; 031 import org.apache.oozie.ErrorCode; 032 033 import java.io.IOException; 034 import java.io.InputStreamReader; 035 import java.io.Reader; 036 import java.io.StringWriter; 037 import java.net.URI; 038 import java.net.URISyntaxException; 039 import java.util.ArrayList; 040 import java.util.List; 041 import java.util.Map; 042 043 /** 044 * Service that provides application workflow definition reading from the path and creation of the proto configuration. 045 */ 046 public abstract class WorkflowAppService implements Service { 047 048 public static final String CONF_PREFIX = Service.CONF_PREFIX + "WorkflowAppService."; 049 050 public static final String SYSTEM_LIB_PATH = CONF_PREFIX + "system.libpath"; 051 052 public static final String APP_LIB_PATH_LIST = "oozie.wf.application.lib"; 053 054 public static final String HADOOP_UGI = "hadoop.job.ugi"; 055 056 public static final String HADOOP_USER = "user.name"; 057 058 public static final String HADOOP_JT_KERBEROS_NAME = "mapreduce.jobtracker.kerberos.principal"; 059 060 public static final String HADOOP_NN_KERBEROS_NAME = "dfs.namenode.kerberos.principal"; 061 062 private Path systemLibPath; 063 064 /** 065 * Initialize the workflow application service. 066 * 067 * @param services services instance. 068 */ 069 public void init(Services services) { 070 String path = services.getConf().get(SYSTEM_LIB_PATH, " "); 071 if (path.trim().length() > 0) { 072 systemLibPath = new Path(path.trim()); 073 } 074 } 075 076 /** 077 * Destroy the workflow application service. 078 */ 079 public void destroy() { 080 } 081 082 /** 083 * Return the public interface for workflow application service. 084 * 085 * @return {@link WorkflowAppService}. 086 */ 087 public Class<? extends Service> getInterface() { 088 return WorkflowAppService.class; 089 } 090 091 /** 092 * Read workflow definition. 093 * 094 * @param appPath application path. 095 * @param user user name. 096 * @param group group name. 097 * @param autToken authentication token. 098 * @return workflow definition. 099 * @throws WorkflowException thrown if the definition could not be read. 100 */ 101 protected String readDefinition(String appPath, String user, String group, String autToken) 102 throws WorkflowException { 103 try { 104 URI uri = new URI(appPath); 105 FileSystem fs = Services.get().get(HadoopAccessorService.class). 106 createFileSystem(user, group, uri, new Configuration()); 107 108 // app path could be a directory 109 Path path = new Path(uri.getPath()); 110 if (!fs.isFile(path)) { 111 path = new Path(path, "workflow.xml"); 112 } 113 114 Reader reader = new InputStreamReader(fs.open(path)); 115 StringWriter writer = new StringWriter(); 116 IOUtils.copyCharStream(reader, writer); 117 return writer.toString(); 118 119 } 120 catch (IOException ex) { 121 throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex); 122 } 123 catch (URISyntaxException ex) { 124 throw new WorkflowException(ErrorCode.E0711, appPath, ex.getMessage(), ex); 125 } 126 catch (HadoopAccessorException ex) { 127 throw new WorkflowException(ex); 128 } 129 catch (Exception ex) { 130 throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex); 131 } 132 } 133 134 /** 135 * Create proto configuration. <p/> The proto configuration includes the user,group and the paths which need to be 136 * added to distributed cache. These paths include .jar,.so and the resource file paths. 137 * 138 * @param jobConf job configuration. 139 * @param authToken authentication token. 140 * @param isWorkflowJob indicates if the job is a workflow job or not. 141 * @return proto configuration. 142 * @throws WorkflowException thrown if the proto action configuration could not be created. 143 */ 144 public XConfiguration createProtoActionConf(Configuration jobConf, String authToken, boolean isWorkflowJob) 145 throws WorkflowException { 146 XConfiguration conf = new XConfiguration(); 147 try { 148 String user = jobConf.get(OozieClient.USER_NAME); 149 String group = jobConf.get(OozieClient.GROUP_NAME); 150 String hadoopUgi = user + "," + group; 151 152 conf.set(OozieClient.USER_NAME, user); 153 conf.set(OozieClient.GROUP_NAME, group); 154 conf.set(HADOOP_UGI, hadoopUgi); 155 156 conf.set(HADOOP_JT_KERBEROS_NAME, jobConf.get(HADOOP_JT_KERBEROS_NAME)); 157 conf.set(HADOOP_NN_KERBEROS_NAME, jobConf.get(HADOOP_NN_KERBEROS_NAME)); 158 159 URI uri = new URI(jobConf.get(OozieClient.APP_PATH)); 160 161 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, uri, conf); 162 163 Path appPath = new Path(uri.getPath()); 164 XLog.getLog(getClass()).debug("jobConf.libPath = " + jobConf.get(OozieClient.LIBPATH)); 165 XLog.getLog(getClass()).debug("jobConf.appPath = " + appPath); 166 167 List<String> filePaths; 168 if (isWorkflowJob) { 169 // app path could be a directory 170 Path path = new Path(uri.getPath()); 171 if (!fs.isFile(path)) { 172 filePaths = getLibFiles(fs, new Path(appPath + "/lib")); 173 } else { 174 filePaths = getLibFiles(fs, new Path(appPath.getParent(), "lib")); 175 } 176 } 177 else { 178 filePaths = new ArrayList<String>(); 179 } 180 181 String[] libPaths = jobConf.getStrings(OozieClient.LIBPATH); 182 if (libPaths != null && libPaths.length > 0) { 183 for (int i = 0; i < libPaths.length; i++) { 184 if (libPaths[i].trim().length() > 0) { 185 Path libPath = new Path(libPaths[i].trim()); 186 List<String> libFilePaths = getLibFiles(fs, libPath); 187 filePaths.addAll(libFilePaths); 188 } 189 } 190 } 191 192 if (systemLibPath != null && jobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) { 193 List<String> libFilePaths = getLibFiles(fs, systemLibPath); 194 filePaths.addAll(libFilePaths); 195 } 196 197 conf.setStrings(APP_LIB_PATH_LIST, filePaths.toArray(new String[filePaths.size()])); 198 199 //Add all properties start with 'oozie.' 200 for (Map.Entry<String, String> entry : jobConf) { 201 if (entry.getKey().startsWith("oozie.")) { 202 String name = entry.getKey(); 203 String value = entry.getValue(); 204 // Append application lib jars of both parent and child in 205 // subworkflow to APP_LIB_PATH_LIST 206 if ((conf.get(name) != null) && name.equals(APP_LIB_PATH_LIST)) { 207 value = value + "," + conf.get(name); 208 } 209 conf.set(name, value); 210 } 211 } 212 return conf; 213 } 214 catch (IOException ex) { 215 throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex); 216 } 217 catch (URISyntaxException ex) { 218 throw new WorkflowException(ErrorCode.E0711, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex); 219 } 220 catch (HadoopAccessorException ex) { 221 throw new WorkflowException(ex); 222 } 223 catch (Exception ex) { 224 throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH), 225 ex.getMessage(), ex); 226 } 227 } 228 229 /** 230 * Parse workflow definition. 231 * 232 * @param jobConf job configuration. 233 * @param authToken authentication token. 234 * @return workflow application. 235 * @throws WorkflowException thrown if the workflow application could not be parsed. 236 */ 237 public abstract WorkflowApp parseDef(Configuration jobConf, String authToken) throws WorkflowException; 238 239 /** 240 * Parse workflow definition. 241 * @param wfXml workflow. 242 * @return workflow application. 243 * @throws WorkflowException thrown if the workflow application could not be parsed. 244 */ 245 public abstract WorkflowApp parseDef(String wfXml) throws WorkflowException; 246 247 /** 248 * Get all library paths. 249 * 250 * @param fs file system object. 251 * @param libPath hdfs library path. 252 * @return list of paths. 253 * @throws IOException thrown if the lib paths could not be obtained. 254 */ 255 private List<String> getLibFiles(FileSystem fs, Path libPath) throws IOException { 256 List<String> libPaths = new ArrayList<String>(); 257 if (fs.exists(libPath)) { 258 FileStatus[] files = fs.listStatus(libPath, new NoPathFilter()); 259 260 for (FileStatus file : files) { 261 libPaths.add(file.getPath().toUri().getPath().trim()); 262 } 263 } 264 else { 265 XLog.getLog(getClass()).warn("libpath [{0}] does not exists", libPath); 266 } 267 return libPaths; 268 } 269 270 /* 271 * Filter class doing no filtering. 272 * We dont need define this class, but seems fs.listStatus() is not working properly without this. 273 * So providing this dummy no filtering Filter class. 274 */ 275 private class NoPathFilter implements PathFilter { 276 @Override 277 public boolean accept(Path path) { 278 return true; 279 } 280 } 281 }