001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import org.apache.hadoop.io.Text; 021 import org.apache.hadoop.mapred.JobClient; 022 import org.apache.hadoop.mapred.JobConf; 023 import org.apache.hadoop.fs.FileSystem; 024 import org.apache.hadoop.fs.Path; 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 027 import org.apache.hadoop.security.UserGroupInformation; 028 import org.apache.hadoop.filecache.DistributedCache; 029 import org.apache.hadoop.security.token.Token; 030 import org.apache.oozie.ErrorCode; 031 import org.apache.oozie.util.ParamChecker; 032 import org.apache.oozie.util.XConfiguration; 033 import org.apache.oozie.util.XLog; 034 035 import java.io.File; 036 import java.io.FileInputStream; 037 import java.io.IOException; 038 import java.io.InputStream; 039 import java.net.URI; 040 import java.net.URISyntaxException; 041 import java.security.PrivilegedExceptionAction; 042 import java.util.HashMap; 043 import java.util.Map; 044 import java.util.Set; 045 import java.util.HashSet; 046 import java.util.concurrent.ConcurrentHashMap; 047 import java.util.concurrent.ConcurrentMap; 048 049 /** 050 * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The 051 * default accessor used is the base accessor which just injects the UGI into the configuration instance used to 052 * create/obtain JobClient and ileSystem instances. <p/> The HadoopAccess class to use can be configured in the 053 * <code>oozie-site.xml</code> using the <code>oozie.service.HadoopAccessorService.accessor.class</code> property. 054 */ 055 public class HadoopAccessorService implements Service { 056 057 public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService."; 058 public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist"; 059 public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist"; 060 public static final String HADOOP_CONFS = CONF_PREFIX + "hadoop.configurations"; 061 public static final String ACTION_CONFS = CONF_PREFIX + "action.configurations"; 062 public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled"; 063 public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file"; 064 public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal"; 065 066 private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created"; 067 068 private Set<String> jobTrackerWhitelist = new HashSet<String>(); 069 private Set<String> nameNodeWhitelist = new HashSet<String>(); 070 private Map<String, Configuration> hadoopConfigs = new HashMap<String, Configuration>(); 071 private Map<String, File> actionConfigDirs = new HashMap<String, File>(); 072 private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>(); 073 074 private ConcurrentMap<String, UserGroupInformation> userUgiMap; 075 076 public void init(Services services) throws ServiceException { 077 init(services.getConf()); 078 } 079 080 //for testing purposes, see XFsTestCase 081 public void init(Configuration conf) throws ServiceException { 082 for (String name : conf.getStringCollection(JOB_TRACKER_WHITELIST)) { 083 String tmp = name.toLowerCase().trim(); 084 if (tmp.length() == 0) { 085 continue; 086 } 087 jobTrackerWhitelist.add(tmp); 088 } 089 XLog.getLog(getClass()).info( 090 "JOB_TRACKER_WHITELIST :" + conf.getStringCollection(JOB_TRACKER_WHITELIST) 091 + ", Total entries :" + jobTrackerWhitelist.size()); 092 for (String name : conf.getStringCollection(NAME_NODE_WHITELIST)) { 093 String tmp = name.toLowerCase().trim(); 094 if (tmp.length() == 0) { 095 continue; 096 } 097 nameNodeWhitelist.add(tmp); 098 } 099 XLog.getLog(getClass()).info( 100 "NAME_NODE_WHITELIST :" + conf.getStringCollection(NAME_NODE_WHITELIST) 101 + ", Total entries :" + nameNodeWhitelist.size()); 102 103 boolean kerberosAuthOn = conf.getBoolean(KERBEROS_AUTH_ENABLED, true); 104 XLog.getLog(getClass()).info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled"); 105 if (kerberosAuthOn) { 106 kerberosInit(conf); 107 } 108 else { 109 Configuration ugiConf = new Configuration(); 110 ugiConf.set("hadoop.security.authentication", "simple"); 111 UserGroupInformation.setConfiguration(ugiConf); 112 } 113 114 userUgiMap = new ConcurrentHashMap<String, UserGroupInformation>(); 115 116 loadHadoopConfigs(conf); 117 preLoadActionConfigs(conf); 118 } 119 120 private void kerberosInit(Configuration serviceConf) throws ServiceException { 121 try { 122 String keytabFile = serviceConf.get(KERBEROS_KEYTAB, 123 System.getProperty("user.home") + "/oozie.keytab").trim(); 124 if (keytabFile.length() == 0) { 125 throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB); 126 } 127 String principal = serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST"); 128 if (principal.length() == 0) { 129 throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL); 130 } 131 Configuration conf = new Configuration(); 132 conf.set("hadoop.security.authentication", "kerberos"); 133 UserGroupInformation.setConfiguration(conf); 134 UserGroupInformation.loginUserFromKeytab(principal, keytabFile); 135 XLog.getLog(getClass()).info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]", 136 keytabFile, principal); 137 } 138 catch (ServiceException ex) { 139 throw ex; 140 } 141 catch (Exception ex) { 142 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); 143 } 144 } 145 146 private static final String[] HADOOP_CONF_FILES = 147 {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hadoop-site.xml"}; 148 149 150 private Configuration loadHadoopConf(File dir) throws IOException { 151 Configuration hadoopConf = new XConfiguration(); 152 for (String file : HADOOP_CONF_FILES) { 153 File f = new File(dir, file); 154 if (f.exists()) { 155 InputStream is = new FileInputStream(f); 156 Configuration conf = new XConfiguration(is); 157 is.close(); 158 XConfiguration.copy(conf, hadoopConf); 159 } 160 } 161 return hadoopConf; 162 } 163 164 private Map<String, File> parseConfigDirs(String[] confDefs, String type) throws ServiceException, IOException { 165 Map<String, File> map = new HashMap<String, File>(); 166 File configDir = new File(ConfigurationService.getConfigurationDirectory()); 167 for (String confDef : confDefs) { 168 if (confDef.trim().length() > 0) { 169 String[] parts = confDef.split("="); 170 if (parts.length == 2) { 171 String hostPort = parts[0]; 172 String confDir = parts[1]; 173 File dir = new File(confDir); 174 if (!dir.isAbsolute()) { 175 dir = new File(configDir, confDir); 176 } 177 if (dir.exists()) { 178 map.put(hostPort.toLowerCase(), dir); 179 } 180 else { 181 throw new ServiceException(ErrorCode.E0100, getClass().getName(), 182 "could not find " + type + " configuration directory: " + 183 dir.getAbsolutePath()); 184 } 185 } 186 else { 187 throw new ServiceException(ErrorCode.E0100, getClass().getName(), 188 "Incorrect " + type + " configuration definition: " + confDef); 189 } 190 } 191 } 192 return map; 193 } 194 195 private void loadHadoopConfigs(Configuration serviceConf) throws ServiceException { 196 try { 197 Map<String, File> map = parseConfigDirs(serviceConf.getStrings(HADOOP_CONFS, "*=hadoop-conf"), "hadoop"); 198 for (Map.Entry<String, File> entry : map.entrySet()) { 199 hadoopConfigs.put(entry.getKey(), loadHadoopConf(entry.getValue())); 200 } 201 } 202 catch (ServiceException ex) { 203 throw ex; 204 } 205 catch (Exception ex) { 206 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); 207 } 208 } 209 210 private void preLoadActionConfigs(Configuration serviceConf) throws ServiceException { 211 try { 212 actionConfigDirs = parseConfigDirs(serviceConf.getStrings(ACTION_CONFS, "*=hadoop-conf"), "action"); 213 for (String hostport : actionConfigDirs.keySet()) { 214 actionConfigs.put(hostport, new ConcurrentHashMap<String, XConfiguration>()); 215 } 216 } 217 catch (ServiceException ex) { 218 throw ex; 219 } 220 catch (Exception ex) { 221 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); 222 } 223 } 224 225 public void destroy() { 226 } 227 228 public Class<? extends Service> getInterface() { 229 return HadoopAccessorService.class; 230 } 231 232 private UserGroupInformation getUGI(String user) throws IOException { 233 UserGroupInformation ugi = userUgiMap.get(user); 234 if (ugi == null) { 235 // taking care of a race condition, the latest UGI will be discarded 236 ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); 237 userUgiMap.putIfAbsent(user, ugi); 238 } 239 return ugi; 240 } 241 242 /** 243 * Creates a JobConf using the site configuration for the specified hostname:port. 244 * <p/> 245 * If the specified hostname:port is not defined it falls back to the '*' site 246 * configuration if available. If the '*' site configuration is not available, 247 * the JobConf has all Hadoop defaults. 248 * 249 * @param hostPort hostname:port to lookup Hadoop site configuration. 250 * @return a JobConf with the corresponding site configuration for hostPort. 251 */ 252 public JobConf createJobConf(String hostPort) { 253 JobConf jobConf = new JobConf(); 254 XConfiguration.copy(getConfiguration(hostPort), jobConf); 255 jobConf.setBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, true); 256 return jobConf; 257 } 258 259 private XConfiguration loadActionConf(String hostPort, String action) { 260 File dir = actionConfigDirs.get(hostPort); 261 XConfiguration actionConf = new XConfiguration(); 262 if (dir != null) { 263 File actionConfFile = new File(dir, action + ".xml"); 264 if (actionConfFile.exists()) { 265 try { 266 actionConf = new XConfiguration(new FileInputStream(actionConfFile)); 267 } 268 catch (IOException ex) { 269 XLog.getLog(getClass()).warn("Could not read file [{0}] for action [{1}] configuration for hostPort [{2}]", 270 actionConfFile.getAbsolutePath(), action, hostPort); 271 } 272 } 273 } 274 return actionConf; 275 } 276 277 /** 278 * Returns a Configuration containing any defaults for an action for a particular cluster. 279 * <p/> 280 * This configuration is used as default for the action configuration and enables cluster 281 * level default values per action. 282 * 283 * @param hostPort hostname"port to lookup the action default confiugration. 284 * @param action action name. 285 * @return the default configuration for the action for the specified cluster. 286 */ 287 public XConfiguration createActionDefaultConf(String hostPort, String action) { 288 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null; 289 Map<String, XConfiguration> hostPortActionConfigs = actionConfigs.get(hostPort); 290 if (hostPortActionConfigs == null) { 291 hostPortActionConfigs = actionConfigs.get("*"); 292 hostPort = "*"; 293 } 294 XConfiguration actionConf = hostPortActionConfigs.get(action); 295 if (actionConf == null) { 296 // doing lazy loading as we don't know upfront all actions, no need to synchronize 297 // as it is a read operation an in case of a race condition loading and inserting 298 // into the Map is idempotent and the action-config Map is a ConcurrentHashMap 299 actionConf = loadActionConf(hostPort, action); 300 hostPortActionConfigs.put(action, actionConf); 301 } 302 return new XConfiguration(actionConf.toProperties()); 303 } 304 305 private Configuration getConfiguration(String hostPort) { 306 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null; 307 Configuration conf = hadoopConfigs.get(hostPort); 308 if (conf == null) { 309 conf = hadoopConfigs.get("*"); 310 if (conf == null) { 311 conf = new XConfiguration(); 312 } 313 } 314 return conf; 315 } 316 317 /** 318 * Return a JobClient created with the provided user/group. 319 * 320 * 321 * @param conf JobConf with all necessary information to create the 322 * JobClient. 323 * @return JobClient created with the provided user/group. 324 * @throws HadoopAccessorException if the client could not be created. 325 */ 326 public JobClient createJobClient(String user, final JobConf conf) throws HadoopAccessorException { 327 ParamChecker.notEmpty(user, "user"); 328 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { 329 throw new HadoopAccessorException(ErrorCode.E0903); 330 } 331 String jobTracker = conf.get("mapred.job.tracker"); 332 validateJobTracker(jobTracker); 333 try { 334 UserGroupInformation ugi = getUGI(user); 335 JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() { 336 public JobClient run() throws Exception { 337 return new JobClient(conf); 338 } 339 }); 340 Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(new Text("mr token")); 341 conf.getCredentials().addToken(new Text("mr token"), mrdt); 342 return jobClient; 343 } 344 catch (InterruptedException ex) { 345 throw new HadoopAccessorException(ErrorCode.E0902, ex); 346 } 347 catch (IOException ex) { 348 throw new HadoopAccessorException(ErrorCode.E0902, ex); 349 } 350 } 351 352 /** 353 * Return a FileSystem created with the provided user for the specified URI. 354 * 355 * 356 * @param uri file system URI. 357 * @param conf Configuration with all necessary information to create the FileSystem. 358 * @return FileSystem created with the provided user/group. 359 * @throws HadoopAccessorException if the filesystem could not be created. 360 */ 361 public FileSystem createFileSystem(String user, final URI uri, final Configuration conf) 362 throws HadoopAccessorException { 363 ParamChecker.notEmpty(user, "user"); 364 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { 365 throw new HadoopAccessorException(ErrorCode.E0903); 366 } 367 String nameNode = uri.getAuthority(); 368 if (nameNode == null) { 369 nameNode = conf.get("fs.default.name"); 370 if (nameNode != null) { 371 try { 372 nameNode = new URI(nameNode).getAuthority(); 373 } 374 catch (URISyntaxException ex) { 375 throw new HadoopAccessorException(ErrorCode.E0902, ex); 376 } 377 } 378 } 379 validateNameNode(nameNode); 380 381 try { 382 UserGroupInformation ugi = getUGI(user); 383 return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { 384 public FileSystem run() throws Exception { 385 return FileSystem.get(uri, conf); 386 } 387 }); 388 } 389 catch (InterruptedException ex) { 390 throw new HadoopAccessorException(ErrorCode.E0902, ex); 391 } 392 catch (IOException ex) { 393 throw new HadoopAccessorException(ErrorCode.E0902, ex); 394 } 395 } 396 397 /** 398 * Validate Job tracker 399 * @param jobTrackerUri 400 * @throws HadoopAccessorException 401 */ 402 protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException { 403 validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900); 404 } 405 406 /** 407 * Validate Namenode list 408 * @param nameNodeUri 409 * @throws HadoopAccessorException 410 */ 411 protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException { 412 validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901); 413 } 414 415 private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException { 416 if (uri != null) { 417 uri = uri.toLowerCase().trim(); 418 if (whitelist.size() > 0 && !whitelist.contains(uri)) { 419 throw new HadoopAccessorException(error, uri); 420 } 421 } 422 } 423 424 public void addFileToClassPath(String user, final Path file, final Configuration conf) 425 throws IOException { 426 ParamChecker.notEmpty(user, "user"); 427 try { 428 UserGroupInformation ugi = getUGI(user); 429 ugi.doAs(new PrivilegedExceptionAction<Void>() { 430 public Void run() throws Exception { 431 Configuration defaultConf = new Configuration(); 432 XConfiguration.copy(conf, defaultConf); 433 //Doing this NOP add first to have the FS created and cached 434 DistributedCache.addFileToClassPath(file, defaultConf); 435 436 DistributedCache.addFileToClassPath(file, conf); 437 return null; 438 } 439 }); 440 441 } 442 catch (InterruptedException ex) { 443 throw new IOException(ex); 444 } 445 446 } 447 448 }