001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.hadoop.fs.FileStatus; 022 import org.apache.hadoop.fs.FileSystem; 023 import org.apache.hadoop.fs.Path; 024 import org.apache.hadoop.fs.PathFilter; 025 import org.apache.hadoop.mapred.JobConf; 026 import org.apache.oozie.client.OozieClient; 027 import org.apache.oozie.workflow.WorkflowApp; 028 import org.apache.oozie.workflow.WorkflowException; 029 import org.apache.oozie.util.IOUtils; 030 import org.apache.oozie.util.XConfiguration; 031 import org.apache.oozie.util.XLog; 032 import org.apache.oozie.ErrorCode; 033 034 import java.io.IOException; 035 import java.io.InputStreamReader; 036 import java.io.Reader; 037 import java.io.StringWriter; 038 import java.net.URI; 039 import java.net.URISyntaxException; 040 import java.util.Collection; 041 import java.util.LinkedHashSet; 042 import java.util.Map; 043 import java.util.Set; 044 045 /** 046 * Service that provides application workflow definition reading from the path and creation of the proto configuration. 047 */ 048 public abstract class WorkflowAppService implements Service { 049 050 public static final String CONF_PREFIX = Service.CONF_PREFIX + "WorkflowAppService."; 051 052 public static final String SYSTEM_LIB_PATH = CONF_PREFIX + "system.libpath"; 053 054 public static final String APP_LIB_PATH_LIST = "oozie.wf.application.lib"; 055 056 public static final String HADOOP_USER = "user.name"; 057 058 public static final String CONFG_MAX_WF_LENGTH = CONF_PREFIX + "WorkflowDefinitionMaxLength"; 059 060 private Path systemLibPath; 061 private long maxWFLength; 062 063 /** 064 * Initialize the workflow application service. 065 * 066 * @param services services instance. 067 */ 068 public void init(Services services) { 069 Configuration conf = services.getConf(); 070 071 String path = conf.get(SYSTEM_LIB_PATH, " "); 072 if (path.trim().length() > 0) { 073 systemLibPath = new Path(path.trim()); 074 } 075 076 maxWFLength = conf.getInt(CONFG_MAX_WF_LENGTH, 100000); 077 } 078 079 /** 080 * Destroy the workflow application service. 081 */ 082 public void destroy() { 083 } 084 085 /** 086 * Return the public interface for workflow application service. 087 * 088 * @return {@link WorkflowAppService}. 089 */ 090 public Class<? extends Service> getInterface() { 091 return WorkflowAppService.class; 092 } 093 094 /** 095 * Read workflow definition. 096 * 097 * 098 * @param appPath application path. 099 * @param user user name. 100 * @param autToken authentication token. 101 * @return workflow definition. 102 * @throws WorkflowException thrown if the definition could not be read. 103 */ 104 protected String readDefinition(String appPath, String user, String autToken, Configuration conf) 105 throws WorkflowException { 106 try { 107 URI uri = new URI(appPath); 108 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 109 JobConf jobConf = has.createJobConf(uri.getAuthority()); 110 FileSystem fs = has.createFileSystem(user, uri, jobConf); 111 112 // app path could be a directory 113 Path path = new Path(uri.getPath()); 114 if (!fs.isFile(path)) { 115 path = new Path(path, "workflow.xml"); 116 } 117 118 FileStatus fsStatus = fs.getFileStatus(path); 119 if (fsStatus.getLen() > this.maxWFLength) { 120 throw new WorkflowException(ErrorCode.E0736, fsStatus.getLen(), this.maxWFLength); 121 } 122 123 Reader reader = new InputStreamReader(fs.open(path)); 124 StringWriter writer = new StringWriter(); 125 IOUtils.copyCharStream(reader, writer); 126 return writer.toString(); 127 128 } 129 catch (WorkflowException wfe) { 130 throw wfe; 131 } 132 catch (IOException ex) { 133 throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex); 134 } 135 catch (URISyntaxException ex) { 136 throw new WorkflowException(ErrorCode.E0711, appPath, ex.getMessage(), ex); 137 } 138 catch (HadoopAccessorException ex) { 139 throw new WorkflowException(ex); 140 } 141 catch (Exception ex) { 142 throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex); 143 } 144 } 145 146 /** 147 * Create proto configuration. <p/> The proto configuration includes the user,group and the paths which need to be 148 * added to distributed cache. These paths include .jar,.so and the resource file paths. 149 * 150 * @param jobConf job configuration. 151 * @param authToken authentication token. 152 * @param isWorkflowJob indicates if the job is a workflow job or not. 153 * @return proto configuration. 154 * @throws WorkflowException thrown if the proto action configuration could not be created. 155 */ 156 public XConfiguration createProtoActionConf(Configuration jobConf, String authToken, boolean isWorkflowJob) 157 throws WorkflowException { 158 try { 159 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 160 URI uri = new URI(jobConf.get(OozieClient.APP_PATH)); 161 162 Configuration conf = has.createJobConf(uri.getAuthority()); 163 164 String user = jobConf.get(OozieClient.USER_NAME); 165 conf.set(OozieClient.USER_NAME, user); 166 167 FileSystem fs = has.createFileSystem(user, uri, conf); 168 169 Path appPath = new Path(uri); 170 XLog.getLog(getClass()).debug("jobConf.libPath = " + jobConf.get(OozieClient.LIBPATH)); 171 XLog.getLog(getClass()).debug("jobConf.appPath = " + appPath); 172 173 Collection<String> filePaths; 174 if (isWorkflowJob) { 175 // app path could be a directory 176 Path path = new Path(uri.getPath()); 177 if (!fs.isFile(path)) { 178 filePaths = getLibFiles(fs, new Path(appPath + "/lib")); 179 } else { 180 filePaths = getLibFiles(fs, new Path(appPath.getParent(), "lib")); 181 } 182 } 183 else { 184 filePaths = new LinkedHashSet<String>(); 185 } 186 187 String[] libPaths = jobConf.getStrings(OozieClient.LIBPATH); 188 if (libPaths != null && libPaths.length > 0) { 189 for (int i = 0; i < libPaths.length; i++) { 190 if (libPaths[i].trim().length() > 0) { 191 Path libPath = new Path(libPaths[i].trim()); 192 Collection<String> libFilePaths = getLibFiles(fs, libPath); 193 filePaths.addAll(libFilePaths); 194 } 195 } 196 } 197 198 conf.setStrings(APP_LIB_PATH_LIST, filePaths.toArray(new String[filePaths.size()])); 199 200 //Add all properties start with 'oozie.' 201 for (Map.Entry<String, String> entry : jobConf) { 202 if (entry.getKey().startsWith("oozie.")) { 203 String name = entry.getKey(); 204 String value = entry.getValue(); 205 // if property already exists, should not overwrite 206 if(conf.get(name) == null) { 207 conf.set(name, value); 208 } 209 } 210 } 211 XConfiguration retConf = new XConfiguration(); 212 XConfiguration.copy(conf, retConf); 213 return retConf; 214 } 215 catch (IOException ex) { 216 throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex); 217 } 218 catch (URISyntaxException ex) { 219 throw new WorkflowException(ErrorCode.E0711, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex); 220 } 221 catch (HadoopAccessorException ex) { 222 throw new WorkflowException(ex); 223 } 224 catch (Exception ex) { 225 throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH), 226 ex.getMessage(), ex); 227 } 228 } 229 230 /** 231 * Parse workflow definition. 232 * 233 * @param jobConf job configuration. 234 * @param authToken authentication token. 235 * @return workflow application. 236 * @throws WorkflowException thrown if the workflow application could not be parsed. 237 */ 238 public abstract WorkflowApp parseDef(Configuration jobConf, String authToken) throws WorkflowException; 239 240 /** 241 * Parse workflow definition. 242 * @param wfXml workflow. 243 * @param jobConf job configuration 244 * @return workflow application. 245 * @throws WorkflowException thrown if the workflow application could not be parsed. 246 */ 247 public abstract WorkflowApp parseDef(String wfXml, Configuration jobConf) throws WorkflowException; 248 249 /** 250 * Get all library paths. 251 * 252 * @param fs file system object. 253 * @param libPath hdfs library path. 254 * @return list of paths. 255 * @throws IOException thrown if the lib paths could not be obtained. 256 */ 257 private Collection<String> getLibFiles(FileSystem fs, Path libPath) throws IOException { 258 Set<String> libPaths = new LinkedHashSet<String>(); 259 if (fs.exists(libPath)) { 260 FileStatus[] files = fs.listStatus(libPath, new NoPathFilter()); 261 262 for (FileStatus file : files) { 263 libPaths.add(file.getPath().toUri().toString()); 264 } 265 } 266 else { 267 XLog.getLog(getClass()).warn("libpath [{0}] does not exist", libPath); 268 } 269 return libPaths; 270 } 271 272 /* 273 * Filter class doing no filtering. 274 * We dont need define this class, but seems fs.listStatus() is not working properly without this. 275 * So providing this dummy no filtering Filter class. 276 */ 277 private class NoPathFilter implements PathFilter { 278 @Override 279 public boolean accept(Path path) { 280 return true; 281 } 282 } 283 284 /** 285 * Returns Oozie system libpath. 286 * 287 * @return Oozie system libpath (sharelib) in HDFS if present, otherwise it returns <code>NULL</code>. 288 */ 289 public Path getSystemLibPath() { 290 return systemLibPath; 291 } 292 }