001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.util; 019 020import java.io.IOException; 021import java.net.URI; 022import java.util.HashMap; 023import java.util.Map; 024 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.filecache.DistributedCache; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.oozie.ErrorCode; 030import org.apache.oozie.client.OozieClient; 031import org.apache.oozie.client.XOozieClient; 032import org.apache.oozie.command.CommandException; 033import org.apache.oozie.service.HadoopAccessorException; 034import org.apache.oozie.service.HadoopAccessorService; 035import org.apache.oozie.service.Services; 036 037/** 038 * Job utilities. 039 */ 040public class JobUtils { 041 /** 042 * Normalize appPath in job conf with the provided user/group - If it's not jobs via proxy submission, after 043 * normalization appPath always points to job's Xml definition file. 044 * <p/> 045 * 046 * @param user user 047 * @param group group 048 * @param conf job configuration. 049 * @throws IOException thrown if normalization can not be done properly. 050 */ 051 public static void normalizeAppPath(String user, String group, Configuration conf) throws IOException { 052 if (user == null) { 053 throw new IllegalArgumentException("user cannot be null"); 054 } 055 056 if (conf.get(XOozieClient.IS_PROXY_SUBMISSION) != null) { // do nothing for proxy submission job; 057 return; 058 } 059 060 String wfPathStr = conf.get(OozieClient.APP_PATH); 061 String coordPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH); 062 String bundlePathStr = conf.get(OozieClient.BUNDLE_APP_PATH); 063 String appPathStr = wfPathStr != null ? wfPathStr : (coordPathStr != null ? coordPathStr : bundlePathStr); 064 065 FileSystem fs = null; 066 try { 067 URI uri = new Path(appPathStr).toUri(); 068 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 069 Configuration fsConf = has.createJobConf(uri.getAuthority()); 070 fs = has.createFileSystem(user, uri, fsConf); 071 } 072 catch (HadoopAccessorException ex) { 073 throw new IOException(ex.getMessage()); 074 } 075 076 Path appPath = new Path(appPathStr); 077 String normalizedAppPathStr = appPathStr; 078 if (!fs.exists(appPath)) { 079 throw new IOException("Error: " + appPathStr + " does not exist"); 080 } 081 082 if (wfPathStr != null) { 083 conf.set(OozieClient.APP_PATH, normalizedAppPathStr); 084 } 085 else if (coordPathStr != null) { 086 conf.set(OozieClient.COORDINATOR_APP_PATH, normalizedAppPathStr); 087 } 088 else if (bundlePathStr != null) { 089 conf.set(OozieClient.BUNDLE_APP_PATH, normalizedAppPathStr); 090 } 091 } 092 093 /** 094 * This Function will parse the value of the changed values in key value manner. the change value would be 095 * key1=value1;key2=value2 096 * 097 * @param changeValue change value. 098 * @return This returns the hash with hash<[key1,value1],[key2,value2]> 099 * @throws CommandException thrown if changeValue cannot be parsed properly. 100 */ 101 public static Map<String, String> parseChangeValue(String changeValue) throws CommandException { 102 if (changeValue == null || changeValue.trim().equalsIgnoreCase("")) { 103 throw new CommandException(ErrorCode.E1015, "change value can not be empty string or null"); 104 } 105 106 Map<String, String> map = new HashMap<String, String>(); 107 108 String[] tokens = changeValue.split(";"); 109 for (String token : tokens) { 110 if (!token.contains("=")) { 111 throw new CommandException(ErrorCode.E1015, changeValue, 112 "change value must be name=value pair or name=(empty string)"); 113 } 114 115 String[] pair = token.split("="); 116 String key = pair[0]; 117 118 if (map.containsKey(key)) { 119 throw new CommandException(ErrorCode.E1015, changeValue, "can not specify repeated change values on " 120 + key); 121 } 122 123 if (pair.length == 2) { 124 map.put(key, pair[1]); 125 } 126 else if (pair.length == 1) { 127 map.put(key, ""); 128 } 129 else { 130 throw new CommandException(ErrorCode.E1015, changeValue, "elements on " + key 131 + " must be name=value pair or name=(empty string)"); 132 } 133 } 134 135 return map; 136 } 137 138 /** 139 * This method provides a wrapper around hadoop 0.20/1.x and 0.23/2.x implementations. 140 * TODO: Remove the workaround when we drop the support for hadoop 0.20. 141 * @param file Path of the file to be added 142 * @param conf Configuration that contains the classpath setting 143 * @param fs FileSystem with respect to which path should be interpreted (may be null) 144 * @throws IOException 145 */ 146 public static void addFileToClassPath(Path file, Configuration conf, FileSystem fs) throws IOException { 147 Configuration defaultConf = new Configuration(); 148 XConfiguration.copy(conf, defaultConf); 149 if (fs == null) { 150 // it fails with conf, therefore we pass defaultConf instead 151 fs = file.getFileSystem(defaultConf); 152 } 153 // Hadoop 0.20/1.x. 154 if (defaultConf.get("yarn.resourcemanager.address") == null) { 155 // Duplicate hadoop 1.x code to workaround MAPREDUCE-2361 in Hadoop 0.20 156 // Refer OOZIE-1806. 157 String filepath = file.toUri().getPath(); 158 String classpath = conf.get("mapred.job.classpath.files"); 159 conf.set("mapred.job.classpath.files", classpath == null 160 ? filepath 161 : classpath + System.getProperty("path.separator") + filepath); 162 URI uri = fs.makeQualified(file).toUri(); 163 DistributedCache.addCacheFile(uri, conf); 164 } 165 else { // Hadoop 0.23/2.x 166 DistributedCache.addFileToClassPath(file, conf, fs); 167 } 168 } 169}