001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.action.hadoop; 020 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.fs.FileSystem; 023import org.apache.hadoop.fs.Path; 024import org.apache.hadoop.hbase.security.User; 025import org.apache.hadoop.mapreduce.MRJobConfig; 026import org.apache.hadoop.mapreduce.security.TokenCache; 027import org.apache.hadoop.security.Credentials; 028import org.apache.hadoop.security.UserGroupInformation; 029import org.apache.hadoop.security.token.Token; 030import org.apache.oozie.ErrorCode; 031import org.apache.oozie.action.ActionExecutor; 032import org.apache.oozie.service.HadoopAccessorException; 033import org.apache.oozie.service.Services; 034import org.apache.oozie.service.UserGroupInformationService; 035import org.apache.oozie.util.XLog; 036 037import java.io.IOException; 038import java.net.URISyntaxException; 039import java.security.PrivilegedExceptionAction; 040 041 042public class HDFSCredentials implements CredentialsProvider { 043 protected XLog LOG = XLog.getLog(getClass()); 044 /** 045 * Add an HDFS_DELEGATION_TOKEN to the {@link Credentials} provided. 046 * This is also important to ensure that log aggregation works correctly from the NM 047 * 048 * @param credentials the credentials object which is updated 049 * @param config launcher AM configuration 050 * @param props properties for getting credential token or certificate 051 * @param context workflow context 052 * @throws Exception thrown if failed 053 */ 054 @Override 055 public void updateCredentials(Credentials credentials, Configuration config, CredentialsProperties props, 056 ActionExecutor.Context context) throws Exception { 057 final String jobNameNodes[] = config.getStrings(MRJobConfig.JOB_NAMENODES); 058 if (jobNameNodes != null) { 059 final Path[] paths = new Path[jobNameNodes.length]; 060 for (int i = 0; i != jobNameNodes.length; ++i) { 061 paths[i] = new Path(jobNameNodes[i]); 062 } 063 064 final UserGroupInformation ugi = Services.get().get(UserGroupInformationService.class) 065 .getProxyUser(context.getWorkflow().getUser()); 066 final User user = User.create(ugi); 067 068 obtainTokensForNamenodes(credentials, config, user, paths); 069 } 070 else { 071 obtainTokenForAppFileSystemNameNode(credentials, config, context); 072 } 073 074 } 075 076 private void obtainTokenForAppFileSystemNameNode(final Credentials credentials, 077 final Configuration config, 078 final ActionExecutor.Context context) 079 throws IOException, CredentialException, HadoopAccessorException, URISyntaxException { 080 try (FileSystem fileSystem = context.getAppFileSystem()) { 081 final String renewer = new HadoopTokenHelper().getServerPrincipal(config); 082 LOG.debug("Server principal present, getting HDFS delegation token. [renewer={0}]", renewer); 083 final Token hdfsDelegationToken = fileSystem.getDelegationToken(renewer); 084 if (hdfsDelegationToken == null) { 085 throw new CredentialException(ErrorCode.E0511, renewer); 086 } 087 LOG.info("Got HDFS delegation token, setting credentials. [hdfsDelegationToken={0}]", 088 hdfsDelegationToken); 089 credentials.addToken(hdfsDelegationToken.getService(), hdfsDelegationToken); 090 } catch (Exception e) { 091 LOG.debug("exception in updateCredentials", e); 092 throw e; 093 } 094 } 095 096 private void obtainTokensForNamenodes(final Credentials credentials, 097 final Configuration config, 098 final User user, 099 final Path[] paths) throws IOException, InterruptedException { 100 LOG.info(String.format("\"%s\" is present in workflow configuration. Obtaining tokens for NameNode(s) [%s]", 101 MRJobConfig.JOB_NAMENODES, config.get(MRJobConfig.JOB_NAMENODES))); 102 user.runAs( 103 new PrivilegedExceptionAction<Void>() { 104 @Override 105 public Void run() throws Exception { 106 TokenCache.obtainTokensForNamenodes(credentials, paths, config); 107 return null; 108 } 109 } 110 ); 111 } 112}