001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.action.hadoop;
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.fs.FileSystem;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.hbase.security.User;
025import org.apache.hadoop.mapreduce.MRJobConfig;
026import org.apache.hadoop.mapreduce.security.TokenCache;
027import org.apache.hadoop.security.Credentials;
028import org.apache.hadoop.security.UserGroupInformation;
029import org.apache.hadoop.security.token.Token;
030import org.apache.oozie.ErrorCode;
031import org.apache.oozie.action.ActionExecutor;
032import org.apache.oozie.service.HadoopAccessorException;
033import org.apache.oozie.service.Services;
034import org.apache.oozie.service.UserGroupInformationService;
035import org.apache.oozie.util.XLog;
036
037import java.io.IOException;
038import java.net.URISyntaxException;
039import java.security.PrivilegedExceptionAction;
040
041
042public class HDFSCredentials implements CredentialsProvider {
043    protected XLog LOG = XLog.getLog(getClass());
044    /**
045     * Add an HDFS_DELEGATION_TOKEN to the {@link Credentials} provided.
046     * This is also important to ensure that log aggregation works correctly from the NM
047     *
048     * @param credentials the credentials object which is updated
049     * @param config launcher AM configuration
050     * @param props properties for getting credential token or certificate
051     * @param context workflow context
052     * @throws Exception thrown if failed
053     */
054    @Override
055    public void updateCredentials(Credentials credentials, Configuration config, CredentialsProperties props,
056                                  ActionExecutor.Context context) throws Exception {
057        final String jobNameNodes[] = config.getStrings(MRJobConfig.JOB_NAMENODES);
058        if (jobNameNodes != null) {
059            final Path[] paths = new Path[jobNameNodes.length];
060            for (int i = 0; i != jobNameNodes.length; ++i) {
061                paths[i] = new Path(jobNameNodes[i]);
062            }
063
064            final UserGroupInformation ugi = Services.get().get(UserGroupInformationService.class)
065                    .getProxyUser(context.getWorkflow().getUser());
066            final User user = User.create(ugi);
067
068            obtainTokensForNamenodes(credentials, config, user, paths);
069        }
070        else {
071            obtainTokenForAppFileSystemNameNode(credentials, config, context);
072        }
073
074    }
075
076    private void obtainTokenForAppFileSystemNameNode(final Credentials credentials,
077                                                     final Configuration config,
078                                                     final ActionExecutor.Context context)
079            throws IOException, CredentialException, HadoopAccessorException, URISyntaxException {
080        try (FileSystem fileSystem = context.getAppFileSystem()) {
081            final String renewer = new HadoopTokenHelper().getServerPrincipal(config);
082            LOG.debug("Server principal present, getting HDFS delegation token. [renewer={0}]", renewer);
083            final Token hdfsDelegationToken = fileSystem.getDelegationToken(renewer);
084            if (hdfsDelegationToken == null) {
085                throw new CredentialException(ErrorCode.E0511, renewer);
086            }
087            LOG.info("Got HDFS delegation token, setting credentials. [hdfsDelegationToken={0}]",
088                    hdfsDelegationToken);
089            credentials.addToken(hdfsDelegationToken.getService(), hdfsDelegationToken);
090        } catch (Exception e) {
091            LOG.debug("exception in updateCredentials", e);
092            throw e;
093        }
094    }
095
096    private void obtainTokensForNamenodes(final Credentials credentials,
097                                          final Configuration config,
098                                          final User user,
099                                          final Path[] paths) throws IOException, InterruptedException {
100        LOG.info(String.format("\"%s\" is present in workflow configuration. Obtaining tokens for NameNode(s) [%s]",
101                MRJobConfig.JOB_NAMENODES, config.get(MRJobConfig.JOB_NAMENODES)));
102        user.runAs(
103                new PrivilegedExceptionAction<Void>() {
104                    @Override
105                    public Void run() throws Exception {
106                        TokenCache.obtainTokensForNamenodes(credentials, paths, config);
107                        return null;
108                    }
109                }
110        );
111    }
112}