001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.fs.FileStatus;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.fs.PathFilter;
026import org.apache.hadoop.mapred.JobConf;
027import org.apache.oozie.client.OozieClient;
028import org.apache.oozie.workflow.WorkflowApp;
029import org.apache.oozie.workflow.WorkflowException;
030import org.apache.oozie.util.IOUtils;
031import org.apache.oozie.util.XConfiguration;
032import org.apache.oozie.util.XLog;
033import org.apache.oozie.ErrorCode;
034
035import java.io.IOException;
036import java.io.InputStreamReader;
037import java.io.Reader;
038import java.io.StringWriter;
039import java.net.URI;
040import java.net.URISyntaxException;
041import java.util.ArrayList;
042import java.util.Arrays;
043import java.util.Collection;
044import java.util.LinkedHashSet;
045import java.util.List;
046import java.util.Map;
047import java.util.Set;
048
049/**
050 * Service that provides application workflow definition reading from the path and creation of the proto configuration.
051 */
052public abstract class WorkflowAppService implements Service {
053
054    public static final String CONF_PREFIX = Service.CONF_PREFIX + "WorkflowAppService.";
055
056    public static final String SYSTEM_LIB_PATH = CONF_PREFIX + "system.libpath";
057
058    public static final String APP_LIB_PATH_LIST = "oozie.wf.application.lib";
059
060    public static final String HADOOP_USER = "user.name";
061
062    public static final String CONFG_MAX_WF_LENGTH = CONF_PREFIX + "WorkflowDefinitionMaxLength";
063
064    public static final String OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE = "oozie.subworkflow.classpath.inheritance";
065
066    public static final String OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE = "oozie.wf.subworkflow.classpath.inheritance";
067
068    private Path systemLibPath;
069    private long maxWFLength;
070    private boolean oozieSubWfCPInheritance;
071
072    /**
073     * Initialize the workflow application service.
074     *
075     * @param services services instance.
076     */
077    public void init(Services services) {
078        Configuration conf = services.getConf();
079
080        String path = ConfigurationService.get(conf, SYSTEM_LIB_PATH);
081        if (path.trim().length() > 0) {
082            systemLibPath = new Path(path.trim());
083        }
084
085        maxWFLength = conf.getInt(CONFG_MAX_WF_LENGTH, 100000);
086
087        oozieSubWfCPInheritance = conf.getBoolean(OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE, false);
088    }
089
090    /**
091     * Destroy the workflow application service.
092     */
093    public void destroy() {
094    }
095
096    /**
097     * Return the public interface for workflow application service.
098     *
099     * @return {@link WorkflowAppService}.
100     */
101    public Class<? extends Service> getInterface() {
102        return WorkflowAppService.class;
103    }
104
105    /**
106     * Read workflow definition.
107     *
108     *
109     * @param appPath application path.
110     * @param user user name.
111     * @return workflow definition.
112     * @throws WorkflowException thrown if the definition could not be read.
113     */
114    protected String readDefinition(String appPath, String user, Configuration conf)
115            throws WorkflowException {
116        try {
117            URI uri = new URI(appPath);
118            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
119            JobConf jobConf = has.createJobConf(uri.getAuthority());
120            FileSystem fs = has.createFileSystem(user, uri, jobConf);
121
122            // app path could be a directory
123            Path path = new Path(uri.getPath());
124            if (!fs.isFile(path)) {
125                path = new Path(path, "workflow.xml");
126            }
127
128            FileStatus fsStatus = fs.getFileStatus(path);
129            if (fsStatus.getLen() > this.maxWFLength) {
130                throw new WorkflowException(ErrorCode.E0736, fsStatus.getLen(), this.maxWFLength);
131            }
132
133            Reader reader = new InputStreamReader(fs.open(path));
134            StringWriter writer = new StringWriter();
135            IOUtils.copyCharStream(reader, writer);
136            return writer.toString();
137
138        }
139        catch (WorkflowException wfe) {
140            throw wfe;
141        }
142        catch (IOException ex) {
143            throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
144        }
145        catch (URISyntaxException ex) {
146            throw new WorkflowException(ErrorCode.E0711, appPath, ex.getMessage(), ex);
147        }
148        catch (HadoopAccessorException ex) {
149            throw new WorkflowException(ex);
150        }
151        catch (Exception ex) {
152            throw new WorkflowException(ErrorCode.E0710, ex.getMessage(), ex);
153        }
154    }
155    /**
156     * Create proto configuration. <p> The proto configuration includes the user,group and the paths which need to be
157     * added to distributed cache. These paths include .jar,.so and the resource file paths.
158     *
159     * @param jobConf job configuration.
160     * @param isWorkflowJob indicates if the job is a workflow job or not.
161     * @return proto configuration.
162     * @throws WorkflowException thrown if the proto action configuration could not be created.
163     */
164    public XConfiguration createProtoActionConf(Configuration jobConf, boolean isWorkflowJob)
165            throws WorkflowException {
166        try {
167            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
168            URI uri = new URI(jobConf.get(OozieClient.APP_PATH));
169
170            Configuration conf = has.createJobConf(uri.getAuthority());
171            XConfiguration protoConf = new XConfiguration();
172
173
174            String user = jobConf.get(OozieClient.USER_NAME);
175            conf.set(OozieClient.USER_NAME, user);
176            protoConf.set(OozieClient.USER_NAME, user);
177
178            FileSystem fs = has.createFileSystem(user, uri, conf);
179
180            Path appPath = new Path(uri);
181            XLog.getLog(getClass()).debug("jobConf.libPath = " + jobConf.get(OozieClient.LIBPATH));
182            XLog.getLog(getClass()).debug("jobConf.appPath = " + appPath);
183
184            Collection<String> filePaths;
185            if (isWorkflowJob) {
186                // app path could be a directory
187                Path path = new Path(uri.getPath());
188                if (!fs.isFile(path)) {
189                    filePaths = getLibFiles(fs, new Path(appPath + "/lib"));
190                } else {
191                    filePaths = getLibFiles(fs, new Path(appPath.getParent(), "lib"));
192                }
193            }
194            else {
195                filePaths = new LinkedHashSet<String>();
196            }
197
198            String[] libPaths = jobConf.getStrings(OozieClient.LIBPATH);
199            if (libPaths != null && libPaths.length > 0) {
200                for (int i = 0; i < libPaths.length; i++) {
201                    if (libPaths[i].trim().length() > 0) {
202                        Path libPath = new Path(libPaths[i].trim());
203                        Collection<String> libFilePaths = getLibFiles(fs, libPath);
204                        filePaths.addAll(libFilePaths);
205                    }
206                }
207            }
208
209            // Check if a subworkflow should inherit the libs from the parent WF
210            // OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE has priority over OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE from oozie-site
211            // If OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE isn't specified, we use OOZIE_SUBWORKFLOW_CLASSPATH_INHERITANCE
212            if (jobConf.getBoolean(OOZIE_WF_SUBWORKFLOW_CLASSPATH_INHERITANCE, oozieSubWfCPInheritance)) {
213                // Keep any libs from a parent workflow that might already be in APP_LIB_PATH_LIST and also remove duplicates
214                String[] parentFilePaths = jobConf.getStrings(APP_LIB_PATH_LIST);
215                if (parentFilePaths != null && parentFilePaths.length > 0) {
216                    String[] filePathsNames = filePaths.toArray(new String[filePaths.size()]);
217                    for (int i = 0; i < filePathsNames.length; i++) {
218                        Path p = new Path(filePathsNames[i]);
219                        filePathsNames[i] = p.getName();
220                    }
221                    Arrays.sort(filePathsNames);
222                    List<String> nonDuplicateParentFilePaths = new ArrayList<String>();
223                    for (String parentFilePath : parentFilePaths) {
224                        Path p = new Path(parentFilePath);
225                        if (Arrays.binarySearch(filePathsNames, p.getName()) < 0) {
226                            nonDuplicateParentFilePaths.add(parentFilePath);
227                        }
228                    }
229                    filePaths.addAll(nonDuplicateParentFilePaths);
230                }
231            }
232
233            protoConf.setStrings(APP_LIB_PATH_LIST, filePaths.toArray(new String[filePaths.size()]));
234
235            //Add all properties start with 'oozie.'
236            for (Map.Entry<String, String> entry : jobConf) {
237                if (entry.getKey().startsWith("oozie.")) {
238                    String name = entry.getKey();
239                    String value = entry.getValue();
240                    // if property already exists, should not overwrite
241                    if(protoConf.get(name) == null) {
242                        protoConf.set(name, value);
243                    }
244                }
245            }
246            return protoConf;
247        }
248        catch (IOException ex) {
249            throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
250        }
251        catch (URISyntaxException ex) {
252            throw new WorkflowException(ErrorCode.E0711, jobConf.get(OozieClient.APP_PATH), ex.getMessage(), ex);
253        }
254        catch (HadoopAccessorException ex) {
255            throw new WorkflowException(ex);
256        }
257        catch (Exception ex) {
258            throw new WorkflowException(ErrorCode.E0712, jobConf.get(OozieClient.APP_PATH),
259                                        ex.getMessage(), ex);
260        }
261    }
262
263    /**
264     * Parse workflow definition.
265     *
266     * @param jobConf
267     * @return
268     * @throws WorkflowException
269     */
270    public abstract WorkflowApp parseDef(Configuration jobConf) throws WorkflowException;
271
272    /**
273     * Parse workflow definition along with config-default.xml config
274     *
275     * @param jobConf job configuration
276     * @param configDefault config from config-default.xml
277     * @return workflow application thrown if the workflow application could not
278     *         be parsed
279     * @throws WorkflowException
280     */
281    public abstract WorkflowApp parseDef(Configuration jobConf, Configuration configDefault) throws WorkflowException;
282
283    /**
284     * Parse workflow definition.
285     * @param wfXml workflow.
286     * @param jobConf job configuration
287     * @return workflow application.
288     * @throws WorkflowException thrown if the workflow application could not be parsed.
289     */
290    public abstract WorkflowApp parseDef(String wfXml, Configuration jobConf) throws WorkflowException;
291
292    /**
293     * Get all library paths.
294     *
295     * @param fs file system object.
296     * @param libPath hdfs library path.
297     * @return list of paths.
298     * @throws IOException thrown if the lib paths could not be obtained.
299     */
300    private Collection<String> getLibFiles(FileSystem fs, Path libPath) throws IOException {
301        Set<String> libPaths = new LinkedHashSet<String>();
302        if (fs.exists(libPath)) {
303            FileStatus[] files = fs.listStatus(libPath, new NoPathFilter());
304
305            for (FileStatus file : files) {
306                libPaths.add(file.getPath().toUri().toString());
307            }
308        }
309        else {
310            XLog.getLog(getClass()).warn("libpath [{0}] does not exist", libPath);
311        }
312        return libPaths;
313    }
314
315    /*
316     * Filter class doing no filtering.
317     * We dont need define this class, but seems fs.listStatus() is not working properly without this.
318     * So providing this dummy no filtering Filter class.
319     */
320    private class NoPathFilter implements PathFilter {
321        @Override
322        public boolean accept(Path path) {
323            return true;
324        }
325    }
326
327    /**
328     * Returns Oozie system libpath.
329     *
330     * @return Oozie system libpath (sharelib) in HDFS if present, otherwise it returns <code>NULL</code>.
331     */
332    public Path getSystemLibPath() {
333        return systemLibPath;
334    }
335}