This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.util;
019
020 import java.io.IOException;
021 import java.net.URI;
022 import java.util.HashMap;
023 import java.util.Map;
024
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.fs.FileSystem;
027 import org.apache.hadoop.fs.Path;
028 import org.apache.oozie.ErrorCode;
029 import org.apache.oozie.client.OozieClient;
030 import org.apache.oozie.client.XOozieClient;
031 import org.apache.oozie.command.CommandException;
032 import org.apache.oozie.service.HadoopAccessorException;
033 import org.apache.oozie.service.HadoopAccessorService;
034 import org.apache.oozie.service.Services;
035
036 /**
037 * Job utilities.
038 */
039 public class JobUtils {
040 /**
041 * Normalize appPath in job conf with the provided user/group - If it's not jobs via proxy submission, after
042 * normalization appPath always points to job's Xml definition file.
043 * <p/>
044 *
045 * @param user user
046 * @param group group
047 * @param conf job configuration.
048 * @throws IOException thrown if normalization can not be done properly.
049 */
050 public static void normalizeAppPath(String user, String group, Configuration conf) throws IOException {
051 if (user == null) {
052 throw new IllegalArgumentException("user cannot be null");
053 }
054
055 if (conf.get(XOozieClient.IS_PROXY_SUBMISSION) != null) { // do nothing for proxy submission job;
056 return;
057 }
058
059 String wfPathStr = conf.get(OozieClient.APP_PATH);
060 String coordPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH);
061 String bundlePathStr = conf.get(OozieClient.BUNDLE_APP_PATH);
062 String appPathStr = wfPathStr != null ? wfPathStr : (coordPathStr != null ? coordPathStr : bundlePathStr);
063
064 FileSystem fs = null;
065 try {
066 URI uri = new Path(appPathStr).toUri();
067 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
068 Configuration fsConf = has.createJobConf(uri.getAuthority());
069 fs = has.createFileSystem(user, uri, fsConf);
070 }
071 catch (HadoopAccessorException ex) {
072 throw new IOException(ex.getMessage());
073 }
074
075 Path appPath = new Path(appPathStr);
076 String normalizedAppPathStr = appPathStr;
077 if (!fs.exists(appPath)) {
078 throw new IOException("Error: " + appPathStr + " does not exist");
079 }
080
081 if (wfPathStr != null) {
082 conf.set(OozieClient.APP_PATH, normalizedAppPathStr);
083 }
084 else if (coordPathStr != null) {
085 conf.set(OozieClient.COORDINATOR_APP_PATH, normalizedAppPathStr);
086 }
087 else if (bundlePathStr != null) {
088 conf.set(OozieClient.BUNDLE_APP_PATH, normalizedAppPathStr);
089 }
090 }
091
092 /**
093 * This Function will parse the value of the changed values in key value manner. the change value would be
094 * key1=value1;key2=value2
095 *
096 * @param changeValue change value.
097 * @return This returns the hash with hash<[key1,value1],[key2,value2]>
098 * @throws CommandException thrown if changeValue cannot be parsed properly.
099 */
100 public static Map<String, String> parseChangeValue(String changeValue) throws CommandException {
101 if (changeValue == null || changeValue.trim().equalsIgnoreCase("")) {
102 throw new CommandException(ErrorCode.E1015, "change value can not be empty string or null");
103 }
104
105 Map<String, String> map = new HashMap<String, String>();
106
107 String[] tokens = changeValue.split(";");
108 for (String token : tokens) {
109 if (!token.contains("=")) {
110 throw new CommandException(ErrorCode.E1015, changeValue,
111 "change value must be name=value pair or name=(empty string)");
112 }
113
114 String[] pair = token.split("=");
115 String key = pair[0];
116
117 if (map.containsKey(key)) {
118 throw new CommandException(ErrorCode.E1015, changeValue, "can not specify repeated change values on "
119 + key);
120 }
121
122 if (pair.length == 2) {
123 map.put(key, pair[1]);
124 }
125 else if (pair.length == 1) {
126 map.put(key, "");
127 }
128 else {
129 throw new CommandException(ErrorCode.E1015, changeValue, "elements on " + key
130 + " must be name=value pair or name=(empty string)");
131 }
132 }
133
134 return map;
135 }
136 }