This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.util;
020
021import java.io.IOException;
022import java.net.URI;
023import java.util.HashMap;
024import java.util.Map;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.filecache.DistributedCache;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.oozie.ErrorCode;
031import org.apache.oozie.client.OozieClient;
032import org.apache.oozie.client.XOozieClient;
033import org.apache.oozie.command.CommandException;
034import org.apache.oozie.service.HadoopAccessorException;
035import org.apache.oozie.service.HadoopAccessorService;
036import org.apache.oozie.service.Services;
037
038/**
039 * Job utilities.
040 */
041public class JobUtils {
042    /**
043     * Normalize appPath in job conf with the provided user/group - If it's not jobs via proxy submission, after
044     * normalization appPath always points to job's Xml definition file.
045     * <p/>
046     *
047     * @param user user
048     * @param group group
049     * @param conf job configuration.
050     * @throws IOException thrown if normalization can not be done properly.
051     */
052    public static void normalizeAppPath(String user, String group, Configuration conf) throws IOException {
053        if (user == null) {
054            throw new IllegalArgumentException("user cannot be null");
055        }
056
057        if (conf.get(XOozieClient.IS_PROXY_SUBMISSION) != null) { // do nothing for proxy submission job;
058            return;
059        }
060
061        String wfPathStr = conf.get(OozieClient.APP_PATH);
062        String coordPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH);
063        String bundlePathStr = conf.get(OozieClient.BUNDLE_APP_PATH);
064        String appPathStr = wfPathStr != null ? wfPathStr : (coordPathStr != null ? coordPathStr : bundlePathStr);
065
066        FileSystem fs = null;
067        try {
068            URI uri = new Path(appPathStr).toUri();
069            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
070            Configuration fsConf = has.createJobConf(uri.getAuthority());
071            fs = has.createFileSystem(user, uri, fsConf);
072        }
073        catch (HadoopAccessorException ex) {
074            throw new IOException(ex.getMessage());
075        }
076
077        Path appPath = new Path(appPathStr);
078        String normalizedAppPathStr = appPathStr;
079        if (!fs.exists(appPath)) {
080            throw new IOException("Error: " + appPathStr + " does not exist");
081        }
082
083        if (wfPathStr != null) {
084            conf.set(OozieClient.APP_PATH, normalizedAppPathStr);
085        }
086        else if (coordPathStr != null) {
087            conf.set(OozieClient.COORDINATOR_APP_PATH, normalizedAppPathStr);
088        }
089        else if (bundlePathStr != null) {
090            conf.set(OozieClient.BUNDLE_APP_PATH, normalizedAppPathStr);
091        }
092    }
093
094    /**
095     * This Function will parse the value of the changed values in key value manner. the change value would be
096     * key1=value1;key2=value2
097     *
098     * @param changeValue change value.
099     * @return This returns the hash with hash<[key1,value1],[key2,value2]>
100     * @throws CommandException thrown if changeValue cannot be parsed properly.
101     */
102    public static Map<String, String> parseChangeValue(String changeValue) throws CommandException {
103        if (changeValue == null || changeValue.trim().equalsIgnoreCase("")) {
104            throw new CommandException(ErrorCode.E1015, "change value can not be empty string or null");
105        }
106
107        Map<String, String> map = new HashMap<String, String>();
108
109        String[] tokens = changeValue.split(";");
110        for (String token : tokens) {
111            if (!token.contains("=")) {
112                throw new CommandException(ErrorCode.E1015, changeValue,
113                        "change value must be name=value pair or name=(empty string)");
114            }
115
116            String[] pair = token.split("=");
117            String key = pair[0];
118
119            if (map.containsKey(key)) {
120                throw new CommandException(ErrorCode.E1015, changeValue, "can not specify repeated change values on "
121                        + key);
122            }
123
124            if (pair.length == 2) {
125                map.put(key, pair[1]);
126            }
127            else if (pair.length == 1) {
128                map.put(key, "");
129            }
130            else {
131                throw new CommandException(ErrorCode.E1015, changeValue, "elements on " + key
132                        + " must be name=value pair or name=(empty string)");
133            }
134        }
135
136        return map;
137    }
138
139    /**
140     * This method provides a wrapper around hadoop 0.20/1.x and 0.23/2.x implementations.
141     * TODO: Remove the workaround when we drop the support for hadoop 0.20.
142     * @param file Path of the file to be added
143     * @param conf Configuration that contains the classpath setting
144     * @param fs FileSystem with respect to which path should be interpreted (may be null)
145     * @throws IOException
146     */
147    public static void addFileToClassPath(Path file, Configuration conf, FileSystem fs) throws IOException {
148      Configuration defaultConf = new Configuration();
149      XConfiguration.copy(conf, defaultConf);
150      if (fs == null) {
151        // it fails with conf, therefore we pass defaultConf instead
152        fs = file.getFileSystem(defaultConf);
153      }
154      // Hadoop 0.20/1.x.
155      if (defaultConf.get("yarn.resourcemanager.webapp.address") == null) {
156          // Duplicate hadoop 1.x code to workaround MAPREDUCE-2361 in Hadoop 0.20
157          // Refer OOZIE-1806.
158          String filepath = file.toUri().getPath();
159          String classpath = conf.get("mapred.job.classpath.files");
160          conf.set("mapred.job.classpath.files", classpath == null
161              ? filepath
162              : classpath + System.getProperty("path.separator") + filepath);
163          URI uri = fs.makeQualified(file).toUri();
164          DistributedCache.addCacheFile(uri, conf);
165      }
166      else { // Hadoop 0.23/2.x
167          DistributedCache.addFileToClassPath(file, conf, fs);
168      }
169    }
170}