This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import org.apache.hadoop.io.Text;
021 import org.apache.hadoop.mapred.JobClient;
022 import org.apache.hadoop.mapred.JobConf;
023 import org.apache.hadoop.fs.FileSystem;
024 import org.apache.hadoop.fs.Path;
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
027 import org.apache.hadoop.net.NetUtils;
028 import org.apache.hadoop.security.SecurityUtil;
029 import org.apache.hadoop.security.UserGroupInformation;
030 import org.apache.hadoop.filecache.DistributedCache;
031 import org.apache.hadoop.security.token.Token;
032 import org.apache.oozie.ErrorCode;
033 import org.apache.oozie.util.ParamChecker;
034 import org.apache.oozie.util.XConfiguration;
035 import org.apache.oozie.util.XLog;
036
037 import java.io.File;
038 import java.io.FileInputStream;
039 import java.io.IOException;
040 import java.io.InputStream;
041 import java.net.URI;
042 import java.net.URISyntaxException;
043 import java.security.PrivilegedExceptionAction;
044 import java.util.HashMap;
045 import java.util.Map;
046 import java.util.Set;
047 import java.util.HashSet;
048 import java.util.concurrent.ConcurrentHashMap;
049 import java.util.concurrent.ConcurrentMap;
050
051 /**
052 * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The
053 * default accessor used is the base accessor which just injects the UGI into the configuration instance used to
054 * create/obtain JobClient and ileSystem instances. <p/> The HadoopAccess class to use can be configured in the
055 * <code>oozie-site.xml</code> using the <code>oozie.service.HadoopAccessorService.accessor.class</code> property.
056 */
057 public class HadoopAccessorService implements Service {
058
059 private static XLog LOG = XLog.getLog(HadoopAccessorService.class);
060
061 public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService.";
062 public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist";
063 public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist";
064 public static final String HADOOP_CONFS = CONF_PREFIX + "hadoop.configurations";
065 public static final String ACTION_CONFS = CONF_PREFIX + "action.configurations";
066 public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled";
067 public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file";
068 public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal";
069 public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token");
070
071 private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
072 /** The Kerberos principal for the job tracker.*/
073 private static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
074 /** The Kerberos principal for the resource manager.*/
075 private static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
076 private static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
077 private static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
078 private static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
079 private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>();
080
081 private Set<String> jobTrackerWhitelist = new HashSet<String>();
082 private Set<String> nameNodeWhitelist = new HashSet<String>();
083 private Map<String, Configuration> hadoopConfigs = new HashMap<String, Configuration>();
084 private Map<String, File> actionConfigDirs = new HashMap<String, File>();
085 private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>();
086
087 private ConcurrentMap<String, UserGroupInformation> userUgiMap;
088
089 /**
090 * Supported filesystem schemes for namespace federation
091 */
092 public static final String SUPPORTED_FILESYSTEMS = CONF_PREFIX + "supported.filesystems";
093 private Set<String> supportedSchemes;
094
095 public void init(Services services) throws ServiceException {
096 init(services.getConf());
097 }
098
099 //for testing purposes, see XFsTestCase
100 public void init(Configuration conf) throws ServiceException {
101 for (String name : conf.getStringCollection(JOB_TRACKER_WHITELIST)) {
102 String tmp = name.toLowerCase().trim();
103 if (tmp.length() == 0) {
104 continue;
105 }
106 jobTrackerWhitelist.add(tmp);
107 }
108 XLog.getLog(getClass()).info(
109 "JOB_TRACKER_WHITELIST :" + conf.getStringCollection(JOB_TRACKER_WHITELIST)
110 + ", Total entries :" + jobTrackerWhitelist.size());
111 for (String name : conf.getStringCollection(NAME_NODE_WHITELIST)) {
112 String tmp = name.toLowerCase().trim();
113 if (tmp.length() == 0) {
114 continue;
115 }
116 nameNodeWhitelist.add(tmp);
117 }
118 XLog.getLog(getClass()).info(
119 "NAME_NODE_WHITELIST :" + conf.getStringCollection(NAME_NODE_WHITELIST)
120 + ", Total entries :" + nameNodeWhitelist.size());
121
122 boolean kerberosAuthOn = conf.getBoolean(KERBEROS_AUTH_ENABLED, true);
123 XLog.getLog(getClass()).info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled");
124 if (kerberosAuthOn) {
125 kerberosInit(conf);
126 }
127 else {
128 Configuration ugiConf = new Configuration();
129 ugiConf.set("hadoop.security.authentication", "simple");
130 UserGroupInformation.setConfiguration(ugiConf);
131 }
132
133 userUgiMap = new ConcurrentHashMap<String, UserGroupInformation>();
134
135 loadHadoopConfigs(conf);
136 preLoadActionConfigs(conf);
137
138 supportedSchemes = new HashSet<String>();
139 String[] schemesFromConf = conf.getStrings(SUPPORTED_FILESYSTEMS, new String[]{"hdfs","hftp","webhdfs"});
140 if(schemesFromConf != null) {
141 for (String scheme: schemesFromConf) {
142 scheme = scheme.trim();
143 // If user gives "*", supportedSchemes will be empty, so that checking is not done i.e. all schemes allowed
144 if(scheme.equals("*")) {
145 if(schemesFromConf.length > 1) {
146 throw new ServiceException(ErrorCode.E0100, getClass().getName(),
147 SUPPORTED_FILESYSTEMS + " should contain either only wildcard or explicit list, not both");
148 }
149 } else {
150 supportedSchemes.add(scheme);
151 }
152 }
153 }
154 }
155
156 private void kerberosInit(Configuration serviceConf) throws ServiceException {
157 try {
158 String keytabFile = serviceConf.get(KERBEROS_KEYTAB,
159 System.getProperty("user.home") + "/oozie.keytab").trim();
160 if (keytabFile.length() == 0) {
161 throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB);
162 }
163 String principal = serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
164 if (principal.length() == 0) {
165 throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
166 }
167 Configuration conf = new Configuration();
168 conf.set("hadoop.security.authentication", "kerberos");
169 UserGroupInformation.setConfiguration(conf);
170 UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
171 XLog.getLog(getClass()).info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]",
172 keytabFile, principal);
173 }
174 catch (ServiceException ex) {
175 throw ex;
176 }
177 catch (Exception ex) {
178 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
179 }
180 }
181
182 private static final String[] HADOOP_CONF_FILES =
183 {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hadoop-site.xml"};
184
185
186 private Configuration loadHadoopConf(File dir) throws IOException {
187 Configuration hadoopConf = new XConfiguration();
188 for (String file : HADOOP_CONF_FILES) {
189 File f = new File(dir, file);
190 if (f.exists()) {
191 InputStream is = new FileInputStream(f);
192 Configuration conf = new XConfiguration(is);
193 is.close();
194 XConfiguration.copy(conf, hadoopConf);
195 }
196 }
197 return hadoopConf;
198 }
199
200 private Map<String, File> parseConfigDirs(String[] confDefs, String type) throws ServiceException, IOException {
201 Map<String, File> map = new HashMap<String, File>();
202 File configDir = new File(ConfigurationService.getConfigurationDirectory());
203 for (String confDef : confDefs) {
204 if (confDef.trim().length() > 0) {
205 String[] parts = confDef.split("=");
206 if (parts.length == 2) {
207 String hostPort = parts[0];
208 String confDir = parts[1];
209 File dir = new File(confDir);
210 if (!dir.isAbsolute()) {
211 dir = new File(configDir, confDir);
212 }
213 if (dir.exists()) {
214 map.put(hostPort.toLowerCase(), dir);
215 }
216 else {
217 throw new ServiceException(ErrorCode.E0100, getClass().getName(),
218 "could not find " + type + " configuration directory: " +
219 dir.getAbsolutePath());
220 }
221 }
222 else {
223 throw new ServiceException(ErrorCode.E0100, getClass().getName(),
224 "Incorrect " + type + " configuration definition: " + confDef);
225 }
226 }
227 }
228 return map;
229 }
230
231 private void loadHadoopConfigs(Configuration serviceConf) throws ServiceException {
232 try {
233 Map<String, File> map = parseConfigDirs(serviceConf.getStrings(HADOOP_CONFS, "*=hadoop-conf"), "hadoop");
234 for (Map.Entry<String, File> entry : map.entrySet()) {
235 hadoopConfigs.put(entry.getKey(), loadHadoopConf(entry.getValue()));
236 }
237 }
238 catch (ServiceException ex) {
239 throw ex;
240 }
241 catch (Exception ex) {
242 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
243 }
244 }
245
246 private void preLoadActionConfigs(Configuration serviceConf) throws ServiceException {
247 try {
248 actionConfigDirs = parseConfigDirs(serviceConf.getStrings(ACTION_CONFS, "*=hadoop-conf"), "action");
249 for (String hostport : actionConfigDirs.keySet()) {
250 actionConfigs.put(hostport, new ConcurrentHashMap<String, XConfiguration>());
251 }
252 }
253 catch (ServiceException ex) {
254 throw ex;
255 }
256 catch (Exception ex) {
257 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
258 }
259 }
260
261 public void destroy() {
262 }
263
264 public Class<? extends Service> getInterface() {
265 return HadoopAccessorService.class;
266 }
267
268 private UserGroupInformation getUGI(String user) throws IOException {
269 UserGroupInformation ugi = userUgiMap.get(user);
270 if (ugi == null) {
271 // taking care of a race condition, the latest UGI will be discarded
272 ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
273 userUgiMap.putIfAbsent(user, ugi);
274 }
275 return ugi;
276 }
277
278 /**
279 * Creates a JobConf using the site configuration for the specified hostname:port.
280 * <p/>
281 * If the specified hostname:port is not defined it falls back to the '*' site
282 * configuration if available. If the '*' site configuration is not available,
283 * the JobConf has all Hadoop defaults.
284 *
285 * @param hostPort hostname:port to lookup Hadoop site configuration.
286 * @return a JobConf with the corresponding site configuration for hostPort.
287 */
288 public JobConf createJobConf(String hostPort) {
289 JobConf jobConf = new JobConf();
290 XConfiguration.copy(getConfiguration(hostPort), jobConf);
291 jobConf.setBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, true);
292 return jobConf;
293 }
294
295 private XConfiguration loadActionConf(String hostPort, String action) {
296 File dir = actionConfigDirs.get(hostPort);
297 XConfiguration actionConf = new XConfiguration();
298 if (dir != null) {
299 File actionConfFile = new File(dir, action + ".xml");
300 if (actionConfFile.exists()) {
301 try {
302 actionConf = new XConfiguration(new FileInputStream(actionConfFile));
303 }
304 catch (IOException ex) {
305 XLog.getLog(getClass()).warn("Could not read file [{0}] for action [{1}] configuration for hostPort [{2}]",
306 actionConfFile.getAbsolutePath(), action, hostPort);
307 }
308 }
309 }
310 return actionConf;
311 }
312
313 /**
314 * Returns a Configuration containing any defaults for an action for a particular cluster.
315 * <p/>
316 * This configuration is used as default for the action configuration and enables cluster
317 * level default values per action.
318 *
319 * @param hostPort hostname"port to lookup the action default confiugration.
320 * @param action action name.
321 * @return the default configuration for the action for the specified cluster.
322 */
323 public XConfiguration createActionDefaultConf(String hostPort, String action) {
324 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
325 Map<String, XConfiguration> hostPortActionConfigs = actionConfigs.get(hostPort);
326 if (hostPortActionConfigs == null) {
327 hostPortActionConfigs = actionConfigs.get("*");
328 hostPort = "*";
329 }
330 XConfiguration actionConf = hostPortActionConfigs.get(action);
331 if (actionConf == null) {
332 // doing lazy loading as we don't know upfront all actions, no need to synchronize
333 // as it is a read operation an in case of a race condition loading and inserting
334 // into the Map is idempotent and the action-config Map is a ConcurrentHashMap
335 actionConf = loadActionConf(hostPort, action);
336 hostPortActionConfigs.put(action, actionConf);
337 }
338 return new XConfiguration(actionConf.toProperties());
339 }
340
341 private Configuration getConfiguration(String hostPort) {
342 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
343 Configuration conf = hadoopConfigs.get(hostPort);
344 if (conf == null) {
345 conf = hadoopConfigs.get("*");
346 if (conf == null) {
347 conf = new XConfiguration();
348 }
349 }
350 return conf;
351 }
352
353 /**
354 * Return a JobClient created with the provided user/group.
355 *
356 *
357 * @param conf JobConf with all necessary information to create the
358 * JobClient.
359 * @return JobClient created with the provided user/group.
360 * @throws HadoopAccessorException if the client could not be created.
361 */
362 public JobClient createJobClient(String user, final JobConf conf) throws HadoopAccessorException {
363 ParamChecker.notEmpty(user, "user");
364 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
365 throw new HadoopAccessorException(ErrorCode.E0903);
366 }
367 String jobTracker = conf.get("mapred.job.tracker");
368 validateJobTracker(jobTracker);
369 try {
370 UserGroupInformation ugi = getUGI(user);
371 JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() {
372 public JobClient run() throws Exception {
373 return new JobClient(conf);
374 }
375 });
376 Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(getMRDelegationTokenRenewer(conf));
377 conf.getCredentials().addToken(MR_TOKEN_ALIAS, mrdt);
378 return jobClient;
379 }
380 catch (InterruptedException ex) {
381 throw new HadoopAccessorException(ErrorCode.E0902, ex);
382 }
383 catch (IOException ex) {
384 throw new HadoopAccessorException(ErrorCode.E0902, ex);
385 }
386 }
387
388 /**
389 * Return a FileSystem created with the provided user for the specified URI.
390 *
391 *
392 * @param uri file system URI.
393 * @param conf Configuration with all necessary information to create the FileSystem.
394 * @return FileSystem created with the provided user/group.
395 * @throws HadoopAccessorException if the filesystem could not be created.
396 */
397 public FileSystem createFileSystem(String user, final URI uri, final Configuration conf)
398 throws HadoopAccessorException {
399 ParamChecker.notEmpty(user, "user");
400 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
401 throw new HadoopAccessorException(ErrorCode.E0903);
402 }
403
404 checkSupportedFilesystem(uri);
405
406 String nameNode = uri.getAuthority();
407 if (nameNode == null) {
408 nameNode = conf.get("fs.default.name");
409 if (nameNode != null) {
410 try {
411 nameNode = new URI(nameNode).getAuthority();
412 }
413 catch (URISyntaxException ex) {
414 throw new HadoopAccessorException(ErrorCode.E0902, ex);
415 }
416 }
417 }
418 validateNameNode(nameNode);
419
420 try {
421 UserGroupInformation ugi = getUGI(user);
422 return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
423 public FileSystem run() throws Exception {
424 return FileSystem.get(uri, conf);
425 }
426 });
427 }
428 catch (InterruptedException ex) {
429 throw new HadoopAccessorException(ErrorCode.E0902, ex);
430 }
431 catch (IOException ex) {
432 throw new HadoopAccessorException(ErrorCode.E0902, ex);
433 }
434 }
435
436 /**
437 * Validate Job tracker
438 * @param jobTrackerUri
439 * @throws HadoopAccessorException
440 */
441 protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException {
442 validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900);
443 }
444
445 /**
446 * Validate Namenode list
447 * @param nameNodeUri
448 * @throws HadoopAccessorException
449 */
450 protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException {
451 validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901);
452 }
453
454 private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException {
455 if (uri != null) {
456 uri = uri.toLowerCase().trim();
457 if (whitelist.size() > 0 && !whitelist.contains(uri)) {
458 throw new HadoopAccessorException(error, uri);
459 }
460 }
461 }
462
463 public static Text getMRDelegationTokenRenewer(JobConf jobConf) throws IOException {
464 if (UserGroupInformation.isSecurityEnabled()) { // secure cluster
465 return getMRTokenRenewerInternal(jobConf);
466 }
467 else {
468 return MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer
469 }
470 }
471
472 // Package private for unit test purposes
473 static Text getMRTokenRenewerInternal(JobConf jobConf) throws IOException {
474 // Getting renewer correctly for JT principal also though JT in hadoop 1.x does not have
475 // support for renewing/cancelling tokens
476 String servicePrincipal = jobConf.get(RM_PRINCIPAL, jobConf.get(JT_PRINCIPAL));
477 Text renewer;
478 if (servicePrincipal != null) { // secure cluster
479 renewer = mrTokenRenewers.get(servicePrincipal);
480 if (renewer == null) {
481 // Mimic org.apache.hadoop.mapred.Master.getMasterPrincipal()
482 String target = jobConf.get(HADOOP_YARN_RM, jobConf.get(HADOOP_JOB_TRACKER_2));
483 if (target == null) {
484 target = jobConf.get(HADOOP_JOB_TRACKER);
485 }
486 String addr = NetUtils.createSocketAddr(target).getHostName();
487 renewer = new Text(SecurityUtil.getServerPrincipal(servicePrincipal, addr));
488 LOG.info("Delegation Token Renewer details: Principal=" + servicePrincipal + ",Target=" + target
489 + ",Renewer=" + renewer);
490 mrTokenRenewers.put(servicePrincipal, renewer);
491 }
492 }
493 else {
494 renewer = MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer
495 }
496 return renewer;
497 }
498
499 public void addFileToClassPath(String user, final Path file, final Configuration conf)
500 throws IOException {
501 ParamChecker.notEmpty(user, "user");
502 try {
503 UserGroupInformation ugi = getUGI(user);
504 ugi.doAs(new PrivilegedExceptionAction<Void>() {
505 public Void run() throws Exception {
506 Configuration defaultConf = new Configuration();
507 XConfiguration.copy(conf, defaultConf);
508 //Doing this NOP add first to have the FS created and cached
509 DistributedCache.addFileToClassPath(file, defaultConf);
510
511 DistributedCache.addFileToClassPath(file, conf);
512 return null;
513 }
514 });
515
516 }
517 catch (InterruptedException ex) {
518 throw new IOException(ex);
519 }
520
521 }
522
523 /**
524 * checks configuration parameter if filesystem scheme is among the list of supported ones
525 * this makes system robust to filesystems other than HDFS also
526 */
527
528 public void checkSupportedFilesystem(URI uri) throws HadoopAccessorException {
529 String uriScheme = uri.getScheme();
530 if(!supportedSchemes.isEmpty()) {
531 XLog.getLog(this.getClass()).debug("Checking if filesystem " + uriScheme + " is supported");
532 if (!supportedSchemes.contains(uriScheme)) {
533 throw new HadoopAccessorException(ErrorCode.E0904, uriScheme, uri.toString());
534 }
535 }
536 }
537
538 }