This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.hadoop.fs.FileStatus;
022 import org.apache.hadoop.fs.FileSystem;
023 import org.apache.hadoop.fs.Path;
024 import org.apache.hadoop.fs.PathFilter;
025 import org.apache.hadoop.mapred.JobConf;
026 import org.apache.oozie.client.OozieClient;
027 import org.apache.oozie.workflow.WorkflowApp;
028 import org.apache.oozie.workflow.WorkflowException;
029 import org.apache.oozie.util.IOUtils;
030 import org.apache.oozie.util.XConfiguration;
031 import org.apache.oozie.util.XLog;
032 import org.apache.oozie.ErrorCode;
033
034 import java.io.IOException;
035 import java.io.InputStreamReader;
036 import java.io.Reader;
037 import java.io.StringWriter;
038 import java.net.URI;
039 import java.net.URISyntaxException;
040 import java.util.ArrayList;
041 import java.util.List;
042 import java.util.Map;
043
044 /**
045 * Service that provides application workflow definition reading from the path and creation of the proto configuration.
046 */
047 public abstract class WorkflowAppService implements Service {
048
049 public static final String CONF_PREFIX = Service.CONF_PREFIX + "WorkflowAppService.";
050
051 public static final String SYSTEM_LIB_PATH = CONF_PREFIX + "system.libpath";
052
053 public static final String APP_LIB_PATH_LIST = "oozie.wf.application.lib";
054
055 public static final String HADOOP_USER = "user.name";
056
057 public static final String CONFG_MAX_WF_LENGTH = CONF_PREFIX + "WorkflowDefinitionMaxLength";
058
059 private Path systemLibPath;
060 private long maxWFLength;
061
062 /**
063 * Initialize the workflow application service.
064 *
065 * @param services services instance.
066 */
067 public void init(Services services) {
068 Configuration conf = services.getConf();
069
070 String path = conf.get(SYSTEM_LIB_PATH, " ");
071 if (path.trim().length() > 0) {
072 systemLibPath = new Path(path.trim());
073 }
074
075 maxWFLength = conf.getInt(CONFG_MAX_WF_LENGTH, 100000);
076 }
077
078 /**
079 * Destroy the workflow application service.
080 */
081 public void destroy() {
082 }
083
084 /**
085 * Return the public interface for workflow application service.
086 *
087 * @return {@link WorkflowAppService}.
088 */
089 public Class<? extends Service> getInterface() {
090 return WorkflowAppService.class;
091 }
092
093 /**
094 * Read workflow definition.
095 *
096 *
097 * @param appPath application path.
098 * @param user user name.
099 * @param autToken authentication token.
100 * @return workflow definition.
101 * @throws WorkflowException thrown if the definition could not be read.
102 */
103 protected String readDefinition(String appPath, String user, String autToken, Configuration conf)
104 throws WorkflowException {
105 try {
106 URI uri = new URI(appPath);
107 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
108 JobConf jobConf = has.createJobConf(uri.getAuthority());
109 FileSystem fs = has.createFileSystem(user, uri, jobConf);
110
111 // app path could be a directory
112 Path path = new Path(uri.getPath());
113 if (!fs.isFile(path)) {
114 path = new Path(path, "workflow.xml");
115 }
116
117 FileStatus fsStatus = fs.getFileStatus(path);
118 if (fsStatus.getLen() > this.maxWFLength) {
119 throw new WorkflowException(ErrorCode.E0736, fsStatus.getLen(), this.maxWFLength);
120 }
121
122 Reader reader = new InputStreamReader(fs.open(path));
123 StringWriter writer = new StringWriter();
124 IOUtils.copyCharStream(reader, writer);
125 return writer.toString();
126
127 }
128 catch (WorkflowException wfe) {
129 throw wfe;
130 }
131 catch (IOException ex) {
132 throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
133 }
134 catch (URISyntaxException ex) {
135 throw new WorkflowException(ErrorCode.E0711, appPath, ex.getMessage(), ex);
136 }
137 catch (HadoopAccessorException ex) {
138 throw new WorkflowException(ex);
139 }
140 catch (Exception ex) {
141 throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
142 }
143 }
144
145 /**
146 * Create proto configuration. <p/> The proto configuration includes the user,group and the paths which need to be
147 * added to distributed cache. These paths include .jar,.so and the resource file paths.
148 *
149 * @param jobConf job configuration.
150 * @param authToken authentication token.
151 * @param isWorkflowJob indicates if the job is a workflow job or not.
152 * @return proto configuration.
153 * @throws WorkflowException thrown if the proto action configuration could not be created.
154 */
155 public XConfiguration createProtoActionConf(Configuration jobConf, String authToken, boolean isWorkflowJob)
156 throws WorkflowException {
157 try {
158 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
159 URI uri = new URI(jobConf.get(OozieClient.APP_PATH));
160
161 Configuration conf = has.createJobConf(uri.getAuthority());
162
163 String user = jobConf.get(OozieClient.USER_NAME);
164 conf.set(OozieClient.USER_NAME, user);
165
166 FileSystem fs = has.createFileSystem(user, uri, conf);
167
168 Path appPath = new Path(uri.getPath());
169 XLog.getLog(getClass()).debug("jobConf.libPath = " + jobConf.get(OozieClient.LIBPATH));
170 XLog.getLog(getClass()).debug("jobConf.appPath = " + appPath);
171
172 List<String> filePaths;
173 if (isWorkflowJob) {
174 // app path could be a directory
175 Path path = new Path(uri.getPath());
176 if (!fs.isFile(path)) {
177 filePaths = getLibFiles(fs, new Path(appPath + "/lib"));
178 } else {
179 filePaths = getLibFiles(fs, new Path(appPath.getParent(), "lib"));
180 }
181 }
182 else {
183 filePaths = new ArrayList<String>();
184 }
185
186 String[] libPaths = jobConf.getStrings(OozieClient.LIBPATH);
187 if (libPaths != null && libPaths.length > 0) {
188 for (int i = 0; i < libPaths.length; i++) {
189 if (libPaths[i].trim().length() > 0) {
190 Path libPath = new Path(libPaths[i].trim());
191 List<String> libFilePaths = getLibFiles(fs, libPath);
192 filePaths.addAll(libFilePaths);
193 }
194 }
195 }
196
197 conf.setStrings(APP_LIB_PATH_LIST, filePaths.toArray(new String[filePaths.size()]));
198
199 //Add all properties start with 'oozie.'
200 for (Map.Entry<String, String> entry : jobConf) {
201 if (entry.getKey().startsWith("oozie.")) {
202 String name = entry.getKey();
203 String value = entry.getValue();
204 // Append application lib jars of both parent and child in
205 // subworkflow to APP_LIB_PATH_LIST
206 if ((conf.get(name) != null) && name.equals(APP_LIB_PATH_LIST)) {
207 value = value + "," + conf.get(name);
208 }
209 conf.set(name, value);
210 }
211 }
212 XConfiguration retConf = new XConfiguration();
213 XConfiguration.copy(conf, retConf);
214 return retConf;
215 }
216 catch (IOException ex) {
217 throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
218 }
219 catch (URISyntaxException ex) {
220 throw new WorkflowException(ErrorCode.E0711, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
221 }
222 catch (HadoopAccessorException ex) {
223 throw new WorkflowException(ex);
224 }
225 catch (Exception ex) {
226 throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH),
227 ex.getMessage(), ex);
228 }
229 }
230
231 /**
232 * Parse workflow definition.
233 *
234 * @param jobConf job configuration.
235 * @param authToken authentication token.
236 * @return workflow application.
237 * @throws WorkflowException thrown if the workflow application could not be parsed.
238 */
239 public abstract WorkflowApp parseDef(Configuration jobConf, String authToken) throws WorkflowException;
240
241 /**
242 * Parse workflow definition.
243 * @param wfXml workflow.
244 * @return workflow application.
245 * @throws WorkflowException thrown if the workflow application could not be parsed.
246 */
247 public abstract WorkflowApp parseDef(String wfXml) throws WorkflowException;
248
249 /**
250 * Get all library paths.
251 *
252 * @param fs file system object.
253 * @param libPath hdfs library path.
254 * @return list of paths.
255 * @throws IOException thrown if the lib paths could not be obtained.
256 */
257 private List<String> getLibFiles(FileSystem fs, Path libPath) throws IOException {
258 List<String> libPaths = new ArrayList<String>();
259 if (fs.exists(libPath)) {
260 FileStatus[] files = fs.listStatus(libPath, new NoPathFilter());
261
262 for (FileStatus file : files) {
263 libPaths.add(file.getPath().toUri().getPath().trim());
264 }
265 }
266 else {
267 XLog.getLog(getClass()).warn("libpath [{0}] does not exists", libPath);
268 }
269 return libPaths;
270 }
271
272 /*
273 * Filter class doing no filtering.
274 * We dont need define this class, but seems fs.listStatus() is not working properly without this.
275 * So providing this dummy no filtering Filter class.
276 */
277 private class NoPathFilter implements PathFilter {
278 @Override
279 public boolean accept(Path path) {
280 return true;
281 }
282 }
283
284 /**
285 * Returns Oozie system libpath.
286 *
287 * @return Oozie system libpath (sharelib) in HDFS if present, otherwise it returns <code>NULL</code>.
288 */
289 public Path getSystemLibPath() {
290 return systemLibPath;
291 }
292 }