001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.util; 019 020 import java.io.IOException; 021 import java.util.HashMap; 022 import java.util.Map; 023 024 import org.apache.hadoop.conf.Configuration; 025 import org.apache.hadoop.fs.FileSystem; 026 import org.apache.hadoop.fs.Path; 027 import org.apache.oozie.ErrorCode; 028 import org.apache.oozie.client.OozieClient; 029 import org.apache.oozie.client.XOozieClient; 030 import org.apache.oozie.command.CommandException; 031 import org.apache.oozie.service.HadoopAccessorException; 032 import org.apache.oozie.service.HadoopAccessorService; 033 import org.apache.oozie.service.Services; 034 035 /** 036 * Job utilities. 037 */ 038 public class JobUtils { 039 /** 040 * Normalize appPath in job conf with the provided user/group - If it's not jobs via proxy submission, after 041 * normalization appPath always points to job's Xml definition file. 042 * <p/> 043 * 044 * @param user user 045 * @param group group 046 * @param conf job configuration. 047 * @throws IOException thrown if normalization can not be done properly. 048 */ 049 public static void normalizeAppPath(String user, String group, Configuration conf) throws IOException { 050 if (user == null) { 051 throw new IllegalArgumentException("user cannot be null"); 052 } 053 054 if (group == null) { 055 throw new IllegalArgumentException("group cannot be null"); 056 } 057 058 if (conf.get(XOozieClient.IS_PROXY_SUBMISSION) != null) { // do nothing for proxy submission job; 059 return; 060 } 061 062 String wfPathStr = conf.get(OozieClient.APP_PATH); 063 String coordPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH); 064 String bundlePathStr = conf.get(OozieClient.BUNDLE_APP_PATH); 065 String appPathStr = wfPathStr != null ? wfPathStr : (coordPathStr != null ? coordPathStr : bundlePathStr); 066 067 FileSystem fs = null; 068 try { 069 fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, new Path(appPathStr).toUri(), conf); 070 } 071 catch (HadoopAccessorException ex) { 072 throw new IOException(ex.getMessage()); 073 } 074 075 Path appPath = new Path(appPathStr); 076 String normalizedAppPathStr = appPathStr; 077 if (!fs.exists(appPath)) { 078 throw new IOException("Error: " + appPathStr + " does not exist"); 079 } 080 081 if (wfPathStr != null) { 082 conf.set(OozieClient.APP_PATH, normalizedAppPathStr); 083 } 084 else if (coordPathStr != null) { 085 conf.set(OozieClient.COORDINATOR_APP_PATH, normalizedAppPathStr); 086 } 087 else if (bundlePathStr != null) { 088 conf.set(OozieClient.BUNDLE_APP_PATH, normalizedAppPathStr); 089 } 090 } 091 092 /** 093 * This Function will parse the value of the changed values in key value manner. the change value would be 094 * key1=value1;key2=value2 095 * 096 * @param changeValue change value. 097 * @return This returns the hash with hash<[key1,value1],[key2,value2]> 098 * @throws CommandException thrown if changeValue cannot be parsed properly. 099 */ 100 public static Map<String, String> parseChangeValue(String changeValue) throws CommandException { 101 if (changeValue == null || changeValue.trim().equalsIgnoreCase("")) { 102 throw new CommandException(ErrorCode.E1015, "change value can not be empty string or null"); 103 } 104 105 Map<String, String> map = new HashMap<String, String>(); 106 107 String[] tokens = changeValue.split(";"); 108 for (String token : tokens) { 109 if (!token.contains("=")) { 110 throw new CommandException(ErrorCode.E1015, changeValue, 111 "change value must be name=value pair or name=(empty string)"); 112 } 113 114 String[] pair = token.split("="); 115 String key = pair[0]; 116 117 if (map.containsKey(key)) { 118 throw new CommandException(ErrorCode.E1015, changeValue, "can not specify repeated change values on " 119 + key); 120 } 121 122 if (pair.length == 2) { 123 map.put(key, pair[1]); 124 } 125 else if (pair.length == 1) { 126 map.put(key, ""); 127 } 128 else { 129 throw new CommandException(ErrorCode.E1015, changeValue, "elements on " + key 130 + " must be name=value pair or name=(empty string)"); 131 } 132 } 133 134 return map; 135 } 136 }