001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import org.apache.hadoop.io.Text; 021 import org.apache.hadoop.mapred.JobClient; 022 import org.apache.hadoop.mapred.JobConf; 023 import org.apache.hadoop.fs.FileSystem; 024 import org.apache.hadoop.fs.Path; 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 027 import org.apache.hadoop.security.UserGroupInformation; 028 import org.apache.hadoop.filecache.DistributedCache; 029 import org.apache.hadoop.security.token.Token; 030 import org.apache.oozie.ErrorCode; 031 import org.apache.oozie.util.ParamChecker; 032 import org.apache.oozie.util.XConfiguration; 033 import org.apache.oozie.util.XLog; 034 035 import java.io.File; 036 import java.io.FileInputStream; 037 import java.io.IOException; 038 import java.io.InputStream; 039 import java.net.URI; 040 import java.net.URISyntaxException; 041 import java.security.PrivilegedExceptionAction; 042 import java.util.HashMap; 043 import java.util.Map; 044 import java.util.Set; 045 import java.util.HashSet; 046 import java.util.concurrent.ConcurrentHashMap; 047 import java.util.concurrent.ConcurrentMap; 048 049 /** 050 * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The 051 * default accessor used is the base accessor which just injects the UGI into the configuration instance used to 052 * create/obtain JobClient and ileSystem instances. <p/> The HadoopAccess class to use can be configured in the 053 * <code>oozie-site.xml</code> using the <code>oozie.service.HadoopAccessorService.accessor.class</code> property. 054 */ 055 public class HadoopAccessorService implements Service { 056 057 public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService."; 058 public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist"; 059 public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist"; 060 public static final String HADOOP_CONFS = CONF_PREFIX + "hadoop.configurations"; 061 public static final String ACTION_CONFS = CONF_PREFIX + "action.configurations"; 062 public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled"; 063 public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file"; 064 public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal"; 065 066 private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created"; 067 068 private Set<String> jobTrackerWhitelist = new HashSet<String>(); 069 private Set<String> nameNodeWhitelist = new HashSet<String>(); 070 private Map<String, Configuration> hadoopConfigs = new HashMap<String, Configuration>(); 071 private Map<String, File> actionConfigDirs = new HashMap<String, File>(); 072 private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>(); 073 074 private ConcurrentMap<String, UserGroupInformation> userUgiMap; 075 076 /** 077 * Supported filesystem schemes for namespace federation 078 */ 079 public static final String SUPPORTED_FILESYSTEMS = CONF_PREFIX + "supported.filesystems"; 080 private Set<String> supportedSchemes; 081 082 public void init(Services services) throws ServiceException { 083 init(services.getConf()); 084 } 085 086 //for testing purposes, see XFsTestCase 087 public void init(Configuration conf) throws ServiceException { 088 for (String name : conf.getStringCollection(JOB_TRACKER_WHITELIST)) { 089 String tmp = name.toLowerCase().trim(); 090 if (tmp.length() == 0) { 091 continue; 092 } 093 jobTrackerWhitelist.add(tmp); 094 } 095 XLog.getLog(getClass()).info( 096 "JOB_TRACKER_WHITELIST :" + conf.getStringCollection(JOB_TRACKER_WHITELIST) 097 + ", Total entries :" + jobTrackerWhitelist.size()); 098 for (String name : conf.getStringCollection(NAME_NODE_WHITELIST)) { 099 String tmp = name.toLowerCase().trim(); 100 if (tmp.length() == 0) { 101 continue; 102 } 103 nameNodeWhitelist.add(tmp); 104 } 105 XLog.getLog(getClass()).info( 106 "NAME_NODE_WHITELIST :" + conf.getStringCollection(NAME_NODE_WHITELIST) 107 + ", Total entries :" + nameNodeWhitelist.size()); 108 109 boolean kerberosAuthOn = conf.getBoolean(KERBEROS_AUTH_ENABLED, true); 110 XLog.getLog(getClass()).info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled"); 111 if (kerberosAuthOn) { 112 kerberosInit(conf); 113 } 114 else { 115 Configuration ugiConf = new Configuration(); 116 ugiConf.set("hadoop.security.authentication", "simple"); 117 UserGroupInformation.setConfiguration(ugiConf); 118 } 119 120 userUgiMap = new ConcurrentHashMap<String, UserGroupInformation>(); 121 122 loadHadoopConfigs(conf); 123 preLoadActionConfigs(conf); 124 125 supportedSchemes = new HashSet<String>(); 126 String[] schemesFromConf = conf.getStrings(SUPPORTED_FILESYSTEMS, new String[]{"hdfs"}); 127 if(schemesFromConf != null) { 128 for (String scheme: schemesFromConf) { 129 scheme = scheme.trim(); 130 // If user gives "*", supportedSchemes will be empty, so that checking is not done i.e. all schemes allowed 131 if(scheme.equals("*")) { 132 if(schemesFromConf.length > 1) { 133 throw new ServiceException(ErrorCode.E0100, getClass().getName(), 134 SUPPORTED_FILESYSTEMS + " should contain either only wildcard or explicit list, not both"); 135 } 136 } else { 137 supportedSchemes.add(scheme); 138 } 139 } 140 } 141 } 142 143 private void kerberosInit(Configuration serviceConf) throws ServiceException { 144 try { 145 String keytabFile = serviceConf.get(KERBEROS_KEYTAB, 146 System.getProperty("user.home") + "/oozie.keytab").trim(); 147 if (keytabFile.length() == 0) { 148 throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB); 149 } 150 String principal = serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST"); 151 if (principal.length() == 0) { 152 throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL); 153 } 154 Configuration conf = new Configuration(); 155 conf.set("hadoop.security.authentication", "kerberos"); 156 UserGroupInformation.setConfiguration(conf); 157 UserGroupInformation.loginUserFromKeytab(principal, keytabFile); 158 XLog.getLog(getClass()).info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]", 159 keytabFile, principal); 160 } 161 catch (ServiceException ex) { 162 throw ex; 163 } 164 catch (Exception ex) { 165 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); 166 } 167 } 168 169 private static final String[] HADOOP_CONF_FILES = 170 {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hadoop-site.xml"}; 171 172 173 private Configuration loadHadoopConf(File dir) throws IOException { 174 Configuration hadoopConf = new XConfiguration(); 175 for (String file : HADOOP_CONF_FILES) { 176 File f = new File(dir, file); 177 if (f.exists()) { 178 InputStream is = new FileInputStream(f); 179 Configuration conf = new XConfiguration(is); 180 is.close(); 181 XConfiguration.copy(conf, hadoopConf); 182 } 183 } 184 return hadoopConf; 185 } 186 187 private Map<String, File> parseConfigDirs(String[] confDefs, String type) throws ServiceException, IOException { 188 Map<String, File> map = new HashMap<String, File>(); 189 File configDir = new File(ConfigurationService.getConfigurationDirectory()); 190 for (String confDef : confDefs) { 191 if (confDef.trim().length() > 0) { 192 String[] parts = confDef.split("="); 193 if (parts.length == 2) { 194 String hostPort = parts[0]; 195 String confDir = parts[1]; 196 File dir = new File(confDir); 197 if (!dir.isAbsolute()) { 198 dir = new File(configDir, confDir); 199 } 200 if (dir.exists()) { 201 map.put(hostPort.toLowerCase(), dir); 202 } 203 else { 204 throw new ServiceException(ErrorCode.E0100, getClass().getName(), 205 "could not find " + type + " configuration directory: " + 206 dir.getAbsolutePath()); 207 } 208 } 209 else { 210 throw new ServiceException(ErrorCode.E0100, getClass().getName(), 211 "Incorrect " + type + " configuration definition: " + confDef); 212 } 213 } 214 } 215 return map; 216 } 217 218 private void loadHadoopConfigs(Configuration serviceConf) throws ServiceException { 219 try { 220 Map<String, File> map = parseConfigDirs(serviceConf.getStrings(HADOOP_CONFS, "*=hadoop-conf"), "hadoop"); 221 for (Map.Entry<String, File> entry : map.entrySet()) { 222 hadoopConfigs.put(entry.getKey(), loadHadoopConf(entry.getValue())); 223 } 224 } 225 catch (ServiceException ex) { 226 throw ex; 227 } 228 catch (Exception ex) { 229 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); 230 } 231 } 232 233 private void preLoadActionConfigs(Configuration serviceConf) throws ServiceException { 234 try { 235 actionConfigDirs = parseConfigDirs(serviceConf.getStrings(ACTION_CONFS, "*=hadoop-conf"), "action"); 236 for (String hostport : actionConfigDirs.keySet()) { 237 actionConfigs.put(hostport, new ConcurrentHashMap<String, XConfiguration>()); 238 } 239 } 240 catch (ServiceException ex) { 241 throw ex; 242 } 243 catch (Exception ex) { 244 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex); 245 } 246 } 247 248 public void destroy() { 249 } 250 251 public Class<? extends Service> getInterface() { 252 return HadoopAccessorService.class; 253 } 254 255 private UserGroupInformation getUGI(String user) throws IOException { 256 UserGroupInformation ugi = userUgiMap.get(user); 257 if (ugi == null) { 258 // taking care of a race condition, the latest UGI will be discarded 259 ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); 260 userUgiMap.putIfAbsent(user, ugi); 261 } 262 return ugi; 263 } 264 265 /** 266 * Creates a JobConf using the site configuration for the specified hostname:port. 267 * <p/> 268 * If the specified hostname:port is not defined it falls back to the '*' site 269 * configuration if available. If the '*' site configuration is not available, 270 * the JobConf has all Hadoop defaults. 271 * 272 * @param hostPort hostname:port to lookup Hadoop site configuration. 273 * @return a JobConf with the corresponding site configuration for hostPort. 274 */ 275 public JobConf createJobConf(String hostPort) { 276 JobConf jobConf = new JobConf(); 277 XConfiguration.copy(getConfiguration(hostPort), jobConf); 278 jobConf.setBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, true); 279 return jobConf; 280 } 281 282 private XConfiguration loadActionConf(String hostPort, String action) { 283 File dir = actionConfigDirs.get(hostPort); 284 XConfiguration actionConf = new XConfiguration(); 285 if (dir != null) { 286 File actionConfFile = new File(dir, action + ".xml"); 287 if (actionConfFile.exists()) { 288 try { 289 actionConf = new XConfiguration(new FileInputStream(actionConfFile)); 290 } 291 catch (IOException ex) { 292 XLog.getLog(getClass()).warn("Could not read file [{0}] for action [{1}] configuration for hostPort [{2}]", 293 actionConfFile.getAbsolutePath(), action, hostPort); 294 } 295 } 296 } 297 return actionConf; 298 } 299 300 /** 301 * Returns a Configuration containing any defaults for an action for a particular cluster. 302 * <p/> 303 * This configuration is used as default for the action configuration and enables cluster 304 * level default values per action. 305 * 306 * @param hostPort hostname"port to lookup the action default confiugration. 307 * @param action action name. 308 * @return the default configuration for the action for the specified cluster. 309 */ 310 public XConfiguration createActionDefaultConf(String hostPort, String action) { 311 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null; 312 Map<String, XConfiguration> hostPortActionConfigs = actionConfigs.get(hostPort); 313 if (hostPortActionConfigs == null) { 314 hostPortActionConfigs = actionConfigs.get("*"); 315 hostPort = "*"; 316 } 317 XConfiguration actionConf = hostPortActionConfigs.get(action); 318 if (actionConf == null) { 319 // doing lazy loading as we don't know upfront all actions, no need to synchronize 320 // as it is a read operation an in case of a race condition loading and inserting 321 // into the Map is idempotent and the action-config Map is a ConcurrentHashMap 322 actionConf = loadActionConf(hostPort, action); 323 hostPortActionConfigs.put(action, actionConf); 324 } 325 return new XConfiguration(actionConf.toProperties()); 326 } 327 328 private Configuration getConfiguration(String hostPort) { 329 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null; 330 Configuration conf = hadoopConfigs.get(hostPort); 331 if (conf == null) { 332 conf = hadoopConfigs.get("*"); 333 if (conf == null) { 334 conf = new XConfiguration(); 335 } 336 } 337 return conf; 338 } 339 340 /** 341 * Return a JobClient created with the provided user/group. 342 * 343 * 344 * @param conf JobConf with all necessary information to create the 345 * JobClient. 346 * @return JobClient created with the provided user/group. 347 * @throws HadoopAccessorException if the client could not be created. 348 */ 349 public JobClient createJobClient(String user, final JobConf conf) throws HadoopAccessorException { 350 ParamChecker.notEmpty(user, "user"); 351 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { 352 throw new HadoopAccessorException(ErrorCode.E0903); 353 } 354 String jobTracker = conf.get("mapred.job.tracker"); 355 validateJobTracker(jobTracker); 356 try { 357 UserGroupInformation ugi = getUGI(user); 358 JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() { 359 public JobClient run() throws Exception { 360 return new JobClient(conf); 361 } 362 }); 363 Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(new Text("mr token")); 364 conf.getCredentials().addToken(new Text("mr token"), mrdt); 365 return jobClient; 366 } 367 catch (InterruptedException ex) { 368 throw new HadoopAccessorException(ErrorCode.E0902, ex); 369 } 370 catch (IOException ex) { 371 throw new HadoopAccessorException(ErrorCode.E0902, ex); 372 } 373 } 374 375 /** 376 * Return a FileSystem created with the provided user for the specified URI. 377 * 378 * 379 * @param uri file system URI. 380 * @param conf Configuration with all necessary information to create the FileSystem. 381 * @return FileSystem created with the provided user/group. 382 * @throws HadoopAccessorException if the filesystem could not be created. 383 */ 384 public FileSystem createFileSystem(String user, final URI uri, final Configuration conf) 385 throws HadoopAccessorException { 386 ParamChecker.notEmpty(user, "user"); 387 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { 388 throw new HadoopAccessorException(ErrorCode.E0903); 389 } 390 391 checkSupportedFilesystem(uri); 392 393 String nameNode = uri.getAuthority(); 394 if (nameNode == null) { 395 nameNode = conf.get("fs.default.name"); 396 if (nameNode != null) { 397 try { 398 nameNode = new URI(nameNode).getAuthority(); 399 } 400 catch (URISyntaxException ex) { 401 throw new HadoopAccessorException(ErrorCode.E0902, ex); 402 } 403 } 404 } 405 validateNameNode(nameNode); 406 407 try { 408 UserGroupInformation ugi = getUGI(user); 409 return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { 410 public FileSystem run() throws Exception { 411 return FileSystem.get(uri, conf); 412 } 413 }); 414 } 415 catch (InterruptedException ex) { 416 throw new HadoopAccessorException(ErrorCode.E0902, ex); 417 } 418 catch (IOException ex) { 419 throw new HadoopAccessorException(ErrorCode.E0902, ex); 420 } 421 } 422 423 /** 424 * Validate Job tracker 425 * @param jobTrackerUri 426 * @throws HadoopAccessorException 427 */ 428 protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException { 429 validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900); 430 } 431 432 /** 433 * Validate Namenode list 434 * @param nameNodeUri 435 * @throws HadoopAccessorException 436 */ 437 protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException { 438 validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901); 439 } 440 441 private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException { 442 if (uri != null) { 443 uri = uri.toLowerCase().trim(); 444 if (whitelist.size() > 0 && !whitelist.contains(uri)) { 445 throw new HadoopAccessorException(error, uri); 446 } 447 } 448 } 449 450 public void addFileToClassPath(String user, final Path file, final Configuration conf) 451 throws IOException { 452 ParamChecker.notEmpty(user, "user"); 453 try { 454 UserGroupInformation ugi = getUGI(user); 455 ugi.doAs(new PrivilegedExceptionAction<Void>() { 456 public Void run() throws Exception { 457 Configuration defaultConf = new Configuration(); 458 XConfiguration.copy(conf, defaultConf); 459 //Doing this NOP add first to have the FS created and cached 460 DistributedCache.addFileToClassPath(file, defaultConf); 461 462 DistributedCache.addFileToClassPath(file, conf); 463 return null; 464 } 465 }); 466 467 } 468 catch (InterruptedException ex) { 469 throw new IOException(ex); 470 } 471 472 } 473 474 /** 475 * checks configuration parameter if filesystem scheme is among the list of supported ones 476 * this makes system robust to filesystems other than HDFS also 477 */ 478 479 public void checkSupportedFilesystem(URI uri) throws HadoopAccessorException { 480 String uriScheme = uri.getScheme(); 481 if(!supportedSchemes.isEmpty()) { 482 XLog.getLog(this.getClass()).debug("Checking if filesystem " + uriScheme + " is supported"); 483 if (!supportedSchemes.contains(uriScheme)) { 484 throw new HadoopAccessorException(ErrorCode.E0904, uriScheme, uri.toString()); 485 } 486 } 487 } 488 489 }