001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.hadoop.fs.FileStatus;
022import org.apache.hadoop.fs.FileSystem;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.fs.PathFilter;
025import org.apache.hadoop.mapred.JobConf;
026import org.apache.oozie.client.OozieClient;
027import org.apache.oozie.workflow.WorkflowApp;
028import org.apache.oozie.workflow.WorkflowException;
029import org.apache.oozie.util.IOUtils;
030import org.apache.oozie.util.XConfiguration;
031import org.apache.oozie.util.XLog;
032import org.apache.oozie.ErrorCode;
033
034import java.io.IOException;
035import java.io.InputStreamReader;
036import java.io.Reader;
037import java.io.StringWriter;
038import java.net.URI;
039import java.net.URISyntaxException;
040import java.util.ArrayList;
041import java.util.Arrays;
042import java.util.Collection;
043import java.util.LinkedHashSet;
044import java.util.List;
045import java.util.Map;
046import java.util.Set;
047
048/**
049 * Service that provides application workflow definition reading from the path and creation of the proto configuration.
050 */
051public abstract class WorkflowAppService implements Service {
052
053    public static final String CONF_PREFIX = Service.CONF_PREFIX + "WorkflowAppService.";
054
055    public static final String SYSTEM_LIB_PATH = CONF_PREFIX + "system.libpath";
056
057    public static final String APP_LIB_PATH_LIST = "oozie.wf.application.lib";
058
059    public static final String HADOOP_USER = "user.name";
060
061    public static final String CONFG_MAX_WF_LENGTH = CONF_PREFIX + "WorkflowDefinitionMaxLength";
062
063    public static final String OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE = "oozie.subworkflow.classpath.inheritance";
064
065    public static final String OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE = "oozie.wf.subworkflow.classpath.inheritance";
066
067    private Path systemLibPath;
068    private long maxWFLength;
069    private boolean oozieSubWfCPInheritance;
070
071    /**
072     * Initialize the workflow application service.
073     *
074     * @param services services instance.
075     */
076    public void init(Services services) {
077        Configuration conf = services.getConf();
078
079        String path = conf.get(SYSTEM_LIB_PATH, " ");
080        if (path.trim().length() > 0) {
081            systemLibPath = new Path(path.trim());
082        }
083
084        maxWFLength = conf.getInt(CONFG_MAX_WF_LENGTH, 100000);
085
086        oozieSubWfCPInheritance = conf.getBoolean(OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE, false);
087    }
088
089    /**
090     * Destroy the workflow application service.
091     */
092    public void destroy() {
093    }
094
095    /**
096     * Return the public interface for workflow application service.
097     *
098     * @return {@link WorkflowAppService}.
099     */
100    public Class<? extends Service> getInterface() {
101        return WorkflowAppService.class;
102    }
103
104    /**
105     * Read workflow definition.
106     *
107     *
108     * @param appPath application path.
109     * @param user user name.
110     * @return workflow definition.
111     * @throws WorkflowException thrown if the definition could not be read.
112     */
113    protected String readDefinition(String appPath, String user, Configuration conf)
114            throws WorkflowException {
115        try {
116            URI uri = new URI(appPath);
117            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
118            JobConf jobConf = has.createJobConf(uri.getAuthority());
119            FileSystem fs = has.createFileSystem(user, uri, jobConf);
120
121            // app path could be a directory
122            Path path = new Path(uri.getPath());
123            if (!fs.isFile(path)) {
124                path = new Path(path, "workflow.xml");
125            }
126
127            FileStatus fsStatus = fs.getFileStatus(path);
128            if (fsStatus.getLen() > this.maxWFLength) {
129                throw new WorkflowException(ErrorCode.E0736, fsStatus.getLen(), this.maxWFLength);
130            }
131
132            Reader reader = new InputStreamReader(fs.open(path));
133            StringWriter writer = new StringWriter();
134            IOUtils.copyCharStream(reader, writer);
135            return writer.toString();
136
137        }
138        catch (WorkflowException wfe) {
139            throw wfe;
140        }
141        catch (IOException ex) {
142            throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
143        }
144        catch (URISyntaxException ex) {
145            throw new WorkflowException(ErrorCode.E0711, appPath, ex.getMessage(), ex);
146        }
147        catch (HadoopAccessorException ex) {
148            throw new WorkflowException(ex);
149        }
150        catch (Exception ex) {
151            throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
152        }
153    }
154    /**
155     * Create proto configuration. <p/> The proto configuration includes the user,group and the paths which need to be
156     * added to distributed cache. These paths include .jar,.so and the resource file paths.
157     *
158     * @param jobConf job configuration.
159     * @param isWorkflowJob indicates if the job is a workflow job or not.
160     * @return proto configuration.
161     * @throws WorkflowException thrown if the proto action configuration could not be created.
162     */
163    public XConfiguration createProtoActionConf(Configuration jobConf, boolean isWorkflowJob)
164            throws WorkflowException {
165        try {
166            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
167            URI uri = new URI(jobConf.get(OozieClient.APP_PATH));
168
169            Configuration conf = has.createJobConf(uri.getAuthority());
170            XConfiguration protoConf = new XConfiguration();
171
172
173            String user = jobConf.get(OozieClient.USER_NAME);
174            conf.set(OozieClient.USER_NAME, user);
175            protoConf.set(OozieClient.USER_NAME, user);
176
177            FileSystem fs = has.createFileSystem(user, uri, conf);
178
179            Path appPath = new Path(uri);
180            XLog.getLog(getClass()).debug("jobConf.libPath = " + jobConf.get(OozieClient.LIBPATH));
181            XLog.getLog(getClass()).debug("jobConf.appPath = " + appPath);
182
183            Collection<String> filePaths;
184            if (isWorkflowJob) {
185                // app path could be a directory
186                Path path = new Path(uri.getPath());
187                if (!fs.isFile(path)) {
188                    filePaths = getLibFiles(fs, new Path(appPath + "/lib"));
189                } else {
190                    filePaths = getLibFiles(fs, new Path(appPath.getParent(), "lib"));
191                }
192            }
193            else {
194                filePaths = new LinkedHashSet<String>();
195            }
196
197            String[] libPaths = jobConf.getStrings(OozieClient.LIBPATH);
198            if (libPaths != null && libPaths.length > 0) {
199                for (int i = 0; i < libPaths.length; i++) {
200                    if (libPaths[i].trim().length() > 0) {
201                        Path libPath = new Path(libPaths[i].trim());
202                        Collection<String> libFilePaths = getLibFiles(fs, libPath);
203                        filePaths.addAll(libFilePaths);
204                    }
205                }
206            }
207
208            // Check if a subworkflow should inherit the libs from the parent WF
209            // OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE has priority over OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE from oozie-site
210            // If OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE isn't specified, we use OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE
211            if (jobConf.getBoolean(OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE, oozieSubWfCPInheritance)) {
212                // Keep any libs from a parent workflow that might already be in APP_LIB_PATH_LIST and also remove duplicates
213                String[] parentFilePaths = jobConf.getStrings(APP_LIB_PATH_LIST);
214                if (parentFilePaths != null && parentFilePaths.length > 0) {
215                    String[] filePathsNames = filePaths.toArray(new String[filePaths.size()]);
216                    for (int i = 0; i < filePathsNames.length; i++) {
217                        Path p = new Path(filePathsNames[i]);
218                        filePathsNames[i] = p.getName();
219                    }
220                    Arrays.sort(filePathsNames);
221                    List<String> nonDuplicateParentFilePaths = new ArrayList<String>();
222                    for (String parentFilePath : parentFilePaths) {
223                        Path p = new Path(parentFilePath);
224                        if (Arrays.binarySearch(filePathsNames, p.getName()) < 0) {
225                            nonDuplicateParentFilePaths.add(parentFilePath);
226                        }
227                    }
228                    filePaths.addAll(nonDuplicateParentFilePaths);
229                }
230            }
231
232            protoConf.setStrings(APP_LIB_PATH_LIST, filePaths.toArray(new String[filePaths.size()]));
233
234            //Add all properties start with 'oozie.'
235            for (Map.Entry<String, String> entry : jobConf) {
236                if (entry.getKey().startsWith("oozie.")) {
237                    String name = entry.getKey();
238                    String value = entry.getValue();
239                    // if property already exists, should not overwrite
240                    if(protoConf.get(name) == null) {
241                        protoConf.set(name, value);
242                    }
243                }
244            }
245            return protoConf;
246        }
247        catch (IOException ex) {
248            throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
249        }
250        catch (URISyntaxException ex) {
251            throw new WorkflowException(ErrorCode.E0711, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
252        }
253        catch (HadoopAccessorException ex) {
254            throw new WorkflowException(ex);
255        }
256        catch (Exception ex) {
257            throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH),
258                                        ex.getMessage(), ex);
259        }
260    }
261
262    /**
263     * Parse workflow definition.
264     *
265     * @param jobConf
266     * @return
267     * @throws WorkflowException
268     */
269    public abstract WorkflowApp parseDef(Configuration jobConf) throws WorkflowException;
270
271    /**
272     * Parse workflow definition along with config-default.xml config
273     *
274     * @param jobConf job configuration
275     * @param configDefault config from config-default.xml
276     * @return workflow application thrown if the workflow application could not
277     *         be parsed
278     * @throws WorkflowException
279     */
280    public abstract WorkflowApp parseDef(Configuration jobConf, Configuration configDefault) throws WorkflowException;
281
282    /**
283     * Parse workflow definition.
284     * @param wfXml workflow.
285     * @param jobConf job configuration
286     * @return workflow application.
287     * @throws WorkflowException thrown if the workflow application could not be parsed.
288     */
289    public abstract WorkflowApp parseDef(String wfXml, Configuration jobConf) throws WorkflowException;
290
291    /**
292     * Get all library paths.
293     *
294     * @param fs file system object.
295     * @param libPath hdfs library path.
296     * @return list of paths.
297     * @throws IOException thrown if the lib paths could not be obtained.
298     */
299    private Collection<String> getLibFiles(FileSystem fs, Path libPath) throws IOException {
300        Set<String> libPaths = new LinkedHashSet<String>();
301        if (fs.exists(libPath)) {
302            FileStatus[] files = fs.listStatus(libPath, new NoPathFilter());
303
304            for (FileStatus file : files) {
305                libPaths.add(file.getPath().toUri().toString());
306            }
307        }
308        else {
309            XLog.getLog(getClass()).warn("libpath [{0}] does not exist", libPath);
310        }
311        return libPaths;
312    }
313
314    /*
315     * Filter class doing no filtering.
316     * We dont need define this class, but seems fs.listStatus() is not working properly without this.
317     * So providing this dummy no filtering Filter class.
318     */
319    private class NoPathFilter implements PathFilter {
320        @Override
321        public boolean accept(Path path) {
322            return true;
323        }
324    }
325
326    /**
327     * Returns Oozie system libpath.
328     *
329     * @return Oozie system libpath (sharelib) in HDFS if present, otherwise it returns <code>NULL</code>.
330     */
331    public Path getSystemLibPath() {
332        return systemLibPath;
333    }
334}