001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import org.apache.hadoop.io.Text; 022import org.apache.hadoop.mapred.JobClient; 023import org.apache.hadoop.mapred.JobConf; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 028import org.apache.hadoop.net.NetUtils; 029import org.apache.hadoop.security.SecurityUtil; 030import org.apache.hadoop.security.UserGroupInformation; 031import org.apache.hadoop.security.token.Token; 032import org.apache.oozie.ErrorCode; 033import org.apache.oozie.action.hadoop.JavaActionExecutor; 034import org.apache.oozie.util.IOUtils; 035import org.apache.oozie.util.ParamChecker; 036import org.apache.oozie.util.XConfiguration; 037import org.apache.oozie.util.XLog; 038import org.apache.oozie.util.JobUtils; 039import org.apache.oozie.workflow.lite.LiteWorkflowAppParser; 040 041import java.io.File; 042import java.io.FileInputStream; 043import java.io.FilenameFilter; 044import java.io.IOException; 045import java.io.InputStream; 046import java.lang.reflect.InvocationTargetException; 047import java.lang.reflect.Method; 048import java.net.InetAddress; 049import java.net.URI; 050import java.net.URISyntaxException; 051import java.security.PrivilegedExceptionAction; 052import java.util.Arrays; 053import java.util.Comparator; 054import java.util.HashMap; 055import java.util.Map; 056import java.util.Properties; 057import java.util.Set; 058import java.util.HashSet; 059import java.util.concurrent.ConcurrentHashMap; 060 061 062/** 063 * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p> The 064 * default accessor used is the base accessor which just injects the UGI into the configuration instance used to 065 * create/obtain JobClient and FileSystem instances. 066 */ 067public class HadoopAccessorService implements Service { 068 069 private static XLog LOG = XLog.getLog(HadoopAccessorService.class); 070 071 public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService."; 072 public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist"; 073 public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist"; 074 public static final String HADOOP_CONFS = CONF_PREFIX + "hadoop.configurations"; 075 public static final String ACTION_CONFS = CONF_PREFIX + "action.configurations"; 076 public static final String ACTION_CONFS_LOAD_DEFAULT_RESOURCES = ACTION_CONFS + ".load.default.resources"; 077 public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled"; 078 public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file"; 079 public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal"; 080 public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token"); 081 082 protected static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created"; 083 /** The Kerberos principal for the job tracker.*/ 084 protected static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal"; 085 /** The Kerberos principal for the resource manager.*/ 086 protected static final String RM_PRINCIPAL = "yarn.resourcemanager.principal"; 087 protected static final String HADOOP_JOB_TRACKER = "mapred.job.tracker"; 088 protected static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address"; 089 protected static final String HADOOP_YARN_RM = "yarn.resourcemanager.address"; 090 private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>(); 091 092 private static Configuration cachedConf; 093 094 private static final String DEFAULT_ACTIONNAME = "default"; 095 096 private Set<String> jobTrackerWhitelist = new HashSet<String>(); 097 private Set<String> nameNodeWhitelist = new HashSet<String>(); 098 private Map<String, Configuration> hadoopConfigs = new HashMap<String, Configuration>(); 099 private Map<String, File> actionConfigDirs = new HashMap<String, File>(); 100 private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>(); 101 102 private UserGroupInformationService ugiService; 103 104 /** 105 * Supported filesystem schemes for namespace federation 106 */ 107 public static final String SUPPORTED_FILESYSTEMS = CONF_PREFIX + "supported.filesystems"; 108 private Set<String> supportedSchemes; 109 private boolean allSchemesSupported; 110 111 public void init(Services services) throws ServiceException { 112 this.ugiService = services.get(UserGroupInformationService.class); 113 init(services.getConf()); 114 } 115 116 //for testing purposes, see XFsTestCase 117 public void init(Configuration conf) throws ServiceException { 118 for (String name : ConfigurationService.getStrings(conf, JOB_TRACKER_WHITELIST)) { 119 String tmp = name.toLowerCase().trim(); 120 if (tmp.length() == 0) { 121 continue; 122 } 123 jobTrackerWhitelist.add(tmp); 124 } 125 LOG.info( 126 "JOB_TRACKER_WHITELIST :" + jobTrackerWhitelist.toString() 127 + ", Total entries :" + jobTrackerWhitelist.size()); 128 for (String name : ConfigurationService.getStrings(conf, NAME_NODE_WHITELIST)) { 129 String tmp = name.toLowerCase().trim(); 130 if (tmp.length() == 0) { 131 continue; 132 } 133 nameNodeWhitelist.add(tmp); 134 } 135 LOG.info( 136 "NAME_NODE_WHITELIST :" + nameNodeWhitelist.toString() 137 + ", Total entries :" + nameNodeWhitelist.size()); 138 139 boolean kerberosAuthOn = ConfigurationService.getBoolean(conf, KERBEROS_AUTH_ENABLED); 140 LOG.info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled"); 141 if (kerberosAuthOn) { 142 kerberosInit(conf); 143 } 144 else { 145 Configuration ugiConf = new Configuration(); 146 ugiConf.set("hadoop.security.authentication", "simple"); 147 UserGroupInformation.setConfiguration(ugiConf); 148 } 149 150 if (ugiService == null) { //for testing purposes, see XFsTestCase 151 this.ugiService = new UserGroupInformationService(); 152 } 153 154 loadHadoopConfigs(conf); 155 preLoadActionConfigs(conf); 156 157 supportedSchemes = new HashSet<String>(); 158 String[] schemesFromConf = ConfigurationService.getStrings(conf, SUPPORTED_FILESYSTEMS); 159 if(schemesFromConf != null) { 160 for (String scheme: schemesFromConf) { 161 scheme = scheme.trim(); 162 // If user gives "*", supportedSchemes will be empty, so that checking is not done i.e. all schemes allowed 163 if(scheme.equals("*")) { 164 if(schemesFromConf.length > 1) { 165 throw new ServiceException(ErrorCode.E0100, getClass().getName(), 166 SUPPORTED_FILESYSTEMS + " should contain either only wildcard or explicit list, not both"); 167 } 168 allSchemesSupported = true; 169 } 170 supportedSchemes.add(scheme); 171 } 172 } 173 174 setConfigForHadoopSecurityUtil(conf); 175 } 176 177 private void setConfigForHadoopSecurityUtil(Configuration conf) { 178 // Prior to HADOOP-12954 (2.9.0+), Hadoop sets hadoop.security.token.service.use_ip on startup in a static block with no 179 // way for Oozie to change it because Oozie doesn't load *-site.xml files on the classpath. HADOOP-12954 added a way to 180 // set this property via a setConfiguration method. Ideally, this would be part of JobClient so Oozie wouldn't have to 181 // worry about it and we could have different values for different clusters, but we can't; so we have to use the same value 182 // for every cluster Oozie is configured for. To that end, we'll use the default NN's configs. If that's not defined, 183 // we'll use the wildcard's configs. And if that's not defined, we'll use an arbitrary cluster's configs. In any case, 184 // if the version of Hadoop we're using doesn't include HADOOP-12954, we'll do nothing (there's no workaround), and 185 // hadoop.security.token.service.use_ip will have the default value. 186 String nameNode = conf.get(LiteWorkflowAppParser.DEFAULT_NAME_NODE); 187 if (nameNode != null) { 188 nameNode = nameNode.trim(); 189 if (nameNode.isEmpty()) { 190 nameNode = null; 191 } 192 } 193 if (nameNode == null && hadoopConfigs.containsKey("*")) { 194 nameNode = "*"; 195 } 196 if (nameNode == null) { 197 for (String nn : hadoopConfigs.keySet()) { 198 nn = nn.trim(); 199 if (!nn.isEmpty()) { 200 nameNode = nn; 201 break; 202 } 203 } 204 } 205 if (nameNode != null) { 206 Configuration hConf = getConfiguration(nameNode); 207 try { 208 Method setConfigurationMethod = SecurityUtil.class.getMethod("setConfiguration", Configuration.class); 209 setConfigurationMethod.invoke(null, hConf); 210 LOG.debug("Setting Hadoop SecurityUtil Configuration to that of {0}", nameNode); 211 } catch (NoSuchMethodException e) { 212 LOG.debug("Not setting Hadoop SecurityUtil Configuration because this version of Hadoop doesn't support it"); 213 } catch (Exception e) { 214 LOG.error("An Exception occurred while trying to call setConfiguration on {0} via Reflection. It won't be called.", 215 SecurityUtil.class.getName(), e); 216 } 217 } 218 } 219 220 private void kerberosInit(Configuration serviceConf) throws ServiceException { 221 try { 222 String keytabFile = ConfigurationService.get(serviceConf, KERBEROS_KEYTAB).trim(); 223 if (keytabFile.length() == 0) { 224 throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB); 225 } 226 String principal = SecurityUtil.getServerPrincipal( 227 serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST"), 228 InetAddress.getLocalHost().getCanonicalHostName()); 229 if (principal.length() == 0) { 230 throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL); 231 } 232 Configuration conf = new Configuration(); 233 conf.set("hadoop.security.authentication", "kerberos"); 234 UserGroupInformation.setConfiguration(conf); 235 UserGroupInformation.loginUserFromKeytab(principal, keytabFile); 236 LOG.info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]", 237 keytabFile, principal); 238 } 239 catch (ServiceException ex) { 240 throw ex; 241 } 242 catch (Exception ex) { 243 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); 244 } 245 } 246 247 private static final String[] HADOOP_CONF_FILES = 248 {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hadoop-site.xml", "ssl-client.xml"}; 249 250 251 private Configuration loadHadoopConf(File dir) throws IOException { 252 Configuration hadoopConf = new XConfiguration(); 253 for (String file : HADOOP_CONF_FILES) { 254 File f = new File(dir, file); 255 if (f.exists()) { 256 InputStream is = new FileInputStream(f); 257 Configuration conf = new XConfiguration(is, false); 258 is.close(); 259 XConfiguration.copy(conf, hadoopConf); 260 } 261 } 262 return hadoopConf; 263 } 264 265 private Map<String, File> parseConfigDirs(String[] confDefs, String type) throws ServiceException, IOException { 266 Map<String, File> map = new HashMap<String, File>(); 267 File configDir = new File(ConfigurationService.getConfigurationDirectory()); 268 for (String confDef : confDefs) { 269 if (confDef.trim().length() > 0) { 270 String[] parts = confDef.split("="); 271 if (parts.length == 2) { 272 String hostPort = parts[0]; 273 String confDir = parts[1]; 274 File dir = new File(confDir); 275 if (!dir.isAbsolute()) { 276 dir = new File(configDir, confDir); 277 } 278 if (dir.exists()) { 279 map.put(hostPort.toLowerCase(), dir); 280 } 281 else { 282 throw new ServiceException(ErrorCode.E0100, getClass().getName(), 283 "could not find " + type + " configuration directory: " + 284 dir.getAbsolutePath()); 285 } 286 } 287 else { 288 throw new ServiceException(ErrorCode.E0100, getClass().getName(), 289 "Incorrect " + type + " configuration definition: " + confDef); 290 } 291 } 292 } 293 return map; 294 } 295 296 private void loadHadoopConfigs(Configuration serviceConf) throws ServiceException { 297 try { 298 Map<String, File> map = parseConfigDirs(ConfigurationService.getStrings(serviceConf, HADOOP_CONFS), 299 "hadoop"); 300 for (Map.Entry<String, File> entry : map.entrySet()) { 301 hadoopConfigs.put(entry.getKey(), loadHadoopConf(entry.getValue())); 302 } 303 } 304 catch (ServiceException ex) { 305 throw ex; 306 } 307 catch (Exception ex) { 308 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); 309 } 310 } 311 312 private void preLoadActionConfigs(Configuration serviceConf) throws ServiceException { 313 try { 314 actionConfigDirs = parseConfigDirs(ConfigurationService.getStrings(serviceConf, ACTION_CONFS), "action"); 315 for (String hostport : actionConfigDirs.keySet()) { 316 actionConfigs.put(hostport, new ConcurrentHashMap<String, XConfiguration>()); 317 } 318 } 319 catch (ServiceException ex) { 320 throw ex; 321 } 322 catch (Exception ex) { 323 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); 324 } 325 } 326 327 public void destroy() { 328 } 329 330 public Class<? extends Service> getInterface() { 331 return HadoopAccessorService.class; 332 } 333 334 private UserGroupInformation getUGI(String user) throws IOException { 335 return ugiService.getProxyUser(user); 336 } 337 338 /** 339 * Creates a JobConf using the site configuration for the specified hostname:port. 340 * <p> 341 * If the specified hostname:port is not defined it falls back to the '*' site 342 * configuration if available. If the '*' site configuration is not available, 343 * the JobConf has all Hadoop defaults. 344 * 345 * @param hostPort hostname:port to lookup Hadoop site configuration. 346 * @return a JobConf with the corresponding site configuration for hostPort. 347 */ 348 public JobConf createJobConf(String hostPort) { 349 JobConf jobConf = new JobConf(getCachedConf()); 350 XConfiguration.copy(getConfiguration(hostPort), jobConf); 351 jobConf.setBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, true); 352 return jobConf; 353 } 354 355 public Configuration getCachedConf() { 356 if (cachedConf == null) { 357 loadCachedConf(); 358 } 359 return cachedConf; 360 } 361 362 private void loadCachedConf() { 363 cachedConf = new Configuration(); 364 //for lazy loading 365 cachedConf.size(); 366 } 367 368 private XConfiguration loadActionConf(String hostPort, String action) { 369 File dir = actionConfigDirs.get(hostPort); 370 XConfiguration actionConf = new XConfiguration(); 371 if (dir != null) { 372 // See if a dir with the action name exists. If so, load all the supported conf files in the dir 373 File actionConfDir = new File(dir, action); 374 375 if (actionConfDir.exists() && actionConfDir.isDirectory()) { 376 LOG.info("Processing configuration files under [{0}]" 377 + " for action [{1}] and hostPort [{2}]", 378 actionConfDir.getAbsolutePath(), action, hostPort); 379 updateActionConfigWithDir(actionConf, actionConfDir); 380 } 381 } 382 383 // Now check for <action.xml> This way <action.xml> has priority over <action-dir>/*.xml 384 File actionConfFile = new File(dir, action + ".xml"); 385 LOG.info("Processing configuration file [{0}] for action [{1}] and hostPort [{2}]", 386 actionConfFile.getAbsolutePath(), action, hostPort); 387 if (actionConfFile.exists()) { 388 updateActionConfigWithFile(actionConf, actionConfFile); 389 } 390 391 return actionConf; 392 } 393 394 private void updateActionConfigWithFile(Configuration actionConf, File actionConfFile) { 395 try { 396 Configuration conf = readActionConfFile(actionConfFile); 397 XConfiguration.copy(conf, actionConf); 398 } catch (IOException e) { 399 LOG.warn("Could not read file [{0}].", actionConfFile.getAbsolutePath()); 400 } 401 } 402 403 private void updateActionConfigWithDir(Configuration actionConf, File actionConfDir) { 404 File[] actionConfFiles = actionConfDir.listFiles(new FilenameFilter() { 405 @Override 406 public boolean accept(File dir, String name) { 407 return ActionConfFileType.isSupportedFileType(name); 408 }}); 409 Arrays.sort(actionConfFiles, new Comparator<File>() { 410 @Override 411 public int compare(File o1, File o2) { 412 return o1.getName().compareTo(o2.getName()); 413 } 414 }); 415 for (File f : actionConfFiles) { 416 if (f.isFile() && f.canRead()) { 417 updateActionConfigWithFile(actionConf, f); 418 } 419 } 420 421 } 422 423 private Configuration readActionConfFile(File file) throws IOException { 424 InputStream fis = null; 425 try { 426 fis = new FileInputStream(file); 427 ActionConfFileType fileTyple = ActionConfFileType.getFileType(file.getName()); 428 switch (fileTyple) { 429 case XML: 430 return new XConfiguration(fis); 431 case PROPERTIES: 432 Properties properties = new Properties(); 433 properties.load(fis); 434 return new XConfiguration(properties); 435 default: 436 throw new UnsupportedOperationException( 437 String.format("Unable to parse action conf file of type %s", fileTyple)); 438 } 439 } finally { 440 IOUtils.closeSafely(fis); 441 } 442 } 443 444 /** 445 * Returns a Configuration containing any defaults for an action for a particular cluster. 446 * <p> 447 * This configuration is used as default for the action configuration and enables cluster 448 * level default values per action. 449 * 450 * @param hostPort hostname"port to lookup the action default confiugration. 451 * @param action action name. 452 * @return the default configuration for the action for the specified cluster. 453 */ 454 public XConfiguration createActionDefaultConf(String hostPort, String action) { 455 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null; 456 Map<String, XConfiguration> hostPortActionConfigs = actionConfigs.get(hostPort); 457 if (hostPortActionConfigs == null) { 458 hostPortActionConfigs = actionConfigs.get("*"); 459 hostPort = "*"; 460 } 461 XConfiguration actionConf = hostPortActionConfigs.get(action); 462 if (actionConf == null) { 463 // doing lazy loading as we don't know upfront all actions, no need to synchronize 464 // as it is a read operation an in case of a race condition loading and inserting 465 // into the Map is idempotent and the action-config Map is a ConcurrentHashMap 466 467 // We first load a action of type default 468 // This allows for global configuration for all actions - for example 469 // all launchers in one queue and actions in another queue 470 // Are some configuration that applies to multiple actions - like 471 // config libraries path etc 472 actionConf = loadActionConf(hostPort, DEFAULT_ACTIONNAME); 473 474 // Action specific default configuration will override the default action config 475 476 XConfiguration.copy(loadActionConf(hostPort, action), actionConf); 477 hostPortActionConfigs.put(action, actionConf); 478 } 479 return new XConfiguration(actionConf.toProperties()); 480 } 481 482 private Configuration getConfiguration(String hostPort) { 483 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null; 484 Configuration conf = hadoopConfigs.get(hostPort); 485 if (conf == null) { 486 conf = hadoopConfigs.get("*"); 487 if (conf == null) { 488 conf = new XConfiguration(); 489 } 490 } 491 return conf; 492 } 493 494 /** 495 * Return a JobClient created with the provided user/group. 496 * 497 * 498 * @param conf JobConf with all necessary information to create the 499 * JobClient. 500 * @return JobClient created with the provided user/group. 501 * @throws HadoopAccessorException if the client could not be created. 502 */ 503 public JobClient createJobClient(String user, final JobConf conf) throws HadoopAccessorException { 504 ParamChecker.notEmpty(user, "user"); 505 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { 506 throw new HadoopAccessorException(ErrorCode.E0903); 507 } 508 String jobTracker = conf.get(JavaActionExecutor.HADOOP_JOB_TRACKER); 509 validateJobTracker(jobTracker); 510 try { 511 UserGroupInformation ugi = getUGI(user); 512 JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() { 513 public JobClient run() throws Exception { 514 return new JobClient(conf); 515 } 516 }); 517 return jobClient; 518 } 519 catch (InterruptedException ex) { 520 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); 521 } 522 catch (IOException ex) { 523 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); 524 } 525 } 526 527 /** 528 * Get the RM delegation token using jobClient and add it to conf 529 * 530 * @param jobClient 531 * @param conf 532 * @throws HadoopAccessorException 533 */ 534 public void addRMDelegationToken(JobClient jobClient, JobConf conf) throws HadoopAccessorException { 535 Token<DelegationTokenIdentifier> mrdt; 536 try { 537 mrdt = jobClient.getDelegationToken(getMRDelegationTokenRenewer(conf)); 538 } 539 catch (IOException e) { 540 throw new HadoopAccessorException(ErrorCode.E0902, e.getMessage(), e); 541 } 542 catch (InterruptedException e) { 543 throw new HadoopAccessorException(ErrorCode.E0902, e.getMessage(), e); 544 } 545 conf.getCredentials().addToken(MR_TOKEN_ALIAS, mrdt); 546 } 547 548 /** 549 * Return a FileSystem created with the provided user for the specified URI. 550 * 551 * 552 * @param uri file system URI. 553 * @param conf Configuration with all necessary information to create the FileSystem. 554 * @return FileSystem created with the provided user/group. 555 * @throws HadoopAccessorException if the filesystem could not be created. 556 */ 557 public FileSystem createFileSystem(String user, final URI uri, final Configuration conf) 558 throws HadoopAccessorException { 559 ParamChecker.notEmpty(user, "user"); 560 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { 561 throw new HadoopAccessorException(ErrorCode.E0903); 562 } 563 564 checkSupportedFilesystem(uri); 565 566 String nameNode = uri.getAuthority(); 567 if (nameNode == null) { 568 nameNode = conf.get("fs.default.name"); 569 if (nameNode != null) { 570 try { 571 nameNode = new URI(nameNode).getAuthority(); 572 } 573 catch (URISyntaxException ex) { 574 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); 575 } 576 } 577 } 578 validateNameNode(nameNode); 579 580 try { 581 UserGroupInformation ugi = getUGI(user); 582 return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { 583 public FileSystem run() throws Exception { 584 return FileSystem.get(uri, conf); 585 } 586 }); 587 } 588 catch (InterruptedException ex) { 589 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); 590 } 591 catch (IOException ex) { 592 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); 593 } 594 } 595 596 /** 597 * Validate Job tracker 598 * @param jobTrackerUri 599 * @throws HadoopAccessorException 600 */ 601 protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException { 602 validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900); 603 } 604 605 /** 606 * Validate Namenode list 607 * @param nameNodeUri 608 * @throws HadoopAccessorException 609 */ 610 protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException { 611 validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901); 612 } 613 614 private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException { 615 if (uri != null) { 616 uri = uri.toLowerCase().trim(); 617 if (whitelist.size() > 0 && !whitelist.contains(uri)) { 618 throw new HadoopAccessorException(error, uri, whitelist); 619 } 620 } 621 } 622 623 public Text getMRDelegationTokenRenewer(JobConf jobConf) throws IOException { 624 if (UserGroupInformation.isSecurityEnabled()) { // secure cluster 625 return getMRTokenRenewerInternal(jobConf); 626 } 627 else { 628 return MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer 629 } 630 } 631 632 // Package private for unit test purposes 633 Text getMRTokenRenewerInternal(JobConf jobConf) throws IOException { 634 // Getting renewer correctly for JT principal also though JT in hadoop 1.x does not have 635 // support for renewing/cancelling tokens 636 String servicePrincipal = jobConf.get(RM_PRINCIPAL, jobConf.get(JT_PRINCIPAL)); 637 Text renewer; 638 if (servicePrincipal != null) { // secure cluster 639 renewer = mrTokenRenewers.get(servicePrincipal); 640 if (renewer == null) { 641 // Mimic org.apache.hadoop.mapred.Master.getMasterPrincipal() 642 String target = jobConf.get(HADOOP_YARN_RM, jobConf.get(HADOOP_JOB_TRACKER_2)); 643 if (target == null) { 644 target = jobConf.get(HADOOP_JOB_TRACKER); 645 } 646 try { 647 String addr = NetUtils.createSocketAddr(target).getHostName(); 648 renewer = new Text(SecurityUtil.getServerPrincipal(servicePrincipal, addr)); 649 LOG.info("Delegation Token Renewer details: Principal=" + servicePrincipal + ",Target=" + target 650 + ",Renewer=" + renewer); 651 } 652 catch (IllegalArgumentException iae) { 653 renewer = new Text(servicePrincipal.split("[/@]")[0]); 654 LOG.info("Delegation Token Renewer for " + servicePrincipal + " is " + renewer); 655 } 656 mrTokenRenewers.put(servicePrincipal, renewer); 657 } 658 } 659 else { 660 renewer = MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer 661 } 662 return renewer; 663 } 664 665 public void addFileToClassPath(String user, final Path file, final Configuration conf) 666 throws IOException { 667 ParamChecker.notEmpty(user, "user"); 668 try { 669 UserGroupInformation ugi = getUGI(user); 670 ugi.doAs(new PrivilegedExceptionAction<Void>() { 671 @Override 672 public Void run() throws Exception { 673 JobUtils.addFileToClassPath(file, conf, null); 674 return null; 675 } 676 }); 677 678 } 679 catch (InterruptedException ex) { 680 throw new IOException(ex); 681 } 682 683 } 684 685 /** 686 * checks configuration parameter if filesystem scheme is among the list of supported ones 687 * this makes system robust to filesystems other than HDFS also 688 */ 689 690 public void checkSupportedFilesystem(URI uri) throws HadoopAccessorException { 691 if (allSchemesSupported) 692 return; 693 String uriScheme = uri.getScheme(); 694 if (uriScheme != null) { // skip the check if no scheme is given 695 if(!supportedSchemes.isEmpty()) { 696 LOG.debug("Checking if filesystem " + uriScheme + " is supported"); 697 if (!supportedSchemes.contains(uriScheme)) { 698 throw new HadoopAccessorException(ErrorCode.E0904, uriScheme, uri.toString()); 699 } 700 } 701 } 702 } 703 704 public Set<String> getSupportedSchemes() { 705 return supportedSchemes; 706 } 707 708}