This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.hadoop.fs.FileStatus;
022 import org.apache.hadoop.fs.FileSystem;
023 import org.apache.hadoop.fs.Path;
024 import org.apache.hadoop.fs.PathFilter;
025 import org.apache.oozie.client.OozieClient;
026 import org.apache.oozie.workflow.WorkflowApp;
027 import org.apache.oozie.workflow.WorkflowException;
028 import org.apache.oozie.util.IOUtils;
029 import org.apache.oozie.util.XConfiguration;
030 import org.apache.oozie.util.XLog;
031 import org.apache.oozie.ErrorCode;
032
033 import java.io.IOException;
034 import java.io.InputStreamReader;
035 import java.io.Reader;
036 import java.io.StringWriter;
037 import java.net.URI;
038 import java.net.URISyntaxException;
039 import java.util.ArrayList;
040 import java.util.List;
041 import java.util.Map;
042
043 /**
044 * Service that provides application workflow definition reading from the path and creation of the proto configuration.
045 */
046 public abstract class WorkflowAppService implements Service {
047
048 public static final String CONF_PREFIX = Service.CONF_PREFIX + "WorkflowAppService.";
049
050 public static final String SYSTEM_LIB_PATH = CONF_PREFIX + "system.libpath";
051
052 public static final String APP_LIB_PATH_LIST = "oozie.wf.application.lib";
053
054 public static final String HADOOP_UGI = "hadoop.job.ugi";
055
056 public static final String HADOOP_USER = "user.name";
057
058 public static final String HADOOP_JT_KERBEROS_NAME = "mapreduce.jobtracker.kerberos.principal";
059
060 public static final String HADOOP_NN_KERBEROS_NAME = "dfs.namenode.kerberos.principal";
061
062 private Path systemLibPath;
063
064 /**
065 * Initialize the workflow application service.
066 *
067 * @param services services instance.
068 */
069 public void init(Services services) {
070 String path = services.getConf().get(SYSTEM_LIB_PATH, " ");
071 if (path.trim().length() > 0) {
072 systemLibPath = new Path(path.trim());
073 }
074 }
075
076 /**
077 * Destroy the workflow application service.
078 */
079 public void destroy() {
080 }
081
082 /**
083 * Return the public interface for workflow application service.
084 *
085 * @return {@link WorkflowAppService}.
086 */
087 public Class<? extends Service> getInterface() {
088 return WorkflowAppService.class;
089 }
090
091 /**
092 * Read workflow definition.
093 *
094 * @param appPath application path.
095 * @param user user name.
096 * @param group group name.
097 * @param autToken authentication token.
098 * @return workflow definition.
099 * @throws WorkflowException thrown if the definition could not be read.
100 */
101 protected String readDefinition(String appPath, String user, String group, String autToken)
102 throws WorkflowException {
103 try {
104 URI uri = new URI(appPath);
105 FileSystem fs = Services.get().get(HadoopAccessorService.class).
106 createFileSystem(user, group, uri, new Configuration());
107
108 // app path could be a directory
109 Path path = new Path(uri.getPath());
110 if (!fs.isFile(path)) {
111 path = new Path(path, "workflow.xml");
112 }
113
114 Reader reader = new InputStreamReader(fs.open(path));
115 StringWriter writer = new StringWriter();
116 IOUtils.copyCharStream(reader, writer);
117 return writer.toString();
118
119 }
120 catch (IOException ex) {
121 throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
122 }
123 catch (URISyntaxException ex) {
124 throw new WorkflowException(ErrorCode.E0711, appPath, ex.getMessage(), ex);
125 }
126 catch (HadoopAccessorException ex) {
127 throw new WorkflowException(ex);
128 }
129 catch (Exception ex) {
130 throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
131 }
132 }
133
134 /**
135 * Create proto configuration. <p/> The proto configuration includes the user,group and the paths which need to be
136 * added to distributed cache. These paths include .jar,.so and the resource file paths.
137 *
138 * @param jobConf job configuration.
139 * @param authToken authentication token.
140 * @param isWorkflowJob indicates if the job is a workflow job or not.
141 * @return proto configuration.
142 * @throws WorkflowException thrown if the proto action configuration could not be created.
143 */
144 public XConfiguration createProtoActionConf(Configuration jobConf, String authToken, boolean isWorkflowJob)
145 throws WorkflowException {
146 XConfiguration conf = new XConfiguration();
147 try {
148 String user = jobConf.get(OozieClient.USER_NAME);
149 String group = jobConf.get(OozieClient.GROUP_NAME);
150 String hadoopUgi = user + "," + group;
151
152 conf.set(OozieClient.USER_NAME, user);
153 conf.set(OozieClient.GROUP_NAME, group);
154 conf.set(HADOOP_UGI, hadoopUgi);
155
156 conf.set(HADOOP_JT_KERBEROS_NAME, jobConf.get(HADOOP_JT_KERBEROS_NAME));
157 conf.set(HADOOP_NN_KERBEROS_NAME, jobConf.get(HADOOP_NN_KERBEROS_NAME));
158
159 URI uri = new URI(jobConf.get(OozieClient.APP_PATH));
160
161 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, uri, conf);
162
163 Path appPath = new Path(uri.getPath());
164 XLog.getLog(getClass()).debug("jobConf.libPath = " + jobConf.get(OozieClient.LIBPATH));
165 XLog.getLog(getClass()).debug("jobConf.appPath = " + appPath);
166
167 List<String> filePaths;
168 if (isWorkflowJob) {
169 // app path could be a directory
170 Path path = new Path(uri.getPath());
171 if (!fs.isFile(path)) {
172 filePaths = getLibFiles(fs, new Path(appPath + "/lib"));
173 } else {
174 filePaths = getLibFiles(fs, new Path(appPath.getParent(), "lib"));
175 }
176 }
177 else {
178 filePaths = new ArrayList<String>();
179 }
180
181 String[] libPaths = jobConf.getStrings(OozieClient.LIBPATH);
182 if (libPaths != null && libPaths.length > 0) {
183 for (int i = 0; i < libPaths.length; i++) {
184 if (libPaths[i].trim().length() > 0) {
185 Path libPath = new Path(libPaths[i].trim());
186 List<String> libFilePaths = getLibFiles(fs, libPath);
187 filePaths.addAll(libFilePaths);
188 }
189 }
190 }
191
192 if (systemLibPath != null && jobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) {
193 List<String> libFilePaths = getLibFiles(fs, systemLibPath);
194 filePaths.addAll(libFilePaths);
195 }
196
197 conf.setStrings(APP_LIB_PATH_LIST, filePaths.toArray(new String[filePaths.size()]));
198
199 //Add all properties start with 'oozie.'
200 for (Map.Entry<String, String> entry : jobConf) {
201 if (entry.getKey().startsWith("oozie.")) {
202 String name = entry.getKey();
203 String value = entry.getValue();
204 // Append application lib jars of both parent and child in
205 // subworkflow to APP_LIB_PATH_LIST
206 if ((conf.get(name) != null) && name.equals(APP_LIB_PATH_LIST)) {
207 value = value + "," + conf.get(name);
208 }
209 conf.set(name, value);
210 }
211 }
212 return conf;
213 }
214 catch (IOException ex) {
215 throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
216 }
217 catch (URISyntaxException ex) {
218 throw new WorkflowException(ErrorCode.E0711, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
219 }
220 catch (HadoopAccessorException ex) {
221 throw new WorkflowException(ex);
222 }
223 catch (Exception ex) {
224 throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH),
225 ex.getMessage(), ex);
226 }
227 }
228
229 /**
230 * Parse workflow definition.
231 *
232 * @param jobConf job configuration.
233 * @param authToken authentication token.
234 * @return workflow application.
235 * @throws WorkflowException thrown if the workflow application could not be parsed.
236 */
237 public abstract WorkflowApp parseDef(Configuration jobConf, String authToken) throws WorkflowException;
238
239 /**
240 * Parse workflow definition.
241 * @param wfXml workflow.
242 * @return workflow application.
243 * @throws WorkflowException thrown if the workflow application could not be parsed.
244 */
245 public abstract WorkflowApp parseDef(String wfXml) throws WorkflowException;
246
247 /**
248 * Get all library paths.
249 *
250 * @param fs file system object.
251 * @param libPath hdfs library path.
252 * @return list of paths.
253 * @throws IOException thrown if the lib paths could not be obtained.
254 */
255 private List<String> getLibFiles(FileSystem fs, Path libPath) throws IOException {
256 List<String> libPaths = new ArrayList<String>();
257 if (fs.exists(libPath)) {
258 FileStatus[] files = fs.listStatus(libPath, new NoPathFilter());
259
260 for (FileStatus file : files) {
261 libPaths.add(file.getPath().toUri().getPath().trim());
262 }
263 }
264 else {
265 XLog.getLog(getClass()).warn("libpath [{0}] does not exists", libPath);
266 }
267 return libPaths;
268 }
269
270 /*
271 * Filter class doing no filtering.
272 * We dont need define this class, but seems fs.listStatus() is not working properly without this.
273 * So providing this dummy no filtering Filter class.
274 */
275 private class NoPathFilter implements PathFilter {
276 @Override
277 public boolean accept(Path path) {
278 return true;
279 }
280 }
281 }