This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import org.apache.hadoop.mapred.JobClient;
021 import org.apache.hadoop.mapred.JobConf;
022 import org.apache.hadoop.fs.FileSystem;
023 import org.apache.hadoop.fs.Path;
024 import org.apache.hadoop.conf.Configuration;
025 import org.apache.hadoop.security.UserGroupInformation;
026 import org.apache.hadoop.security.token.Token;
027 import org.apache.hadoop.filecache.DistributedCache;
028 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
029 import org.apache.hadoop.io.Text;
030 import org.apache.oozie.util.XLog;
031 import org.apache.oozie.util.XConfiguration;
032 import org.apache.oozie.util.ParamChecker;
033 import org.apache.oozie.ErrorCode;
034 import org.apache.oozie.service.HadoopAccessorService;
035 import org.apache.oozie.service.HadoopAccessorException;
036 import org.apache.oozie.service.Service;
037 import org.apache.oozie.service.ServiceException;
038
039 import java.io.IOException;
040 import java.net.URI;
041 import java.net.URISyntaxException;
042 import java.security.PrivilegedExceptionAction;
043 import java.util.concurrent.ConcurrentMap;
044 import java.util.concurrent.ConcurrentHashMap;
045
046 /**
047 * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The
048 * default accessor used is the base accessor which just injects the UGI into the configuration instance used to
049 * create/obtain JobClient and ileSystem instances. <p/> The HadoopAccess class to use can be configured in the
050 * <code>oozie-site.xml</code> using the <code>oozie.service.HadoopAccessorService.accessor.class</code> property.
051 */
052 public class KerberosHadoopAccessorService extends HadoopAccessorService {
053
054 public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService.";
055
056 public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled";
057 public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file";
058 public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal";
059
060 private ConcurrentMap<String, UserGroupInformation> userUgiMap;
061
062 private String localRealm;
063
064 public void init(Configuration serviceConf) throws ServiceException {
065 boolean kerberosAuthOn = serviceConf.getBoolean(KERBEROS_AUTH_ENABLED, true);
066 XLog.getLog(getClass()).info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled");
067 if (kerberosAuthOn) {
068 try {
069 String keytabFile = serviceConf.get(KERBEROS_KEYTAB,
070 System.getProperty("user.home") + "/oozie.keytab").trim();
071 if (keytabFile.length() == 0) {
072 throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB);
073 }
074 String principal = serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
075 if (principal.length() == 0) {
076 throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
077 }
078 Configuration conf = new Configuration();
079 conf.set("hadoop.security.authentication", "kerberos");
080 UserGroupInformation.setConfiguration(conf);
081 UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
082 XLog.getLog(getClass()).info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]",
083 keytabFile, principal);
084 }
085 catch (ServiceException ex) {
086 throw ex;
087 }
088 catch (Exception ex) {
089 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
090 }
091 }
092 else {
093 Configuration conf = new Configuration();
094 conf.set("hadoop.security.authentication", "simple");
095 UserGroupInformation.setConfiguration(conf);
096 }
097 localRealm = serviceConf.get("local.realm");
098
099 userUgiMap = new ConcurrentHashMap<String, UserGroupInformation>();
100 }
101
102 public void destroy() {
103 userUgiMap = null;
104 super.destroy();
105 }
106
107 private UserGroupInformation getUGI(String user) throws IOException {
108 UserGroupInformation ugi = userUgiMap.get(user);
109 if (ugi == null) {
110 // taking care of a race condition, the latest UGI will be discarded
111 ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
112 userUgiMap.putIfAbsent(user, ugi);
113 }
114 return ugi;
115 }
116
117 /**
118 * Return a JobClient created with the provided user/group.
119 *
120 * @param conf JobConf with all necessary information to create the JobClient.
121 * @return JobClient created with the provided user/group.
122 * @throws HadoopAccessorException if the client could not be created.
123 */
124 public JobClient createJobClient(String user, String group, final JobConf conf) throws HadoopAccessorException {
125 ParamChecker.notEmpty(user, "user");
126 ParamChecker.notEmpty(group, "group");
127 validateJobTracker(conf.get("mapred.job.tracker"));
128 try {
129 UserGroupInformation ugi = getUGI(user);
130 JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() {
131 public JobClient run() throws Exception {
132 return new JobClient(conf);
133 }
134 });
135 Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(new Text("mr token"));
136 conf.getCredentials().addToken(new Text("mr token"), mrdt);
137 return jobClient;
138 }
139 catch (InterruptedException ex) {
140 throw new HadoopAccessorException(ErrorCode.E0902, ex);
141 }
142 catch (IOException ex) {
143 throw new HadoopAccessorException(ErrorCode.E0902, ex);
144 }
145 }
146
147 /**
148 * Return a FileSystem created with the provided user/group.
149 *
150 * @param conf Configuration with all necessary information to create the FileSystem.
151 * @return FileSystem created with the provided user/group.
152 * @throws HadoopAccessorException if the filesystem could not be created.
153 */
154 public FileSystem createFileSystem(String user, String group, final Configuration conf)
155 throws HadoopAccessorException {
156 ParamChecker.notEmpty(user, "user");
157 ParamChecker.notEmpty(group, "group");
158 try {
159 validateNameNode(new URI(conf.get("fs.default.name")).getAuthority());
160 UserGroupInformation ugi = getUGI(user);
161 return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
162 public FileSystem run() throws Exception {
163 Configuration defaultConf = new Configuration();
164 XConfiguration.copy(conf, defaultConf);
165 return FileSystem.get(defaultConf);
166 }
167 });
168 }
169 catch (InterruptedException ex) {
170 throw new HadoopAccessorException(ErrorCode.E0902, ex);
171 }
172 catch (IOException ex) {
173 throw new HadoopAccessorException(ErrorCode.E0902, ex);
174 }
175 catch (URISyntaxException ex) {
176 throw new HadoopAccessorException(ErrorCode.E0902, ex);
177 }
178 }
179
180 /**
181 * Return a FileSystem created with the provided user/group for the specified URI.
182 *
183 * @param uri file system URI.
184 * @param conf Configuration with all necessary information to create the FileSystem.
185 * @return FileSystem created with the provided user/group.
186 * @throws HadoopAccessorException if the filesystem could not be created.
187 */
188 public FileSystem createFileSystem(String user, String group, final URI uri, final Configuration conf)
189 throws HadoopAccessorException {
190 ParamChecker.notEmpty(user, "user");
191 ParamChecker.notEmpty(group, "group");
192 validateNameNode(uri.getAuthority());
193 try {
194 UserGroupInformation ugi = getUGI(user);
195 return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
196 public FileSystem run() throws Exception {
197 Configuration defaultConf = new Configuration();
198
199 defaultConf.set(WorkflowAppService.HADOOP_JT_KERBEROS_NAME, "mapred/_HOST@" + localRealm);
200 defaultConf.set(WorkflowAppService.HADOOP_NN_KERBEROS_NAME, "hdfs/_HOST@" + localRealm);
201
202 XConfiguration.copy(conf, defaultConf);
203 return FileSystem.get(uri, defaultConf);
204 }
205 });
206 }
207 catch (InterruptedException ex) {
208 throw new HadoopAccessorException(ErrorCode.E0902, ex);
209 }
210 catch (IOException ex) {
211 throw new HadoopAccessorException(ErrorCode.E0902, ex);
212 }
213 }
214
215
216 public void addFileToClassPath(String user, String group, final Path file, final Configuration conf)
217 throws IOException {
218 ParamChecker.notEmpty(user, "user");
219 ParamChecker.notEmpty(group, "group");
220 try {
221 UserGroupInformation ugi = getUGI(user);
222 ugi.doAs(new PrivilegedExceptionAction<Void>() {
223 public Void run() throws Exception {
224 Configuration defaultConf = new Configuration();
225 XConfiguration.copy(conf, defaultConf);
226 //Doing this NOP add first to have the FS created and cached
227 DistributedCache.addFileToClassPath(file, defaultConf);
228
229 DistributedCache.addFileToClassPath(file, conf);
230 return null;
231 }
232 });
233
234 }
235 catch (InterruptedException ex) {
236 throw new IOException(ex);
237 }
238
239 }
240
241 }