001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import org.apache.hadoop.io.Text; 021import org.apache.hadoop.mapred.JobClient; 022import org.apache.hadoop.mapred.JobConf; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 027import org.apache.hadoop.net.NetUtils; 028import org.apache.hadoop.security.SecurityUtil; 029import org.apache.hadoop.security.UserGroupInformation; 030import org.apache.hadoop.filecache.DistributedCache; 031import org.apache.hadoop.security.token.Token; 032import org.apache.oozie.ErrorCode; 033import org.apache.oozie.action.hadoop.JavaActionExecutor; 034import org.apache.oozie.util.ParamChecker; 035import org.apache.oozie.util.XConfiguration; 036import org.apache.oozie.util.XLog; 037import org.apache.oozie.util.JobUtils; 038 039import java.io.File; 040import java.io.FileInputStream; 041import java.io.IOException; 042import java.io.InputStream; 043import java.net.URI; 044import java.net.URISyntaxException; 045import java.security.PrivilegedExceptionAction; 046import java.util.HashMap; 047import java.util.Map; 048import java.util.Set; 049import java.util.HashSet; 050import java.util.concurrent.ConcurrentHashMap; 051 052/** 053 * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The 054 * default accessor used is the base accessor which just injects the UGI into the configuration instance used to 055 * create/obtain JobClient and FileSystem instances. 056 */ 057public class HadoopAccessorService implements Service { 058 059 private static XLog LOG = XLog.getLog(HadoopAccessorService.class); 060 061 public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService."; 062 public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist"; 063 public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist"; 064 public static final String HADOOP_CONFS = CONF_PREFIX + "hadoop.configurations"; 065 public static final String ACTION_CONFS = CONF_PREFIX + "action.configurations"; 066 public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled"; 067 public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file"; 068 public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal"; 069 public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token"); 070 071 protected static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created"; 072 /** The Kerberos principal for the job tracker.*/ 073 protected static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal"; 074 /** The Kerberos principal for the resource manager.*/ 075 protected static final String RM_PRINCIPAL = "yarn.resourcemanager.principal"; 076 protected static final String HADOOP_JOB_TRACKER = "mapred.job.tracker"; 077 protected static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address"; 078 protected static final String HADOOP_YARN_RM = "yarn.resourcemanager.address"; 079 private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>(); 080 081 private Set<String> jobTrackerWhitelist = new HashSet<String>(); 082 private Set<String> nameNodeWhitelist = new HashSet<String>(); 083 private Map<String, Configuration> hadoopConfigs = new HashMap<String, Configuration>(); 084 private Map<String, File> actionConfigDirs = new HashMap<String, File>(); 085 private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>(); 086 087 private UserGroupInformationService ugiService; 088 089 /** 090 * Supported filesystem schemes for namespace federation 091 */ 092 public static final String SUPPORTED_FILESYSTEMS = CONF_PREFIX + "supported.filesystems"; 093 public static final String[] DEFAULT_SUPPORTED_SCHEMES = new String[]{"hdfs","hftp","webhdfs"}; 094 private Set<String> supportedSchemes; 095 private boolean allSchemesSupported; 096 097 public void init(Services services) throws ServiceException { 098 this.ugiService = services.get(UserGroupInformationService.class); 099 init(services.getConf()); 100 } 101 102 //for testing purposes, see XFsTestCase 103 public void init(Configuration conf) throws ServiceException { 104 for (String name : conf.getStringCollection(JOB_TRACKER_WHITELIST)) { 105 String tmp = name.toLowerCase().trim(); 106 if (tmp.length() == 0) { 107 continue; 108 } 109 jobTrackerWhitelist.add(tmp); 110 } 111 XLog.getLog(getClass()).info( 112 "JOB_TRACKER_WHITELIST :" + conf.getStringCollection(JOB_TRACKER_WHITELIST) 113 + ", Total entries :" + jobTrackerWhitelist.size()); 114 for (String name : conf.getStringCollection(NAME_NODE_WHITELIST)) { 115 String tmp = name.toLowerCase().trim(); 116 if (tmp.length() == 0) { 117 continue; 118 } 119 nameNodeWhitelist.add(tmp); 120 } 121 XLog.getLog(getClass()).info( 122 "NAME_NODE_WHITELIST :" + conf.getStringCollection(NAME_NODE_WHITELIST) 123 + ", Total entries :" + nameNodeWhitelist.size()); 124 125 boolean kerberosAuthOn = conf.getBoolean(KERBEROS_AUTH_ENABLED, true); 126 XLog.getLog(getClass()).info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled"); 127 if (kerberosAuthOn) { 128 kerberosInit(conf); 129 } 130 else { 131 Configuration ugiConf = new Configuration(); 132 ugiConf.set("hadoop.security.authentication", "simple"); 133 UserGroupInformation.setConfiguration(ugiConf); 134 } 135 136 if (ugiService == null) { //for testing purposes, see XFsTestCase 137 this.ugiService = new UserGroupInformationService(); 138 } 139 140 loadHadoopConfigs(conf); 141 preLoadActionConfigs(conf); 142 143 supportedSchemes = new HashSet<String>(); 144 String[] schemesFromConf = conf.getStrings(SUPPORTED_FILESYSTEMS, DEFAULT_SUPPORTED_SCHEMES); 145 if(schemesFromConf != null) { 146 for (String scheme: schemesFromConf) { 147 scheme = scheme.trim(); 148 // If user gives "*", supportedSchemes will be empty, so that checking is not done i.e. all schemes allowed 149 if(scheme.equals("*")) { 150 if(schemesFromConf.length > 1) { 151 throw new ServiceException(ErrorCode.E0100, getClass().getName(), 152 SUPPORTED_FILESYSTEMS + " should contain either only wildcard or explicit list, not both"); 153 } 154 allSchemesSupported = true; 155 } 156 supportedSchemes.add(scheme); 157 } 158 } 159 } 160 161 private void kerberosInit(Configuration serviceConf) throws ServiceException { 162 try { 163 String keytabFile = serviceConf.get(KERBEROS_KEYTAB, 164 System.getProperty("user.home") + "/oozie.keytab").trim(); 165 if (keytabFile.length() == 0) { 166 throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB); 167 } 168 String principal = serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST"); 169 if (principal.length() == 0) { 170 throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL); 171 } 172 Configuration conf = new Configuration(); 173 conf.set("hadoop.security.authentication", "kerberos"); 174 UserGroupInformation.setConfiguration(conf); 175 UserGroupInformation.loginUserFromKeytab(principal, keytabFile); 176 XLog.getLog(getClass()).info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]", 177 keytabFile, principal); 178 } 179 catch (ServiceException ex) { 180 throw ex; 181 } 182 catch (Exception ex) { 183 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); 184 } 185 } 186 187 private static final String[] HADOOP_CONF_FILES = 188 {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hadoop-site.xml"}; 189 190 191 private Configuration loadHadoopConf(File dir) throws IOException { 192 Configuration hadoopConf = new XConfiguration(); 193 for (String file : HADOOP_CONF_FILES) { 194 File f = new File(dir, file); 195 if (f.exists()) { 196 InputStream is = new FileInputStream(f); 197 Configuration conf = new XConfiguration(is); 198 is.close(); 199 XConfiguration.copy(conf, hadoopConf); 200 } 201 } 202 return hadoopConf; 203 } 204 205 private Map<String, File> parseConfigDirs(String[] confDefs, String type) throws ServiceException, IOException { 206 Map<String, File> map = new HashMap<String, File>(); 207 File configDir = new File(ConfigurationService.getConfigurationDirectory()); 208 for (String confDef : confDefs) { 209 if (confDef.trim().length() > 0) { 210 String[] parts = confDef.split("="); 211 if (parts.length == 2) { 212 String hostPort = parts[0]; 213 String confDir = parts[1]; 214 File dir = new File(confDir); 215 if (!dir.isAbsolute()) { 216 dir = new File(configDir, confDir); 217 } 218 if (dir.exists()) { 219 map.put(hostPort.toLowerCase(), dir); 220 } 221 else { 222 throw new ServiceException(ErrorCode.E0100, getClass().getName(), 223 "could not find " + type + " configuration directory: " + 224 dir.getAbsolutePath()); 225 } 226 } 227 else { 228 throw new ServiceException(ErrorCode.E0100, getClass().getName(), 229 "Incorrect " + type + " configuration definition: " + confDef); 230 } 231 } 232 } 233 return map; 234 } 235 236 private void loadHadoopConfigs(Configuration serviceConf) throws ServiceException { 237 try { 238 Map<String, File> map = parseConfigDirs(serviceConf.getStrings(HADOOP_CONFS, "*=hadoop-conf"), "hadoop"); 239 for (Map.Entry<String, File> entry : map.entrySet()) { 240 hadoopConfigs.put(entry.getKey(), loadHadoopConf(entry.getValue())); 241 } 242 } 243 catch (ServiceException ex) { 244 throw ex; 245 } 246 catch (Exception ex) { 247 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); 248 } 249 } 250 251 private void preLoadActionConfigs(Configuration serviceConf) throws ServiceException { 252 try { 253 actionConfigDirs = parseConfigDirs(serviceConf.getStrings(ACTION_CONFS, "*=hadoop-conf"), "action"); 254 for (String hostport : actionConfigDirs.keySet()) { 255 actionConfigs.put(hostport, new ConcurrentHashMap<String, XConfiguration>()); 256 } 257 } 258 catch (ServiceException ex) { 259 throw ex; 260 } 261 catch (Exception ex) { 262 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); 263 } 264 } 265 266 public void destroy() { 267 } 268 269 public Class<? extends Service> getInterface() { 270 return HadoopAccessorService.class; 271 } 272 273 private UserGroupInformation getUGI(String user) throws IOException { 274 return ugiService.getProxyUser(user); 275 } 276 277 /** 278 * Creates a JobConf using the site configuration for the specified hostname:port. 279 * <p/> 280 * If the specified hostname:port is not defined it falls back to the '*' site 281 * configuration if available. If the '*' site configuration is not available, 282 * the JobConf has all Hadoop defaults. 283 * 284 * @param hostPort hostname:port to lookup Hadoop site configuration. 285 * @return a JobConf with the corresponding site configuration for hostPort. 286 */ 287 public JobConf createJobConf(String hostPort) { 288 JobConf jobConf = new JobConf(); 289 XConfiguration.copy(getConfiguration(hostPort), jobConf); 290 jobConf.setBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, true); 291 return jobConf; 292 } 293 294 private XConfiguration loadActionConf(String hostPort, String action) { 295 File dir = actionConfigDirs.get(hostPort); 296 XConfiguration actionConf = new XConfiguration(); 297 if (dir != null) { 298 File actionConfFile = new File(dir, action + ".xml"); 299 if (actionConfFile.exists()) { 300 try { 301 actionConf = new XConfiguration(new FileInputStream(actionConfFile)); 302 } 303 catch (IOException ex) { 304 XLog.getLog(getClass()).warn("Could not read file [{0}] for action [{1}] configuration for hostPort [{2}]", 305 actionConfFile.getAbsolutePath(), action, hostPort); 306 } 307 } 308 } 309 return actionConf; 310 } 311 312 /** 313 * Returns a Configuration containing any defaults for an action for a particular cluster. 314 * <p/> 315 * This configuration is used as default for the action configuration and enables cluster 316 * level default values per action. 317 * 318 * @param hostPort hostname"port to lookup the action default confiugration. 319 * @param action action name. 320 * @return the default configuration for the action for the specified cluster. 321 */ 322 public XConfiguration createActionDefaultConf(String hostPort, String action) { 323 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null; 324 Map<String, XConfiguration> hostPortActionConfigs = actionConfigs.get(hostPort); 325 if (hostPortActionConfigs == null) { 326 hostPortActionConfigs = actionConfigs.get("*"); 327 hostPort = "*"; 328 } 329 XConfiguration actionConf = hostPortActionConfigs.get(action); 330 if (actionConf == null) { 331 // doing lazy loading as we don't know upfront all actions, no need to synchronize 332 // as it is a read operation an in case of a race condition loading and inserting 333 // into the Map is idempotent and the action-config Map is a ConcurrentHashMap 334 actionConf = loadActionConf(hostPort, action); 335 hostPortActionConfigs.put(action, actionConf); 336 } 337 return new XConfiguration(actionConf.toProperties()); 338 } 339 340 private Configuration getConfiguration(String hostPort) { 341 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null; 342 Configuration conf = hadoopConfigs.get(hostPort); 343 if (conf == null) { 344 conf = hadoopConfigs.get("*"); 345 if (conf == null) { 346 conf = new XConfiguration(); 347 } 348 } 349 return conf; 350 } 351 352 /** 353 * Return a JobClient created with the provided user/group. 354 * 355 * 356 * @param conf JobConf with all necessary information to create the 357 * JobClient. 358 * @return JobClient created with the provided user/group. 359 * @throws HadoopAccessorException if the client could not be created. 360 */ 361 public JobClient createJobClient(String user, final JobConf conf) throws HadoopAccessorException { 362 ParamChecker.notEmpty(user, "user"); 363 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { 364 throw new HadoopAccessorException(ErrorCode.E0903); 365 } 366 String jobTracker = conf.get(JavaActionExecutor.HADOOP_JOB_TRACKER); 367 validateJobTracker(jobTracker); 368 try { 369 UserGroupInformation ugi = getUGI(user); 370 JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() { 371 public JobClient run() throws Exception { 372 return new JobClient(conf); 373 } 374 }); 375 Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(getMRDelegationTokenRenewer(conf)); 376 conf.getCredentials().addToken(MR_TOKEN_ALIAS, mrdt); 377 return jobClient; 378 } 379 catch (InterruptedException ex) { 380 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); 381 } 382 catch (IOException ex) { 383 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); 384 } 385 } 386 387 /** 388 * Return a FileSystem created with the provided user for the specified URI. 389 * 390 * 391 * @param uri file system URI. 392 * @param conf Configuration with all necessary information to create the FileSystem. 393 * @return FileSystem created with the provided user/group. 394 * @throws HadoopAccessorException if the filesystem could not be created. 395 */ 396 public FileSystem createFileSystem(String user, final URI uri, final Configuration conf) 397 throws HadoopAccessorException { 398 ParamChecker.notEmpty(user, "user"); 399 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { 400 throw new HadoopAccessorException(ErrorCode.E0903); 401 } 402 403 checkSupportedFilesystem(uri); 404 405 String nameNode = uri.getAuthority(); 406 if (nameNode == null) { 407 nameNode = conf.get("fs.default.name"); 408 if (nameNode != null) { 409 try { 410 nameNode = new URI(nameNode).getAuthority(); 411 } 412 catch (URISyntaxException ex) { 413 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); 414 } 415 } 416 } 417 validateNameNode(nameNode); 418 419 try { 420 UserGroupInformation ugi = getUGI(user); 421 return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { 422 public FileSystem run() throws Exception { 423 return FileSystem.get(uri, conf); 424 } 425 }); 426 } 427 catch (InterruptedException ex) { 428 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); 429 } 430 catch (IOException ex) { 431 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); 432 } 433 } 434 435 /** 436 * Validate Job tracker 437 * @param jobTrackerUri 438 * @throws HadoopAccessorException 439 */ 440 protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException { 441 validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900); 442 } 443 444 /** 445 * Validate Namenode list 446 * @param nameNodeUri 447 * @throws HadoopAccessorException 448 */ 449 protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException { 450 validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901); 451 } 452 453 private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException { 454 if (uri != null) { 455 uri = uri.toLowerCase().trim(); 456 if (whitelist.size() > 0 && !whitelist.contains(uri)) { 457 throw new HadoopAccessorException(error, uri); 458 } 459 } 460 } 461 462 public Text getMRDelegationTokenRenewer(JobConf jobConf) throws IOException { 463 if (UserGroupInformation.isSecurityEnabled()) { // secure cluster 464 return getMRTokenRenewerInternal(jobConf); 465 } 466 else { 467 return MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer 468 } 469 } 470 471 // Package private for unit test purposes 472 Text getMRTokenRenewerInternal(JobConf jobConf) throws IOException { 473 // Getting renewer correctly for JT principal also though JT in hadoop 1.x does not have 474 // support for renewing/cancelling tokens 475 String servicePrincipal = jobConf.get(RM_PRINCIPAL, jobConf.get(JT_PRINCIPAL)); 476 Text renewer; 477 if (servicePrincipal != null) { // secure cluster 478 renewer = mrTokenRenewers.get(servicePrincipal); 479 if (renewer == null) { 480 // Mimic org.apache.hadoop.mapred.Master.getMasterPrincipal() 481 String target = jobConf.get(HADOOP_YARN_RM, jobConf.get(HADOOP_JOB_TRACKER_2)); 482 if (target == null) { 483 target = jobConf.get(HADOOP_JOB_TRACKER); 484 } 485 try { 486 String addr = NetUtils.createSocketAddr(target).getHostName(); 487 renewer = new Text(SecurityUtil.getServerPrincipal(servicePrincipal, addr)); 488 LOG.info("Delegation Token Renewer details: Principal=" + servicePrincipal + ",Target=" + target 489 + ",Renewer=" + renewer); 490 } 491 catch (IllegalArgumentException iae) { 492 renewer = new Text(servicePrincipal.split("[/@]")[0]); 493 LOG.info("Delegation Token Renewer for " + servicePrincipal + " is " + renewer); 494 } 495 mrTokenRenewers.put(servicePrincipal, renewer); 496 } 497 } 498 else { 499 renewer = MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer 500 } 501 return renewer; 502 } 503 504 public void addFileToClassPath(String user, final Path file, final Configuration conf) 505 throws IOException { 506 ParamChecker.notEmpty(user, "user"); 507 try { 508 UserGroupInformation ugi = getUGI(user); 509 ugi.doAs(new PrivilegedExceptionAction<Void>() { 510 @Override 511 public Void run() throws Exception { 512 JobUtils.addFileToClassPath(file, conf, null); 513 return null; 514 } 515 }); 516 517 } 518 catch (InterruptedException ex) { 519 throw new IOException(ex); 520 } 521 522 } 523 524 /** 525 * checks configuration parameter if filesystem scheme is among the list of supported ones 526 * this makes system robust to filesystems other than HDFS also 527 */ 528 529 public void checkSupportedFilesystem(URI uri) throws HadoopAccessorException { 530 if (allSchemesSupported) 531 return; 532 String uriScheme = uri.getScheme(); 533 if (uriScheme != null) { // skip the check if no scheme is given 534 if(!supportedSchemes.isEmpty()) { 535 XLog.getLog(this.getClass()).debug("Checking if filesystem " + uriScheme + " is supported"); 536 if (!supportedSchemes.contains(uriScheme)) { 537 throw new HadoopAccessorException(ErrorCode.E0904, uriScheme, uri.toString()); 538 } 539 } 540 } 541 } 542 543 public Set<String> getSupportedSchemes() { 544 return supportedSchemes; 545 } 546 547}