001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.util; 020 021import java.io.IOException; 022import java.net.URI; 023import java.util.HashMap; 024import java.util.Map; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.filecache.DistributedCache; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.oozie.ErrorCode; 031import org.apache.oozie.WorkflowActionBean; 032import org.apache.oozie.action.hadoop.JavaActionExecutor; 033import org.apache.oozie.client.OozieClient; 034import org.apache.oozie.client.XOozieClient; 035import org.apache.oozie.command.CommandException; 036import org.apache.oozie.command.wf.ActionXCommand; 037import org.apache.oozie.service.HadoopAccessorException; 038import org.apache.oozie.service.HadoopAccessorService; 039import org.apache.oozie.service.Services; 040 041/** 042 * Job utilities. 043 */ 044public class JobUtils { 045 /** 046 * Normalize appPath in job conf with the provided user/group - If it's not jobs via proxy submission, after 047 * normalization appPath always points to job's Xml definition file. 048 * <p> 049 * 050 * @param user user 051 * @param group group 052 * @param conf job configuration. 053 * @throws IOException thrown if normalization can not be done properly. 054 */ 055 public static void normalizeAppPath(String user, String group, Configuration conf) throws IOException { 056 ParamChecker.notNull(user, "user"); 057 058 if (conf.get(XOozieClient.IS_PROXY_SUBMISSION) != null) { // do nothing for proxy submission job; 059 return; 060 } 061 062 String wfPathStr = conf.get(OozieClient.APP_PATH); 063 String coordPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH); 064 String bundlePathStr = conf.get(OozieClient.BUNDLE_APP_PATH); 065 String appPathStr = wfPathStr != null ? wfPathStr : (coordPathStr != null ? coordPathStr : bundlePathStr); 066 067 FileSystem fs = null; 068 try { 069 URI uri = new Path(appPathStr).toUri(); 070 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 071 Configuration fsConf = has.createConfiguration(uri.getAuthority()); 072 fs = has.createFileSystem(user, uri, fsConf); 073 } 074 catch (HadoopAccessorException ex) { 075 throw new IOException(ex.getMessage()); 076 } 077 078 Path appPath = new Path(appPathStr); 079 String normalizedAppPathStr = appPathStr; 080 if (!fs.exists(appPath)) { 081 throw new IOException("Error: " + appPathStr + " does not exist"); 082 } 083 084 if (wfPathStr != null) { 085 conf.set(OozieClient.APP_PATH, normalizedAppPathStr); 086 } 087 else if (coordPathStr != null) { 088 conf.set(OozieClient.COORDINATOR_APP_PATH, normalizedAppPathStr); 089 } 090 else if (bundlePathStr != null) { 091 conf.set(OozieClient.BUNDLE_APP_PATH, normalizedAppPathStr); 092 } 093 } 094 095 /** 096 * This Function will parse the value of the changed values in key value manner. the change value would be 097 * key1=value1;key2=value2 098 * 099 * @param changeValue change value. 100 * @return This returns the hash with hash<[key1,value1],[key2,value2]> 101 * @throws CommandException thrown if changeValue cannot be parsed properly. 102 */ 103 public static Map<String, String> parseChangeValue(String changeValue) throws CommandException { 104 if (changeValue == null || changeValue.trim().equalsIgnoreCase("")) { 105 throw new CommandException(ErrorCode.E1015, "change value can not be empty string or null"); 106 } 107 108 Map<String, String> map = new HashMap<String, String>(); 109 110 String[] tokens = changeValue.split(";"); 111 for (String token : tokens) { 112 if (!token.contains("=")) { 113 throw new CommandException(ErrorCode.E1015, changeValue, 114 "change value must be name=value pair or name=(empty string)"); 115 } 116 117 String[] pair = token.split("="); 118 String key = pair[0]; 119 120 if (map.containsKey(key)) { 121 throw new CommandException(ErrorCode.E1015, changeValue, "can not specify repeated change values on " 122 + key); 123 } 124 125 if (pair.length == 2) { 126 map.put(key, pair[1]); 127 } 128 else if (pair.length == 1) { 129 map.put(key, ""); 130 } 131 else { 132 throw new CommandException(ErrorCode.E1015, changeValue, "elements on " + key 133 + " must be name=value pair or name=(empty string)"); 134 } 135 } 136 137 return map; 138 } 139 140 /** 141 * This method provides a wrapper around hadoop 2.x implementations. 142 * @param file Path of the file to be added 143 * @param conf Configuration that contains the classpath setting 144 * @param fs FileSystem with respect to which path should be interpreted (may be null) 145 * @throws IOException if the file can't be added to the classpath 146 */ 147 public static void addFileToClassPath(Path file, Configuration conf, FileSystem fs) throws IOException { 148 if (fs == null) { 149 Configuration defaultConf = Services.get().get(HadoopAccessorService.class) 150 .createConfiguration(conf.get(JavaActionExecutor.HADOOP_YARN_RM)); 151 XConfiguration.copy(conf, defaultConf); 152 // it fails with conf, therefore we pass defaultConf instead 153 fs = file.getFileSystem(defaultConf); 154 } 155 156 DistributedCache.addFileToClassPath(file, conf, fs); 157 } 158 159 public static String getRetryKey(WorkflowActionBean wfAction, String key) { 160 return ActionXCommand.RETRY + wfAction.getUserRetryCount() + "." + key; 161 } 162 163 public static String getRetryKey(String key, int retry) { 164 return ActionXCommand.RETRY + retry + "." + key; 165 } 166}