001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.util;
020
021import java.io.IOException;
022import java.net.URI;
023import java.util.HashMap;
024import java.util.Map;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.filecache.DistributedCache;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.oozie.ErrorCode;
031import org.apache.oozie.WorkflowActionBean;
032import org.apache.oozie.action.hadoop.JavaActionExecutor;
033import org.apache.oozie.client.OozieClient;
034import org.apache.oozie.client.XOozieClient;
035import org.apache.oozie.command.CommandException;
036import org.apache.oozie.command.wf.ActionXCommand;
037import org.apache.oozie.service.HadoopAccessorException;
038import org.apache.oozie.service.HadoopAccessorService;
039import org.apache.oozie.service.Services;
040
041/**
042 * Job utilities.
043 */
044public class JobUtils {
045    /**
046     * Normalize appPath in job conf with the provided user/group - If it's not jobs via proxy submission, after
047     * normalization appPath always points to job's Xml definition file.
048     * <p>
049     *
050     * @param user user
051     * @param group group
052     * @param conf job configuration.
053     * @throws IOException thrown if normalization can not be done properly.
054     */
055    public static void normalizeAppPath(String user, String group, Configuration conf) throws IOException {
056        ParamChecker.notNull(user, "user");
057
058        if (conf.get(XOozieClient.IS_PROXY_SUBMISSION) != null) { // do nothing for proxy submission job;
059            return;
060        }
061
062        String wfPathStr = conf.get(OozieClient.APP_PATH);
063        String coordPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH);
064        String bundlePathStr = conf.get(OozieClient.BUNDLE_APP_PATH);
065        String appPathStr = wfPathStr != null ? wfPathStr : (coordPathStr != null ? coordPathStr : bundlePathStr);
066
067        FileSystem fs = null;
068        try {
069            URI uri = new Path(appPathStr).toUri();
070            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
071            Configuration fsConf = has.createConfiguration(uri.getAuthority());
072            fs = has.createFileSystem(user, uri, fsConf);
073        }
074        catch (HadoopAccessorException ex) {
075            throw new IOException(ex.getMessage());
076        }
077
078        Path appPath = new Path(appPathStr);
079        String normalizedAppPathStr = appPathStr;
080        if (!fs.exists(appPath)) {
081            throw new IOException("Error: " + appPathStr + " does not exist");
082        }
083
084        if (wfPathStr != null) {
085            conf.set(OozieClient.APP_PATH, normalizedAppPathStr);
086        }
087        else if (coordPathStr != null) {
088            conf.set(OozieClient.COORDINATOR_APP_PATH, normalizedAppPathStr);
089        }
090        else if (bundlePathStr != null) {
091            conf.set(OozieClient.BUNDLE_APP_PATH, normalizedAppPathStr);
092        }
093    }
094
095    /**
096     * This Function will parse the value of the changed values in key value manner. the change value would be
097     * key1=value1;key2=value2
098     *
099     * @param changeValue change value.
100     * @return This returns the hash with hash&lt;[key1,value1],[key2,value2]&gt;
101     * @throws CommandException thrown if changeValue cannot be parsed properly.
102     */
103    public static Map<String, String> parseChangeValue(String changeValue) throws CommandException {
104        if (changeValue == null || changeValue.trim().equalsIgnoreCase("")) {
105            throw new CommandException(ErrorCode.E1015, "change value can not be empty string or null");
106        }
107
108        Map<String, String> map = new HashMap<String, String>();
109
110        String[] tokens = changeValue.split(";");
111        for (String token : tokens) {
112            if (!token.contains("=")) {
113                throw new CommandException(ErrorCode.E1015, changeValue,
114                        "change value must be name=value pair or name=(empty string)");
115            }
116
117            String[] pair = token.split("=");
118            String key = pair[0];
119
120            if (map.containsKey(key)) {
121                throw new CommandException(ErrorCode.E1015, changeValue, "can not specify repeated change values on "
122                        + key);
123            }
124
125            if (pair.length == 2) {
126                map.put(key, pair[1]);
127            }
128            else if (pair.length == 1) {
129                map.put(key, "");
130            }
131            else {
132                throw new CommandException(ErrorCode.E1015, changeValue, "elements on " + key
133                        + " must be name=value pair or name=(empty string)");
134            }
135        }
136
137        return map;
138    }
139
140    /**
141     * This method provides a wrapper around hadoop 2.x implementations.
142     * @param file Path of the file to be added
143     * @param conf Configuration that contains the classpath setting
144     * @param fs FileSystem with respect to which path should be interpreted (may be null)
145     * @throws IOException if the file can't be added to the classpath
146     */
147    public static void addFileToClassPath(Path file, Configuration conf, FileSystem fs) throws IOException {
148        if (fs == null) {
149            Configuration defaultConf = Services.get().get(HadoopAccessorService.class)
150                    .createConfiguration(conf.get(JavaActionExecutor.HADOOP_YARN_RM));
151            XConfiguration.copy(conf, defaultConf);
152            // it fails with conf, therefore we pass defaultConf instead
153            fs = file.getFileSystem(defaultConf);
154        }
155
156        DistributedCache.addFileToClassPath(file, conf, fs);
157    }
158
159    public static String getRetryKey(WorkflowActionBean wfAction, String key) {
160        return ActionXCommand.RETRY + wfAction.getUserRetryCount() + "." + key;
161    }
162
163    public static String getRetryKey(String key, int retry) {
164        return ActionXCommand.RETRY + retry + "." + key;
165    }
166}