001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.hadoop.fs.FileStatus;
022    import org.apache.hadoop.fs.FileSystem;
023    import org.apache.hadoop.fs.Path;
024    import org.apache.hadoop.fs.PathFilter;
025    import org.apache.hadoop.mapred.JobConf;
026    import org.apache.oozie.client.OozieClient;
027    import org.apache.oozie.workflow.WorkflowApp;
028    import org.apache.oozie.workflow.WorkflowException;
029    import org.apache.oozie.util.IOUtils;
030    import org.apache.oozie.util.XConfiguration;
031    import org.apache.oozie.util.XLog;
032    import org.apache.oozie.ErrorCode;
033    
034    import java.io.IOException;
035    import java.io.InputStreamReader;
036    import java.io.Reader;
037    import java.io.StringWriter;
038    import java.net.URI;
039    import java.net.URISyntaxException;
040    import java.util.ArrayList;
041    import java.util.Arrays;
042    import java.util.Collection;
043    import java.util.LinkedHashSet;
044    import java.util.List;
045    import java.util.Map;
046    import java.util.Set;
047    
048    /**
049     * Service that provides application workflow definition reading from the path and creation of the proto configuration.
050     */
051    public abstract class WorkflowAppService implements Service {
052    
053        public static final String CONF_PREFIX = Service.CONF_PREFIX + "WorkflowAppService.";
054    
055        public static final String SYSTEM_LIB_PATH = CONF_PREFIX + "system.libpath";
056    
057        public static final String APP_LIB_PATH_LIST = "oozie.wf.application.lib";
058    
059        public static final String HADOOP_USER = "user.name";
060    
061        public static final String CONFG_MAX_WF_LENGTH = CONF_PREFIX + "WorkflowDefinitionMaxLength";
062    
063        public static final String OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE = "oozie.subworkflow.classpath.inheritance";
064    
065        public static final String OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE = "oozie.wf.subworkflow.classpath.inheritance";
066    
067        private Path systemLibPath;
068        private long maxWFLength;
069        private boolean oozieSubWfCPInheritance;
070    
071        /**
072         * Initialize the workflow application service.
073         *
074         * @param services services instance.
075         */
076        public void init(Services services) {
077            Configuration conf = services.getConf();
078    
079            String path = conf.get(SYSTEM_LIB_PATH, " ");
080            if (path.trim().length() > 0) {
081                systemLibPath = new Path(path.trim());
082            }
083    
084            maxWFLength = conf.getInt(CONFG_MAX_WF_LENGTH, 100000);
085    
086            oozieSubWfCPInheritance = conf.getBoolean(OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE, true);
087        }
088    
089        /**
090         * Destroy the workflow application service.
091         */
092        public void destroy() {
093        }
094    
095        /**
096         * Return the public interface for workflow application service.
097         *
098         * @return {@link WorkflowAppService}.
099         */
100        public Class<? extends Service> getInterface() {
101            return WorkflowAppService.class;
102        }
103    
104        /**
105         * Read workflow definition.
106         *
107         *
108         * @param appPath application path.
109         * @param user user name.
110         * @param autToken authentication token.
111         * @return workflow definition.
112         * @throws WorkflowException thrown if the definition could not be read.
113         */
114        protected String readDefinition(String appPath, String user, String autToken, Configuration conf)
115                throws WorkflowException {
116            try {
117                URI uri = new URI(appPath);
118                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
119                JobConf jobConf = has.createJobConf(uri.getAuthority());
120                FileSystem fs = has.createFileSystem(user, uri, jobConf);
121    
122                // app path could be a directory
123                Path path = new Path(uri.getPath());
124                if (!fs.isFile(path)) {
125                    path = new Path(path, "workflow.xml");
126                }
127    
128                FileStatus fsStatus = fs.getFileStatus(path);
129                if (fsStatus.getLen() > this.maxWFLength) {
130                    throw new WorkflowException(ErrorCode.E0736, fsStatus.getLen(), this.maxWFLength);
131                }
132    
133                Reader reader = new InputStreamReader(fs.open(path));
134                StringWriter writer = new StringWriter();
135                IOUtils.copyCharStream(reader, writer);
136                return writer.toString();
137    
138            }
139            catch (WorkflowException wfe) {
140                throw wfe;
141            }
142            catch (IOException ex) {
143                throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
144            }
145            catch (URISyntaxException ex) {
146                throw new WorkflowException(ErrorCode.E0711, appPath, ex.getMessage(), ex);
147            }
148            catch (HadoopAccessorException ex) {
149                throw new WorkflowException(ex);
150            }
151            catch (Exception ex) {
152                throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
153            }
154        }
155    
156        /**
157         * Create proto configuration. <p/> The proto configuration includes the user,group and the paths which need to be
158         * added to distributed cache. These paths include .jar,.so and the resource file paths.
159         *
160         * @param jobConf job configuration.
161         * @param authToken authentication token.
162         * @param isWorkflowJob indicates if the job is a workflow job or not.
163         * @return proto configuration.
164         * @throws WorkflowException thrown if the proto action configuration could not be created.
165         */
166        public XConfiguration createProtoActionConf(Configuration jobConf, String authToken, boolean isWorkflowJob)
167                throws WorkflowException {
168            try {
169                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
170                URI uri = new URI(jobConf.get(OozieClient.APP_PATH));
171    
172                Configuration conf = has.createJobConf(uri.getAuthority());
173    
174                String user = jobConf.get(OozieClient.USER_NAME);
175                conf.set(OozieClient.USER_NAME, user);
176    
177                FileSystem fs = has.createFileSystem(user, uri, conf);
178    
179                Path appPath = new Path(uri);
180                XLog.getLog(getClass()).debug("jobConf.libPath = " + jobConf.get(OozieClient.LIBPATH));
181                XLog.getLog(getClass()).debug("jobConf.appPath = " + appPath);
182    
183                Collection<String> filePaths;
184                if (isWorkflowJob) {
185                    // app path could be a directory
186                    Path path = new Path(uri.getPath());
187                    if (!fs.isFile(path)) {
188                        filePaths = getLibFiles(fs, new Path(appPath + "/lib"));
189                    } else {
190                        filePaths = getLibFiles(fs, new Path(appPath.getParent(), "lib"));
191                    }
192                }
193                else {
194                    filePaths = new LinkedHashSet<String>();
195                }
196    
197                String[] libPaths = jobConf.getStrings(OozieClient.LIBPATH);
198                if (libPaths != null && libPaths.length > 0) {
199                    for (int i = 0; i < libPaths.length; i++) {
200                        if (libPaths[i].trim().length() > 0) {
201                            Path libPath = new Path(libPaths[i].trim());
202                            Collection<String> libFilePaths = getLibFiles(fs, libPath);
203                            filePaths.addAll(libFilePaths);
204                        }
205                    }
206                }
207    
208                // Check if a subworkflow should inherit the libs from the parent WF
209                // OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE has priority over OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE from oozie-site
210                // If OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE isn't specified, we use OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE
211                if (jobConf.getBoolean(OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE, oozieSubWfCPInheritance)) {
212                    // Keep any libs from a parent workflow that might already be in APP_LIB_PATH_LIST and also remove duplicates
213                    String[] parentFilePaths = jobConf.getStrings(APP_LIB_PATH_LIST);
214                    if (parentFilePaths != null && parentFilePaths.length > 0) {
215                        String[] filePathsNames = filePaths.toArray(new String[filePaths.size()]);
216                        for (int i = 0; i < filePathsNames.length; i++) {
217                            Path p = new Path(filePathsNames[i]);
218                            filePathsNames[i] = p.getName();
219                        }
220                        Arrays.sort(filePathsNames);
221                        List<String> nonDuplicateParentFilePaths = new ArrayList<String>();
222                        for (String parentFilePath : parentFilePaths) {
223                            Path p = new Path(parentFilePath);
224                            if (Arrays.binarySearch(filePathsNames, p.getName()) < 0) {
225                                nonDuplicateParentFilePaths.add(parentFilePath);
226                            }
227                        }
228                        filePaths.addAll(nonDuplicateParentFilePaths);
229                    }
230                }
231    
232                conf.setStrings(APP_LIB_PATH_LIST, filePaths.toArray(new String[filePaths.size()]));
233    
234                //Add all properties start with 'oozie.'
235                for (Map.Entry<String, String> entry : jobConf) {
236                    if (entry.getKey().startsWith("oozie.")) {
237                        String name = entry.getKey();
238                        String value = entry.getValue();
239                        // if property already exists, should not overwrite
240                        if(conf.get(name) == null) {
241                            conf.set(name, value);
242                        }
243                    }
244                }
245                XConfiguration retConf = new XConfiguration();
246                XConfiguration.copy(conf, retConf);
247                return retConf;
248            }
249            catch (IOException ex) {
250                throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
251            }
252            catch (URISyntaxException ex) {
253                throw new WorkflowException(ErrorCode.E0711, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
254            }
255            catch (HadoopAccessorException ex) {
256                throw new WorkflowException(ex);
257            }
258            catch (Exception ex) {
259                throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH),
260                                            ex.getMessage(), ex);
261            }
262        }
263    
264        /**
265         * Parse workflow definition.
266         *
267         * @param jobConf job configuration.
268         * @param authToken authentication token.
269         * @return workflow application.
270         * @throws WorkflowException thrown if the workflow application could not be parsed.
271         */
272        public abstract WorkflowApp parseDef(Configuration jobConf, String authToken) throws WorkflowException;
273    
274        /**
275         * Parse workflow definition.
276         * @param wfXml workflow.
277         * @param jobConf job configuration
278         * @return workflow application.
279         * @throws WorkflowException thrown if the workflow application could not be parsed.
280         */
281        public abstract WorkflowApp parseDef(String wfXml, Configuration jobConf) throws WorkflowException;
282    
283        /**
284         * Get all library paths.
285         *
286         * @param fs file system object.
287         * @param libPath hdfs library path.
288         * @return list of paths.
289         * @throws IOException thrown if the lib paths could not be obtained.
290         */
291        private Collection<String> getLibFiles(FileSystem fs, Path libPath) throws IOException {
292            Set<String> libPaths = new LinkedHashSet<String>();
293            if (fs.exists(libPath)) {
294                FileStatus[] files = fs.listStatus(libPath, new NoPathFilter());
295    
296                for (FileStatus file : files) {
297                    libPaths.add(file.getPath().toUri().toString());
298                }
299            }
300            else {
301                XLog.getLog(getClass()).warn("libpath [{0}] does not exist", libPath);
302            }
303            return libPaths;
304        }
305    
306        /*
307         * Filter class doing no filtering.
308         * We dont need define this class, but seems fs.listStatus() is not working properly without this.
309         * So providing this dummy no filtering Filter class.
310         */
311        private class NoPathFilter implements PathFilter {
312            @Override
313            public boolean accept(Path path) {
314                return true;
315            }
316        }
317    
318        /**
319         * Returns Oozie system libpath.
320         *
321         * @return Oozie system libpath (sharelib) in HDFS if present, otherwise it returns <code>NULL</code>.
322         */
323        public Path getSystemLibPath() {
324            return systemLibPath;
325        }
326    }