This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import org.apache.hadoop.io.Text;
021 import org.apache.hadoop.mapred.JobClient;
022 import org.apache.hadoop.mapred.JobConf;
023 import org.apache.hadoop.fs.FileSystem;
024 import org.apache.hadoop.fs.Path;
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
027 import org.apache.hadoop.net.NetUtils;
028 import org.apache.hadoop.security.SecurityUtil;
029 import org.apache.hadoop.security.UserGroupInformation;
030 import org.apache.hadoop.filecache.DistributedCache;
031 import org.apache.hadoop.security.token.Token;
032 import org.apache.oozie.ErrorCode;
033 import org.apache.oozie.action.hadoop.JavaActionExecutor;
034 import org.apache.oozie.util.ParamChecker;
035 import org.apache.oozie.util.XConfiguration;
036 import org.apache.oozie.util.XLog;
037
038 import java.io.File;
039 import java.io.FileInputStream;
040 import java.io.IOException;
041 import java.io.InputStream;
042 import java.net.URI;
043 import java.net.URISyntaxException;
044 import java.security.PrivilegedExceptionAction;
045 import java.util.HashMap;
046 import java.util.Map;
047 import java.util.Set;
048 import java.util.HashSet;
049 import java.util.concurrent.ConcurrentHashMap;
050 import java.util.concurrent.ConcurrentMap;
051
052 /**
053 * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The
054 * default accessor used is the base accessor which just injects the UGI into the configuration instance used to
055 * create/obtain JobClient and ileSystem instances. <p/> The HadoopAccess class to use can be configured in the
056 * <code>oozie-site.xml</code> using the <code>oozie.service.HadoopAccessorService.accessor.class</code> property.
057 */
058 public class HadoopAccessorService implements Service {
059
060 private static XLog LOG = XLog.getLog(HadoopAccessorService.class);
061
062 public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService.";
063 public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist";
064 public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist";
065 public static final String HADOOP_CONFS = CONF_PREFIX + "hadoop.configurations";
066 public static final String ACTION_CONFS = CONF_PREFIX + "action.configurations";
067 public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled";
068 public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file";
069 public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal";
070 public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token");
071
072 private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
073 /** The Kerberos principal for the job tracker.*/
074 private static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
075 /** The Kerberos principal for the resource manager.*/
076 private static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
077 private static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
078 private static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
079 private static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
080 private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>();
081
082 private Set<String> jobTrackerWhitelist = new HashSet<String>();
083 private Set<String> nameNodeWhitelist = new HashSet<String>();
084 private Map<String, Configuration> hadoopConfigs = new HashMap<String, Configuration>();
085 private Map<String, File> actionConfigDirs = new HashMap<String, File>();
086 private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>();
087
088 private ConcurrentMap<String, UserGroupInformation> userUgiMap;
089
090 /**
091 * Supported filesystem schemes for namespace federation
092 */
093 public static final String SUPPORTED_FILESYSTEMS = CONF_PREFIX + "supported.filesystems";
094 private Set<String> supportedSchemes;
095
096 public void init(Services services) throws ServiceException {
097 init(services.getConf());
098 }
099
100 //for testing purposes, see XFsTestCase
101 public void init(Configuration conf) throws ServiceException {
102 for (String name : conf.getStringCollection(JOB_TRACKER_WHITELIST)) {
103 String tmp = name.toLowerCase().trim();
104 if (tmp.length() == 0) {
105 continue;
106 }
107 jobTrackerWhitelist.add(tmp);
108 }
109 XLog.getLog(getClass()).info(
110 "JOB_TRACKER_WHITELIST :" + conf.getStringCollection(JOB_TRACKER_WHITELIST)
111 + ", Total entries :" + jobTrackerWhitelist.size());
112 for (String name : conf.getStringCollection(NAME_NODE_WHITELIST)) {
113 String tmp = name.toLowerCase().trim();
114 if (tmp.length() == 0) {
115 continue;
116 }
117 nameNodeWhitelist.add(tmp);
118 }
119 XLog.getLog(getClass()).info(
120 "NAME_NODE_WHITELIST :" + conf.getStringCollection(NAME_NODE_WHITELIST)
121 + ", Total entries :" + nameNodeWhitelist.size());
122
123 boolean kerberosAuthOn = conf.getBoolean(KERBEROS_AUTH_ENABLED, true);
124 XLog.getLog(getClass()).info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled");
125 if (kerberosAuthOn) {
126 kerberosInit(conf);
127 }
128 else {
129 Configuration ugiConf = new Configuration();
130 ugiConf.set("hadoop.security.authentication", "simple");
131 UserGroupInformation.setConfiguration(ugiConf);
132 }
133
134 userUgiMap = new ConcurrentHashMap<String, UserGroupInformation>();
135
136 loadHadoopConfigs(conf);
137 preLoadActionConfigs(conf);
138
139 supportedSchemes = new HashSet<String>();
140 String[] schemesFromConf = conf.getStrings(SUPPORTED_FILESYSTEMS, new String[]{"hdfs","hftp","webhdfs"});
141 if(schemesFromConf != null) {
142 for (String scheme: schemesFromConf) {
143 scheme = scheme.trim();
144 // If user gives "*", supportedSchemes will be empty, so that checking is not done i.e. all schemes allowed
145 if(scheme.equals("*")) {
146 if(schemesFromConf.length > 1) {
147 throw new ServiceException(ErrorCode.E0100, getClass().getName(),
148 SUPPORTED_FILESYSTEMS + " should contain either only wildcard or explicit list, not both");
149 }
150 } else {
151 supportedSchemes.add(scheme);
152 }
153 }
154 }
155 }
156
157 private void kerberosInit(Configuration serviceConf) throws ServiceException {
158 try {
159 String keytabFile = serviceConf.get(KERBEROS_KEYTAB,
160 System.getProperty("user.home") + "/oozie.keytab").trim();
161 if (keytabFile.length() == 0) {
162 throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB);
163 }
164 String principal = serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
165 if (principal.length() == 0) {
166 throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
167 }
168 Configuration conf = new Configuration();
169 conf.set("hadoop.security.authentication", "kerberos");
170 UserGroupInformation.setConfiguration(conf);
171 UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
172 XLog.getLog(getClass()).info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]",
173 keytabFile, principal);
174 }
175 catch (ServiceException ex) {
176 throw ex;
177 }
178 catch (Exception ex) {
179 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
180 }
181 }
182
183 private static final String[] HADOOP_CONF_FILES =
184 {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hadoop-site.xml"};
185
186
187 private Configuration loadHadoopConf(File dir) throws IOException {
188 Configuration hadoopConf = new XConfiguration();
189 for (String file : HADOOP_CONF_FILES) {
190 File f = new File(dir, file);
191 if (f.exists()) {
192 InputStream is = new FileInputStream(f);
193 Configuration conf = new XConfiguration(is);
194 is.close();
195 XConfiguration.copy(conf, hadoopConf);
196 }
197 }
198 return hadoopConf;
199 }
200
201 private Map<String, File> parseConfigDirs(String[] confDefs, String type) throws ServiceException, IOException {
202 Map<String, File> map = new HashMap<String, File>();
203 File configDir = new File(ConfigurationService.getConfigurationDirectory());
204 for (String confDef : confDefs) {
205 if (confDef.trim().length() > 0) {
206 String[] parts = confDef.split("=");
207 if (parts.length == 2) {
208 String hostPort = parts[0];
209 String confDir = parts[1];
210 File dir = new File(confDir);
211 if (!dir.isAbsolute()) {
212 dir = new File(configDir, confDir);
213 }
214 if (dir.exists()) {
215 map.put(hostPort.toLowerCase(), dir);
216 }
217 else {
218 throw new ServiceException(ErrorCode.E0100, getClass().getName(),
219 "could not find " + type + " configuration directory: " +
220 dir.getAbsolutePath());
221 }
222 }
223 else {
224 throw new ServiceException(ErrorCode.E0100, getClass().getName(),
225 "Incorrect " + type + " configuration definition: " + confDef);
226 }
227 }
228 }
229 return map;
230 }
231
232 private void loadHadoopConfigs(Configuration serviceConf) throws ServiceException {
233 try {
234 Map<String, File> map = parseConfigDirs(serviceConf.getStrings(HADOOP_CONFS, "*=hadoop-conf"), "hadoop");
235 for (Map.Entry<String, File> entry : map.entrySet()) {
236 hadoopConfigs.put(entry.getKey(), loadHadoopConf(entry.getValue()));
237 }
238 }
239 catch (ServiceException ex) {
240 throw ex;
241 }
242 catch (Exception ex) {
243 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
244 }
245 }
246
247 private void preLoadActionConfigs(Configuration serviceConf) throws ServiceException {
248 try {
249 actionConfigDirs = parseConfigDirs(serviceConf.getStrings(ACTION_CONFS, "*=hadoop-conf"), "action");
250 for (String hostport : actionConfigDirs.keySet()) {
251 actionConfigs.put(hostport, new ConcurrentHashMap<String, XConfiguration>());
252 }
253 }
254 catch (ServiceException ex) {
255 throw ex;
256 }
257 catch (Exception ex) {
258 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
259 }
260 }
261
262 public void destroy() {
263 }
264
265 public Class<? extends Service> getInterface() {
266 return HadoopAccessorService.class;
267 }
268
269 private UserGroupInformation getUGI(String user) throws IOException {
270 UserGroupInformation ugi = userUgiMap.get(user);
271 if (ugi == null) {
272 // taking care of a race condition, the latest UGI will be discarded
273 ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
274 userUgiMap.putIfAbsent(user, ugi);
275 }
276 return ugi;
277 }
278
279 /**
280 * Creates a JobConf using the site configuration for the specified hostname:port.
281 * <p/>
282 * If the specified hostname:port is not defined it falls back to the '*' site
283 * configuration if available. If the '*' site configuration is not available,
284 * the JobConf has all Hadoop defaults.
285 *
286 * @param hostPort hostname:port to lookup Hadoop site configuration.
287 * @return a JobConf with the corresponding site configuration for hostPort.
288 */
289 public JobConf createJobConf(String hostPort) {
290 JobConf jobConf = new JobConf();
291 XConfiguration.copy(getConfiguration(hostPort), jobConf);
292 jobConf.setBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, true);
293 return jobConf;
294 }
295
296 private XConfiguration loadActionConf(String hostPort, String action) {
297 File dir = actionConfigDirs.get(hostPort);
298 XConfiguration actionConf = new XConfiguration();
299 if (dir != null) {
300 File actionConfFile = new File(dir, action + ".xml");
301 if (actionConfFile.exists()) {
302 try {
303 actionConf = new XConfiguration(new FileInputStream(actionConfFile));
304 }
305 catch (IOException ex) {
306 XLog.getLog(getClass()).warn("Could not read file [{0}] for action [{1}] configuration for hostPort [{2}]",
307 actionConfFile.getAbsolutePath(), action, hostPort);
308 }
309 }
310 }
311 return actionConf;
312 }
313
314 /**
315 * Returns a Configuration containing any defaults for an action for a particular cluster.
316 * <p/>
317 * This configuration is used as default for the action configuration and enables cluster
318 * level default values per action.
319 *
320 * @param hostPort hostname"port to lookup the action default confiugration.
321 * @param action action name.
322 * @return the default configuration for the action for the specified cluster.
323 */
324 public XConfiguration createActionDefaultConf(String hostPort, String action) {
325 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
326 Map<String, XConfiguration> hostPortActionConfigs = actionConfigs.get(hostPort);
327 if (hostPortActionConfigs == null) {
328 hostPortActionConfigs = actionConfigs.get("*");
329 hostPort = "*";
330 }
331 XConfiguration actionConf = hostPortActionConfigs.get(action);
332 if (actionConf == null) {
333 // doing lazy loading as we don't know upfront all actions, no need to synchronize
334 // as it is a read operation an in case of a race condition loading and inserting
335 // into the Map is idempotent and the action-config Map is a ConcurrentHashMap
336 actionConf = loadActionConf(hostPort, action);
337 hostPortActionConfigs.put(action, actionConf);
338 }
339 return new XConfiguration(actionConf.toProperties());
340 }
341
342 private Configuration getConfiguration(String hostPort) {
343 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
344 Configuration conf = hadoopConfigs.get(hostPort);
345 if (conf == null) {
346 conf = hadoopConfigs.get("*");
347 if (conf == null) {
348 conf = new XConfiguration();
349 }
350 }
351 return conf;
352 }
353
354 /**
355 * Return a JobClient created with the provided user/group.
356 *
357 *
358 * @param conf JobConf with all necessary information to create the
359 * JobClient.
360 * @return JobClient created with the provided user/group.
361 * @throws HadoopAccessorException if the client could not be created.
362 */
363 public JobClient createJobClient(String user, final JobConf conf) throws HadoopAccessorException {
364 ParamChecker.notEmpty(user, "user");
365 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
366 throw new HadoopAccessorException(ErrorCode.E0903);
367 }
368 String jobTracker = conf.get(JavaActionExecutor.HADOOP_JOB_TRACKER);
369 validateJobTracker(jobTracker);
370 try {
371 UserGroupInformation ugi = getUGI(user);
372 JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() {
373 public JobClient run() throws Exception {
374 return new JobClient(conf);
375 }
376 });
377 Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(getMRDelegationTokenRenewer(conf));
378 conf.getCredentials().addToken(MR_TOKEN_ALIAS, mrdt);
379 return jobClient;
380 }
381 catch (InterruptedException ex) {
382 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
383 }
384 catch (IOException ex) {
385 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
386 }
387 }
388
389 /**
390 * Return a FileSystem created with the provided user for the specified URI.
391 *
392 *
393 * @param uri file system URI.
394 * @param conf Configuration with all necessary information to create the FileSystem.
395 * @return FileSystem created with the provided user/group.
396 * @throws HadoopAccessorException if the filesystem could not be created.
397 */
398 public FileSystem createFileSystem(String user, final URI uri, final Configuration conf)
399 throws HadoopAccessorException {
400 ParamChecker.notEmpty(user, "user");
401 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
402 throw new HadoopAccessorException(ErrorCode.E0903);
403 }
404
405 checkSupportedFilesystem(uri);
406
407 String nameNode = uri.getAuthority();
408 if (nameNode == null) {
409 nameNode = conf.get("fs.default.name");
410 if (nameNode != null) {
411 try {
412 nameNode = new URI(nameNode).getAuthority();
413 }
414 catch (URISyntaxException ex) {
415 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
416 }
417 }
418 }
419 validateNameNode(nameNode);
420
421 try {
422 UserGroupInformation ugi = getUGI(user);
423 return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
424 public FileSystem run() throws Exception {
425 return FileSystem.get(uri, conf);
426 }
427 });
428 }
429 catch (InterruptedException ex) {
430 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
431 }
432 catch (IOException ex) {
433 throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
434 }
435 }
436
437 /**
438 * Validate Job tracker
439 * @param jobTrackerUri
440 * @throws HadoopAccessorException
441 */
442 protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException {
443 validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900);
444 }
445
446 /**
447 * Validate Namenode list
448 * @param nameNodeUri
449 * @throws HadoopAccessorException
450 */
451 protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException {
452 validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901);
453 }
454
455 private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException {
456 if (uri != null) {
457 uri = uri.toLowerCase().trim();
458 if (whitelist.size() > 0 && !whitelist.contains(uri)) {
459 throw new HadoopAccessorException(error, uri);
460 }
461 }
462 }
463
464 public static Text getMRDelegationTokenRenewer(JobConf jobConf) throws IOException {
465 if (UserGroupInformation.isSecurityEnabled()) { // secure cluster
466 return getMRTokenRenewerInternal(jobConf);
467 }
468 else {
469 return MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer
470 }
471 }
472
473 // Package private for unit test purposes
474 static Text getMRTokenRenewerInternal(JobConf jobConf) throws IOException {
475 // Getting renewer correctly for JT principal also though JT in hadoop 1.x does not have
476 // support for renewing/cancelling tokens
477 String servicePrincipal = jobConf.get(RM_PRINCIPAL, jobConf.get(JT_PRINCIPAL));
478 Text renewer;
479 if (servicePrincipal != null) { // secure cluster
480 renewer = mrTokenRenewers.get(servicePrincipal);
481 if (renewer == null) {
482 // Mimic org.apache.hadoop.mapred.Master.getMasterPrincipal()
483 String target = jobConf.get(HADOOP_YARN_RM, jobConf.get(HADOOP_JOB_TRACKER_2));
484 if (target == null) {
485 target = jobConf.get(HADOOP_JOB_TRACKER);
486 }
487 String addr = NetUtils.createSocketAddr(target).getHostName();
488 renewer = new Text(SecurityUtil.getServerPrincipal(servicePrincipal, addr));
489 LOG.info("Delegation Token Renewer details: Principal=" + servicePrincipal + ",Target=" + target
490 + ",Renewer=" + renewer);
491 mrTokenRenewers.put(servicePrincipal, renewer);
492 }
493 }
494 else {
495 renewer = MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer
496 }
497 return renewer;
498 }
499
500 public void addFileToClassPath(String user, final Path file, final Configuration conf)
501 throws IOException {
502 ParamChecker.notEmpty(user, "user");
503 try {
504 UserGroupInformation ugi = getUGI(user);
505 ugi.doAs(new PrivilegedExceptionAction<Void>() {
506 public Void run() throws Exception {
507 Configuration defaultConf = new Configuration();
508 XConfiguration.copy(conf, defaultConf);
509 //Doing this NOP add first to have the FS created and cached
510 DistributedCache.addFileToClassPath(file, defaultConf);
511
512 DistributedCache.addFileToClassPath(file, conf);
513 return null;
514 }
515 });
516
517 }
518 catch (InterruptedException ex) {
519 throw new IOException(ex);
520 }
521
522 }
523
524 /**
525 * checks configuration parameter if filesystem scheme is among the list of supported ones
526 * this makes system robust to filesystems other than HDFS also
527 */
528
529 public void checkSupportedFilesystem(URI uri) throws HadoopAccessorException {
530 String uriScheme = uri.getScheme();
531 if (uriScheme != null) { // skip the check if no scheme is given
532 if(!supportedSchemes.isEmpty()) {
533 XLog.getLog(this.getClass()).debug("Checking if filesystem " + uriScheme + " is supported");
534 if (!supportedSchemes.contains(uriScheme)) {
535 throw new HadoopAccessorException(ErrorCode.E0904, uriScheme, uri.toString());
536 }
537 }
538 }
539 }
540
541 }