This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.hadoop.fs.FileStatus;
022 import org.apache.hadoop.fs.FileSystem;
023 import org.apache.hadoop.fs.Path;
024 import org.apache.hadoop.fs.PathFilter;
025 import org.apache.hadoop.mapred.JobConf;
026 import org.apache.oozie.client.OozieClient;
027 import org.apache.oozie.workflow.WorkflowApp;
028 import org.apache.oozie.workflow.WorkflowException;
029 import org.apache.oozie.util.IOUtils;
030 import org.apache.oozie.util.XConfiguration;
031 import org.apache.oozie.util.XLog;
032 import org.apache.oozie.ErrorCode;
033
034 import java.io.IOException;
035 import java.io.InputStreamReader;
036 import java.io.Reader;
037 import java.io.StringWriter;
038 import java.net.URI;
039 import java.net.URISyntaxException;
040 import java.util.Collection;
041 import java.util.LinkedHashSet;
042 import java.util.Map;
043 import java.util.Set;
044
045 /**
046 * Service that provides application workflow definition reading from the path and creation of the proto configuration.
047 */
048 public abstract class WorkflowAppService implements Service {
049
050 public static final String CONF_PREFIX = Service.CONF_PREFIX + "WorkflowAppService.";
051
052 public static final String SYSTEM_LIB_PATH = CONF_PREFIX + "system.libpath";
053
054 public static final String APP_LIB_PATH_LIST = "oozie.wf.application.lib";
055
056 public static final String HADOOP_USER = "user.name";
057
058 public static final String CONFG_MAX_WF_LENGTH = CONF_PREFIX + "WorkflowDefinitionMaxLength";
059
060 private Path systemLibPath;
061 private long maxWFLength;
062
063 /**
064 * Initialize the workflow application service.
065 *
066 * @param services services instance.
067 */
068 public void init(Services services) {
069 Configuration conf = services.getConf();
070
071 String path = conf.get(SYSTEM_LIB_PATH, " ");
072 if (path.trim().length() > 0) {
073 systemLibPath = new Path(path.trim());
074 }
075
076 maxWFLength = conf.getInt(CONFG_MAX_WF_LENGTH, 100000);
077 }
078
079 /**
080 * Destroy the workflow application service.
081 */
082 public void destroy() {
083 }
084
085 /**
086 * Return the public interface for workflow application service.
087 *
088 * @return {@link WorkflowAppService}.
089 */
090 public Class<? extends Service> getInterface() {
091 return WorkflowAppService.class;
092 }
093
094 /**
095 * Read workflow definition.
096 *
097 *
098 * @param appPath application path.
099 * @param user user name.
100 * @param autToken authentication token.
101 * @return workflow definition.
102 * @throws WorkflowException thrown if the definition could not be read.
103 */
104 protected String readDefinition(String appPath, String user, String autToken, Configuration conf)
105 throws WorkflowException {
106 try {
107 URI uri = new URI(appPath);
108 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
109 JobConf jobConf = has.createJobConf(uri.getAuthority());
110 FileSystem fs = has.createFileSystem(user, uri, jobConf);
111
112 // app path could be a directory
113 Path path = new Path(uri.getPath());
114 if (!fs.isFile(path)) {
115 path = new Path(path, "workflow.xml");
116 }
117
118 FileStatus fsStatus = fs.getFileStatus(path);
119 if (fsStatus.getLen() > this.maxWFLength) {
120 throw new WorkflowException(ErrorCode.E0736, fsStatus.getLen(), this.maxWFLength);
121 }
122
123 Reader reader = new InputStreamReader(fs.open(path));
124 StringWriter writer = new StringWriter();
125 IOUtils.copyCharStream(reader, writer);
126 return writer.toString();
127
128 }
129 catch (WorkflowException wfe) {
130 throw wfe;
131 }
132 catch (IOException ex) {
133 throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
134 }
135 catch (URISyntaxException ex) {
136 throw new WorkflowException(ErrorCode.E0711, appPath, ex.getMessage(), ex);
137 }
138 catch (HadoopAccessorException ex) {
139 throw new WorkflowException(ex);
140 }
141 catch (Exception ex) {
142 throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
143 }
144 }
145
146 /**
147 * Create proto configuration. <p/> The proto configuration includes the user,group and the paths which need to be
148 * added to distributed cache. These paths include .jar,.so and the resource file paths.
149 *
150 * @param jobConf job configuration.
151 * @param authToken authentication token.
152 * @param isWorkflowJob indicates if the job is a workflow job or not.
153 * @return proto configuration.
154 * @throws WorkflowException thrown if the proto action configuration could not be created.
155 */
156 public XConfiguration createProtoActionConf(Configuration jobConf, String authToken, boolean isWorkflowJob)
157 throws WorkflowException {
158 try {
159 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
160 URI uri = new URI(jobConf.get(OozieClient.APP_PATH));
161
162 Configuration conf = has.createJobConf(uri.getAuthority());
163
164 String user = jobConf.get(OozieClient.USER_NAME);
165 conf.set(OozieClient.USER_NAME, user);
166
167 FileSystem fs = has.createFileSystem(user, uri, conf);
168
169 Path appPath = new Path(uri);
170 XLog.getLog(getClass()).debug("jobConf.libPath = " + jobConf.get(OozieClient.LIBPATH));
171 XLog.getLog(getClass()).debug("jobConf.appPath = " + appPath);
172
173 Collection<String> filePaths;
174 if (isWorkflowJob) {
175 // app path could be a directory
176 Path path = new Path(uri.getPath());
177 if (!fs.isFile(path)) {
178 filePaths = getLibFiles(fs, new Path(appPath + "/lib"));
179 } else {
180 filePaths = getLibFiles(fs, new Path(appPath.getParent(), "lib"));
181 }
182 }
183 else {
184 filePaths = new LinkedHashSet<String>();
185 }
186
187 String[] libPaths = jobConf.getStrings(OozieClient.LIBPATH);
188 if (libPaths != null && libPaths.length > 0) {
189 for (int i = 0; i < libPaths.length; i++) {
190 if (libPaths[i].trim().length() > 0) {
191 Path libPath = new Path(libPaths[i].trim());
192 Collection<String> libFilePaths = getLibFiles(fs, libPath);
193 filePaths.addAll(libFilePaths);
194 }
195 }
196 }
197
198 conf.setStrings(APP_LIB_PATH_LIST, filePaths.toArray(new String[filePaths.size()]));
199
200 //Add all properties start with 'oozie.'
201 for (Map.Entry<String, String> entry : jobConf) {
202 if (entry.getKey().startsWith("oozie.")) {
203 String name = entry.getKey();
204 String value = entry.getValue();
205 // if property already exists, should not overwrite
206 if(conf.get(name) == null) {
207 conf.set(name, value);
208 }
209 }
210 }
211 XConfiguration retConf = new XConfiguration();
212 XConfiguration.copy(conf, retConf);
213 return retConf;
214 }
215 catch (IOException ex) {
216 throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
217 }
218 catch (URISyntaxException ex) {
219 throw new WorkflowException(ErrorCode.E0711, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
220 }
221 catch (HadoopAccessorException ex) {
222 throw new WorkflowException(ex);
223 }
224 catch (Exception ex) {
225 throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH),
226 ex.getMessage(), ex);
227 }
228 }
229
230 /**
231 * Parse workflow definition.
232 *
233 * @param jobConf job configuration.
234 * @param authToken authentication token.
235 * @return workflow application.
236 * @throws WorkflowException thrown if the workflow application could not be parsed.
237 */
238 public abstract WorkflowApp parseDef(Configuration jobConf, String authToken) throws WorkflowException;
239
240 /**
241 * Parse workflow definition.
242 * @param wfXml workflow.
243 * @param jobConf job configuration
244 * @return workflow application.
245 * @throws WorkflowException thrown if the workflow application could not be parsed.
246 */
247 public abstract WorkflowApp parseDef(String wfXml, Configuration jobConf) throws WorkflowException;
248
249 /**
250 * Get all library paths.
251 *
252 * @param fs file system object.
253 * @param libPath hdfs library path.
254 * @return list of paths.
255 * @throws IOException thrown if the lib paths could not be obtained.
256 */
257 private Collection<String> getLibFiles(FileSystem fs, Path libPath) throws IOException {
258 Set<String> libPaths = new LinkedHashSet<String>();
259 if (fs.exists(libPath)) {
260 FileStatus[] files = fs.listStatus(libPath, new NoPathFilter());
261
262 for (FileStatus file : files) {
263 libPaths.add(file.getPath().toUri().toString());
264 }
265 }
266 else {
267 XLog.getLog(getClass()).warn("libpath [{0}] does not exist", libPath);
268 }
269 return libPaths;
270 }
271
272 /*
273 * Filter class doing no filtering.
274 * We dont need define this class, but seems fs.listStatus() is not working properly without this.
275 * So providing this dummy no filtering Filter class.
276 */
277 private class NoPathFilter implements PathFilter {
278 @Override
279 public boolean accept(Path path) {
280 return true;
281 }
282 }
283
284 /**
285 * Returns Oozie system libpath.
286 *
287 * @return Oozie system libpath (sharelib) in HDFS if present, otherwise it returns <code>NULL</code>.
288 */
289 public Path getSystemLibPath() {
290 return systemLibPath;
291 }
292 }