This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import org.apache.hadoop.mapred.JobClient;
021 import org.apache.hadoop.mapred.JobConf;
022 import org.apache.hadoop.fs.FileSystem;
023 import org.apache.hadoop.fs.Path;
024 import org.apache.hadoop.conf.Configuration;
025 import org.apache.hadoop.security.UserGroupInformation;
026 import org.apache.hadoop.filecache.DistributedCache;
027 import org.apache.oozie.ErrorCode;
028 import org.apache.oozie.util.ParamChecker;
029 import org.apache.oozie.util.XConfiguration;
030 import org.apache.oozie.util.XLog;
031
032 import java.io.IOException;
033 import java.net.URI;
034 import java.net.URISyntaxException;
035 import java.security.PrivilegedExceptionAction;
036 import java.util.Set;
037 import java.util.HashSet;
038
039 /**
040 * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The
041 * default accessor used is the base accessor which just injects the UGI into the configuration instance used to
042 * create/obtain JobClient and ileSystem instances. <p/> The HadoopAccess class to use can be configured in the
043 * <code>oozie-site.xml</code> using the <code>oozie.service.HadoopAccessorService.accessor.class</code> property.
044 */
045 public class HadoopAccessorService implements Service {
046
047 public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService.";
048 public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist";
049 public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist";
050
051 private Set<String> jobTrackerWhitelist = new HashSet<String>();
052 private Set<String> nameNodeWhitelist = new HashSet<String>();
053
054 public void init(Services services) throws ServiceException {
055 for (String name : services.getConf().getStringCollection(JOB_TRACKER_WHITELIST)) {
056 String tmp = name.toLowerCase().trim();
057 if (tmp.length() == 0) {
058 continue;
059 }
060 jobTrackerWhitelist.add(tmp);
061 }
062 XLog.getLog(getClass()).info(
063 "JOB_TRACKER_WHITELIST :" + services.getConf().getStringCollection(JOB_TRACKER_WHITELIST)
064 + ", Total entries :" + jobTrackerWhitelist.size());
065 for (String name : services.getConf().getStringCollection(NAME_NODE_WHITELIST)) {
066 String tmp = name.toLowerCase().trim();
067 if (tmp.length() == 0) {
068 continue;
069 }
070 nameNodeWhitelist.add(tmp);
071 }
072 XLog.getLog(getClass()).info(
073 "NAME_NODE_WHITELIST :" + services.getConf().getStringCollection(NAME_NODE_WHITELIST)
074 + ", Total entries :" + nameNodeWhitelist.size());
075 init(services.getConf());
076 }
077
078 public void init(Configuration serviceConf) throws ServiceException {
079 }
080
081 public void destroy() {
082 }
083
084 public Class<? extends Service> getInterface() {
085 return HadoopAccessorService.class;
086 }
087
088 /**
089 * Return a JobClient created with the provided user/group.
090 *
091 * @param conf JobConf with all necessary information to create the
092 * JobClient.
093 * @return JobClient created with the provided user/group.
094 * @throws HadoopAccessorException if the client could not be created.
095 */
096 public JobClient createJobClient(String user, String group, JobConf conf) throws HadoopAccessorException {
097 validateJobTracker(conf.get("mapred.job.tracker"));
098 conf = createConfiguration(user, group, conf);
099 try {
100 return new JobClient(conf);
101 }
102 catch (IOException e) {
103 throw new HadoopAccessorException(ErrorCode.E0902, e);
104 }
105 }
106
107 /**
108 * Return a FileSystem created with the provided user/group.
109 *
110 * @param conf Configuration with all necessary information to create the
111 * FileSystem.
112 * @return FileSystem created with the provided user/group.
113 * @throws HadoopAccessorException if the filesystem could not be created.
114 */
115 public FileSystem createFileSystem(String user, String group, Configuration conf) throws HadoopAccessorException {
116 try {
117 validateNameNode(new URI(conf.get("fs.default.name")).getAuthority());
118 conf = createConfiguration(user, group, conf);
119 return FileSystem.get(conf);
120 }
121 catch (IOException e) {
122 throw new HadoopAccessorException(ErrorCode.E0902, e);
123 }
124 catch (URISyntaxException e) {
125 throw new HadoopAccessorException(ErrorCode.E0902, e);
126 }
127 }
128
129 /**
130 * Return a FileSystem created with the provided user/group for the
131 * specified URI.
132 *
133 * @param uri file system URI.
134 * @param conf Configuration with all necessary information to create the
135 * FileSystem.
136 * @return FileSystem created with the provided user/group.
137 * @throws HadoopAccessorException if the filesystem could not be created.
138 */
139 public FileSystem createFileSystem(String user, String group, URI uri, Configuration conf)
140 throws HadoopAccessorException {
141 validateNameNode(uri.getAuthority());
142 conf = createConfiguration(user, group, conf);
143 try {
144 return FileSystem.get(uri, conf);
145 }
146 catch (IOException e) {
147 throw new HadoopAccessorException(ErrorCode.E0902, e);
148 }
149 }
150
151 /**
152 * Validate Job tracker
153 * @param jobTrackerUri
154 * @throws HadoopAccessorException
155 */
156 protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException {
157 validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900);
158 }
159
160 /**
161 * Validate Namenode list
162 * @param nameNodeUri
163 * @throws HadoopAccessorException
164 */
165 protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException {
166 validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901);
167 }
168
169 private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException {
170 if (uri != null) {
171 uri = uri.toLowerCase().trim();
172 if (whitelist.size() > 0 && !whitelist.contains(uri)) {
173 throw new HadoopAccessorException(error, uri);
174 }
175 }
176 }
177
178 @SuppressWarnings("unchecked")
179 private <C extends Configuration> C createConfiguration(String user, String group, C conf) {
180 ParamChecker.notEmpty(user, "user");
181 ParamChecker.notEmpty(group, "group");
182 C fsConf = (C) ((conf instanceof JobConf) ? new JobConf() : new Configuration());
183 XConfiguration.copy(conf, fsConf);
184 fsConf.set("user.name", user);
185 fsConf.set("hadoop.job.ugi", user + "," + group);
186 return fsConf;
187 }
188
189 /**
190 * Add a file to the ClassPath via the DistributedCache.
191 */
192 public void addFileToClassPath(String user, String group, final Path file, final Configuration conf)
193 throws IOException {
194 Configuration defaultConf = createConfiguration(user, group, conf);
195 DistributedCache.addFileToClassPath(file, defaultConf);
196 DistributedCache.addFileToClassPath(file, conf);
197 }
198
199 }