001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import org.apache.hadoop.mapred.JobClient; 021 import org.apache.hadoop.mapred.JobConf; 022 import org.apache.hadoop.fs.FileSystem; 023 import org.apache.hadoop.fs.Path; 024 import org.apache.hadoop.conf.Configuration; 025 import org.apache.hadoop.security.UserGroupInformation; 026 import org.apache.hadoop.filecache.DistributedCache; 027 import org.apache.oozie.ErrorCode; 028 import org.apache.oozie.util.ParamChecker; 029 import org.apache.oozie.util.XConfiguration; 030 import org.apache.oozie.util.XLog; 031 032 import java.io.IOException; 033 import java.net.URI; 034 import java.net.URISyntaxException; 035 import java.security.PrivilegedExceptionAction; 036 import java.util.Set; 037 import java.util.HashSet; 038 039 /** 040 * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The 041 * default accessor used is the base accessor which just injects the UGI into the configuration instance used to 042 * create/obtain JobClient and ileSystem instances. <p/> The HadoopAccess class to use can be configured in the 043 * <code>oozie-site.xml</code> using the <code>oozie.service.HadoopAccessorService.accessor.class</code> property. 044 */ 045 public class HadoopAccessorService implements Service { 046 047 public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService."; 048 public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist"; 049 public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist"; 050 051 private Set<String> jobTrackerWhitelist = new HashSet<String>(); 052 private Set<String> nameNodeWhitelist = new HashSet<String>(); 053 054 public void init(Services services) throws ServiceException { 055 for (String name : services.getConf().getStringCollection(JOB_TRACKER_WHITELIST)) { 056 String tmp = name.toLowerCase().trim(); 057 if (tmp.length() == 0) { 058 continue; 059 } 060 jobTrackerWhitelist.add(tmp); 061 } 062 XLog.getLog(getClass()).info( 063 "JOB_TRACKER_WHITELIST :" + services.getConf().getStringCollection(JOB_TRACKER_WHITELIST) 064 + ", Total entries :" + jobTrackerWhitelist.size()); 065 for (String name : services.getConf().getStringCollection(NAME_NODE_WHITELIST)) { 066 String tmp = name.toLowerCase().trim(); 067 if (tmp.length() == 0) { 068 continue; 069 } 070 nameNodeWhitelist.add(tmp); 071 } 072 XLog.getLog(getClass()).info( 073 "NAME_NODE_WHITELIST :" + services.getConf().getStringCollection(NAME_NODE_WHITELIST) 074 + ", Total entries :" + nameNodeWhitelist.size()); 075 init(services.getConf()); 076 } 077 078 public void init(Configuration serviceConf) throws ServiceException { 079 } 080 081 public void destroy() { 082 } 083 084 public Class<? extends Service> getInterface() { 085 return HadoopAccessorService.class; 086 } 087 088 /** 089 * Return a JobClient created with the provided user/group. 090 * 091 * @param conf JobConf with all necessary information to create the 092 * JobClient. 093 * @return JobClient created with the provided user/group. 094 * @throws HadoopAccessorException if the client could not be created. 095 */ 096 public JobClient createJobClient(String user, String group, JobConf conf) throws HadoopAccessorException { 097 validateJobTracker(conf.get("mapred.job.tracker")); 098 conf = createConfiguration(user, group, conf); 099 try { 100 return new JobClient(conf); 101 } 102 catch (IOException e) { 103 throw new HadoopAccessorException(ErrorCode.E0902, e); 104 } 105 } 106 107 /** 108 * Return a FileSystem created with the provided user/group. 109 * 110 * @param conf Configuration with all necessary information to create the 111 * FileSystem. 112 * @return FileSystem created with the provided user/group. 113 * @throws HadoopAccessorException if the filesystem could not be created. 114 */ 115 public FileSystem createFileSystem(String user, String group, Configuration conf) throws HadoopAccessorException { 116 try { 117 validateNameNode(new URI(conf.get("fs.default.name")).getAuthority()); 118 conf = createConfiguration(user, group, conf); 119 return FileSystem.get(conf); 120 } 121 catch (IOException e) { 122 throw new HadoopAccessorException(ErrorCode.E0902, e); 123 } 124 catch (URISyntaxException e) { 125 throw new HadoopAccessorException(ErrorCode.E0902, e); 126 } 127 } 128 129 /** 130 * Return a FileSystem created with the provided user/group for the 131 * specified URI. 132 * 133 * @param uri file system URI. 134 * @param conf Configuration with all necessary information to create the 135 * FileSystem. 136 * @return FileSystem created with the provided user/group. 137 * @throws HadoopAccessorException if the filesystem could not be created. 138 */ 139 public FileSystem createFileSystem(String user, String group, URI uri, Configuration conf) 140 throws HadoopAccessorException { 141 validateNameNode(uri.getAuthority()); 142 conf = createConfiguration(user, group, conf); 143 try { 144 return FileSystem.get(uri, conf); 145 } 146 catch (IOException e) { 147 throw new HadoopAccessorException(ErrorCode.E0902, e); 148 } 149 } 150 151 /** 152 * Validate Job tracker 153 * @param jobTrackerUri 154 * @throws HadoopAccessorException 155 */ 156 protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException { 157 validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900); 158 } 159 160 /** 161 * Validate Namenode list 162 * @param nameNodeUri 163 * @throws HadoopAccessorException 164 */ 165 protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException { 166 validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901); 167 } 168 169 private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException { 170 if (uri != null) { 171 uri = uri.toLowerCase().trim(); 172 if (whitelist.size() > 0 && !whitelist.contains(uri)) { 173 throw new HadoopAccessorException(error, uri); 174 } 175 } 176 } 177 178 @SuppressWarnings("unchecked") 179 private <C extends Configuration> C createConfiguration(String user, String group, C conf) { 180 ParamChecker.notEmpty(user, "user"); 181 ParamChecker.notEmpty(group, "group"); 182 C fsConf = (C) ((conf instanceof JobConf) ? new JobConf() : new Configuration()); 183 XConfiguration.copy(conf, fsConf); 184 fsConf.set("user.name", user); 185 fsConf.set("hadoop.job.ugi", user + "," + group); 186 return fsConf; 187 } 188 189 /** 190 * Add a file to the ClassPath via the DistributedCache. 191 */ 192 public void addFileToClassPath(String user, String group, final Path file, final Configuration conf) 193 throws IOException { 194 Configuration defaultConf = createConfiguration(user, group, conf); 195 DistributedCache.addFileToClassPath(file, defaultConf); 196 DistributedCache.addFileToClassPath(file, conf); 197 } 198 199 }