001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import org.apache.hadoop.io.Text; 021 import org.apache.hadoop.mapred.JobClient; 022 import org.apache.hadoop.mapred.JobConf; 023 import org.apache.hadoop.fs.FileSystem; 024 import org.apache.hadoop.fs.Path; 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 027 import org.apache.hadoop.net.NetUtils; 028 import org.apache.hadoop.security.SecurityUtil; 029 import org.apache.hadoop.security.UserGroupInformation; 030 import org.apache.hadoop.filecache.DistributedCache; 031 import org.apache.hadoop.security.token.Token; 032 import org.apache.oozie.ErrorCode; 033 import org.apache.oozie.action.hadoop.JavaActionExecutor; 034 import org.apache.oozie.util.ParamChecker; 035 import org.apache.oozie.util.XConfiguration; 036 import org.apache.oozie.util.XLog; 037 038 import java.io.File; 039 import java.io.FileInputStream; 040 import java.io.IOException; 041 import java.io.InputStream; 042 import java.net.URI; 043 import java.net.URISyntaxException; 044 import java.security.PrivilegedExceptionAction; 045 import java.util.HashMap; 046 import java.util.Map; 047 import java.util.Set; 048 import java.util.HashSet; 049 import java.util.concurrent.ConcurrentHashMap; 050 import java.util.concurrent.ConcurrentMap; 051 052 /** 053 * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The 054 * default accessor used is the base accessor which just injects the UGI into the configuration instance used to 055 * create/obtain JobClient and ileSystem instances. <p/> The HadoopAccess class to use can be configured in the 056 * <code>oozie-site.xml</code> using the <code>oozie.service.HadoopAccessorService.accessor.class</code> property. 057 */ 058 public class HadoopAccessorService implements Service { 059 060 private static XLog LOG = XLog.getLog(HadoopAccessorService.class); 061 062 public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService."; 063 public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist"; 064 public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist"; 065 public static final String HADOOP_CONFS = CONF_PREFIX + "hadoop.configurations"; 066 public static final String ACTION_CONFS = CONF_PREFIX + "action.configurations"; 067 public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled"; 068 public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file"; 069 public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal"; 070 public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token"); 071 072 private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created"; 073 /** The Kerberos principal for the job tracker.*/ 074 private static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal"; 075 /** The Kerberos principal for the resource manager.*/ 076 private static final String RM_PRINCIPAL = "yarn.resourcemanager.principal"; 077 private static final String HADOOP_JOB_TRACKER = "mapred.job.tracker"; 078 private static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address"; 079 private static final String HADOOP_YARN_RM = "yarn.resourcemanager.address"; 080 private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>(); 081 082 private Set<String> jobTrackerWhitelist = new HashSet<String>(); 083 private Set<String> nameNodeWhitelist = new HashSet<String>(); 084 private Map<String, Configuration> hadoopConfigs = new HashMap<String, Configuration>(); 085 private Map<String, File> actionConfigDirs = new HashMap<String, File>(); 086 private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>(); 087 088 private ConcurrentMap<String, UserGroupInformation> userUgiMap; 089 090 /** 091 * Supported filesystem schemes for namespace federation 092 */ 093 public static final String SUPPORTED_FILESYSTEMS = CONF_PREFIX + "supported.filesystems"; 094 private Set<String> supportedSchemes; 095 096 public void init(Services services) throws ServiceException { 097 init(services.getConf()); 098 } 099 100 //for testing purposes, see XFsTestCase 101 public void init(Configuration conf) throws ServiceException { 102 for (String name : conf.getStringCollection(JOB_TRACKER_WHITELIST)) { 103 String tmp = name.toLowerCase().trim(); 104 if (tmp.length() == 0) { 105 continue; 106 } 107 jobTrackerWhitelist.add(tmp); 108 } 109 XLog.getLog(getClass()).info( 110 "JOB_TRACKER_WHITELIST :" + conf.getStringCollection(JOB_TRACKER_WHITELIST) 111 + ", Total entries :" + jobTrackerWhitelist.size()); 112 for (String name : conf.getStringCollection(NAME_NODE_WHITELIST)) { 113 String tmp = name.toLowerCase().trim(); 114 if (tmp.length() == 0) { 115 continue; 116 } 117 nameNodeWhitelist.add(tmp); 118 } 119 XLog.getLog(getClass()).info( 120 "NAME_NODE_WHITELIST :" + conf.getStringCollection(NAME_NODE_WHITELIST) 121 + ", Total entries :" + nameNodeWhitelist.size()); 122 123 boolean kerberosAuthOn = conf.getBoolean(KERBEROS_AUTH_ENABLED, true); 124 XLog.getLog(getClass()).info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled"); 125 if (kerberosAuthOn) { 126 kerberosInit(conf); 127 } 128 else { 129 Configuration ugiConf = new Configuration(); 130 ugiConf.set("hadoop.security.authentication", "simple"); 131 UserGroupInformation.setConfiguration(ugiConf); 132 } 133 134 userUgiMap = new ConcurrentHashMap<String, UserGroupInformation>(); 135 136 loadHadoopConfigs(conf); 137 preLoadActionConfigs(conf); 138 139 supportedSchemes = new HashSet<String>(); 140 String[] schemesFromConf = conf.getStrings(SUPPORTED_FILESYSTEMS, new String[]{"hdfs","hftp","webhdfs"}); 141 if(schemesFromConf != null) { 142 for (String scheme: schemesFromConf) { 143 scheme = scheme.trim(); 144 // If user gives "*", supportedSchemes will be empty, so that checking is not done i.e. all schemes allowed 145 if(scheme.equals("*")) { 146 if(schemesFromConf.length > 1) { 147 throw new ServiceException(ErrorCode.E0100, getClass().getName(), 148 SUPPORTED_FILESYSTEMS + " should contain either only wildcard or explicit list, not both"); 149 } 150 } else { 151 supportedSchemes.add(scheme); 152 } 153 } 154 } 155 } 156 157 private void kerberosInit(Configuration serviceConf) throws ServiceException { 158 try { 159 String keytabFile = serviceConf.get(KERBEROS_KEYTAB, 160 System.getProperty("user.home") + "/oozie.keytab").trim(); 161 if (keytabFile.length() == 0) { 162 throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB); 163 } 164 String principal = serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST"); 165 if (principal.length() == 0) { 166 throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL); 167 } 168 Configuration conf = new Configuration(); 169 conf.set("hadoop.security.authentication", "kerberos"); 170 UserGroupInformation.setConfiguration(conf); 171 UserGroupInformation.loginUserFromKeytab(principal, keytabFile); 172 XLog.getLog(getClass()).info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]", 173 keytabFile, principal); 174 } 175 catch (ServiceException ex) { 176 throw ex; 177 } 178 catch (Exception ex) { 179 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); 180 } 181 } 182 183 private static final String[] HADOOP_CONF_FILES = 184 {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hadoop-site.xml"}; 185 186 187 private Configuration loadHadoopConf(File dir) throws IOException { 188 Configuration hadoopConf = new XConfiguration(); 189 for (String file : HADOOP_CONF_FILES) { 190 File f = new File(dir, file); 191 if (f.exists()) { 192 InputStream is = new FileInputStream(f); 193 Configuration conf = new XConfiguration(is); 194 is.close(); 195 XConfiguration.copy(conf, hadoopConf); 196 } 197 } 198 return hadoopConf; 199 } 200 201 private Map<String, File> parseConfigDirs(String[] confDefs, String type) throws ServiceException, IOException { 202 Map<String, File> map = new HashMap<String, File>(); 203 File configDir = new File(ConfigurationService.getConfigurationDirectory()); 204 for (String confDef : confDefs) { 205 if (confDef.trim().length() > 0) { 206 String[] parts = confDef.split("="); 207 if (parts.length == 2) { 208 String hostPort = parts[0]; 209 String confDir = parts[1]; 210 File dir = new File(confDir); 211 if (!dir.isAbsolute()) { 212 dir = new File(configDir, confDir); 213 } 214 if (dir.exists()) { 215 map.put(hostPort.toLowerCase(), dir); 216 } 217 else { 218 throw new ServiceException(ErrorCode.E0100, getClass().getName(), 219 "could not find " + type + " configuration directory: " + 220 dir.getAbsolutePath()); 221 } 222 } 223 else { 224 throw new ServiceException(ErrorCode.E0100, getClass().getName(), 225 "Incorrect " + type + " configuration definition: " + confDef); 226 } 227 } 228 } 229 return map; 230 } 231 232 private void loadHadoopConfigs(Configuration serviceConf) throws ServiceException { 233 try { 234 Map<String, File> map = parseConfigDirs(serviceConf.getStrings(HADOOP_CONFS, "*=hadoop-conf"), "hadoop"); 235 for (Map.Entry<String, File> entry : map.entrySet()) { 236 hadoopConfigs.put(entry.getKey(), loadHadoopConf(entry.getValue())); 237 } 238 } 239 catch (ServiceException ex) { 240 throw ex; 241 } 242 catch (Exception ex) { 243 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); 244 } 245 } 246 247 private void preLoadActionConfigs(Configuration serviceConf) throws ServiceException { 248 try { 249 actionConfigDirs = parseConfigDirs(serviceConf.getStrings(ACTION_CONFS, "*=hadoop-conf"), "action"); 250 for (String hostport : actionConfigDirs.keySet()) { 251 actionConfigs.put(hostport, new ConcurrentHashMap<String, XConfiguration>()); 252 } 253 } 254 catch (ServiceException ex) { 255 throw ex; 256 } 257 catch (Exception ex) { 258 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); 259 } 260 } 261 262 public void destroy() { 263 } 264 265 public Class<? extends Service> getInterface() { 266 return HadoopAccessorService.class; 267 } 268 269 private UserGroupInformation getUGI(String user) throws IOException { 270 UserGroupInformation ugi = userUgiMap.get(user); 271 if (ugi == null) { 272 // taking care of a race condition, the latest UGI will be discarded 273 ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); 274 userUgiMap.putIfAbsent(user, ugi); 275 } 276 return ugi; 277 } 278 279 /** 280 * Creates a JobConf using the site configuration for the specified hostname:port. 281 * <p/> 282 * If the specified hostname:port is not defined it falls back to the '*' site 283 * configuration if available. If the '*' site configuration is not available, 284 * the JobConf has all Hadoop defaults. 285 * 286 * @param hostPort hostname:port to lookup Hadoop site configuration. 287 * @return a JobConf with the corresponding site configuration for hostPort. 288 */ 289 public JobConf createJobConf(String hostPort) { 290 JobConf jobConf = new JobConf(); 291 XConfiguration.copy(getConfiguration(hostPort), jobConf); 292 jobConf.setBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, true); 293 return jobConf; 294 } 295 296 private XConfiguration loadActionConf(String hostPort, String action) { 297 File dir = actionConfigDirs.get(hostPort); 298 XConfiguration actionConf = new XConfiguration(); 299 if (dir != null) { 300 File actionConfFile = new File(dir, action + ".xml"); 301 if (actionConfFile.exists()) { 302 try { 303 actionConf = new XConfiguration(new FileInputStream(actionConfFile)); 304 } 305 catch (IOException ex) { 306 XLog.getLog(getClass()).warn("Could not read file [{0}] for action [{1}] configuration for hostPort [{2}]", 307 actionConfFile.getAbsolutePath(), action, hostPort); 308 } 309 } 310 } 311 return actionConf; 312 } 313 314 /** 315 * Returns a Configuration containing any defaults for an action for a particular cluster. 316 * <p/> 317 * This configuration is used as default for the action configuration and enables cluster 318 * level default values per action. 319 * 320 * @param hostPort hostname"port to lookup the action default confiugration. 321 * @param action action name. 322 * @return the default configuration for the action for the specified cluster. 323 */ 324 public XConfiguration createActionDefaultConf(String hostPort, String action) { 325 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null; 326 Map<String, XConfiguration> hostPortActionConfigs = actionConfigs.get(hostPort); 327 if (hostPortActionConfigs == null) { 328 hostPortActionConfigs = actionConfigs.get("*"); 329 hostPort = "*"; 330 } 331 XConfiguration actionConf = hostPortActionConfigs.get(action); 332 if (actionConf == null) { 333 // doing lazy loading as we don't know upfront all actions, no need to synchronize 334 // as it is a read operation an in case of a race condition loading and inserting 335 // into the Map is idempotent and the action-config Map is a ConcurrentHashMap 336 actionConf = loadActionConf(hostPort, action); 337 hostPortActionConfigs.put(action, actionConf); 338 } 339 return new XConfiguration(actionConf.toProperties()); 340 } 341 342 private Configuration getConfiguration(String hostPort) { 343 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null; 344 Configuration conf = hadoopConfigs.get(hostPort); 345 if (conf == null) { 346 conf = hadoopConfigs.get("*"); 347 if (conf == null) { 348 conf = new XConfiguration(); 349 } 350 } 351 return conf; 352 } 353 354 /** 355 * Return a JobClient created with the provided user/group. 356 * 357 * 358 * @param conf JobConf with all necessary information to create the 359 * JobClient. 360 * @return JobClient created with the provided user/group. 361 * @throws HadoopAccessorException if the client could not be created. 362 */ 363 public JobClient createJobClient(String user, final JobConf conf) throws HadoopAccessorException { 364 ParamChecker.notEmpty(user, "user"); 365 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { 366 throw new HadoopAccessorException(ErrorCode.E0903); 367 } 368 String jobTracker = conf.get(JavaActionExecutor.HADOOP_JOB_TRACKER); 369 validateJobTracker(jobTracker); 370 try { 371 UserGroupInformation ugi = getUGI(user); 372 JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() { 373 public JobClient run() throws Exception { 374 return new JobClient(conf); 375 } 376 }); 377 Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(getMRDelegationTokenRenewer(conf)); 378 conf.getCredentials().addToken(MR_TOKEN_ALIAS, mrdt); 379 return jobClient; 380 } 381 catch (InterruptedException ex) { 382 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); 383 } 384 catch (IOException ex) { 385 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); 386 } 387 } 388 389 /** 390 * Return a FileSystem created with the provided user for the specified URI. 391 * 392 * 393 * @param uri file system URI. 394 * @param conf Configuration with all necessary information to create the FileSystem. 395 * @return FileSystem created with the provided user/group. 396 * @throws HadoopAccessorException if the filesystem could not be created. 397 */ 398 public FileSystem createFileSystem(String user, final URI uri, final Configuration conf) 399 throws HadoopAccessorException { 400 ParamChecker.notEmpty(user, "user"); 401 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { 402 throw new HadoopAccessorException(ErrorCode.E0903); 403 } 404 405 checkSupportedFilesystem(uri); 406 407 String nameNode = uri.getAuthority(); 408 if (nameNode == null) { 409 nameNode = conf.get("fs.default.name"); 410 if (nameNode != null) { 411 try { 412 nameNode = new URI(nameNode).getAuthority(); 413 } 414 catch (URISyntaxException ex) { 415 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); 416 } 417 } 418 } 419 validateNameNode(nameNode); 420 421 try { 422 UserGroupInformation ugi = getUGI(user); 423 return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { 424 public FileSystem run() throws Exception { 425 return FileSystem.get(uri, conf); 426 } 427 }); 428 } 429 catch (InterruptedException ex) { 430 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); 431 } 432 catch (IOException ex) { 433 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); 434 } 435 } 436 437 /** 438 * Validate Job tracker 439 * @param jobTrackerUri 440 * @throws HadoopAccessorException 441 */ 442 protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException { 443 validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900); 444 } 445 446 /** 447 * Validate Namenode list 448 * @param nameNodeUri 449 * @throws HadoopAccessorException 450 */ 451 protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException { 452 validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901); 453 } 454 455 private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException { 456 if (uri != null) { 457 uri = uri.toLowerCase().trim(); 458 if (whitelist.size() > 0 && !whitelist.contains(uri)) { 459 throw new HadoopAccessorException(error, uri); 460 } 461 } 462 } 463 464 public static Text getMRDelegationTokenRenewer(JobConf jobConf) throws IOException { 465 if (UserGroupInformation.isSecurityEnabled()) { // secure cluster 466 return getMRTokenRenewerInternal(jobConf); 467 } 468 else { 469 return MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer 470 } 471 } 472 473 // Package private for unit test purposes 474 static Text getMRTokenRenewerInternal(JobConf jobConf) throws IOException { 475 // Getting renewer correctly for JT principal also though JT in hadoop 1.x does not have 476 // support for renewing/cancelling tokens 477 String servicePrincipal = jobConf.get(RM_PRINCIPAL, jobConf.get(JT_PRINCIPAL)); 478 Text renewer; 479 if (servicePrincipal != null) { // secure cluster 480 renewer = mrTokenRenewers.get(servicePrincipal); 481 if (renewer == null) { 482 // Mimic org.apache.hadoop.mapred.Master.getMasterPrincipal() 483 String target = jobConf.get(HADOOP_YARN_RM, jobConf.get(HADOOP_JOB_TRACKER_2)); 484 if (target == null) { 485 target = jobConf.get(HADOOP_JOB_TRACKER); 486 } 487 String addr = NetUtils.createSocketAddr(target).getHostName(); 488 renewer = new Text(SecurityUtil.getServerPrincipal(servicePrincipal, addr)); 489 LOG.info("Delegation Token Renewer details: Principal=" + servicePrincipal + ",Target=" + target 490 + ",Renewer=" + renewer); 491 mrTokenRenewers.put(servicePrincipal, renewer); 492 } 493 } 494 else { 495 renewer = MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer 496 } 497 return renewer; 498 } 499 500 public void addFileToClassPath(String user, final Path file, final Configuration conf) 501 throws IOException { 502 ParamChecker.notEmpty(user, "user"); 503 try { 504 UserGroupInformation ugi = getUGI(user); 505 ugi.doAs(new PrivilegedExceptionAction<Void>() { 506 public Void run() throws Exception { 507 Configuration defaultConf = new Configuration(); 508 XConfiguration.copy(conf, defaultConf); 509 //Doing this NOP add first to have the FS created and cached 510 DistributedCache.addFileToClassPath(file, defaultConf); 511 512 DistributedCache.addFileToClassPath(file, conf); 513 return null; 514 } 515 }); 516 517 } 518 catch (InterruptedException ex) { 519 throw new IOException(ex); 520 } 521 522 } 523 524 /** 525 * checks configuration parameter if filesystem scheme is among the list of supported ones 526 * this makes system robust to filesystems other than HDFS also 527 */ 528 529 public void checkSupportedFilesystem(URI uri) throws HadoopAccessorException { 530 String uriScheme = uri.getScheme(); 531 if (uriScheme != null) { // skip the check if no scheme is given 532 if(!supportedSchemes.isEmpty()) { 533 XLog.getLog(this.getClass()).debug("Checking if filesystem " + uriScheme + " is supported"); 534 if (!supportedSchemes.contains(uriScheme)) { 535 throw new HadoopAccessorException(ErrorCode.E0904, uriScheme, uri.toString()); 536 } 537 } 538 } 539 } 540 541 }