001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
020    import org.apache.hadoop.mapred.JobClient;
021    import org.apache.hadoop.mapred.JobConf;
022    import org.apache.hadoop.fs.FileSystem;
023    import org.apache.hadoop.fs.Path;
024    import org.apache.hadoop.conf.Configuration;
025    import org.apache.hadoop.security.UserGroupInformation;
026    import org.apache.hadoop.security.token.Token;
027    import org.apache.hadoop.filecache.DistributedCache;
028    import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
029    import org.apache.hadoop.io.Text;
030    import org.apache.oozie.util.XLog;
031    import org.apache.oozie.util.XConfiguration;
032    import org.apache.oozie.util.ParamChecker;
033    import org.apache.oozie.ErrorCode;
034    import org.apache.oozie.service.HadoopAccessorService;
035    import org.apache.oozie.service.HadoopAccessorException;
036    import org.apache.oozie.service.Service;
037    import org.apache.oozie.service.ServiceException;
039    import java.io.IOException;
040    import java.net.URI;
041    import java.net.URISyntaxException;
042    import java.security.PrivilegedExceptionAction;
043    import java.util.concurrent.ConcurrentMap;
044    import java.util.concurrent.ConcurrentHashMap;
046    /**
047     * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The
048     * default accessor used is the base accessor which just injects the UGI into the configuration instance used to
049     * create/obtain JobClient and ileSystem instances. <p/> The HadoopAccess class to use can be configured in the
050     * <code>oozie-site.xml</code> using the <code>oozie.service.HadoopAccessorService.accessor.class</code> property.
051     */
052    public class KerberosHadoopAccessorService extends HadoopAccessorService {
054        public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService.";
056        public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled";
057        public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file";
058        public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal";
060        private ConcurrentMap<String, UserGroupInformation> userUgiMap;
062        private String localRealm;
064        public void init(Configuration serviceConf) throws ServiceException {
065            boolean kerberosAuthOn = serviceConf.getBoolean(KERBEROS_AUTH_ENABLED, true);
066            XLog.getLog(getClass()).info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled");
067            if (kerberosAuthOn) {
068                try {
069                    String keytabFile = serviceConf.get(KERBEROS_KEYTAB,
070                                                        System.getProperty("user.home") + "/oozie.keytab").trim();
071                    if (keytabFile.length() == 0) {
072                        throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB);
073                    }
074                    String principal = serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
075                    if (principal.length() == 0) {
076                        throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
077                    }
078                    Configuration conf = new Configuration();
079                    conf.set("hadoop.security.authentication", "kerberos");
080                    UserGroupInformation.setConfiguration(conf);
081                    UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
082                    XLog.getLog(getClass()).info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]",
083                                                 keytabFile, principal);
084                }
085                catch (ServiceException ex) {
086                    throw ex;
087                }
088                catch (Exception ex) {
089                    throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
090                }
091            }
092            else {
093                Configuration conf = new Configuration();
094                conf.set("hadoop.security.authentication", "simple");
095                UserGroupInformation.setConfiguration(conf);
096            }
097            localRealm = serviceConf.get("local.realm");
099            userUgiMap = new ConcurrentHashMap<String, UserGroupInformation>();
100        }
102        public void destroy() {
103            userUgiMap = null;
104            super.destroy();
105        }
107        private UserGroupInformation getUGI(String user) throws IOException {
108            UserGroupInformation ugi = userUgiMap.get(user);
109            if (ugi == null) {
110                // taking care of a race condition, the latest UGI will be discarded
111                ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
112                userUgiMap.putIfAbsent(user, ugi);
113            }
114            return ugi;
115        }
117        /**
118         * Return a JobClient created with the provided user/group.
119         *
120         * @param conf JobConf with all necessary information to create the JobClient.
121         * @return JobClient created with the provided user/group.
122         * @throws HadoopAccessorException if the client could not be created.
123         */
124        public JobClient createJobClient(String user, String group, final JobConf conf) throws HadoopAccessorException {
125            ParamChecker.notEmpty(user, "user");
126            ParamChecker.notEmpty(group, "group");
127            validateJobTracker(conf.get("mapred.job.tracker"));
128            try {
129                UserGroupInformation ugi = getUGI(user);
130                JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() {
131                    public JobClient run() throws Exception {
132                        return new JobClient(conf);
133                    }
134                });
135                Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(new Text("mr token"));
136                conf.getCredentials().addToken(new Text("mr token"), mrdt);
137                return jobClient;
138            }
139            catch (InterruptedException ex) {
140                throw new HadoopAccessorException(ErrorCode.E0902, ex);
141            }
142            catch (IOException ex) {
143                throw new HadoopAccessorException(ErrorCode.E0902, ex);
144            }
145        }
147        /**
148         * Return a FileSystem created with the provided user/group.
149         *
150         * @param conf Configuration with all necessary information to create the FileSystem.
151         * @return FileSystem created with the provided user/group.
152         * @throws HadoopAccessorException if the filesystem could not be created.
153         */
154        public FileSystem createFileSystem(String user, String group, final Configuration conf)
155                throws HadoopAccessorException {
156            ParamChecker.notEmpty(user, "user");
157            ParamChecker.notEmpty(group, "group");
158            try {
159                validateNameNode(new URI(conf.get("fs.default.name")).getAuthority());
160                UserGroupInformation ugi = getUGI(user);
161                return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
162                    public FileSystem run() throws Exception {
163                        Configuration defaultConf = new Configuration();
164                        XConfiguration.copy(conf, defaultConf);
165                        return FileSystem.get(defaultConf);
166                    }
167                });
168            }
169            catch (InterruptedException ex) {
170                throw new HadoopAccessorException(ErrorCode.E0902, ex);
171            }
172            catch (IOException ex) {
173                throw new HadoopAccessorException(ErrorCode.E0902, ex);
174            }
175            catch (URISyntaxException ex) {
176                throw new HadoopAccessorException(ErrorCode.E0902, ex);
177            }
178        }
180        /**
181         * Return a FileSystem created with the provided user/group for the specified URI.
182         *
183         * @param uri file system URI.
184         * @param conf Configuration with all necessary information to create the FileSystem.
185         * @return FileSystem created with the provided user/group.
186         * @throws HadoopAccessorException if the filesystem could not be created.
187         */
188        public FileSystem createFileSystem(String user, String group, final URI uri, final Configuration conf)
189                throws HadoopAccessorException {
190            ParamChecker.notEmpty(user, "user");
191            ParamChecker.notEmpty(group, "group");
192            validateNameNode(uri.getAuthority());
193            try {
194                UserGroupInformation ugi = getUGI(user);
195                return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
196                    public FileSystem run() throws Exception {
197                        Configuration defaultConf = new Configuration();
199                        defaultConf.set(WorkflowAppService.HADOOP_JT_KERBEROS_NAME, "mapred/_HOST@" + localRealm);
200                        defaultConf.set(WorkflowAppService.HADOOP_NN_KERBEROS_NAME, "hdfs/_HOST@" + localRealm);
202                        XConfiguration.copy(conf, defaultConf);
203                        return FileSystem.get(uri, defaultConf);
204                    }
205                });
206            }
207            catch (InterruptedException ex) {
208                throw new HadoopAccessorException(ErrorCode.E0902, ex);
209            }
210            catch (IOException ex) {
211                throw new HadoopAccessorException(ErrorCode.E0902, ex);
212            }
213        }
216        public void addFileToClassPath(String user, String group, final Path file, final Configuration conf)
217                throws IOException {
218            ParamChecker.notEmpty(user, "user");
219            ParamChecker.notEmpty(group, "group");
220            try {
221                UserGroupInformation ugi = getUGI(user);
222                ugi.doAs(new PrivilegedExceptionAction<Void>() {
223                    public Void run() throws Exception {
224                        Configuration defaultConf = new Configuration();
225                        XConfiguration.copy(conf, defaultConf);
226                        //Doing this NOP add first to have the FS created and cached
227                        DistributedCache.addFileToClassPath(file, defaultConf);
229                        DistributedCache.addFileToClassPath(file, conf);
230                        return null;
231                    }
232                });
234            }
235            catch (InterruptedException ex) {
236                throw new IOException(ex);
237            }
239        }
241    }