This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.hadoop.fs.FileStatus;
022 import org.apache.hadoop.fs.FileSystem;
023 import org.apache.hadoop.fs.Path;
024 import org.apache.hadoop.fs.PathFilter;
025 import org.apache.hadoop.mapred.JobConf;
026 import org.apache.oozie.client.OozieClient;
027 import org.apache.oozie.workflow.WorkflowApp;
028 import org.apache.oozie.workflow.WorkflowException;
029 import org.apache.oozie.util.IOUtils;
030 import org.apache.oozie.util.XConfiguration;
031 import org.apache.oozie.util.XLog;
032 import org.apache.oozie.ErrorCode;
033
034 import java.io.IOException;
035 import java.io.InputStreamReader;
036 import java.io.Reader;
037 import java.io.StringWriter;
038 import java.net.URI;
039 import java.net.URISyntaxException;
040 import java.util.ArrayList;
041 import java.util.Arrays;
042 import java.util.Collection;
043 import java.util.LinkedHashSet;
044 import java.util.List;
045 import java.util.Map;
046 import java.util.Set;
047
048 /**
049 * Service that provides application workflow definition reading from the path and creation of the proto configuration.
050 */
051 public abstract class WorkflowAppService implements Service {
052
053 public static final String CONF_PREFIX = Service.CONF_PREFIX + "WorkflowAppService.";
054
055 public static final String SYSTEM_LIB_PATH = CONF_PREFIX + "system.libpath";
056
057 public static final String APP_LIB_PATH_LIST = "oozie.wf.application.lib";
058
059 public static final String HADOOP_USER = "user.name";
060
061 public static final String CONFG_MAX_WF_LENGTH = CONF_PREFIX + "WorkflowDefinitionMaxLength";
062
063 public static final String OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE = "oozie.subworkflow.classpath.inheritance";
064
065 public static final String OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE = "oozie.wf.subworkflow.classpath.inheritance";
066
067 private Path systemLibPath;
068 private long maxWFLength;
069 private boolean oozieSubWfCPInheritance;
070
071 /**
072 * Initialize the workflow application service.
073 *
074 * @param services services instance.
075 */
076 public void init(Services services) {
077 Configuration conf = services.getConf();
078
079 String path = conf.get(SYSTEM_LIB_PATH, " ");
080 if (path.trim().length() > 0) {
081 systemLibPath = new Path(path.trim());
082 }
083
084 maxWFLength = conf.getInt(CONFG_MAX_WF_LENGTH, 100000);
085
086 oozieSubWfCPInheritance = conf.getBoolean(OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE, false);
087 }
088
089 /**
090 * Destroy the workflow application service.
091 */
092 public void destroy() {
093 }
094
095 /**
096 * Return the public interface for workflow application service.
097 *
098 * @return {@link WorkflowAppService}.
099 */
100 public Class<? extends Service> getInterface() {
101 return WorkflowAppService.class;
102 }
103
104 /**
105 * Read workflow definition.
106 *
107 *
108 * @param appPath application path.
109 * @param user user name.
110 * @return workflow definition.
111 * @throws WorkflowException thrown if the definition could not be read.
112 */
113 protected String readDefinition(String appPath, String user, Configuration conf)
114 throws WorkflowException {
115 try {
116 URI uri = new URI(appPath);
117 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
118 JobConf jobConf = has.createJobConf(uri.getAuthority());
119 FileSystem fs = has.createFileSystem(user, uri, jobConf);
120
121 // app path could be a directory
122 Path path = new Path(uri.getPath());
123 if (!fs.isFile(path)) {
124 path = new Path(path, "workflow.xml");
125 }
126
127 FileStatus fsStatus = fs.getFileStatus(path);
128 if (fsStatus.getLen() > this.maxWFLength) {
129 throw new WorkflowException(ErrorCode.E0736, fsStatus.getLen(), this.maxWFLength);
130 }
131
132 Reader reader = new InputStreamReader(fs.open(path));
133 StringWriter writer = new StringWriter();
134 IOUtils.copyCharStream(reader, writer);
135 return writer.toString();
136
137 }
138 catch (WorkflowException wfe) {
139 throw wfe;
140 }
141 catch (IOException ex) {
142 throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
143 }
144 catch (URISyntaxException ex) {
145 throw new WorkflowException(ErrorCode.E0711, appPath, ex.getMessage(), ex);
146 }
147 catch (HadoopAccessorException ex) {
148 throw new WorkflowException(ex);
149 }
150 catch (Exception ex) {
151 throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
152 }
153 }
154
155 /**
156 * Create proto configuration. <p/> The proto configuration includes the user,group and the paths which need to be
157 * added to distributed cache. These paths include .jar,.so and the resource file paths.
158 *
159 * @param jobConf job configuration.
160 * @param isWorkflowJob indicates if the job is a workflow job or not.
161 * @return proto configuration.
162 * @throws WorkflowException thrown if the proto action configuration could not be created.
163 */
164 public XConfiguration createProtoActionConf(Configuration jobConf, boolean isWorkflowJob)
165 throws WorkflowException {
166 try {
167 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
168 URI uri = new URI(jobConf.get(OozieClient.APP_PATH));
169
170 Configuration conf = has.createJobConf(uri.getAuthority());
171
172 String user = jobConf.get(OozieClient.USER_NAME);
173 conf.set(OozieClient.USER_NAME, user);
174
175 FileSystem fs = has.createFileSystem(user, uri, conf);
176
177 Path appPath = new Path(uri);
178 XLog.getLog(getClass()).debug("jobConf.libPath = " + jobConf.get(OozieClient.LIBPATH));
179 XLog.getLog(getClass()).debug("jobConf.appPath = " + appPath);
180
181 Collection<String> filePaths;
182 if (isWorkflowJob) {
183 // app path could be a directory
184 Path path = new Path(uri.getPath());
185 if (!fs.isFile(path)) {
186 filePaths = getLibFiles(fs, new Path(appPath + "/lib"));
187 } else {
188 filePaths = getLibFiles(fs, new Path(appPath.getParent(), "lib"));
189 }
190 }
191 else {
192 filePaths = new LinkedHashSet<String>();
193 }
194
195 String[] libPaths = jobConf.getStrings(OozieClient.LIBPATH);
196 if (libPaths != null && libPaths.length > 0) {
197 for (int i = 0; i < libPaths.length; i++) {
198 if (libPaths[i].trim().length() > 0) {
199 Path libPath = new Path(libPaths[i].trim());
200 Collection<String> libFilePaths = getLibFiles(fs, libPath);
201 filePaths.addAll(libFilePaths);
202 }
203 }
204 }
205
206 // Check if a subworkflow should inherit the libs from the parent WF
207 // OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE has priority over OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE from oozie-site
208 // If OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE isn't specified, we use OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE
209 if (jobConf.getBoolean(OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE, oozieSubWfCPInheritance)) {
210 // Keep any libs from a parent workflow that might already be in APP_LIB_PATH_LIST and also remove duplicates
211 String[] parentFilePaths = jobConf.getStrings(APP_LIB_PATH_LIST);
212 if (parentFilePaths != null && parentFilePaths.length > 0) {
213 String[] filePathsNames = filePaths.toArray(new String[filePaths.size()]);
214 for (int i = 0; i < filePathsNames.length; i++) {
215 Path p = new Path(filePathsNames[i]);
216 filePathsNames[i] = p.getName();
217 }
218 Arrays.sort(filePathsNames);
219 List<String> nonDuplicateParentFilePaths = new ArrayList<String>();
220 for (String parentFilePath : parentFilePaths) {
221 Path p = new Path(parentFilePath);
222 if (Arrays.binarySearch(filePathsNames, p.getName()) < 0) {
223 nonDuplicateParentFilePaths.add(parentFilePath);
224 }
225 }
226 filePaths.addAll(nonDuplicateParentFilePaths);
227 }
228 }
229
230 conf.setStrings(APP_LIB_PATH_LIST, filePaths.toArray(new String[filePaths.size()]));
231
232 //Add all properties start with 'oozie.'
233 for (Map.Entry<String, String> entry : jobConf) {
234 if (entry.getKey().startsWith("oozie.")) {
235 String name = entry.getKey();
236 String value = entry.getValue();
237 // if property already exists, should not overwrite
238 if(conf.get(name) == null) {
239 conf.set(name, value);
240 }
241 }
242 }
243 XConfiguration retConf = new XConfiguration();
244 XConfiguration.copy(conf, retConf);
245 return retConf;
246 }
247 catch (IOException ex) {
248 throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
249 }
250 catch (URISyntaxException ex) {
251 throw new WorkflowException(ErrorCode.E0711, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
252 }
253 catch (HadoopAccessorException ex) {
254 throw new WorkflowException(ex);
255 }
256 catch (Exception ex) {
257 throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH),
258 ex.getMessage(), ex);
259 }
260 }
261
262 /**
263 * Parse workflow definition.
264 *
265 * @param jobConf job configuration.
266 * @return workflow application.
267 * @throws WorkflowException thrown if the workflow application could not be parsed.
268 */
269 public abstract WorkflowApp parseDef(Configuration jobConf) throws WorkflowException;
270
271 /**
272 * Parse workflow definition.
273 * @param wfXml workflow.
274 * @param jobConf job configuration
275 * @return workflow application.
276 * @throws WorkflowException thrown if the workflow application could not be parsed.
277 */
278 public abstract WorkflowApp parseDef(String wfXml, Configuration jobConf) throws WorkflowException;
279
280 /**
281 * Get all library paths.
282 *
283 * @param fs file system object.
284 * @param libPath hdfs library path.
285 * @return list of paths.
286 * @throws IOException thrown if the lib paths could not be obtained.
287 */
288 private Collection<String> getLibFiles(FileSystem fs, Path libPath) throws IOException {
289 Set<String> libPaths = new LinkedHashSet<String>();
290 if (fs.exists(libPath)) {
291 FileStatus[] files = fs.listStatus(libPath, new NoPathFilter());
292
293 for (FileStatus file : files) {
294 libPaths.add(file.getPath().toUri().toString());
295 }
296 }
297 else {
298 XLog.getLog(getClass()).warn("libpath [{0}] does not exist", libPath);
299 }
300 return libPaths;
301 }
302
303 /*
304 * Filter class doing no filtering.
305 * We dont need define this class, but seems fs.listStatus() is not working properly without this.
306 * So providing this dummy no filtering Filter class.
307 */
308 private class NoPathFilter implements PathFilter {
309 @Override
310 public boolean accept(Path path) {
311 return true;
312 }
313 }
314
315 /**
316 * Returns Oozie system libpath.
317 *
318 * @return Oozie system libpath (sharelib) in HDFS if present, otherwise it returns <code>NULL</code>.
319 */
320 public Path getSystemLibPath() {
321 return systemLibPath;
322 }
323 }