This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import org.apache.hadoop.io.Text;
021 import org.apache.hadoop.mapred.JobClient;
022 import org.apache.hadoop.mapred.JobConf;
023 import org.apache.hadoop.fs.FileSystem;
024 import org.apache.hadoop.fs.Path;
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
027 import org.apache.hadoop.security.UserGroupInformation;
028 import org.apache.hadoop.filecache.DistributedCache;
029 import org.apache.hadoop.security.token.Token;
030 import org.apache.oozie.ErrorCode;
031 import org.apache.oozie.util.ParamChecker;
032 import org.apache.oozie.util.XConfiguration;
033 import org.apache.oozie.util.XLog;
034
035 import java.io.File;
036 import java.io.FileInputStream;
037 import java.io.IOException;
038 import java.io.InputStream;
039 import java.net.URI;
040 import java.net.URISyntaxException;
041 import java.security.PrivilegedExceptionAction;
042 import java.util.HashMap;
043 import java.util.Map;
044 import java.util.Set;
045 import java.util.HashSet;
046 import java.util.concurrent.ConcurrentHashMap;
047 import java.util.concurrent.ConcurrentMap;
048
049 /**
050 * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The
051 * default accessor used is the base accessor which just injects the UGI into the configuration instance used to
052 * create/obtain JobClient and ileSystem instances. <p/> The HadoopAccess class to use can be configured in the
053 * <code>oozie-site.xml</code> using the <code>oozie.service.HadoopAccessorService.accessor.class</code> property.
054 */
055 public class HadoopAccessorService implements Service {
056
057 public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService.";
058 public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist";
059 public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist";
060 public static final String HADOOP_CONFS = CONF_PREFIX + "hadoop.configurations";
061 public static final String ACTION_CONFS = CONF_PREFIX + "action.configurations";
062 public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled";
063 public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file";
064 public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal";
065
066 private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
067
068 private Set<String> jobTrackerWhitelist = new HashSet<String>();
069 private Set<String> nameNodeWhitelist = new HashSet<String>();
070 private Map<String, Configuration> hadoopConfigs = new HashMap<String, Configuration>();
071 private Map<String, File> actionConfigDirs = new HashMap<String, File>();
072 private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>();
073
074 private ConcurrentMap<String, UserGroupInformation> userUgiMap;
075
076 public void init(Services services) throws ServiceException {
077 init(services.getConf());
078 }
079
080 //for testing purposes, see XFsTestCase
081 public void init(Configuration conf) throws ServiceException {
082 for (String name : conf.getStringCollection(JOB_TRACKER_WHITELIST)) {
083 String tmp = name.toLowerCase().trim();
084 if (tmp.length() == 0) {
085 continue;
086 }
087 jobTrackerWhitelist.add(tmp);
088 }
089 XLog.getLog(getClass()).info(
090 "JOB_TRACKER_WHITELIST :" + conf.getStringCollection(JOB_TRACKER_WHITELIST)
091 + ", Total entries :" + jobTrackerWhitelist.size());
092 for (String name : conf.getStringCollection(NAME_NODE_WHITELIST)) {
093 String tmp = name.toLowerCase().trim();
094 if (tmp.length() == 0) {
095 continue;
096 }
097 nameNodeWhitelist.add(tmp);
098 }
099 XLog.getLog(getClass()).info(
100 "NAME_NODE_WHITELIST :" + conf.getStringCollection(NAME_NODE_WHITELIST)
101 + ", Total entries :" + nameNodeWhitelist.size());
102
103 boolean kerberosAuthOn = conf.getBoolean(KERBEROS_AUTH_ENABLED, true);
104 XLog.getLog(getClass()).info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled");
105 if (kerberosAuthOn) {
106 kerberosInit(conf);
107 }
108 else {
109 Configuration ugiConf = new Configuration();
110 ugiConf.set("hadoop.security.authentication", "simple");
111 UserGroupInformation.setConfiguration(ugiConf);
112 }
113
114 userUgiMap = new ConcurrentHashMap<String, UserGroupInformation>();
115
116 loadHadoopConfigs(conf);
117 preLoadActionConfigs(conf);
118 }
119
120 private void kerberosInit(Configuration serviceConf) throws ServiceException {
121 try {
122 String keytabFile = serviceConf.get(KERBEROS_KEYTAB,
123 System.getProperty("user.home") + "/oozie.keytab").trim();
124 if (keytabFile.length() == 0) {
125 throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB);
126 }
127 String principal = serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
128 if (principal.length() == 0) {
129 throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
130 }
131 Configuration conf = new Configuration();
132 conf.set("hadoop.security.authentication", "kerberos");
133 UserGroupInformation.setConfiguration(conf);
134 UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
135 XLog.getLog(getClass()).info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]",
136 keytabFile, principal);
137 }
138 catch (ServiceException ex) {
139 throw ex;
140 }
141 catch (Exception ex) {
142 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
143 }
144 }
145
146 private static final String[] HADOOP_CONF_FILES =
147 {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hadoop-site.xml"};
148
149
150 private Configuration loadHadoopConf(File dir) throws IOException {
151 Configuration hadoopConf = new XConfiguration();
152 for (String file : HADOOP_CONF_FILES) {
153 File f = new File(dir, file);
154 if (f.exists()) {
155 InputStream is = new FileInputStream(f);
156 Configuration conf = new XConfiguration(is);
157 is.close();
158 XConfiguration.copy(conf, hadoopConf);
159 }
160 }
161 return hadoopConf;
162 }
163
164 private Map<String, File> parseConfigDirs(String[] confDefs, String type) throws ServiceException, IOException {
165 Map<String, File> map = new HashMap<String, File>();
166 File configDir = new File(ConfigurationService.getConfigurationDirectory());
167 for (String confDef : confDefs) {
168 if (confDef.trim().length() > 0) {
169 String[] parts = confDef.split("=");
170 if (parts.length == 2) {
171 String hostPort = parts[0];
172 String confDir = parts[1];
173 File dir = new File(confDir);
174 if (!dir.isAbsolute()) {
175 dir = new File(configDir, confDir);
176 }
177 if (dir.exists()) {
178 map.put(hostPort.toLowerCase(), dir);
179 }
180 else {
181 throw new ServiceException(ErrorCode.E0100, getClass().getName(),
182 "could not find " + type + " configuration directory: " +
183 dir.getAbsolutePath());
184 }
185 }
186 else {
187 throw new ServiceException(ErrorCode.E0100, getClass().getName(),
188 "Incorrect " + type + " configuration definition: " + confDef);
189 }
190 }
191 }
192 return map;
193 }
194
195 private void loadHadoopConfigs(Configuration serviceConf) throws ServiceException {
196 try {
197 Map<String, File> map = parseConfigDirs(serviceConf.getStrings(HADOOP_CONFS, "*=hadoop-conf"), "hadoop");
198 for (Map.Entry<String, File> entry : map.entrySet()) {
199 hadoopConfigs.put(entry.getKey(), loadHadoopConf(entry.getValue()));
200 }
201 }
202 catch (ServiceException ex) {
203 throw ex;
204 }
205 catch (Exception ex) {
206 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
207 }
208 }
209
210 private void preLoadActionConfigs(Configuration serviceConf) throws ServiceException {
211 try {
212 actionConfigDirs = parseConfigDirs(serviceConf.getStrings(ACTION_CONFS, "*=hadoop-conf"), "action");
213 for (String hostport : actionConfigDirs.keySet()) {
214 actionConfigs.put(hostport, new ConcurrentHashMap<String, XConfiguration>());
215 }
216 }
217 catch (ServiceException ex) {
218 throw ex;
219 }
220 catch (Exception ex) {
221 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
222 }
223 }
224
225 public void destroy() {
226 }
227
228 public Class<? extends Service> getInterface() {
229 return HadoopAccessorService.class;
230 }
231
232 private UserGroupInformation getUGI(String user) throws IOException {
233 UserGroupInformation ugi = userUgiMap.get(user);
234 if (ugi == null) {
235 // taking care of a race condition, the latest UGI will be discarded
236 ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
237 userUgiMap.putIfAbsent(user, ugi);
238 }
239 return ugi;
240 }
241
242 /**
243 * Creates a JobConf using the site configuration for the specified hostname:port.
244 * <p/>
245 * If the specified hostname:port is not defined it falls back to the '*' site
246 * configuration if available. If the '*' site configuration is not available,
247 * the JobConf has all Hadoop defaults.
248 *
249 * @param hostPort hostname:port to lookup Hadoop site configuration.
250 * @return a JobConf with the corresponding site configuration for hostPort.
251 */
252 public JobConf createJobConf(String hostPort) {
253 JobConf jobConf = new JobConf();
254 XConfiguration.copy(getConfiguration(hostPort), jobConf);
255 jobConf.setBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, true);
256 return jobConf;
257 }
258
259 private XConfiguration loadActionConf(String hostPort, String action) {
260 File dir = actionConfigDirs.get(hostPort);
261 XConfiguration actionConf = new XConfiguration();
262 if (dir != null) {
263 File actionConfFile = new File(dir, action + ".xml");
264 if (actionConfFile.exists()) {
265 try {
266 actionConf = new XConfiguration(new FileInputStream(actionConfFile));
267 }
268 catch (IOException ex) {
269 XLog.getLog(getClass()).warn("Could not read file [{0}] for action [{1}] configuration for hostPort [{2}]",
270 actionConfFile.getAbsolutePath(), action, hostPort);
271 }
272 }
273 }
274 return actionConf;
275 }
276
277 /**
278 * Returns a Configuration containing any defaults for an action for a particular cluster.
279 * <p/>
280 * This configuration is used as default for the action configuration and enables cluster
281 * level default values per action.
282 *
283 * @param hostPort hostname"port to lookup the action default confiugration.
284 * @param action action name.
285 * @return the default configuration for the action for the specified cluster.
286 */
287 public XConfiguration createActionDefaultConf(String hostPort, String action) {
288 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
289 Map<String, XConfiguration> hostPortActionConfigs = actionConfigs.get(hostPort);
290 if (hostPortActionConfigs == null) {
291 hostPortActionConfigs = actionConfigs.get("*");
292 hostPort = "*";
293 }
294 XConfiguration actionConf = hostPortActionConfigs.get(action);
295 if (actionConf == null) {
296 // doing lazy loading as we don't know upfront all actions, no need to synchronize
297 // as it is a read operation an in case of a race condition loading and inserting
298 // into the Map is idempotent and the action-config Map is a ConcurrentHashMap
299 actionConf = loadActionConf(hostPort, action);
300 hostPortActionConfigs.put(action, actionConf);
301 }
302 return new XConfiguration(actionConf.toProperties());
303 }
304
305 private Configuration getConfiguration(String hostPort) {
306 hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
307 Configuration conf = hadoopConfigs.get(hostPort);
308 if (conf == null) {
309 conf = hadoopConfigs.get("*");
310 if (conf == null) {
311 conf = new XConfiguration();
312 }
313 }
314 return conf;
315 }
316
317 /**
318 * Return a JobClient created with the provided user/group.
319 *
320 *
321 * @param conf JobConf with all necessary information to create the
322 * JobClient.
323 * @return JobClient created with the provided user/group.
324 * @throws HadoopAccessorException if the client could not be created.
325 */
326 public JobClient createJobClient(String user, final JobConf conf) throws HadoopAccessorException {
327 ParamChecker.notEmpty(user, "user");
328 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
329 throw new HadoopAccessorException(ErrorCode.E0903);
330 }
331 String jobTracker = conf.get("mapred.job.tracker");
332 validateJobTracker(jobTracker);
333 try {
334 UserGroupInformation ugi = getUGI(user);
335 JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() {
336 public JobClient run() throws Exception {
337 return new JobClient(conf);
338 }
339 });
340 Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(new Text("mr token"));
341 conf.getCredentials().addToken(new Text("mr token"), mrdt);
342 return jobClient;
343 }
344 catch (InterruptedException ex) {
345 throw new HadoopAccessorException(ErrorCode.E0902, ex);
346 }
347 catch (IOException ex) {
348 throw new HadoopAccessorException(ErrorCode.E0902, ex);
349 }
350 }
351
352 /**
353 * Return a FileSystem created with the provided user for the specified URI.
354 *
355 *
356 * @param uri file system URI.
357 * @param conf Configuration with all necessary information to create the FileSystem.
358 * @return FileSystem created with the provided user/group.
359 * @throws HadoopAccessorException if the filesystem could not be created.
360 */
361 public FileSystem createFileSystem(String user, final URI uri, final Configuration conf)
362 throws HadoopAccessorException {
363 ParamChecker.notEmpty(user, "user");
364 if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
365 throw new HadoopAccessorException(ErrorCode.E0903);
366 }
367 String nameNode = uri.getAuthority();
368 if (nameNode == null) {
369 nameNode = conf.get("fs.default.name");
370 if (nameNode != null) {
371 try {
372 nameNode = new URI(nameNode).getAuthority();
373 }
374 catch (URISyntaxException ex) {
375 throw new HadoopAccessorException(ErrorCode.E0902, ex);
376 }
377 }
378 }
379 validateNameNode(nameNode);
380
381 try {
382 UserGroupInformation ugi = getUGI(user);
383 return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
384 public FileSystem run() throws Exception {
385 return FileSystem.get(uri, conf);
386 }
387 });
388 }
389 catch (InterruptedException ex) {
390 throw new HadoopAccessorException(ErrorCode.E0902, ex);
391 }
392 catch (IOException ex) {
393 throw new HadoopAccessorException(ErrorCode.E0902, ex);
394 }
395 }
396
397 /**
398 * Validate Job tracker
399 * @param jobTrackerUri
400 * @throws HadoopAccessorException
401 */
402 protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException {
403 validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900);
404 }
405
406 /**
407 * Validate Namenode list
408 * @param nameNodeUri
409 * @throws HadoopAccessorException
410 */
411 protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException {
412 validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901);
413 }
414
415 private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException {
416 if (uri != null) {
417 uri = uri.toLowerCase().trim();
418 if (whitelist.size() > 0 && !whitelist.contains(uri)) {
419 throw new HadoopAccessorException(error, uri);
420 }
421 }
422 }
423
424 public void addFileToClassPath(String user, final Path file, final Configuration conf)
425 throws IOException {
426 ParamChecker.notEmpty(user, "user");
427 try {
428 UserGroupInformation ugi = getUGI(user);
429 ugi.doAs(new PrivilegedExceptionAction<Void>() {
430 public Void run() throws Exception {
431 Configuration defaultConf = new Configuration();
432 XConfiguration.copy(conf, defaultConf);
433 //Doing this NOP add first to have the FS created and cached
434 DistributedCache.addFileToClassPath(file, defaultConf);
435
436 DistributedCache.addFileToClassPath(file, conf);
437 return null;
438 }
439 });
440
441 }
442 catch (InterruptedException ex) {
443 throw new IOException(ex);
444 }
445
446 }
447
448 }