001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import org.apache.hadoop.io.Text; 021 import org.apache.hadoop.mapred.JobClient; 022 import org.apache.hadoop.mapred.JobConf; 023 import org.apache.hadoop.fs.FileSystem; 024 import org.apache.hadoop.fs.Path; 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 027 import org.apache.hadoop.net.NetUtils; 028 import org.apache.hadoop.security.SecurityUtil; 029 import org.apache.hadoop.security.UserGroupInformation; 030 import org.apache.hadoop.filecache.DistributedCache; 031 import org.apache.hadoop.security.token.Token; 032 import org.apache.oozie.ErrorCode; 033 import org.apache.oozie.util.ParamChecker; 034 import org.apache.oozie.util.XConfiguration; 035 import org.apache.oozie.util.XLog; 036 037 import java.io.File; 038 import java.io.FileInputStream; 039 import java.io.IOException; 040 import java.io.InputStream; 041 import java.net.URI; 042 import java.net.URISyntaxException; 043 import java.security.PrivilegedExceptionAction; 044 import java.util.HashMap; 045 import java.util.Map; 046 import java.util.Set; 047 import java.util.HashSet; 048 import java.util.concurrent.ConcurrentHashMap; 049 import java.util.concurrent.ConcurrentMap; 050 051 /** 052 * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The 053 * default accessor used is the base accessor which just injects the UGI into the configuration instance used to 054 * create/obtain JobClient and ileSystem instances. <p/> The HadoopAccess class to use can be configured in the 055 * <code>oozie-site.xml</code> using the <code>oozie.service.HadoopAccessorService.accessor.class</code> property. 056 */ 057 public class HadoopAccessorService implements Service { 058 059 private static XLog LOG = XLog.getLog(HadoopAccessorService.class); 060 061 public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService."; 062 public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist"; 063 public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist"; 064 public static final String HADOOP_CONFS = CONF_PREFIX + "hadoop.configurations"; 065 public static final String ACTION_CONFS = CONF_PREFIX + "action.configurations"; 066 public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled"; 067 public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file"; 068 public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal"; 069 public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token"); 070 071 private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created"; 072 /** The Kerberos principal for the job tracker.*/ 073 private static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal"; 074 /** The Kerberos principal for the resource manager.*/ 075 private static final String RM_PRINCIPAL = "yarn.resourcemanager.principal"; 076 private static final String HADOOP_JOB_TRACKER = "mapred.job.tracker"; 077 private static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address"; 078 private static final String HADOOP_YARN_RM = "yarn.resourcemanager.address"; 079 private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>(); 080 081 private Set<String> jobTrackerWhitelist = new HashSet<String>(); 082 private Set<String> nameNodeWhitelist = new HashSet<String>(); 083 private Map<String, Configuration> hadoopConfigs = new HashMap<String, Configuration>(); 084 private Map<String, File> actionConfigDirs = new HashMap<String, File>(); 085 private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>(); 086 087 private ConcurrentMap<String, UserGroupInformation> userUgiMap; 088 089 /** 090 * Supported filesystem schemes for namespace federation 091 */ 092 public static final String SUPPORTED_FILESYSTEMS = CONF_PREFIX + "supported.filesystems"; 093 private Set<String> supportedSchemes; 094 095 public void init(Services services) throws ServiceException { 096 init(services.getConf()); 097 } 098 099 //for testing purposes, see XFsTestCase 100 public void init(Configuration conf) throws ServiceException { 101 for (String name : conf.getStringCollection(JOB_TRACKER_WHITELIST)) { 102 String tmp = name.toLowerCase().trim(); 103 if (tmp.length() == 0) { 104 continue; 105 } 106 jobTrackerWhitelist.add(tmp); 107 } 108 XLog.getLog(getClass()).info( 109 "JOB_TRACKER_WHITELIST :" + conf.getStringCollection(JOB_TRACKER_WHITELIST) 110 + ", Total entries :" + jobTrackerWhitelist.size()); 111 for (String name : conf.getStringCollection(NAME_NODE_WHITELIST)) { 112 String tmp = name.toLowerCase().trim(); 113 if (tmp.length() == 0) { 114 continue; 115 } 116 nameNodeWhitelist.add(tmp); 117 } 118 XLog.getLog(getClass()).info( 119 "NAME_NODE_WHITELIST :" + conf.getStringCollection(NAME_NODE_WHITELIST) 120 + ", Total entries :" + nameNodeWhitelist.size()); 121 122 boolean kerberosAuthOn = conf.getBoolean(KERBEROS_AUTH_ENABLED, true); 123 XLog.getLog(getClass()).info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled"); 124 if (kerberosAuthOn) { 125 kerberosInit(conf); 126 } 127 else { 128 Configuration ugiConf = new Configuration(); 129 ugiConf.set("hadoop.security.authentication", "simple"); 130 UserGroupInformation.setConfiguration(ugiConf); 131 } 132 133 userUgiMap = new ConcurrentHashMap<String, UserGroupInformation>(); 134 135 loadHadoopConfigs(conf); 136 preLoadActionConfigs(conf); 137 138 supportedSchemes = new HashSet<String>(); 139 String[] schemesFromConf = conf.getStrings(SUPPORTED_FILESYSTEMS, new String[]{"hdfs","hftp","webhdfs"}); 140 if(schemesFromConf != null) { 141 for (String scheme: schemesFromConf) { 142 scheme = scheme.trim(); 143 // If user gives "*", supportedSchemes will be empty, so that checking is not done i.e. all schemes allowed 144 if(scheme.equals("*")) { 145 if(schemesFromConf.length > 1) { 146 throw new ServiceException(ErrorCode.E0100, getClass().getName(), 147 SUPPORTED_FILESYSTEMS + " should contain either only wildcard or explicit list, not both"); 148 } 149 } else { 150 supportedSchemes.add(scheme); 151 } 152 } 153 } 154 } 155 156 private void kerberosInit(Configuration serviceConf) throws ServiceException { 157 try { 158 String keytabFile = serviceConf.get(KERBEROS_KEYTAB, 159 System.getProperty("user.home") + "/oozie.keytab").trim(); 160 if (keytabFile.length() == 0) { 161 throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB); 162 } 163 String principal = serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST"); 164 if (principal.length() == 0) { 165 throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL); 166 } 167 Configuration conf = new Configuration(); 168 conf.set("hadoop.security.authentication", "kerberos"); 169 UserGroupInformation.setConfiguration(conf); 170 UserGroupInformation.loginUserFromKeytab(principal, keytabFile); 171 XLog.getLog(getClass()).info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]", 172 keytabFile, principal); 173 } 174 catch (ServiceException ex) { 175 throw ex; 176 } 177 catch (Exception ex) { 178 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); 179 } 180 } 181 182 private static final String[] HADOOP_CONF_FILES = 183 {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hadoop-site.xml"}; 184 185 186 private Configuration loadHadoopConf(File dir) throws IOException { 187 Configuration hadoopConf = new XConfiguration(); 188 for (String file : HADOOP_CONF_FILES) { 189 File f = new File(dir, file); 190 if (f.exists()) { 191 InputStream is = new FileInputStream(f); 192 Configuration conf = new XConfiguration(is); 193 is.close(); 194 XConfiguration.copy(conf, hadoopConf); 195 } 196 } 197 return hadoopConf; 198 } 199 200 private Map<String, File> parseConfigDirs(String[] confDefs, String type) throws ServiceException, IOException { 201 Map<String, File> map = new HashMap<String, File>(); 202 File configDir = new File(ConfigurationService.getConfigurationDirectory()); 203 for (String confDef : confDefs) { 204 if (confDef.trim().length() > 0) { 205 String[] parts = confDef.split("="); 206 if (parts.length == 2) { 207 String hostPort = parts[0]; 208 String confDir = parts[1]; 209 File dir = new File(confDir); 210 if (!dir.isAbsolute()) { 211 dir = new File(configDir, confDir); 212 } 213 if (dir.exists()) { 214 map.put(hostPort.toLowerCase(), dir); 215 } 216 else { 217 throw new ServiceException(ErrorCode.E0100, getClass().getName(), 218 "could not find " + type + " configuration directory: " + 219 dir.getAbsolutePath()); 220 } 221 } 222 else { 223 throw new ServiceException(ErrorCode.E0100, getClass().getName(), 224 "Incorrect " + type + " configuration definition: " + confDef); 225 } 226 } 227 } 228 return map; 229 } 230 231 private void loadHadoopConfigs(Configuration serviceConf) throws ServiceException { 232 try { 233 Map<String, File> map = parseConfigDirs(serviceConf.getStrings(HADOOP_CONFS, "*=hadoop-conf"), "hadoop"); 234 for (Map.Entry<String, File> entry : map.entrySet()) { 235 hadoopConfigs.put(entry.getKey(), loadHadoopConf(entry.getValue())); 236 } 237 } 238 catch (ServiceException ex) { 239 throw ex; 240 } 241 catch (Exception ex) { 242 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); 243 } 244 } 245 246 private void preLoadActionConfigs(Configuration serviceConf) throws ServiceException { 247 try { 248 actionConfigDirs = parseConfigDirs(serviceConf.getStrings(ACTION_CONFS, "*=hadoop-conf"), "action"); 249 for (String hostport : actionConfigDirs.keySet()) { 250 actionConfigs.put(hostport, new ConcurrentHashMap<String, XConfiguration>()); 251 } 252 } 253 catch (ServiceException ex) { 254 throw ex; 255 } 256 catch (Exception ex) { 257 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); 258 } 259 } 260 261 public void destroy() { 262 } 263 264 public Class<? extends Service> getInterface() { 265 return HadoopAccessorService.class; 266 } 267 268 private UserGroupInformation getUGI(String user) throws IOException { 269 UserGroupInformation ugi = userUgiMap.get(user); 270 if (ugi == null) { 271 // taking care of a race condition, the latest UGI will be discarded 272 ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); 273 userUgiMap.putIfAbsent(user, ugi); 274 } 275 return ugi; 276 } 277 278 /** 279 * Creates a JobConf using the site configuration for the specified hostname:port. 280 * <p/> 281 * If the specified hostname:port is not defined it falls back to the '*' site 282 * configuration if available. If the '*' site configuration is not available, 283 * the JobConf has all Hadoop defaults. 284 * 285 * @param hostPort hostname:port to lookup Hadoop site configuration. 286 * @return a JobConf with the corresponding site configuration for hostPort. 287 */ 288 public JobConf createJobConf(String hostPort) { 289 JobConf jobConf = new JobConf(); 290 XConfiguration.copy(getConfiguration(hostPort), jobConf); 291 jobConf.setBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, true); 292 return jobConf; 293 } 294 295 private XConfiguration loadActionConf(String hostPort, String action) { 296 File dir = actionConfigDirs.get(hostPort); 297 XConfiguration actionConf = new XConfiguration(); 298 if (dir != null) { 299 File actionConfFile = new File(dir, action + ".xml"); 300 if (actionConfFile.exists()) { 301 try { 302 actionConf = new XConfiguration(new FileInputStream(actionConfFile)); 303 } 304 catch (IOException ex) { 305 XLog.getLog(getClass()).warn("Could not read file [{0}] for action [{1}] configuration for hostPort [{2}]", 306 actionConfFile.getAbsolutePath(), action, hostPort); 307 } 308 } 309 } 310 return actionConf; 311 } 312 313 /** 314 * Returns a Configuration containing any defaults for an action for a particular cluster. 315 * <p/> 316 * This configuration is used as default for the action configuration and enables cluster 317 * level default values per action. 318 * 319 * @param hostPort hostname"port to lookup the action default confiugration. 320 * @param action action name. 321 * @return the default configuration for the action for the specified cluster. 322 */ 323 public XConfiguration createActionDefaultConf(String hostPort, String action) { 324 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null; 325 Map<String, XConfiguration> hostPortActionConfigs = actionConfigs.get(hostPort); 326 if (hostPortActionConfigs == null) { 327 hostPortActionConfigs = actionConfigs.get("*"); 328 hostPort = "*"; 329 } 330 XConfiguration actionConf = hostPortActionConfigs.get(action); 331 if (actionConf == null) { 332 // doing lazy loading as we don't know upfront all actions, no need to synchronize 333 // as it is a read operation an in case of a race condition loading and inserting 334 // into the Map is idempotent and the action-config Map is a ConcurrentHashMap 335 actionConf = loadActionConf(hostPort, action); 336 hostPortActionConfigs.put(action, actionConf); 337 } 338 return new XConfiguration(actionConf.toProperties()); 339 } 340 341 private Configuration getConfiguration(String hostPort) { 342 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null; 343 Configuration conf = hadoopConfigs.get(hostPort); 344 if (conf == null) { 345 conf = hadoopConfigs.get("*"); 346 if (conf == null) { 347 conf = new XConfiguration(); 348 } 349 } 350 return conf; 351 } 352 353 /** 354 * Return a JobClient created with the provided user/group. 355 * 356 * 357 * @param conf JobConf with all necessary information to create the 358 * JobClient. 359 * @return JobClient created with the provided user/group. 360 * @throws HadoopAccessorException if the client could not be created. 361 */ 362 public JobClient createJobClient(String user, final JobConf conf) throws HadoopAccessorException { 363 ParamChecker.notEmpty(user, "user"); 364 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { 365 throw new HadoopAccessorException(ErrorCode.E0903); 366 } 367 String jobTracker = conf.get("mapred.job.tracker"); 368 validateJobTracker(jobTracker); 369 try { 370 UserGroupInformation ugi = getUGI(user); 371 JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() { 372 public JobClient run() throws Exception { 373 return new JobClient(conf); 374 } 375 }); 376 Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(getMRDelegationTokenRenewer(conf)); 377 conf.getCredentials().addToken(MR_TOKEN_ALIAS, mrdt); 378 return jobClient; 379 } 380 catch (InterruptedException ex) { 381 throw new HadoopAccessorException(ErrorCode.E0902, ex); 382 } 383 catch (IOException ex) { 384 throw new HadoopAccessorException(ErrorCode.E0902, ex); 385 } 386 } 387 388 /** 389 * Return a FileSystem created with the provided user for the specified URI. 390 * 391 * 392 * @param uri file system URI. 393 * @param conf Configuration with all necessary information to create the FileSystem. 394 * @return FileSystem created with the provided user/group. 395 * @throws HadoopAccessorException if the filesystem could not be created. 396 */ 397 public FileSystem createFileSystem(String user, final URI uri, final Configuration conf) 398 throws HadoopAccessorException { 399 ParamChecker.notEmpty(user, "user"); 400 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { 401 throw new HadoopAccessorException(ErrorCode.E0903); 402 } 403 404 checkSupportedFilesystem(uri); 405 406 String nameNode = uri.getAuthority(); 407 if (nameNode == null) { 408 nameNode = conf.get("fs.default.name"); 409 if (nameNode != null) { 410 try { 411 nameNode = new URI(nameNode).getAuthority(); 412 } 413 catch (URISyntaxException ex) { 414 throw new HadoopAccessorException(ErrorCode.E0902, ex); 415 } 416 } 417 } 418 validateNameNode(nameNode); 419 420 try { 421 UserGroupInformation ugi = getUGI(user); 422 return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { 423 public FileSystem run() throws Exception { 424 return FileSystem.get(uri, conf); 425 } 426 }); 427 } 428 catch (InterruptedException ex) { 429 throw new HadoopAccessorException(ErrorCode.E0902, ex); 430 } 431 catch (IOException ex) { 432 throw new HadoopAccessorException(ErrorCode.E0902, ex); 433 } 434 } 435 436 /** 437 * Validate Job tracker 438 * @param jobTrackerUri 439 * @throws HadoopAccessorException 440 */ 441 protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException { 442 validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900); 443 } 444 445 /** 446 * Validate Namenode list 447 * @param nameNodeUri 448 * @throws HadoopAccessorException 449 */ 450 protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException { 451 validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901); 452 } 453 454 private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException { 455 if (uri != null) { 456 uri = uri.toLowerCase().trim(); 457 if (whitelist.size() > 0 && !whitelist.contains(uri)) { 458 throw new HadoopAccessorException(error, uri); 459 } 460 } 461 } 462 463 public static Text getMRDelegationTokenRenewer(JobConf jobConf) throws IOException { 464 if (UserGroupInformation.isSecurityEnabled()) { // secure cluster 465 return getMRTokenRenewerInternal(jobConf); 466 } 467 else { 468 return MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer 469 } 470 } 471 472 // Package private for unit test purposes 473 static Text getMRTokenRenewerInternal(JobConf jobConf) throws IOException { 474 // Getting renewer correctly for JT principal also though JT in hadoop 1.x does not have 475 // support for renewing/cancelling tokens 476 String servicePrincipal = jobConf.get(RM_PRINCIPAL, jobConf.get(JT_PRINCIPAL)); 477 Text renewer; 478 if (servicePrincipal != null) { // secure cluster 479 renewer = mrTokenRenewers.get(servicePrincipal); 480 if (renewer == null) { 481 // Mimic org.apache.hadoop.mapred.Master.getMasterPrincipal() 482 String target = jobConf.get(HADOOP_YARN_RM, jobConf.get(HADOOP_JOB_TRACKER_2)); 483 if (target == null) { 484 target = jobConf.get(HADOOP_JOB_TRACKER); 485 } 486 String addr = NetUtils.createSocketAddr(target).getHostName(); 487 renewer = new Text(SecurityUtil.getServerPrincipal(servicePrincipal, addr)); 488 LOG.info("Delegation Token Renewer details: Principal=" + servicePrincipal + ",Target=" + target 489 + ",Renewer=" + renewer); 490 mrTokenRenewers.put(servicePrincipal, renewer); 491 } 492 } 493 else { 494 renewer = MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer 495 } 496 return renewer; 497 } 498 499 public void addFileToClassPath(String user, final Path file, final Configuration conf) 500 throws IOException { 501 ParamChecker.notEmpty(user, "user"); 502 try { 503 UserGroupInformation ugi = getUGI(user); 504 ugi.doAs(new PrivilegedExceptionAction<Void>() { 505 public Void run() throws Exception { 506 Configuration defaultConf = new Configuration(); 507 XConfiguration.copy(conf, defaultConf); 508 //Doing this NOP add first to have the FS created and cached 509 DistributedCache.addFileToClassPath(file, defaultConf); 510 511 DistributedCache.addFileToClassPath(file, conf); 512 return null; 513 } 514 }); 515 516 } 517 catch (InterruptedException ex) { 518 throw new IOException(ex); 519 } 520 521 } 522 523 /** 524 * checks configuration parameter if filesystem scheme is among the list of supported ones 525 * this makes system robust to filesystems other than HDFS also 526 */ 527 528 public void checkSupportedFilesystem(URI uri) throws HadoopAccessorException { 529 String uriScheme = uri.getScheme(); 530 if(!supportedSchemes.isEmpty()) { 531 XLog.getLog(this.getClass()).debug("Checking if filesystem " + uriScheme + " is supported"); 532 if (!supportedSchemes.contains(uriScheme)) { 533 throw new HadoopAccessorException(ErrorCode.E0904, uriScheme, uri.toString()); 534 } 535 } 536 } 537 538 }