001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.util;
020
021import java.io.IOException;
022import java.net.URI;
023import java.util.HashMap;
024import java.util.Map;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.filecache.DistributedCache;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.oozie.ErrorCode;
031import org.apache.oozie.action.hadoop.JavaActionExecutor;
032import org.apache.oozie.client.OozieClient;
033import org.apache.oozie.client.XOozieClient;
034import org.apache.oozie.command.CommandException;
035import org.apache.oozie.service.HadoopAccessorException;
036import org.apache.oozie.service.HadoopAccessorService;
037import org.apache.oozie.service.Services;
038
039/**
040 * Job utilities.
041 */
042public class JobUtils {
043    /**
044     * Normalize appPath in job conf with the provided user/group - If it's not jobs via proxy submission, after
045     * normalization appPath always points to job's Xml definition file.
046     * <p>
047     *
048     * @param user user
049     * @param group group
050     * @param conf job configuration.
051     * @throws IOException thrown if normalization can not be done properly.
052     */
053    public static void normalizeAppPath(String user, String group, Configuration conf) throws IOException {
054        ParamChecker.notNull(user, "user");
055
056        if (conf.get(XOozieClient.IS_PROXY_SUBMISSION) != null) { // do nothing for proxy submission job;
057            return;
058        }
059
060        String wfPathStr = conf.get(OozieClient.APP_PATH);
061        String coordPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH);
062        String bundlePathStr = conf.get(OozieClient.BUNDLE_APP_PATH);
063        String appPathStr = wfPathStr != null ? wfPathStr : (coordPathStr != null ? coordPathStr : bundlePathStr);
064
065        FileSystem fs = null;
066        try {
067            URI uri = new Path(appPathStr).toUri();
068            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
069            Configuration fsConf = has.createJobConf(uri.getAuthority());
070            fs = has.createFileSystem(user, uri, fsConf);
071        }
072        catch (HadoopAccessorException ex) {
073            throw new IOException(ex.getMessage());
074        }
075
076        Path appPath = new Path(appPathStr);
077        String normalizedAppPathStr = appPathStr;
078        if (!fs.exists(appPath)) {
079            throw new IOException("Error: " + appPathStr + " does not exist");
080        }
081
082        if (wfPathStr != null) {
083            conf.set(OozieClient.APP_PATH, normalizedAppPathStr);
084        }
085        else if (coordPathStr != null) {
086            conf.set(OozieClient.COORDINATOR_APP_PATH, normalizedAppPathStr);
087        }
088        else if (bundlePathStr != null) {
089            conf.set(OozieClient.BUNDLE_APP_PATH, normalizedAppPathStr);
090        }
091    }
092
093    /**
094     * This Function will parse the value of the changed values in key value manner. the change value would be
095     * key1=value1;key2=value2
096     *
097     * @param changeValue change value.
098     * @return This returns the hash with hash&lt;[key1,value1],[key2,value2]&gt;
099     * @throws CommandException thrown if changeValue cannot be parsed properly.
100     */
101    public static Map<String, String> parseChangeValue(String changeValue) throws CommandException {
102        if (changeValue == null || changeValue.trim().equalsIgnoreCase("")) {
103            throw new CommandException(ErrorCode.E1015, "change value can not be empty string or null");
104        }
105
106        Map<String, String> map = new HashMap<String, String>();
107
108        String[] tokens = changeValue.split(";");
109        for (String token : tokens) {
110            if (!token.contains("=")) {
111                throw new CommandException(ErrorCode.E1015, changeValue,
112                        "change value must be name=value pair or name=(empty string)");
113            }
114
115            String[] pair = token.split("=");
116            String key = pair[0];
117
118            if (map.containsKey(key)) {
119                throw new CommandException(ErrorCode.E1015, changeValue, "can not specify repeated change values on "
120                        + key);
121            }
122
123            if (pair.length == 2) {
124                map.put(key, pair[1]);
125            }
126            else if (pair.length == 1) {
127                map.put(key, "");
128            }
129            else {
130                throw new CommandException(ErrorCode.E1015, changeValue, "elements on " + key
131                        + " must be name=value pair or name=(empty string)");
132            }
133        }
134
135        return map;
136    }
137
138    /**
139     * This method provides a wrapper around hadoop 0.20/1.x and 0.23/2.x implementations.
140     * TODO: Remove the workaround when we drop the support for hadoop 0.20.
141     * @param file Path of the file to be added
142     * @param conf Configuration that contains the classpath setting
143     * @param fs FileSystem with respect to which path should be interpreted (may be null)
144     * @throws IOException
145     */
146    public static void addFileToClassPath(Path file, Configuration conf, FileSystem fs) throws IOException {
147        if (fs == null) {
148            Configuration defaultConf = Services.get().get(HadoopAccessorService.class)
149                    .createJobConf(conf.get(JavaActionExecutor.HADOOP_JOB_TRACKER));
150            XConfiguration.copy(conf, defaultConf);
151            // it fails with conf, therefore we pass defaultConf instead
152            fs = file.getFileSystem(defaultConf);
153        }
154        // Hadoop 0.20/1.x.
155        if (Services.get().get(HadoopAccessorService.class).getCachedConf().get("yarn.resourcemanager.webapp.address") == null) {
156            // Duplicate hadoop 1.x code to workaround MAPREDUCE-2361 in Hadoop 0.20
157            // Refer OOZIE-1806.
158            String filepath = file.toUri().getPath();
159            String classpath = conf.get("mapred.job.classpath.files");
160            conf.set("mapred.job.classpath.files",
161                    classpath == null ? filepath : classpath + System.getProperty("path.separator") + filepath);
162            URI uri = fs.makeQualified(file).toUri();
163            DistributedCache.addCacheFile(uri, conf);
164        }
165        else { // Hadoop 2.x
166            DistributedCache.addFileToClassPath(file, conf, fs);
167        }
168    }
169}