This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import org.apache.hadoop.io.Text;
021 import org.apache.hadoop.mapred.JobClient;
022 import org.apache.hadoop.mapred.JobConf;
023 import org.apache.hadoop.fs.FileSystem;
024 import org.apache.hadoop.fs.Path;
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
027 import org.apache.hadoop.security.UserGroupInformation;
028 import org.apache.hadoop.filecache.DistributedCache;
029 import org.apache.hadoop.security.token.Token;
030 import org.apache.oozie.ErrorCode;
031 import org.apache.oozie.util.ParamChecker;
032 import org.apache.oozie.util.XConfiguration;
033 import org.apache.oozie.util.XLog;
034
035 import java.io.File;
036 import java.io.FileInputStream;
037 import java.io.IOException;
038 import java.io.InputStream;
039 import java.net.URI;
040 import java.net.URISyntaxException;
041 import java.security.PrivilegedExceptionAction;
042 import java.util.HashMap;
043 import java.util.Map;
044 import java.util.Set;
045 import java.util.HashSet;
046 import java.util.concurrent.ConcurrentHashMap;
047 import java.util.concurrent.ConcurrentMap;
048
049 /**
050 * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The
051 * default accessor used is the base accessor which just injects the UGI into the configuration instance used to
052 * create/obtain JobClient and ileSystem instances. <p/> The HadoopAccess class to use can be configured in the
053 * <code>oozie-site.xml</code> using the <code>oozie.service.HadoopAccessorService.accessor.class</code> property.
054 */
055 public class HadoopAccessorService implements Service {
056
057 public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService.";
058 public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist";
059 public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist";
060 public static final String HADOOP_CONFS = CONF_PREFIX + "hadoop.configurations";
061 public static final String ACTION_CONFS = CONF_PREFIX + "action.configurations";
062 public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled";
063 public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file";
064 public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal";
065
066 private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
067
068 private Set<String> jobTrackerWhitelist = new HashSet<String>();
069 private Set<String> nameNodeWhitelist = new HashSet<String>();
070 private Map<String, Configuration> hadoopConfigs = new HashMap<String, Configuration>();
071 private Map<String, File> actionConfigDirs = new HashMap<String, File>();
072 private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>();
073
074 private ConcurrentMap<String, UserGroupInformation> userUgiMap;
075
076 /**
077 * Supported filesystem schemes for namespace federation
078 */
079 public static final String SUPPORTED_FILESYSTEMS = CONF_PREFIX + "supported.filesystems";
080 private Set<String> supportedSchemes;
081
082 public void init(Services services) throws ServiceException {
083 init(services.getConf());
084 }
085
086 //for testing purposes, see XFsTestCase
087 public void init(Configuration conf) throws ServiceException {
088 for (String name : conf.getStringCollection(JOB_TRACKER_WHITELIST)) {
089 String tmp = name.toLowerCase().trim();
090 if (tmp.length() == 0) {
091 continue;
092 }
093 jobTrackerWhitelist.add(tmp);
094 }
095 XLog.getLog(getClass()).info(
096 "JOB_TRACKER_WHITELIST :" + conf.getStringCollection(JOB_TRACKER_WHITELIST)
097 + ", Total entries :" + jobTrackerWhitelist.size());
098 for (String name : conf.getStringCollection(NAME_NODE_WHITELIST)) {
099 String tmp = name.toLowerCase().trim();
100 if (tmp.length() == 0) {
101 continue;
102 }
103 nameNodeWhitelist.add(tmp);
104 }
105 XLog.getLog(getClass()).info(
106 "NAME_NODE_WHITELIST :" + conf.getStringCollection(NAME_NODE_WHITELIST)
107 + ", Total entries :" + nameNodeWhitelist.size());
108
109 boolean kerberosAuthOn = conf.getBoolean(KERBEROS_AUTH_ENABLED, true);
110 XLog.getLog(getClass()).info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled");
111 if (kerberosAuthOn) {
112 kerberosInit(conf);
113 }
114 else {
115 Configuration ugiConf = new Configuration();
116 ugiConf.set("hadoop.security.authentication", "simple");
117 UserGroupInformation.setConfiguration(ugiConf);
118 }
119
120 userUgiMap = new ConcurrentHashMap<String, UserGroupInformation>();
121
122 loadHadoopConfigs(conf);
123 preLoadActionConfigs(conf);
124
125 supportedSchemes = new HashSet<String>();
126 String[] schemesFromConf = conf.getStrings(SUPPORTED_FILESYSTEMS, new String[]{"hdfs"});
127 if(schemesFromConf != null) {
128 for (String scheme: schemesFromConf) {
129 scheme = scheme.trim();
130 // If user gives "*", supportedSchemes will be empty, so that checking is not done i.e. all schemes allowed
131 if(scheme.equals("*")) {
132 if(schemesFromConf.length > 1) {
133 throw new ServiceException(ErrorCode.E0100, getClass().getName(),
134 SUPPORTED_FILESYSTEMS + " should contain either only wildcard or explicit list, not both");
135 }
136 } else {
137 supportedSchemes.add(scheme);
138 }
139 }
140 }
141 }
142
143 private void kerberosInit(Configuration serviceConf) throws ServiceException {
144 try {
145 String keytabFile = serviceConf.get(KERBEROS_KEYTAB,
146 System.getProperty("user.home") + "/oozie.keytab").trim();
147 if (keytabFile.length() == 0) {
148 throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB);
149 }
150 String principal = serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
151 if (principal.length() == 0) {
152 throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
153 }
154 Configuration conf = new Configuration();
155 conf.set("hadoop.security.authentication", "kerberos");
156 UserGroupInformation.setConfiguration(conf);
157 UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
158 XLog.getLog(getClass()).info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]",
159 keytabFile, principal);
160 }
161 catch (ServiceException ex) {
162 throw ex;
163 }
164 catch (Exception ex) {
165 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
166 }
167 }
168
169 private static final String[] HADOOP_CONF_FILES =
170 {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hadoop-site.xml"};
171
172
173 private Configuration loadHadoopConf(File dir) throws IOException {
174 Configuration hadoopConf = new XConfiguration();
175 for (String file : HADOOP_CONF_FILES) {
176 File f = new File(dir, file);
177 if (f.exists()) {
178 InputStream is = new FileInputStream(f);
179 Configuration conf = new XConfiguration(is);
180 is.close();
181 XConfiguration.copy(conf, hadoopConf);
182 }
183 }
184 return hadoopConf;
185 }
186
187 private Map<String, File> parseConfigDirs(String[] confDefs, String type) throws ServiceException, IOException {
188 Map<String, File> map = new HashMap<String, File>();
189 File configDir = new File(ConfigurationService.getConfigurationDirectory());
190 for (String confDef : confDefs) {
191 if (confDef.trim().length() > 0) {
192 String[] parts = confDef.split("=");
193 if (parts.length == 2) {
194 String hostPort = parts[0];
195 String confDir = parts[1];
196 File dir = new File(confDir);
197 if (!dir.isAbsolute()) {
198 dir = new File(configDir, confDir);
199 }
200 if (dir.exists()) {
201 map.put(hostPort.toLowerCase(), dir);
202 }
203 else {
204 throw new ServiceException(ErrorCode.E0100, getClass().getName(),
205 "could not find " + type + " configuration directory: " +
206 dir.getAbsolutePath());
207 }
208 }
209 else {
210 throw new ServiceException(ErrorCode.E0100, getClass().getName(),
211 "Incorrect " + type + " configuration definition: " + confDef);
212 }
213 }
214 }
215 return map;
216 }
217
218 private void loadHadoopConfigs(Configuration serviceConf) throws ServiceException {
219 try {
220 Map<String, File> map = parseConfigDirs(serviceConf.getStrings(HADOOP_CONFS, "*=hadoop-conf"), "hadoop");
221 for (Map.Entry<String, File> entry : map.entrySet()) {
222 hadoopConfigs.put(entry.getKey(), loadHadoopConf(entry.getValue()));
223 }
224 }
225 catch (ServiceException ex) {
226 throw ex;
227 }
228 catch (Exception ex) {
229 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
230 }
231 }
232
233 private void preLoadActionConfigs(Configuration serviceConf) throws ServiceException {
234 try {
235 actionConfigDirs = parseConfigDirs(serviceConf.getStrings(ACTION_CONFS, "*=hadoop-conf"), "action");
236 for (String hostport : actionConfigDirs.keySet()) {
237 actionConfigs.put(hostport, new ConcurrentHashMap<String, XConfiguration>());
238 }
239 }
240 catch (ServiceException ex) {
241 throw ex;
242 }
243 catch (Exception ex) {
244 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
245 }
246 }
247
248 public void destroy() {
249 }
250
251 public Class<? extends Service> getInterface() {
252 return HadoopAccessorService.class;
253 }
254
255 private UserGroupInformation getUGI(String user) throws IOException {
256 UserGroupInformation ugi = userUgiMap.get(user);
257 if (ugi == null) {
258 // taking care of a race condition, the latest UGI will be discarded
259 ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
260 userUgiMap.putIfAbsent(user, ugi);
261 }
262 return ugi;
263 }
264
265 /**
266 * Creates a JobConf using the site configuration for the specified hostname:port.
267 * <p/>
268 * If the specified hostname:port is not defined it falls back to the '*' site
269 * configuration if available. If the '*' site configuration is not available,
270 * the JobConf has all Hadoop defaults.
271 *
272 * @param hostPort hostname:port to lookup Hadoop site configuration.
273 * @return a JobConf with the corresponding site configuration for hostPort.
274 */
275 public JobConf createJobConf(String hostPort) {
276 JobConf jobConf = new JobConf();
277 XConfiguration.copy(getConfiguration(hostPort), jobConf);
278 jobConf.setBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, true);
279 return jobConf;
280 }
281
282 private XConfiguration loadActionConf(String hostPort, String action) {
283 File dir = actionConfigDirs.get(hostPort);
284 XConfiguration actionConf = new XConfiguration();
285 if (dir != null) {
286 File actionConfFile = new File(dir, action + ".xml");
287 if (actionConfFile.exists()) {
288 try {
289 actionConf = new XConfiguration(new FileInputStream(actionConfFile));
290 }
291 catch (IOException ex) {
292 XLog.getLog(getClass()).warn("Could not read file [{0}] for action [{1}] configuration for hostPort [{2}]",
293 actionConfFile.getAbsolutePath(), action, hostPort);
294 }
295 }
296 }
297 return actionConf;
298 }
299
300 /**
301 * Returns a Configuration containing any defaults for an action for a particular cluster.
302 * <p/>
303 * This configuration is used as default for the action configuration and enables cluster
304 * level default values per action.
305 *
306 * @param hostPort hostname"port to lookup the action default confiugration.
307 * @param action action name.
308 * @return the default configuration for the action for the specified cluster.
309 */
310 public XConfiguration createActionDefaultConf(String hostPort, String action) {
311 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
312 Map<String, XConfiguration> hostPortActionConfigs = actionConfigs.get(hostPort);
313 if (hostPortActionConfigs == null) {
314 hostPortActionConfigs = actionConfigs.get("*");
315 hostPort = "*";
316 }
317 XConfiguration actionConf = hostPortActionConfigs.get(action);
318 if (actionConf == null) {
319 // doing lazy loading as we don't know upfront all actions, no need to synchronize
320 // as it is a read operation an in case of a race condition loading and inserting
321 // into the Map is idempotent and the action-config Map is a ConcurrentHashMap
322 actionConf = loadActionConf(hostPort, action);
323 hostPortActionConfigs.put(action, actionConf);
324 }
325 return new XConfiguration(actionConf.toProperties());
326 }
327
328 private Configuration getConfiguration(String hostPort) {
329 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
330 Configuration conf = hadoopConfigs.get(hostPort);
331 if (conf == null) {
332 conf = hadoopConfigs.get("*");
333 if (conf == null) {
334 conf = new XConfiguration();
335 }
336 }
337 return conf;
338 }
339
340 /**
341 * Return a JobClient created with the provided user/group.
342 *
343 *
344 * @param conf JobConf with all necessary information to create the
345 * JobClient.
346 * @return JobClient created with the provided user/group.
347 * @throws HadoopAccessorException if the client could not be created.
348 */
349 public JobClient createJobClient(String user, final JobConf conf) throws HadoopAccessorException {
350 ParamChecker.notEmpty(user, "user");
351 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
352 throw new HadoopAccessorException(ErrorCode.E0903);
353 }
354 String jobTracker = conf.get("mapred.job.tracker");
355 validateJobTracker(jobTracker);
356 try {
357 UserGroupInformation ugi = getUGI(user);
358 JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() {
359 public JobClient run() throws Exception {
360 return new JobClient(conf);
361 }
362 });
363 Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(new Text("mr token"));
364 conf.getCredentials().addToken(new Text("mr token"), mrdt);
365 return jobClient;
366 }
367 catch (InterruptedException ex) {
368 throw new HadoopAccessorException(ErrorCode.E0902, ex);
369 }
370 catch (IOException ex) {
371 throw new HadoopAccessorException(ErrorCode.E0902, ex);
372 }
373 }
374
375 /**
376 * Return a FileSystem created with the provided user for the specified URI.
377 *
378 *
379 * @param uri file system URI.
380 * @param conf Configuration with all necessary information to create the FileSystem.
381 * @return FileSystem created with the provided user/group.
382 * @throws HadoopAccessorException if the filesystem could not be created.
383 */
384 public FileSystem createFileSystem(String user, final URI uri, final Configuration conf)
385 throws HadoopAccessorException {
386 ParamChecker.notEmpty(user, "user");
387 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
388 throw new HadoopAccessorException(ErrorCode.E0903);
389 }
390
391 checkSupportedFilesystem(uri);
392
393 String nameNode = uri.getAuthority();
394 if (nameNode == null) {
395 nameNode = conf.get("fs.default.name");
396 if (nameNode != null) {
397 try {
398 nameNode = new URI(nameNode).getAuthority();
399 }
400 catch (URISyntaxException ex) {
401 throw new HadoopAccessorException(ErrorCode.E0902, ex);
402 }
403 }
404 }
405 validateNameNode(nameNode);
406
407 try {
408 UserGroupInformation ugi = getUGI(user);
409 return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
410 public FileSystem run() throws Exception {
411 return FileSystem.get(uri, conf);
412 }
413 });
414 }
415 catch (InterruptedException ex) {
416 throw new HadoopAccessorException(ErrorCode.E0902, ex);
417 }
418 catch (IOException ex) {
419 throw new HadoopAccessorException(ErrorCode.E0902, ex);
420 }
421 }
422
423 /**
424 * Validate Job tracker
425 * @param jobTrackerUri
426 * @throws HadoopAccessorException
427 */
428 protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException {
429 validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900);
430 }
431
432 /**
433 * Validate Namenode list
434 * @param nameNodeUri
435 * @throws HadoopAccessorException
436 */
437 protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException {
438 validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901);
439 }
440
441 private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException {
442 if (uri != null) {
443 uri = uri.toLowerCase().trim();
444 if (whitelist.size() > 0 && !whitelist.contains(uri)) {
445 throw new HadoopAccessorException(error, uri);
446 }
447 }
448 }
449
450 public void addFileToClassPath(String user, final Path file, final Configuration conf)
451 throws IOException {
452 ParamChecker.notEmpty(user, "user");
453 try {
454 UserGroupInformation ugi = getUGI(user);
455 ugi.doAs(new PrivilegedExceptionAction<Void>() {
456 public Void run() throws Exception {
457 Configuration defaultConf = new Configuration();
458 XConfiguration.copy(conf, defaultConf);
459 //Doing this NOP add first to have the FS created and cached
460 DistributedCache.addFileToClassPath(file, defaultConf);
461
462 DistributedCache.addFileToClassPath(file, conf);
463 return null;
464 }
465 });
466
467 }
468 catch (InterruptedException ex) {
469 throw new IOException(ex);
470 }
471
472 }
473
474 /**
475 * checks configuration parameter if filesystem scheme is among the list of supported ones
476 * this makes system robust to filesystems other than HDFS also
477 */
478
479 public void checkSupportedFilesystem(URI uri) throws HadoopAccessorException {
480 String uriScheme = uri.getScheme();
481 if(!supportedSchemes.isEmpty()) {
482 XLog.getLog(this.getClass()).debug("Checking if filesystem " + uriScheme + " is supported");
483 if (!supportedSchemes.contains(uriScheme)) {
484 throw new HadoopAccessorException(ErrorCode.E0904, uriScheme, uri.toString());
485 }
486 }
487 }
488
489 }