This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import org.apache.hadoop.io.Text;
021 import org.apache.hadoop.mapred.JobClient;
022 import org.apache.hadoop.mapred.JobConf;
023 import org.apache.hadoop.fs.FileSystem;
024 import org.apache.hadoop.fs.Path;
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
027 import org.apache.hadoop.net.NetUtils;
028 import org.apache.hadoop.security.SecurityUtil;
029 import org.apache.hadoop.security.UserGroupInformation;
030 import org.apache.hadoop.filecache.DistributedCache;
031 import org.apache.hadoop.security.token.Token;
032 import org.apache.oozie.ErrorCode;
033 import org.apache.oozie.action.hadoop.JavaActionExecutor;
034 import org.apache.oozie.util.ParamChecker;
035 import org.apache.oozie.util.XConfiguration;
036 import org.apache.oozie.util.XLog;
037
038 import java.io.File;
039 import java.io.FileInputStream;
040 import java.io.IOException;
041 import java.io.InputStream;
042 import java.net.URI;
043 import java.net.URISyntaxException;
044 import java.security.PrivilegedExceptionAction;
045 import java.util.HashMap;
046 import java.util.Map;
047 import java.util.Set;
048 import java.util.HashSet;
049 import java.util.concurrent.ConcurrentHashMap;
050
051 /**
052 * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The
053 * default accessor used is the base accessor which just injects the UGI into the configuration instance used to
054 * create/obtain JobClient and FileSystem instances.
055 */
056 public class HadoopAccessorService implements Service {
057
058 private static XLog LOG = XLog.getLog(HadoopAccessorService.class);
059
060 public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService.";
061 public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist";
062 public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist";
063 public static final String HADOOP_CONFS = CONF_PREFIX + "hadoop.configurations";
064 public static final String ACTION_CONFS = CONF_PREFIX + "action.configurations";
065 public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled";
066 public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file";
067 public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal";
068 public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token");
069
070 protected static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
071 /** The Kerberos principal for the job tracker.*/
072 protected static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
073 /** The Kerberos principal for the resource manager.*/
074 protected static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
075 protected static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
076 protected static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
077 protected static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
078 private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>();
079
080 private Set<String> jobTrackerWhitelist = new HashSet<String>();
081 private Set<String> nameNodeWhitelist = new HashSet<String>();
082 private Map<String, Configuration> hadoopConfigs = new HashMap<String, Configuration>();
083 private Map<String, File> actionConfigDirs = new HashMap<String, File>();
084 private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>();
085
086 private UserGroupInformationService ugiService;
087
088 /**
089 * Supported filesystem schemes for namespace federation
090 */
091 public static final String SUPPORTED_FILESYSTEMS = CONF_PREFIX + "supported.filesystems";
092 public static final String[] DEFAULT_SUPPORTED_SCHEMES = new String[]{"hdfs","hftp","webhdfs"};
093 private Set<String> supportedSchemes;
094 private boolean allSchemesSupported;
095
096 public void init(Services services) throws ServiceException {
097 this.ugiService = services.get(UserGroupInformationService.class);
098 init(services.getConf());
099 }
100
101 //for testing purposes, see XFsTestCase
102 public void init(Configuration conf) throws ServiceException {
103 for (String name : conf.getStringCollection(JOB_TRACKER_WHITELIST)) {
104 String tmp = name.toLowerCase().trim();
105 if (tmp.length() == 0) {
106 continue;
107 }
108 jobTrackerWhitelist.add(tmp);
109 }
110 XLog.getLog(getClass()).info(
111 "JOB_TRACKER_WHITELIST :" + conf.getStringCollection(JOB_TRACKER_WHITELIST)
112 + ", Total entries :" + jobTrackerWhitelist.size());
113 for (String name : conf.getStringCollection(NAME_NODE_WHITELIST)) {
114 String tmp = name.toLowerCase().trim();
115 if (tmp.length() == 0) {
116 continue;
117 }
118 nameNodeWhitelist.add(tmp);
119 }
120 XLog.getLog(getClass()).info(
121 "NAME_NODE_WHITELIST :" + conf.getStringCollection(NAME_NODE_WHITELIST)
122 + ", Total entries :" + nameNodeWhitelist.size());
123
124 boolean kerberosAuthOn = conf.getBoolean(KERBEROS_AUTH_ENABLED, true);
125 XLog.getLog(getClass()).info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled");
126 if (kerberosAuthOn) {
127 kerberosInit(conf);
128 }
129 else {
130 Configuration ugiConf = new Configuration();
131 ugiConf.set("hadoop.security.authentication", "simple");
132 UserGroupInformation.setConfiguration(ugiConf);
133 }
134
135 if (ugiService == null) { //for testing purposes, see XFsTestCase
136 this.ugiService = new UserGroupInformationService();
137 }
138
139 loadHadoopConfigs(conf);
140 preLoadActionConfigs(conf);
141
142 supportedSchemes = new HashSet<String>();
143 String[] schemesFromConf = conf.getStrings(SUPPORTED_FILESYSTEMS, DEFAULT_SUPPORTED_SCHEMES);
144 if(schemesFromConf != null) {
145 for (String scheme: schemesFromConf) {
146 scheme = scheme.trim();
147 // If user gives "*", supportedSchemes will be empty, so that checking is not done i.e. all schemes allowed
148 if(scheme.equals("*")) {
149 if(schemesFromConf.length > 1) {
150 throw new ServiceException(ErrorCode.E0100, getClass().getName(),
151 SUPPORTED_FILESYSTEMS + " should contain either only wildcard or explicit list, not both");
152 }
153 allSchemesSupported = true;
154 }
155 supportedSchemes.add(scheme);
156 }
157 }
158 }
159
160 private void kerberosInit(Configuration serviceConf) throws ServiceException {
161 try {
162 String keytabFile = serviceConf.get(KERBEROS_KEYTAB,
163 System.getProperty("user.home") + "/oozie.keytab").trim();
164 if (keytabFile.length() == 0) {
165 throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB);
166 }
167 String principal = serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
168 if (principal.length() == 0) {
169 throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
170 }
171 Configuration conf = new Configuration();
172 conf.set("hadoop.security.authentication", "kerberos");
173 UserGroupInformation.setConfiguration(conf);
174 UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
175 XLog.getLog(getClass()).info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]",
176 keytabFile, principal);
177 }
178 catch (ServiceException ex) {
179 throw ex;
180 }
181 catch (Exception ex) {
182 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
183 }
184 }
185
186 private static final String[] HADOOP_CONF_FILES =
187 {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hadoop-site.xml"};
188
189
190 private Configuration loadHadoopConf(File dir) throws IOException {
191 Configuration hadoopConf = new XConfiguration();
192 for (String file : HADOOP_CONF_FILES) {
193 File f = new File(dir, file);
194 if (f.exists()) {
195 InputStream is = new FileInputStream(f);
196 Configuration conf = new XConfiguration(is);
197 is.close();
198 XConfiguration.copy(conf, hadoopConf);
199 }
200 }
201 return hadoopConf;
202 }
203
204 private Map<String, File> parseConfigDirs(String[] confDefs, String type) throws ServiceException, IOException {
205 Map<String, File> map = new HashMap<String, File>();
206 File configDir = new File(ConfigurationService.getConfigurationDirectory());
207 for (String confDef : confDefs) {
208 if (confDef.trim().length() > 0) {
209 String[] parts = confDef.split("=");
210 if (parts.length == 2) {
211 String hostPort = parts[0];
212 String confDir = parts[1];
213 File dir = new File(confDir);
214 if (!dir.isAbsolute()) {
215 dir = new File(configDir, confDir);
216 }
217 if (dir.exists()) {
218 map.put(hostPort.toLowerCase(), dir);
219 }
220 else {
221 throw new ServiceException(ErrorCode.E0100, getClass().getName(),
222 "could not find " + type + " configuration directory: " +
223 dir.getAbsolutePath());
224 }
225 }
226 else {
227 throw new ServiceException(ErrorCode.E0100, getClass().getName(),
228 "Incorrect " + type + " configuration definition: " + confDef);
229 }
230 }
231 }
232 return map;
233 }
234
235 private void loadHadoopConfigs(Configuration serviceConf) throws ServiceException {
236 try {
237 Map<String, File> map = parseConfigDirs(serviceConf.getStrings(HADOOP_CONFS, "*=hadoop-conf"), "hadoop");
238 for (Map.Entry<String, File> entry : map.entrySet()) {
239 hadoopConfigs.put(entry.getKey(), loadHadoopConf(entry.getValue()));
240 }
241 }
242 catch (ServiceException ex) {
243 throw ex;
244 }
245 catch (Exception ex) {
246 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
247 }
248 }
249
250 private void preLoadActionConfigs(Configuration serviceConf) throws ServiceException {
251 try {
252 actionConfigDirs = parseConfigDirs(serviceConf.getStrings(ACTION_CONFS, "*=hadoop-conf"), "action");
253 for (String hostport : actionConfigDirs.keySet()) {
254 actionConfigs.put(hostport, new ConcurrentHashMap<String, XConfiguration>());
255 }
256 }
257 catch (ServiceException ex) {
258 throw ex;
259 }
260 catch (Exception ex) {
261 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
262 }
263 }
264
265 public void destroy() {
266 }
267
268 public Class<? extends Service> getInterface() {
269 return HadoopAccessorService.class;
270 }
271
272 private UserGroupInformation getUGI(String user) throws IOException {
273 return ugiService.getProxyUser(user);
274 }
275
276 /**
277 * Creates a JobConf using the site configuration for the specified hostname:port.
278 * <p/>
279 * If the specified hostname:port is not defined it falls back to the '*' site
280 * configuration if available. If the '*' site configuration is not available,
281 * the JobConf has all Hadoop defaults.
282 *
283 * @param hostPort hostname:port to lookup Hadoop site configuration.
284 * @return a JobConf with the corresponding site configuration for hostPort.
285 */
286 public JobConf createJobConf(String hostPort) {
287 JobConf jobConf = new JobConf();
288 XConfiguration.copy(getConfiguration(hostPort), jobConf);
289 jobConf.setBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, true);
290 return jobConf;
291 }
292
293 private XConfiguration loadActionConf(String hostPort, String action) {
294 File dir = actionConfigDirs.get(hostPort);
295 XConfiguration actionConf = new XConfiguration();
296 if (dir != null) {
297 File actionConfFile = new File(dir, action + ".xml");
298 if (actionConfFile.exists()) {
299 try {
300 actionConf = new XConfiguration(new FileInputStream(actionConfFile));
301 }
302 catch (IOException ex) {
303 XLog.getLog(getClass()).warn("Could not read file [{0}] for action [{1}] configuration for hostPort [{2}]",
304 actionConfFile.getAbsolutePath(), action, hostPort);
305 }
306 }
307 }
308 return actionConf;
309 }
310
311 /**
312 * Returns a Configuration containing any defaults for an action for a particular cluster.
313 * <p/>
314 * This configuration is used as default for the action configuration and enables cluster
315 * level default values per action.
316 *
317 * @param hostPort hostname"port to lookup the action default confiugration.
318 * @param action action name.
319 * @return the default configuration for the action for the specified cluster.
320 */
321 public XConfiguration createActionDefaultConf(String hostPort, String action) {
322 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
323 Map<String, XConfiguration> hostPortActionConfigs = actionConfigs.get(hostPort);
324 if (hostPortActionConfigs == null) {
325 hostPortActionConfigs = actionConfigs.get("*");
326 hostPort = "*";
327 }
328 XConfiguration actionConf = hostPortActionConfigs.get(action);
329 if (actionConf == null) {
330 // doing lazy loading as we don't know upfront all actions, no need to synchronize
331 // as it is a read operation an in case of a race condition loading and inserting
332 // into the Map is idempotent and the action-config Map is a ConcurrentHashMap
333 actionConf = loadActionConf(hostPort, action);
334 hostPortActionConfigs.put(action, actionConf);
335 }
336 return new XConfiguration(actionConf.toProperties());
337 }
338
339 private Configuration getConfiguration(String hostPort) {
340 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
341 Configuration conf = hadoopConfigs.get(hostPort);
342 if (conf == null) {
343 conf = hadoopConfigs.get("*");
344 if (conf == null) {
345 conf = new XConfiguration();
346 }
347 }
348 return conf;
349 }
350
351 /**
352 * Return a JobClient created with the provided user/group.
353 *
354 *
355 * @param conf JobConf with all necessary information to create the
356 * JobClient.
357 * @return JobClient created with the provided user/group.
358 * @throws HadoopAccessorException if the client could not be created.
359 */
360 public JobClient createJobClient(String user, final JobConf conf) throws HadoopAccessorException {
361 ParamChecker.notEmpty(user, "user");
362 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
363 throw new HadoopAccessorException(ErrorCode.E0903);
364 }
365 String jobTracker = conf.get(JavaActionExecutor.HADOOP_JOB_TRACKER);
366 validateJobTracker(jobTracker);
367 try {
368 UserGroupInformation ugi = getUGI(user);
369 JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() {
370 public JobClient run() throws Exception {
371 return new JobClient(conf);
372 }
373 });
374 Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(getMRDelegationTokenRenewer(conf));
375 conf.getCredentials().addToken(MR_TOKEN_ALIAS, mrdt);
376 return jobClient;
377 }
378 catch (InterruptedException ex) {
379 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
380 }
381 catch (IOException ex) {
382 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
383 }
384 }
385
386 /**
387 * Return a FileSystem created with the provided user for the specified URI.
388 *
389 *
390 * @param uri file system URI.
391 * @param conf Configuration with all necessary information to create the FileSystem.
392 * @return FileSystem created with the provided user/group.
393 * @throws HadoopAccessorException if the filesystem could not be created.
394 */
395 public FileSystem createFileSystem(String user, final URI uri, final Configuration conf)
396 throws HadoopAccessorException {
397 ParamChecker.notEmpty(user, "user");
398 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
399 throw new HadoopAccessorException(ErrorCode.E0903);
400 }
401
402 checkSupportedFilesystem(uri);
403
404 String nameNode = uri.getAuthority();
405 if (nameNode == null) {
406 nameNode = conf.get("fs.default.name");
407 if (nameNode != null) {
408 try {
409 nameNode = new URI(nameNode).getAuthority();
410 }
411 catch (URISyntaxException ex) {
412 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
413 }
414 }
415 }
416 validateNameNode(nameNode);
417
418 try {
419 UserGroupInformation ugi = getUGI(user);
420 return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
421 public FileSystem run() throws Exception {
422 return FileSystem.get(uri, conf);
423 }
424 });
425 }
426 catch (InterruptedException ex) {
427 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
428 }
429 catch (IOException ex) {
430 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
431 }
432 }
433
434 /**
435 * Validate Job tracker
436 * @param jobTrackerUri
437 * @throws HadoopAccessorException
438 */
439 protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException {
440 validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900);
441 }
442
443 /**
444 * Validate Namenode list
445 * @param nameNodeUri
446 * @throws HadoopAccessorException
447 */
448 protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException {
449 validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901);
450 }
451
452 private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException {
453 if (uri != null) {
454 uri = uri.toLowerCase().trim();
455 if (whitelist.size() > 0 && !whitelist.contains(uri)) {
456 throw new HadoopAccessorException(error, uri);
457 }
458 }
459 }
460
461 public Text getMRDelegationTokenRenewer(JobConf jobConf) throws IOException {
462 if (UserGroupInformation.isSecurityEnabled()) { // secure cluster
463 return getMRTokenRenewerInternal(jobConf);
464 }
465 else {
466 return MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer
467 }
468 }
469
470 // Package private for unit test purposes
471 Text getMRTokenRenewerInternal(JobConf jobConf) throws IOException {
472 // Getting renewer correctly for JT principal also though JT in hadoop 1.x does not have
473 // support for renewing/cancelling tokens
474 String servicePrincipal = jobConf.get(RM_PRINCIPAL, jobConf.get(JT_PRINCIPAL));
475 Text renewer;
476 if (servicePrincipal != null) { // secure cluster
477 renewer = mrTokenRenewers.get(servicePrincipal);
478 if (renewer == null) {
479 // Mimic org.apache.hadoop.mapred.Master.getMasterPrincipal()
480 String target = jobConf.get(HADOOP_YARN_RM, jobConf.get(HADOOP_JOB_TRACKER_2));
481 if (target == null) {
482 target = jobConf.get(HADOOP_JOB_TRACKER);
483 }
484 String addr = NetUtils.createSocketAddr(target).getHostName();
485 renewer = new Text(SecurityUtil.getServerPrincipal(servicePrincipal, addr));
486 LOG.info("Delegation Token Renewer details: Principal=" + servicePrincipal + ",Target=" + target
487 + ",Renewer=" + renewer);
488 mrTokenRenewers.put(servicePrincipal, renewer);
489 }
490 }
491 else {
492 renewer = MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer
493 }
494 return renewer;
495 }
496
497 public void addFileToClassPath(String user, final Path file, final Configuration conf)
498 throws IOException {
499 ParamChecker.notEmpty(user, "user");
500 try {
501 UserGroupInformation ugi = getUGI(user);
502 ugi.doAs(new PrivilegedExceptionAction<Void>() {
503 public Void run() throws Exception {
504 Configuration defaultConf = new Configuration();
505 XConfiguration.copy(conf, defaultConf);
506 //Doing this NOP add first to have the FS created and cached
507 DistributedCache.addFileToClassPath(file, defaultConf);
508
509 DistributedCache.addFileToClassPath(file, conf);
510 return null;
511 }
512 });
513
514 }
515 catch (InterruptedException ex) {
516 throw new IOException(ex);
517 }
518
519 }
520
521 /**
522 * checks configuration parameter if filesystem scheme is among the list of supported ones
523 * this makes system robust to filesystems other than HDFS also
524 */
525
526 public void checkSupportedFilesystem(URI uri) throws HadoopAccessorException {
527 if (allSchemesSupported)
528 return;
529 String uriScheme = uri.getScheme();
530 if (uriScheme != null) { // skip the check if no scheme is given
531 if(!supportedSchemes.isEmpty()) {
532 XLog.getLog(this.getClass()).debug("Checking if filesystem " + uriScheme + " is supported");
533 if (!supportedSchemes.contains(uriScheme)) {
534 throw new HadoopAccessorException(ErrorCode.E0904, uriScheme, uri.toString());
535 }
536 }
537 }
538 }
539
540 public Set<String> getSupportedSchemes() {
541 return supportedSchemes;
542 }
543
544 }