001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.hadoop.fs.FileStatus;
022    import org.apache.hadoop.fs.FileSystem;
023    import org.apache.hadoop.fs.Path;
024    import org.apache.hadoop.fs.PathFilter;
025    import org.apache.hadoop.mapred.JobConf;
026    import org.apache.oozie.client.OozieClient;
027    import org.apache.oozie.workflow.WorkflowApp;
028    import org.apache.oozie.workflow.WorkflowException;
029    import org.apache.oozie.util.IOUtils;
030    import org.apache.oozie.util.XConfiguration;
031    import org.apache.oozie.util.XLog;
032    import org.apache.oozie.ErrorCode;
033    
034    import java.io.IOException;
035    import java.io.InputStreamReader;
036    import java.io.Reader;
037    import java.io.StringWriter;
038    import java.net.URI;
039    import java.net.URISyntaxException;
040    import java.util.ArrayList;
041    import java.util.List;
042    import java.util.Map;
043    
044    /**
045     * Service that provides application workflow definition reading from the path and creation of the proto configuration.
046     */
047    public abstract class WorkflowAppService implements Service {
048    
049        public static final String CONF_PREFIX = Service.CONF_PREFIX + "WorkflowAppService.";
050    
051        public static final String SYSTEM_LIB_PATH = CONF_PREFIX + "system.libpath";
052    
053        public static final String APP_LIB_PATH_LIST = "oozie.wf.application.lib";
054    
055        public static final String HADOOP_USER = "user.name";
056    
057        public static final String CONFG_MAX_WF_LENGTH = CONF_PREFIX + "WorkflowDefinitionMaxLength";
058    
059        private Path systemLibPath;
060        private long maxWFLength;
061    
062        /**
063         * Initialize the workflow application service.
064         *
065         * @param services services instance.
066         */
067        public void init(Services services) {
068            Configuration conf = services.getConf();
069    
070            String path = conf.get(SYSTEM_LIB_PATH, " ");
071            if (path.trim().length() > 0) {
072                systemLibPath = new Path(path.trim());
073            }
074    
075            maxWFLength = conf.getInt(CONFG_MAX_WF_LENGTH, 100000);
076        }
077    
078        /**
079         * Destroy the workflow application service.
080         */
081        public void destroy() {
082        }
083    
084        /**
085         * Return the public interface for workflow application service.
086         *
087         * @return {@link WorkflowAppService}.
088         */
089        public Class<? extends Service> getInterface() {
090            return WorkflowAppService.class;
091        }
092    
093        /**
094         * Read workflow definition.
095         *
096         *
097         * @param appPath application path.
098         * @param user user name.
099         * @param autToken authentication token.
100         * @return workflow definition.
101         * @throws WorkflowException thrown if the definition could not be read.
102         */
103        protected String readDefinition(String appPath, String user, String autToken, Configuration conf)
104                throws WorkflowException {
105            try {
106                URI uri = new URI(appPath);
107                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
108                JobConf jobConf = has.createJobConf(uri.getAuthority());
109                FileSystem fs = has.createFileSystem(user, uri, jobConf);
110    
111                // app path could be a directory
112                Path path = new Path(uri.getPath());
113                if (!fs.isFile(path)) {
114                    path = new Path(path, "workflow.xml");
115                }
116    
117                FileStatus fsStatus = fs.getFileStatus(path);
118                if (fsStatus.getLen() > this.maxWFLength) {
119                    throw new WorkflowException(ErrorCode.E0736, fsStatus.getLen(), this.maxWFLength);
120                }
121    
122                Reader reader = new InputStreamReader(fs.open(path));
123                StringWriter writer = new StringWriter();
124                IOUtils.copyCharStream(reader, writer);
125                return writer.toString();
126    
127            }
128            catch (WorkflowException wfe) {
129                throw wfe;
130            }
131            catch (IOException ex) {
132                throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
133            }
134            catch (URISyntaxException ex) {
135                throw new WorkflowException(ErrorCode.E0711, appPath, ex.getMessage(), ex);
136            }
137            catch (HadoopAccessorException ex) {
138                throw new WorkflowException(ex);
139            }
140            catch (Exception ex) {
141                throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
142            }
143        }
144    
145        /**
146         * Create proto configuration. <p/> The proto configuration includes the user,group and the paths which need to be
147         * added to distributed cache. These paths include .jar,.so and the resource file paths.
148         *
149         * @param jobConf job configuration.
150         * @param authToken authentication token.
151         * @param isWorkflowJob indicates if the job is a workflow job or not.
152         * @return proto configuration.
153         * @throws WorkflowException thrown if the proto action configuration could not be created.
154         */
155        public XConfiguration createProtoActionConf(Configuration jobConf, String authToken, boolean isWorkflowJob)
156                throws WorkflowException {
157            try {
158                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
159                URI uri = new URI(jobConf.get(OozieClient.APP_PATH));
160    
161                Configuration conf = has.createJobConf(uri.getAuthority());
162    
163                String user = jobConf.get(OozieClient.USER_NAME);
164                conf.set(OozieClient.USER_NAME, user);
165    
166                FileSystem fs = has.createFileSystem(user, uri, conf);
167    
168                Path appPath = new Path(uri.getPath());
169                XLog.getLog(getClass()).debug("jobConf.libPath = " + jobConf.get(OozieClient.LIBPATH));
170                XLog.getLog(getClass()).debug("jobConf.appPath = " + appPath);
171    
172                List<String> filePaths;
173                if (isWorkflowJob) {
174                    // app path could be a directory
175                    Path path = new Path(uri.getPath());
176                    if (!fs.isFile(path)) {
177                        filePaths = getLibFiles(fs, new Path(appPath + "/lib"));
178                    } else {
179                        filePaths = getLibFiles(fs, new Path(appPath.getParent(), "lib"));
180                    }
181                }
182                else {
183                    filePaths = new ArrayList<String>();
184                }
185    
186                String[] libPaths = jobConf.getStrings(OozieClient.LIBPATH);
187                if (libPaths != null && libPaths.length > 0) {
188                    for (int i = 0; i < libPaths.length; i++) {
189                        if (libPaths[i].trim().length() > 0) {
190                            Path libPath = new Path(libPaths[i].trim());
191                            List<String> libFilePaths = getLibFiles(fs, libPath);
192                            filePaths.addAll(libFilePaths);
193                        }
194                    }
195                }
196    
197                conf.setStrings(APP_LIB_PATH_LIST, filePaths.toArray(new String[filePaths.size()]));
198    
199                //Add all properties start with 'oozie.'
200                for (Map.Entry<String, String> entry : jobConf) {
201                    if (entry.getKey().startsWith("oozie.")) {
202                        String name = entry.getKey();
203                        String value = entry.getValue();
204                        // Append application lib jars of both parent and child in
205                        // subworkflow to APP_LIB_PATH_LIST
206                        if ((conf.get(name) != null) && name.equals(APP_LIB_PATH_LIST)) {
207                            value = value + "," + conf.get(name);
208                        }
209                        conf.set(name, value);
210                    }
211                }
212                XConfiguration retConf = new XConfiguration();
213                XConfiguration.copy(conf, retConf);
214                return retConf;
215            }
216            catch (IOException ex) {
217                throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
218            }
219            catch (URISyntaxException ex) {
220                throw new WorkflowException(ErrorCode.E0711, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
221            }
222            catch (HadoopAccessorException ex) {
223                throw new WorkflowException(ex);
224            }
225            catch (Exception ex) {
226                throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH),
227                                            ex.getMessage(), ex);
228            }
229        }
230    
231        /**
232         * Parse workflow definition.
233         *
234         * @param jobConf job configuration.
235         * @param authToken authentication token.
236         * @return workflow application.
237         * @throws WorkflowException thrown if the workflow application could not be parsed.
238         */
239        public abstract WorkflowApp parseDef(Configuration jobConf, String authToken) throws WorkflowException;
240    
241        /**
242         * Parse workflow definition.
243         * @param wfXml workflow.
244         * @return workflow application.
245         * @throws WorkflowException thrown if the workflow application could not be parsed.
246         */
247        public abstract WorkflowApp parseDef(String wfXml) throws WorkflowException;
248    
249        /**
250         * Get all library paths.
251         *
252         * @param fs file system object.
253         * @param libPath hdfs library path.
254         * @return list of paths.
255         * @throws IOException thrown if the lib paths could not be obtained.
256         */
257        private List<String> getLibFiles(FileSystem fs, Path libPath) throws IOException {
258            List<String> libPaths = new ArrayList<String>();
259            if (fs.exists(libPath)) {
260                FileStatus[] files = fs.listStatus(libPath, new NoPathFilter());
261    
262                for (FileStatus file : files) {
263                    libPaths.add(file.getPath().toUri().getPath().trim());
264                }
265            }
266            else {
267                XLog.getLog(getClass()).warn("libpath [{0}] does not exists", libPath);
268            }
269            return libPaths;
270        }
271    
272        /*
273         * Filter class doing no filtering.
274         * We dont need define this class, but seems fs.listStatus() is not working properly without this.
275         * So providing this dummy no filtering Filter class.
276         */
277        private class NoPathFilter implements PathFilter {
278            @Override
279            public boolean accept(Path path) {
280                return true;
281            }
282        }
283    
284        /**
285         * Returns Oozie system libpath.
286         *
287         * @return Oozie system libpath (sharelib) in HDFS if present, otherwise it returns <code>NULL</code>.
288         */
289        public Path getSystemLibPath() {
290            return systemLibPath;
291        }
292    }