001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.hadoop.fs.FileStatus;
022    import org.apache.hadoop.fs.FileSystem;
023    import org.apache.hadoop.fs.Path;
024    import org.apache.hadoop.fs.PathFilter;
025    import org.apache.oozie.client.OozieClient;
026    import org.apache.oozie.workflow.WorkflowApp;
027    import org.apache.oozie.workflow.WorkflowException;
028    import org.apache.oozie.util.IOUtils;
029    import org.apache.oozie.util.XConfiguration;
030    import org.apache.oozie.util.XLog;
031    import org.apache.oozie.ErrorCode;
032    
033    import java.io.IOException;
034    import java.io.InputStreamReader;
035    import java.io.Reader;
036    import java.io.StringWriter;
037    import java.net.URI;
038    import java.net.URISyntaxException;
039    import java.util.ArrayList;
040    import java.util.List;
041    import java.util.Map;
042    
043    /**
044     * Service that provides application workflow definition reading from the path and creation of the proto configuration.
045     */
046    public abstract class WorkflowAppService implements Service {
047    
048        public static final String CONF_PREFIX = Service.CONF_PREFIX + "WorkflowAppService.";
049    
050        public static final String SYSTEM_LIB_PATH = CONF_PREFIX + "system.libpath";
051    
052        public static final String APP_LIB_PATH_LIST = "oozie.wf.application.lib";
053    
054        public static final String HADOOP_UGI = "hadoop.job.ugi";
055    
056        public static final String HADOOP_USER = "user.name";
057    
058        public static final String HADOOP_JT_KERBEROS_NAME = "mapreduce.jobtracker.kerberos.principal";
059    
060        public static final String HADOOP_NN_KERBEROS_NAME = "dfs.namenode.kerberos.principal";
061    
062        private Path systemLibPath;
063    
064        /**
065         * Initialize the workflow application service.
066         *
067         * @param services services instance.
068         */
069        public void init(Services services) {
070            String path = services.getConf().get(SYSTEM_LIB_PATH, " ");
071            if (path.trim().length() > 0) {
072                systemLibPath = new Path(path.trim());
073            }
074        }
075    
076        /**
077         * Destroy the workflow application service.
078         */
079        public void destroy() {
080        }
081    
082        /**
083         * Return the public interface for workflow application service.
084         *
085         * @return {@link WorkflowAppService}.
086         */
087        public Class<? extends Service> getInterface() {
088            return WorkflowAppService.class;
089        }
090    
091        /**
092         * Read workflow definition.
093         *
094         * @param appPath application path.
095         * @param user user name.
096         * @param group group name.
097         * @param autToken authentication token.
098         * @return workflow definition.
099         * @throws WorkflowException thrown if the definition could not be read.
100         */
101        protected String readDefinition(String appPath, String user, String group, String autToken)
102                throws WorkflowException {
103            try {
104                URI uri = new URI(appPath);
105                FileSystem fs = Services.get().get(HadoopAccessorService.class).
106                        createFileSystem(user, group, uri, new Configuration());
107    
108                // app path could be a directory
109                Path path = new Path(uri.getPath());
110                if (!fs.isFile(path)) {
111                    path = new Path(path, "workflow.xml");
112                }
113    
114                Reader reader = new InputStreamReader(fs.open(path));
115                StringWriter writer = new StringWriter();
116                IOUtils.copyCharStream(reader, writer);
117                return writer.toString();
118    
119            }
120            catch (IOException ex) {
121                throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
122            }
123            catch (URISyntaxException ex) {
124                throw new WorkflowException(ErrorCode.E0711, appPath, ex.getMessage(), ex);
125            }
126            catch (HadoopAccessorException ex) {
127                throw new WorkflowException(ex);
128            }
129            catch (Exception ex) {
130                throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
131            }
132        }
133    
134        /**
135         * Create proto configuration. <p/> The proto configuration includes the user,group and the paths which need to be
136         * added to distributed cache. These paths include .jar,.so and the resource file paths.
137         *
138         * @param jobConf job configuration.
139         * @param authToken authentication token.
140         * @param isWorkflowJob indicates if the job is a workflow job or not.
141         * @return proto configuration.
142         * @throws WorkflowException thrown if the proto action configuration could not be created.
143         */
144        public XConfiguration createProtoActionConf(Configuration jobConf, String authToken, boolean isWorkflowJob)
145                throws WorkflowException {
146            XConfiguration conf = new XConfiguration();
147            try {
148                String user = jobConf.get(OozieClient.USER_NAME);
149                String group = jobConf.get(OozieClient.GROUP_NAME);
150                String hadoopUgi = user + "," + group;
151    
152                conf.set(OozieClient.USER_NAME, user);
153                conf.set(OozieClient.GROUP_NAME, group);
154                conf.set(HADOOP_UGI, hadoopUgi);
155    
156                conf.set(HADOOP_JT_KERBEROS_NAME, jobConf.get(HADOOP_JT_KERBEROS_NAME));
157                conf.set(HADOOP_NN_KERBEROS_NAME, jobConf.get(HADOOP_NN_KERBEROS_NAME));
158    
159                URI uri = new URI(jobConf.get(OozieClient.APP_PATH));
160    
161                FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, uri, conf);
162    
163                Path appPath = new Path(uri.getPath());
164                XLog.getLog(getClass()).debug("jobConf.libPath = " + jobConf.get(OozieClient.LIBPATH));
165                XLog.getLog(getClass()).debug("jobConf.appPath = " + appPath);
166    
167                List<String> filePaths;
168                if (isWorkflowJob) {
169                    // app path could be a directory
170                    Path path = new Path(uri.getPath());
171                    if (!fs.isFile(path)) {
172                        filePaths = getLibFiles(fs, new Path(appPath + "/lib"));
173                    } else {
174                        filePaths = getLibFiles(fs, new Path(appPath.getParent(), "lib"));
175                    }
176                }
177                else {
178                    filePaths = new ArrayList<String>();
179                }
180    
181                String[] libPaths = jobConf.getStrings(OozieClient.LIBPATH);
182                if (libPaths != null && libPaths.length > 0) {
183                    for (int i = 0; i < libPaths.length; i++) {
184                        if (libPaths[i].trim().length() > 0) {
185                            Path libPath = new Path(libPaths[i].trim());
186                            List<String> libFilePaths = getLibFiles(fs, libPath);
187                            filePaths.addAll(libFilePaths);
188                        }
189                    }
190                }
191    
192                if (systemLibPath != null && jobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) {
193                    List<String> libFilePaths = getLibFiles(fs, systemLibPath);
194                    filePaths.addAll(libFilePaths);
195                }
196    
197                conf.setStrings(APP_LIB_PATH_LIST, filePaths.toArray(new String[filePaths.size()]));
198    
199                //Add all properties start with 'oozie.'
200                for (Map.Entry<String, String> entry : jobConf) {
201                    if (entry.getKey().startsWith("oozie.")) {
202                        String name = entry.getKey();
203                        String value = entry.getValue();
204                        // Append application lib jars of both parent and child in
205                        // subworkflow to APP_LIB_PATH_LIST
206                        if ((conf.get(name) != null) && name.equals(APP_LIB_PATH_LIST)) {
207                            value = value + "," + conf.get(name);
208                        }
209                        conf.set(name, value);
210                    }
211                }
212                return conf;
213            }
214            catch (IOException ex) {
215                throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
216            }
217            catch (URISyntaxException ex) {
218                throw new WorkflowException(ErrorCode.E0711, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
219            }
220            catch (HadoopAccessorException ex) {
221                throw new WorkflowException(ex);
222            }
223            catch (Exception ex) {
224                throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH),
225                                            ex.getMessage(), ex);
226            }
227        }
228    
229        /**
230         * Parse workflow definition.
231         *
232         * @param jobConf job configuration.
233         * @param authToken authentication token.
234         * @return workflow application.
235         * @throws WorkflowException thrown if the workflow application could not be parsed.
236         */
237        public abstract WorkflowApp parseDef(Configuration jobConf, String authToken) throws WorkflowException;
238    
239        /**
240         * Parse workflow definition.
241         * @param wfXml workflow.
242         * @return workflow application.
243         * @throws WorkflowException thrown if the workflow application could not be parsed.
244         */
245        public abstract WorkflowApp parseDef(String wfXml) throws WorkflowException;
246    
247        /**
248         * Get all library paths.
249         *
250         * @param fs file system object.
251         * @param libPath hdfs library path.
252         * @return list of paths.
253         * @throws IOException thrown if the lib paths could not be obtained.
254         */
255        private List<String> getLibFiles(FileSystem fs, Path libPath) throws IOException {
256            List<String> libPaths = new ArrayList<String>();
257            if (fs.exists(libPath)) {
258                FileStatus[] files = fs.listStatus(libPath, new NoPathFilter());
259    
260                for (FileStatus file : files) {
261                    libPaths.add(file.getPath().toUri().getPath().trim());
262                }
263            }
264            else {
265                XLog.getLog(getClass()).warn("libpath [{0}] does not exists", libPath);
266            }
267            return libPaths;
268        }
269    
270        /*
271         * Filter class doing no filtering.
272         * We dont need define this class, but seems fs.listStatus() is not working properly without this.
273         * So providing this dummy no filtering Filter class.
274         */
275        private class NoPathFilter implements PathFilter {
276            @Override
277            public boolean accept(Path path) {
278                return true;
279            }
280        }
281    }