This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.hadoop.fs.FileStatus;
022 import org.apache.hadoop.fs.FileSystem;
023 import org.apache.hadoop.fs.Path;
024 import org.apache.hadoop.fs.PathFilter;
025 import org.apache.hadoop.mapred.JobConf;
026 import org.apache.oozie.client.OozieClient;
027 import org.apache.oozie.workflow.WorkflowApp;
028 import org.apache.oozie.workflow.WorkflowException;
029 import org.apache.oozie.util.IOUtils;
030 import org.apache.oozie.util.XConfiguration;
031 import org.apache.oozie.util.XLog;
032 import org.apache.oozie.ErrorCode;
033
034 import java.io.IOException;
035 import java.io.InputStreamReader;
036 import java.io.Reader;
037 import java.io.StringWriter;
038 import java.net.URI;
039 import java.net.URISyntaxException;
040 import java.util.ArrayList;
041 import java.util.Arrays;
042 import java.util.Collection;
043 import java.util.LinkedHashSet;
044 import java.util.List;
045 import java.util.Map;
046 import java.util.Set;
047
048 /**
049 * Service that provides application workflow definition reading from the path and creation of the proto configuration.
050 */
051 public abstract class WorkflowAppService implements Service {
052
053 public static final String CONF_PREFIX = Service.CONF_PREFIX + "WorkflowAppService.";
054
055 public static final String SYSTEM_LIB_PATH = CONF_PREFIX + "system.libpath";
056
057 public static final String APP_LIB_PATH_LIST = "oozie.wf.application.lib";
058
059 public static final String HADOOP_USER = "user.name";
060
061 public static final String CONFG_MAX_WF_LENGTH = CONF_PREFIX + "WorkflowDefinitionMaxLength";
062
063 public static final String OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE = "oozie.subworkflow.classpath.inheritance";
064
065 public static final String OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE = "oozie.wf.subworkflow.classpath.inheritance";
066
067 private Path systemLibPath;
068 private long maxWFLength;
069 private boolean oozieSubWfCPInheritance;
070
071 /**
072 * Initialize the workflow application service.
073 *
074 * @param services services instance.
075 */
076 public void init(Services services) {
077 Configuration conf = services.getConf();
078
079 String path = conf.get(SYSTEM_LIB_PATH, " ");
080 if (path.trim().length() > 0) {
081 systemLibPath = new Path(path.trim());
082 }
083
084 maxWFLength = conf.getInt(CONFG_MAX_WF_LENGTH, 100000);
085
086 oozieSubWfCPInheritance = conf.getBoolean(OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE, true);
087 }
088
089 /**
090 * Destroy the workflow application service.
091 */
092 public void destroy() {
093 }
094
095 /**
096 * Return the public interface for workflow application service.
097 *
098 * @return {@link WorkflowAppService}.
099 */
100 public Class<? extends Service> getInterface() {
101 return WorkflowAppService.class;
102 }
103
104 /**
105 * Read workflow definition.
106 *
107 *
108 * @param appPath application path.
109 * @param user user name.
110 * @param autToken authentication token.
111 * @return workflow definition.
112 * @throws WorkflowException thrown if the definition could not be read.
113 */
114 protected String readDefinition(String appPath, String user, String autToken, Configuration conf)
115 throws WorkflowException {
116 try {
117 URI uri = new URI(appPath);
118 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
119 JobConf jobConf = has.createJobConf(uri.getAuthority());
120 FileSystem fs = has.createFileSystem(user, uri, jobConf);
121
122 // app path could be a directory
123 Path path = new Path(uri.getPath());
124 if (!fs.isFile(path)) {
125 path = new Path(path, "workflow.xml");
126 }
127
128 FileStatus fsStatus = fs.getFileStatus(path);
129 if (fsStatus.getLen() > this.maxWFLength) {
130 throw new WorkflowException(ErrorCode.E0736, fsStatus.getLen(), this.maxWFLength);
131 }
132
133 Reader reader = new InputStreamReader(fs.open(path));
134 StringWriter writer = new StringWriter();
135 IOUtils.copyCharStream(reader, writer);
136 return writer.toString();
137
138 }
139 catch (WorkflowException wfe) {
140 throw wfe;
141 }
142 catch (IOException ex) {
143 throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
144 }
145 catch (URISyntaxException ex) {
146 throw new WorkflowException(ErrorCode.E0711, appPath, ex.getMessage(), ex);
147 }
148 catch (HadoopAccessorException ex) {
149 throw new WorkflowException(ex);
150 }
151 catch (Exception ex) {
152 throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
153 }
154 }
155
156 /**
157 * Create proto configuration. <p/> The proto configuration includes the user,group and the paths which need to be
158 * added to distributed cache. These paths include .jar,.so and the resource file paths.
159 *
160 * @param jobConf job configuration.
161 * @param authToken authentication token.
162 * @param isWorkflowJob indicates if the job is a workflow job or not.
163 * @return proto configuration.
164 * @throws WorkflowException thrown if the proto action configuration could not be created.
165 */
166 public XConfiguration createProtoActionConf(Configuration jobConf, String authToken, boolean isWorkflowJob)
167 throws WorkflowException {
168 try {
169 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
170 URI uri = new URI(jobConf.get(OozieClient.APP_PATH));
171
172 Configuration conf = has.createJobConf(uri.getAuthority());
173
174 String user = jobConf.get(OozieClient.USER_NAME);
175 conf.set(OozieClient.USER_NAME, user);
176
177 FileSystem fs = has.createFileSystem(user, uri, conf);
178
179 Path appPath = new Path(uri);
180 XLog.getLog(getClass()).debug("jobConf.libPath = " + jobConf.get(OozieClient.LIBPATH));
181 XLog.getLog(getClass()).debug("jobConf.appPath = " + appPath);
182
183 Collection<String> filePaths;
184 if (isWorkflowJob) {
185 // app path could be a directory
186 Path path = new Path(uri.getPath());
187 if (!fs.isFile(path)) {
188 filePaths = getLibFiles(fs, new Path(appPath + "/lib"));
189 } else {
190 filePaths = getLibFiles(fs, new Path(appPath.getParent(), "lib"));
191 }
192 }
193 else {
194 filePaths = new LinkedHashSet<String>();
195 }
196
197 String[] libPaths = jobConf.getStrings(OozieClient.LIBPATH);
198 if (libPaths != null && libPaths.length > 0) {
199 for (int i = 0; i < libPaths.length; i++) {
200 if (libPaths[i].trim().length() > 0) {
201 Path libPath = new Path(libPaths[i].trim());
202 Collection<String> libFilePaths = getLibFiles(fs, libPath);
203 filePaths.addAll(libFilePaths);
204 }
205 }
206 }
207
208 // Check if a subworkflow should inherit the libs from the parent WF
209 // OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE has priority over OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE from oozie-site
210 // If OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE isn't specified, we use OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE
211 if (jobConf.getBoolean(OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE, oozieSubWfCPInheritance)) {
212 // Keep any libs from a parent workflow that might already be in APP_LIB_PATH_LIST and also remove duplicates
213 String[] parentFilePaths = jobConf.getStrings(APP_LIB_PATH_LIST);
214 if (parentFilePaths != null && parentFilePaths.length > 0) {
215 String[] filePathsNames = filePaths.toArray(new String[filePaths.size()]);
216 for (int i = 0; i < filePathsNames.length; i++) {
217 Path p = new Path(filePathsNames[i]);
218 filePathsNames[i] = p.getName();
219 }
220 Arrays.sort(filePathsNames);
221 List<String> nonDuplicateParentFilePaths = new ArrayList<String>();
222 for (String parentFilePath : parentFilePaths) {
223 Path p = new Path(parentFilePath);
224 if (Arrays.binarySearch(filePathsNames, p.getName()) < 0) {
225 nonDuplicateParentFilePaths.add(parentFilePath);
226 }
227 }
228 filePaths.addAll(nonDuplicateParentFilePaths);
229 }
230 }
231
232 conf.setStrings(APP_LIB_PATH_LIST, filePaths.toArray(new String[filePaths.size()]));
233
234 //Add all properties start with 'oozie.'
235 for (Map.Entry<String, String> entry : jobConf) {
236 if (entry.getKey().startsWith("oozie.")) {
237 String name = entry.getKey();
238 String value = entry.getValue();
239 // if property already exists, should not overwrite
240 if(conf.get(name) == null) {
241 conf.set(name, value);
242 }
243 }
244 }
245 XConfiguration retConf = new XConfiguration();
246 XConfiguration.copy(conf, retConf);
247 return retConf;
248 }
249 catch (IOException ex) {
250 throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
251 }
252 catch (URISyntaxException ex) {
253 throw new WorkflowException(ErrorCode.E0711, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
254 }
255 catch (HadoopAccessorException ex) {
256 throw new WorkflowException(ex);
257 }
258 catch (Exception ex) {
259 throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH),
260 ex.getMessage(), ex);
261 }
262 }
263
264 /**
265 * Parse workflow definition.
266 *
267 * @param jobConf job configuration.
268 * @param authToken authentication token.
269 * @return workflow application.
270 * @throws WorkflowException thrown if the workflow application could not be parsed.
271 */
272 public abstract WorkflowApp parseDef(Configuration jobConf, String authToken) throws WorkflowException;
273
274 /**
275 * Parse workflow definition.
276 * @param wfXml workflow.
277 * @param jobConf job configuration
278 * @return workflow application.
279 * @throws WorkflowException thrown if the workflow application could not be parsed.
280 */
281 public abstract WorkflowApp parseDef(String wfXml, Configuration jobConf) throws WorkflowException;
282
283 /**
284 * Get all library paths.
285 *
286 * @param fs file system object.
287 * @param libPath hdfs library path.
288 * @return list of paths.
289 * @throws IOException thrown if the lib paths could not be obtained.
290 */
291 private Collection<String> getLibFiles(FileSystem fs, Path libPath) throws IOException {
292 Set<String> libPaths = new LinkedHashSet<String>();
293 if (fs.exists(libPath)) {
294 FileStatus[] files = fs.listStatus(libPath, new NoPathFilter());
295
296 for (FileStatus file : files) {
297 libPaths.add(file.getPath().toUri().toString());
298 }
299 }
300 else {
301 XLog.getLog(getClass()).warn("libpath [{0}] does not exist", libPath);
302 }
303 return libPaths;
304 }
305
306 /*
307 * Filter class doing no filtering.
308 * We dont need define this class, but seems fs.listStatus() is not working properly without this.
309 * So providing this dummy no filtering Filter class.
310 */
311 private class NoPathFilter implements PathFilter {
312 @Override
313 public boolean accept(Path path) {
314 return true;
315 }
316 }
317
318 /**
319 * Returns Oozie system libpath.
320 *
321 * @return Oozie system libpath (sharelib) in HDFS if present, otherwise it returns <code>NULL</code>.
322 */
323 public Path getSystemLibPath() {
324 return systemLibPath;
325 }
326 }