001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import org.apache.hadoop.io.Text; 021 import org.apache.hadoop.mapred.JobClient; 022 import org.apache.hadoop.mapred.JobConf; 023 import org.apache.hadoop.fs.FileSystem; 024 import org.apache.hadoop.fs.Path; 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 027 import org.apache.hadoop.net.NetUtils; 028 import org.apache.hadoop.security.SecurityUtil; 029 import org.apache.hadoop.security.UserGroupInformation; 030 import org.apache.hadoop.filecache.DistributedCache; 031 import org.apache.hadoop.security.token.Token; 032 import org.apache.oozie.ErrorCode; 033 import org.apache.oozie.action.hadoop.JavaActionExecutor; 034 import org.apache.oozie.util.ParamChecker; 035 import org.apache.oozie.util.XConfiguration; 036 import org.apache.oozie.util.XLog; 037 038 import java.io.File; 039 import java.io.FileInputStream; 040 import java.io.IOException; 041 import java.io.InputStream; 042 import java.net.URI; 043 import java.net.URISyntaxException; 044 import java.security.PrivilegedExceptionAction; 045 import java.util.HashMap; 046 import java.util.Map; 047 import java.util.Set; 048 import java.util.HashSet; 049 import java.util.concurrent.ConcurrentHashMap; 050 051 /** 052 * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The 053 * default accessor used is the base accessor which just injects the UGI into the configuration instance used to 054 * create/obtain JobClient and FileSystem instances. 055 */ 056 public class HadoopAccessorService implements Service { 057 058 private static XLog LOG = XLog.getLog(HadoopAccessorService.class); 059 060 public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService."; 061 public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist"; 062 public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist"; 063 public static final String HADOOP_CONFS = CONF_PREFIX + "hadoop.configurations"; 064 public static final String ACTION_CONFS = CONF_PREFIX + "action.configurations"; 065 public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled"; 066 public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file"; 067 public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal"; 068 public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token"); 069 070 protected static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created"; 071 /** The Kerberos principal for the job tracker.*/ 072 protected static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal"; 073 /** The Kerberos principal for the resource manager.*/ 074 protected static final String RM_PRINCIPAL = "yarn.resourcemanager.principal"; 075 protected static final String HADOOP_JOB_TRACKER = "mapred.job.tracker"; 076 protected static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address"; 077 protected static final String HADOOP_YARN_RM = "yarn.resourcemanager.address"; 078 private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>(); 079 080 private Set<String> jobTrackerWhitelist = new HashSet<String>(); 081 private Set<String> nameNodeWhitelist = new HashSet<String>(); 082 private Map<String, Configuration> hadoopConfigs = new HashMap<String, Configuration>(); 083 private Map<String, File> actionConfigDirs = new HashMap<String, File>(); 084 private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>(); 085 086 private UserGroupInformationService ugiService; 087 088 /** 089 * Supported filesystem schemes for namespace federation 090 */ 091 public static final String SUPPORTED_FILESYSTEMS = CONF_PREFIX + "supported.filesystems"; 092 public static final String[] DEFAULT_SUPPORTED_SCHEMES = new String[]{"hdfs","hftp","webhdfs"}; 093 private Set<String> supportedSchemes; 094 private boolean allSchemesSupported; 095 096 public void init(Services services) throws ServiceException { 097 this.ugiService = services.get(UserGroupInformationService.class); 098 init(services.getConf()); 099 } 100 101 //for testing purposes, see XFsTestCase 102 public void init(Configuration conf) throws ServiceException { 103 for (String name : conf.getStringCollection(JOB_TRACKER_WHITELIST)) { 104 String tmp = name.toLowerCase().trim(); 105 if (tmp.length() == 0) { 106 continue; 107 } 108 jobTrackerWhitelist.add(tmp); 109 } 110 XLog.getLog(getClass()).info( 111 "JOB_TRACKER_WHITELIST :" + conf.getStringCollection(JOB_TRACKER_WHITELIST) 112 + ", Total entries :" + jobTrackerWhitelist.size()); 113 for (String name : conf.getStringCollection(NAME_NODE_WHITELIST)) { 114 String tmp = name.toLowerCase().trim(); 115 if (tmp.length() == 0) { 116 continue; 117 } 118 nameNodeWhitelist.add(tmp); 119 } 120 XLog.getLog(getClass()).info( 121 "NAME_NODE_WHITELIST :" + conf.getStringCollection(NAME_NODE_WHITELIST) 122 + ", Total entries :" + nameNodeWhitelist.size()); 123 124 boolean kerberosAuthOn = conf.getBoolean(KERBEROS_AUTH_ENABLED, true); 125 XLog.getLog(getClass()).info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled"); 126 if (kerberosAuthOn) { 127 kerberosInit(conf); 128 } 129 else { 130 Configuration ugiConf = new Configuration(); 131 ugiConf.set("hadoop.security.authentication", "simple"); 132 UserGroupInformation.setConfiguration(ugiConf); 133 } 134 135 if (ugiService == null) { //for testing purposes, see XFsTestCase 136 this.ugiService = new UserGroupInformationService(); 137 } 138 139 loadHadoopConfigs(conf); 140 preLoadActionConfigs(conf); 141 142 supportedSchemes = new HashSet<String>(); 143 String[] schemesFromConf = conf.getStrings(SUPPORTED_FILESYSTEMS, DEFAULT_SUPPORTED_SCHEMES); 144 if(schemesFromConf != null) { 145 for (String scheme: schemesFromConf) { 146 scheme = scheme.trim(); 147 // If user gives "*", supportedSchemes will be empty, so that checking is not done i.e. all schemes allowed 148 if(scheme.equals("*")) { 149 if(schemesFromConf.length > 1) { 150 throw new ServiceException(ErrorCode.E0100, getClass().getName(), 151 SUPPORTED_FILESYSTEMS + " should contain either only wildcard or explicit list, not both"); 152 } 153 allSchemesSupported = true; 154 } 155 supportedSchemes.add(scheme); 156 } 157 } 158 } 159 160 private void kerberosInit(Configuration serviceConf) throws ServiceException { 161 try { 162 String keytabFile = serviceConf.get(KERBEROS_KEYTAB, 163 System.getProperty("user.home") + "/oozie.keytab").trim(); 164 if (keytabFile.length() == 0) { 165 throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB); 166 } 167 String principal = serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST"); 168 if (principal.length() == 0) { 169 throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL); 170 } 171 Configuration conf = new Configuration(); 172 conf.set("hadoop.security.authentication", "kerberos"); 173 UserGroupInformation.setConfiguration(conf); 174 UserGroupInformation.loginUserFromKeytab(principal, keytabFile); 175 XLog.getLog(getClass()).info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]", 176 keytabFile, principal); 177 } 178 catch (ServiceException ex) { 179 throw ex; 180 } 181 catch (Exception ex) { 182 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); 183 } 184 } 185 186 private static final String[] HADOOP_CONF_FILES = 187 {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hadoop-site.xml"}; 188 189 190 private Configuration loadHadoopConf(File dir) throws IOException { 191 Configuration hadoopConf = new XConfiguration(); 192 for (String file : HADOOP_CONF_FILES) { 193 File f = new File(dir, file); 194 if (f.exists()) { 195 InputStream is = new FileInputStream(f); 196 Configuration conf = new XConfiguration(is); 197 is.close(); 198 XConfiguration.copy(conf, hadoopConf); 199 } 200 } 201 return hadoopConf; 202 } 203 204 private Map<String, File> parseConfigDirs(String[] confDefs, String type) throws ServiceException, IOException { 205 Map<String, File> map = new HashMap<String, File>(); 206 File configDir = new File(ConfigurationService.getConfigurationDirectory()); 207 for (String confDef : confDefs) { 208 if (confDef.trim().length() > 0) { 209 String[] parts = confDef.split("="); 210 if (parts.length == 2) { 211 String hostPort = parts[0]; 212 String confDir = parts[1]; 213 File dir = new File(confDir); 214 if (!dir.isAbsolute()) { 215 dir = new File(configDir, confDir); 216 } 217 if (dir.exists()) { 218 map.put(hostPort.toLowerCase(), dir); 219 } 220 else { 221 throw new ServiceException(ErrorCode.E0100, getClass().getName(), 222 "could not find " + type + " configuration directory: " + 223 dir.getAbsolutePath()); 224 } 225 } 226 else { 227 throw new ServiceException(ErrorCode.E0100, getClass().getName(), 228 "Incorrect " + type + " configuration definition: " + confDef); 229 } 230 } 231 } 232 return map; 233 } 234 235 private void loadHadoopConfigs(Configuration serviceConf) throws ServiceException { 236 try { 237 Map<String, File> map = parseConfigDirs(serviceConf.getStrings(HADOOP_CONFS, "*=hadoop-conf"), "hadoop"); 238 for (Map.Entry<String, File> entry : map.entrySet()) { 239 hadoopConfigs.put(entry.getKey(), loadHadoopConf(entry.getValue())); 240 } 241 } 242 catch (ServiceException ex) { 243 throw ex; 244 } 245 catch (Exception ex) { 246 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); 247 } 248 } 249 250 private void preLoadActionConfigs(Configuration serviceConf) throws ServiceException { 251 try { 252 actionConfigDirs = parseConfigDirs(serviceConf.getStrings(ACTION_CONFS, "*=hadoop-conf"), "action"); 253 for (String hostport : actionConfigDirs.keySet()) { 254 actionConfigs.put(hostport, new ConcurrentHashMap<String, XConfiguration>()); 255 } 256 } 257 catch (ServiceException ex) { 258 throw ex; 259 } 260 catch (Exception ex) { 261 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); 262 } 263 } 264 265 public void destroy() { 266 } 267 268 public Class<? extends Service> getInterface() { 269 return HadoopAccessorService.class; 270 } 271 272 private UserGroupInformation getUGI(String user) throws IOException { 273 return ugiService.getProxyUser(user); 274 } 275 276 /** 277 * Creates a JobConf using the site configuration for the specified hostname:port. 278 * <p/> 279 * If the specified hostname:port is not defined it falls back to the '*' site 280 * configuration if available. If the '*' site configuration is not available, 281 * the JobConf has all Hadoop defaults. 282 * 283 * @param hostPort hostname:port to lookup Hadoop site configuration. 284 * @return a JobConf with the corresponding site configuration for hostPort. 285 */ 286 public JobConf createJobConf(String hostPort) { 287 JobConf jobConf = new JobConf(); 288 XConfiguration.copy(getConfiguration(hostPort), jobConf); 289 jobConf.setBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, true); 290 return jobConf; 291 } 292 293 private XConfiguration loadActionConf(String hostPort, String action) { 294 File dir = actionConfigDirs.get(hostPort); 295 XConfiguration actionConf = new XConfiguration(); 296 if (dir != null) { 297 File actionConfFile = new File(dir, action + ".xml"); 298 if (actionConfFile.exists()) { 299 try { 300 actionConf = new XConfiguration(new FileInputStream(actionConfFile)); 301 } 302 catch (IOException ex) { 303 XLog.getLog(getClass()).warn("Could not read file [{0}] for action [{1}] configuration for hostPort [{2}]", 304 actionConfFile.getAbsolutePath(), action, hostPort); 305 } 306 } 307 } 308 return actionConf; 309 } 310 311 /** 312 * Returns a Configuration containing any defaults for an action for a particular cluster. 313 * <p/> 314 * This configuration is used as default for the action configuration and enables cluster 315 * level default values per action. 316 * 317 * @param hostPort hostname"port to lookup the action default confiugration. 318 * @param action action name. 319 * @return the default configuration for the action for the specified cluster. 320 */ 321 public XConfiguration createActionDefaultConf(String hostPort, String action) { 322 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null; 323 Map<String, XConfiguration> hostPortActionConfigs = actionConfigs.get(hostPort); 324 if (hostPortActionConfigs == null) { 325 hostPortActionConfigs = actionConfigs.get("*"); 326 hostPort = "*"; 327 } 328 XConfiguration actionConf = hostPortActionConfigs.get(action); 329 if (actionConf == null) { 330 // doing lazy loading as we don't know upfront all actions, no need to synchronize 331 // as it is a read operation an in case of a race condition loading and inserting 332 // into the Map is idempotent and the action-config Map is a ConcurrentHashMap 333 actionConf = loadActionConf(hostPort, action); 334 hostPortActionConfigs.put(action, actionConf); 335 } 336 return new XConfiguration(actionConf.toProperties()); 337 } 338 339 private Configuration getConfiguration(String hostPort) { 340 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null; 341 Configuration conf = hadoopConfigs.get(hostPort); 342 if (conf == null) { 343 conf = hadoopConfigs.get("*"); 344 if (conf == null) { 345 conf = new XConfiguration(); 346 } 347 } 348 return conf; 349 } 350 351 /** 352 * Return a JobClient created with the provided user/group. 353 * 354 * 355 * @param conf JobConf with all necessary information to create the 356 * JobClient. 357 * @return JobClient created with the provided user/group. 358 * @throws HadoopAccessorException if the client could not be created. 359 */ 360 public JobClient createJobClient(String user, final JobConf conf) throws HadoopAccessorException { 361 ParamChecker.notEmpty(user, "user"); 362 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { 363 throw new HadoopAccessorException(ErrorCode.E0903); 364 } 365 String jobTracker = conf.get(JavaActionExecutor.HADOOP_JOB_TRACKER); 366 validateJobTracker(jobTracker); 367 try { 368 UserGroupInformation ugi = getUGI(user); 369 JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() { 370 public JobClient run() throws Exception { 371 return new JobClient(conf); 372 } 373 }); 374 Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(getMRDelegationTokenRenewer(conf)); 375 conf.getCredentials().addToken(MR_TOKEN_ALIAS, mrdt); 376 return jobClient; 377 } 378 catch (InterruptedException ex) { 379 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); 380 } 381 catch (IOException ex) { 382 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); 383 } 384 } 385 386 /** 387 * Return a FileSystem created with the provided user for the specified URI. 388 * 389 * 390 * @param uri file system URI. 391 * @param conf Configuration with all necessary information to create the FileSystem. 392 * @return FileSystem created with the provided user/group. 393 * @throws HadoopAccessorException if the filesystem could not be created. 394 */ 395 public FileSystem createFileSystem(String user, final URI uri, final Configuration conf) 396 throws HadoopAccessorException { 397 ParamChecker.notEmpty(user, "user"); 398 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { 399 throw new HadoopAccessorException(ErrorCode.E0903); 400 } 401 402 checkSupportedFilesystem(uri); 403 404 String nameNode = uri.getAuthority(); 405 if (nameNode == null) { 406 nameNode = conf.get("fs.default.name"); 407 if (nameNode != null) { 408 try { 409 nameNode = new URI(nameNode).getAuthority(); 410 } 411 catch (URISyntaxException ex) { 412 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); 413 } 414 } 415 } 416 validateNameNode(nameNode); 417 418 try { 419 UserGroupInformation ugi = getUGI(user); 420 return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { 421 public FileSystem run() throws Exception { 422 return FileSystem.get(uri, conf); 423 } 424 }); 425 } 426 catch (InterruptedException ex) { 427 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); 428 } 429 catch (IOException ex) { 430 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); 431 } 432 } 433 434 /** 435 * Validate Job tracker 436 * @param jobTrackerUri 437 * @throws HadoopAccessorException 438 */ 439 protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException { 440 validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900); 441 } 442 443 /** 444 * Validate Namenode list 445 * @param nameNodeUri 446 * @throws HadoopAccessorException 447 */ 448 protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException { 449 validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901); 450 } 451 452 private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException { 453 if (uri != null) { 454 uri = uri.toLowerCase().trim(); 455 if (whitelist.size() > 0 && !whitelist.contains(uri)) { 456 throw new HadoopAccessorException(error, uri); 457 } 458 } 459 } 460 461 public Text getMRDelegationTokenRenewer(JobConf jobConf) throws IOException { 462 if (UserGroupInformation.isSecurityEnabled()) { // secure cluster 463 return getMRTokenRenewerInternal(jobConf); 464 } 465 else { 466 return MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer 467 } 468 } 469 470 // Package private for unit test purposes 471 Text getMRTokenRenewerInternal(JobConf jobConf) throws IOException { 472 // Getting renewer correctly for JT principal also though JT in hadoop 1.x does not have 473 // support for renewing/cancelling tokens 474 String servicePrincipal = jobConf.get(RM_PRINCIPAL, jobConf.get(JT_PRINCIPAL)); 475 Text renewer; 476 if (servicePrincipal != null) { // secure cluster 477 renewer = mrTokenRenewers.get(servicePrincipal); 478 if (renewer == null) { 479 // Mimic org.apache.hadoop.mapred.Master.getMasterPrincipal() 480 String target = jobConf.get(HADOOP_YARN_RM, jobConf.get(HADOOP_JOB_TRACKER_2)); 481 if (target == null) { 482 target = jobConf.get(HADOOP_JOB_TRACKER); 483 } 484 String addr = NetUtils.createSocketAddr(target).getHostName(); 485 renewer = new Text(SecurityUtil.getServerPrincipal(servicePrincipal, addr)); 486 LOG.info("Delegation Token Renewer details: Principal=" + servicePrincipal + ",Target=" + target 487 + ",Renewer=" + renewer); 488 mrTokenRenewers.put(servicePrincipal, renewer); 489 } 490 } 491 else { 492 renewer = MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer 493 } 494 return renewer; 495 } 496 497 public void addFileToClassPath(String user, final Path file, final Configuration conf) 498 throws IOException { 499 ParamChecker.notEmpty(user, "user"); 500 try { 501 UserGroupInformation ugi = getUGI(user); 502 ugi.doAs(new PrivilegedExceptionAction<Void>() { 503 public Void run() throws Exception { 504 Configuration defaultConf = new Configuration(); 505 XConfiguration.copy(conf, defaultConf); 506 //Doing this NOP add first to have the FS created and cached 507 DistributedCache.addFileToClassPath(file, defaultConf); 508 509 DistributedCache.addFileToClassPath(file, conf); 510 return null; 511 } 512 }); 513 514 } 515 catch (InterruptedException ex) { 516 throw new IOException(ex); 517 } 518 519 } 520 521 /** 522 * checks configuration parameter if filesystem scheme is among the list of supported ones 523 * this makes system robust to filesystems other than HDFS also 524 */ 525 526 public void checkSupportedFilesystem(URI uri) throws HadoopAccessorException { 527 if (allSchemesSupported) 528 return; 529 String uriScheme = uri.getScheme(); 530 if (uriScheme != null) { // skip the check if no scheme is given 531 if(!supportedSchemes.isEmpty()) { 532 XLog.getLog(this.getClass()).debug("Checking if filesystem " + uriScheme + " is supported"); 533 if (!supportedSchemes.contains(uriScheme)) { 534 throw new HadoopAccessorException(ErrorCode.E0904, uriScheme, uri.toString()); 535 } 536 } 537 } 538 } 539 540 public Set<String> getSupportedSchemes() { 541 return supportedSchemes; 542 } 543 544 }