001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import org.apache.hadoop.io.Text;
022import org.apache.hadoop.mapred.JobClient;
023import org.apache.hadoop.mapred.JobConf;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
028import org.apache.hadoop.net.NetUtils;
029import org.apache.hadoop.security.SecurityUtil;
030import org.apache.hadoop.security.UserGroupInformation;
031import org.apache.hadoop.security.token.Token;
032import org.apache.oozie.ErrorCode;
033import org.apache.oozie.action.hadoop.JavaActionExecutor;
034import org.apache.oozie.util.IOUtils;
035import org.apache.oozie.util.ParamChecker;
036import org.apache.oozie.util.XConfiguration;
037import org.apache.oozie.util.XLog;
038import org.apache.oozie.util.JobUtils;
039import org.apache.oozie.workflow.lite.LiteWorkflowAppParser;
040
041import java.io.File;
042import java.io.FileInputStream;
043import java.io.FilenameFilter;
044import java.io.IOException;
045import java.io.InputStream;
046import java.lang.reflect.InvocationTargetException;
047import java.lang.reflect.Method;
048import java.net.InetAddress;
049import java.net.URI;
050import java.net.URISyntaxException;
051import java.security.PrivilegedExceptionAction;
052import java.util.Arrays;
053import java.util.Comparator;
054import java.util.HashMap;
055import java.util.Map;
056import java.util.Properties;
057import java.util.Set;
058import java.util.HashSet;
059import java.util.concurrent.ConcurrentHashMap;
060
061
062/**
063 * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p> The
064 * default accessor used is the base accessor which just injects the UGI into the configuration instance used to
065 * create/obtain JobClient and FileSystem instances.
066 */
067public class HadoopAccessorService implements Service {
068
069    private static XLog LOG = XLog.getLog(HadoopAccessorService.class);
070
071    public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService.";
072    public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist";
073    public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist";
074    public static final String HADOOP_CONFS = CONF_PREFIX + "hadoop.configurations";
075    public static final String ACTION_CONFS = CONF_PREFIX + "action.configurations";
076    public static final String ACTION_CONFS_LOAD_DEFAULT_RESOURCES = ACTION_CONFS + ".load.default.resources";
077    public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled";
078    public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file";
079    public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal";
080    public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token");
081
082    protected static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
083    /** The Kerberos principal for the job tracker.*/
084    protected static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
085    /** The Kerberos principal for the resource manager.*/
086    protected static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
087    protected static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
088    protected static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
089    protected static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
090    private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>();
091
092    private static Configuration cachedConf;
093
094    private static final String DEFAULT_ACTIONNAME = "default";
095
096    private Set<String> jobTrackerWhitelist = new HashSet<String>();
097    private Set<String> nameNodeWhitelist = new HashSet<String>();
098    private Map<String, Configuration> hadoopConfigs = new HashMap<String, Configuration>();
099    private Map<String, File> actionConfigDirs = new HashMap<String, File>();
100    private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>();
101
102    private UserGroupInformationService ugiService;
103
104    /**
105     * Supported filesystem schemes for namespace federation
106     */
107    public static final String SUPPORTED_FILESYSTEMS = CONF_PREFIX + "supported.filesystems";
108    private Set<String> supportedSchemes;
109    private boolean allSchemesSupported;
110
111    public void init(Services services) throws ServiceException {
112        this.ugiService = services.get(UserGroupInformationService.class);
113        init(services.getConf());
114    }
115
116    //for testing purposes, see XFsTestCase
117    public void init(Configuration conf) throws ServiceException {
118        for (String name : ConfigurationService.getStrings(conf, JOB_TRACKER_WHITELIST)) {
119            String tmp = name.toLowerCase().trim();
120            if (tmp.length() == 0) {
121                continue;
122            }
123            jobTrackerWhitelist.add(tmp);
124        }
125        LOG.info(
126                "JOB_TRACKER_WHITELIST :" + jobTrackerWhitelist.toString()
127                        + ", Total entries :" + jobTrackerWhitelist.size());
128        for (String name : ConfigurationService.getStrings(conf, NAME_NODE_WHITELIST)) {
129            String tmp = name.toLowerCase().trim();
130            if (tmp.length() == 0) {
131                continue;
132            }
133            nameNodeWhitelist.add(tmp);
134        }
135        LOG.info(
136                "NAME_NODE_WHITELIST :" + nameNodeWhitelist.toString()
137                        + ", Total entries :" + nameNodeWhitelist.size());
138
139        boolean kerberosAuthOn = ConfigurationService.getBoolean(conf, KERBEROS_AUTH_ENABLED);
140        LOG.info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled");
141        if (kerberosAuthOn) {
142            kerberosInit(conf);
143        }
144        else {
145            Configuration ugiConf = new Configuration();
146            ugiConf.set("hadoop.security.authentication", "simple");
147            UserGroupInformation.setConfiguration(ugiConf);
148        }
149
150        if (ugiService == null) { //for testing purposes, see XFsTestCase
151            this.ugiService = new UserGroupInformationService();
152        }
153
154        loadHadoopConfigs(conf);
155        preLoadActionConfigs(conf);
156
157        supportedSchemes = new HashSet<String>();
158        String[] schemesFromConf = ConfigurationService.getStrings(conf, SUPPORTED_FILESYSTEMS);
159        if(schemesFromConf != null) {
160            for (String scheme: schemesFromConf) {
161                scheme = scheme.trim();
162                // If user gives "*", supportedSchemes will be empty, so that checking is not done i.e. all schemes allowed
163                if(scheme.equals("*")) {
164                    if(schemesFromConf.length > 1) {
165                        throw new ServiceException(ErrorCode.E0100, getClass().getName(),
166                            SUPPORTED_FILESYSTEMS + " should contain either only wildcard or explicit list, not both");
167                    }
168                    allSchemesSupported = true;
169                }
170                supportedSchemes.add(scheme);
171            }
172        }
173
174        setConfigForHadoopSecurityUtil(conf);
175    }
176
177    private void setConfigForHadoopSecurityUtil(Configuration conf) {
178        // Prior to HADOOP-12954 (2.9.0+), Hadoop sets hadoop.security.token.service.use_ip on startup in a static block with no
179        // way for Oozie to change it because Oozie doesn't load *-site.xml files on the classpath.  HADOOP-12954 added a way to
180        // set this property via a setConfiguration method.  Ideally, this would be part of JobClient so Oozie wouldn't have to
181        // worry about it and we could have different values for different clusters, but we can't; so we have to use the same value
182        // for every cluster Oozie is configured for.  To that end, we'll use the default NN's configs.  If that's not defined,
183        // we'll use the wildcard's configs.  And if that's not defined, we'll use an arbitrary cluster's configs.  In any case,
184        // if the version of Hadoop we're using doesn't include HADOOP-12954, we'll do nothing (there's no workaround), and
185        // hadoop.security.token.service.use_ip will have the default value.
186        String nameNode = conf.get(LiteWorkflowAppParser.DEFAULT_NAME_NODE);
187        if (nameNode != null) {
188            nameNode = nameNode.trim();
189            if (nameNode.isEmpty()) {
190                nameNode = null;
191            }
192        }
193        if (nameNode == null && hadoopConfigs.containsKey("*")) {
194            nameNode = "*";
195        }
196        if (nameNode == null) {
197            for (String nn : hadoopConfigs.keySet()) {
198                nn = nn.trim();
199                if (!nn.isEmpty()) {
200                    nameNode = nn;
201                    break;
202                }
203            }
204        }
205        if (nameNode != null) {
206            Configuration hConf = getConfiguration(nameNode);
207            try {
208                Method setConfigurationMethod = SecurityUtil.class.getMethod("setConfiguration", Configuration.class);
209                setConfigurationMethod.invoke(null, hConf);
210                LOG.debug("Setting Hadoop SecurityUtil Configuration to that of {0}", nameNode);
211            } catch (NoSuchMethodException e) {
212                LOG.debug("Not setting Hadoop SecurityUtil Configuration because this version of Hadoop doesn't support it");
213            } catch (Exception e) {
214                LOG.error("An Exception occurred while trying to call setConfiguration on {0} via Reflection.  It won't be called.",
215                        SecurityUtil.class.getName(), e);
216            }
217        }
218    }
219
220    private void kerberosInit(Configuration serviceConf) throws ServiceException {
221            try {
222                String keytabFile = ConfigurationService.get(serviceConf, KERBEROS_KEYTAB).trim();
223                if (keytabFile.length() == 0) {
224                    throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB);
225                }
226                String principal = SecurityUtil.getServerPrincipal(
227                        serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST"),
228                        InetAddress.getLocalHost().getCanonicalHostName());
229                if (principal.length() == 0) {
230                    throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
231                }
232                Configuration conf = new Configuration();
233                conf.set("hadoop.security.authentication", "kerberos");
234                UserGroupInformation.setConfiguration(conf);
235                UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
236                LOG.info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]",
237                        keytabFile, principal);
238            }
239            catch (ServiceException ex) {
240                throw ex;
241            }
242            catch (Exception ex) {
243                throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
244            }
245    }
246
247    private static final String[] HADOOP_CONF_FILES =
248        {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hadoop-site.xml", "ssl-client.xml"};
249
250
251    private Configuration loadHadoopConf(File dir) throws IOException {
252        Configuration hadoopConf = new XConfiguration();
253        for (String file : HADOOP_CONF_FILES) {
254            File f = new File(dir, file);
255            if (f.exists()) {
256                InputStream is = new FileInputStream(f);
257                Configuration conf = new XConfiguration(is, false);
258                is.close();
259                XConfiguration.copy(conf, hadoopConf);
260            }
261        }
262        return hadoopConf;
263    }
264
265    private Map<String, File> parseConfigDirs(String[] confDefs, String type) throws ServiceException, IOException {
266        Map<String, File> map = new HashMap<String, File>();
267        File configDir = new File(ConfigurationService.getConfigurationDirectory());
268        for (String confDef : confDefs) {
269            if (confDef.trim().length() > 0) {
270                String[] parts = confDef.split("=");
271                if (parts.length == 2) {
272                    String hostPort = parts[0];
273                    String confDir = parts[1];
274                    File dir = new File(confDir);
275                    if (!dir.isAbsolute()) {
276                        dir = new File(configDir, confDir);
277                    }
278                    if (dir.exists()) {
279                        map.put(hostPort.toLowerCase(), dir);
280                    }
281                    else {
282                        throw new ServiceException(ErrorCode.E0100, getClass().getName(),
283                                                   "could not find " + type + " configuration directory: " +
284                                                   dir.getAbsolutePath());
285                    }
286                }
287                else {
288                    throw new ServiceException(ErrorCode.E0100, getClass().getName(),
289                                               "Incorrect " + type + " configuration definition: " + confDef);
290                }
291            }
292        }
293        return map;
294    }
295
296    private void loadHadoopConfigs(Configuration serviceConf) throws ServiceException {
297        try {
298            Map<String, File> map = parseConfigDirs(ConfigurationService.getStrings(serviceConf, HADOOP_CONFS),
299                    "hadoop");
300            for (Map.Entry<String, File> entry : map.entrySet()) {
301                hadoopConfigs.put(entry.getKey(), loadHadoopConf(entry.getValue()));
302            }
303        }
304        catch (ServiceException ex) {
305            throw ex;
306        }
307        catch (Exception ex) {
308            throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
309        }
310    }
311
312    private void preLoadActionConfigs(Configuration serviceConf) throws ServiceException {
313        try {
314            actionConfigDirs = parseConfigDirs(ConfigurationService.getStrings(serviceConf, ACTION_CONFS), "action");
315            for (String hostport : actionConfigDirs.keySet()) {
316                actionConfigs.put(hostport, new ConcurrentHashMap<String, XConfiguration>());
317            }
318        }
319        catch (ServiceException ex) {
320            throw ex;
321        }
322        catch (Exception ex) {
323            throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
324        }
325    }
326
327    public void destroy() {
328    }
329
330    public Class<? extends Service> getInterface() {
331        return HadoopAccessorService.class;
332    }
333
334    private UserGroupInformation getUGI(String user) throws IOException {
335        return ugiService.getProxyUser(user);
336    }
337
338    /**
339     * Creates a JobConf using the site configuration for the specified hostname:port.
340     * <p>
341     * If the specified hostname:port is not defined it falls back to the '*' site
342     * configuration if available. If the '*' site configuration is not available,
343     * the JobConf has all Hadoop defaults.
344     *
345     * @param hostPort hostname:port to lookup Hadoop site configuration.
346     * @return a JobConf with the corresponding site configuration for hostPort.
347     */
348    public JobConf createJobConf(String hostPort) {
349        JobConf jobConf = new JobConf(getCachedConf());
350        XConfiguration.copy(getConfiguration(hostPort), jobConf);
351        jobConf.setBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, true);
352        return jobConf;
353    }
354
355    public Configuration getCachedConf() {
356        if (cachedConf == null) {
357            loadCachedConf();
358        }
359        return cachedConf;
360    }
361
362    private void loadCachedConf() {
363        cachedConf = new Configuration();
364        //for lazy loading
365        cachedConf.size();
366    }
367
368    private XConfiguration loadActionConf(String hostPort, String action) {
369        File dir = actionConfigDirs.get(hostPort);
370        XConfiguration actionConf = new XConfiguration();
371        if (dir != null) {
372            // See if a dir with the action name exists.   If so, load all the supported conf files in the dir
373            File actionConfDir = new File(dir, action);
374
375            if (actionConfDir.exists() && actionConfDir.isDirectory()) {
376                LOG.info("Processing configuration files under [{0}]"
377                                + " for action [{1}] and hostPort [{2}]",
378                        actionConfDir.getAbsolutePath(), action, hostPort);
379                updateActionConfigWithDir(actionConf, actionConfDir);
380            }
381        }
382
383        // Now check for <action.xml>   This way <action.xml> has priority over <action-dir>/*.xml
384        File actionConfFile = new File(dir, action + ".xml");
385        LOG.info("Processing configuration file [{0}] for action [{1}] and hostPort [{2}]",
386            actionConfFile.getAbsolutePath(), action, hostPort);
387        if (actionConfFile.exists()) {
388            updateActionConfigWithFile(actionConf, actionConfFile);
389        }
390
391        return actionConf;
392    }
393
394    private void updateActionConfigWithFile(Configuration actionConf, File actionConfFile)  {
395        try {
396            Configuration conf = readActionConfFile(actionConfFile);
397            XConfiguration.copy(conf, actionConf);
398        } catch (IOException e) {
399            LOG.warn("Could not read file [{0}].", actionConfFile.getAbsolutePath());
400        }
401    }
402
403    private void updateActionConfigWithDir(Configuration actionConf, File actionConfDir) {
404        File[] actionConfFiles = actionConfDir.listFiles(new FilenameFilter() {
405            @Override
406            public boolean accept(File dir, String name) {
407                return ActionConfFileType.isSupportedFileType(name);
408            }});
409        Arrays.sort(actionConfFiles, new Comparator<File>() {
410            @Override
411            public int compare(File o1, File o2) {
412                return o1.getName().compareTo(o2.getName());
413            }
414        });
415        for (File f : actionConfFiles) {
416            if (f.isFile() && f.canRead()) {
417                updateActionConfigWithFile(actionConf, f);
418            }
419        }
420
421    }
422
423    private Configuration readActionConfFile(File file) throws IOException {
424        InputStream fis = null;
425        try {
426            fis = new FileInputStream(file);
427            ActionConfFileType fileTyple = ActionConfFileType.getFileType(file.getName());
428            switch (fileTyple) {
429                case XML:
430                    return new XConfiguration(fis);
431                case PROPERTIES:
432                    Properties properties = new Properties();
433                    properties.load(fis);
434                    return new XConfiguration(properties);
435                default:
436                    throw new UnsupportedOperationException(
437                        String.format("Unable to parse action conf file of type %s", fileTyple));
438            }
439        } finally {
440            IOUtils.closeSafely(fis);
441        }
442    }
443
444    /**
445     * Returns a Configuration containing any defaults for an action for a particular cluster.
446     * <p>
447     * This configuration is used as default for the action configuration and enables cluster
448     * level default values per action.
449     *
450     * @param hostPort hostname"port to lookup the action default confiugration.
451     * @param action action name.
452     * @return the default configuration for the action for the specified cluster.
453     */
454    public XConfiguration createActionDefaultConf(String hostPort, String action) {
455        hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
456        Map<String, XConfiguration> hostPortActionConfigs = actionConfigs.get(hostPort);
457        if (hostPortActionConfigs == null) {
458            hostPortActionConfigs = actionConfigs.get("*");
459            hostPort = "*";
460        }
461        XConfiguration actionConf = hostPortActionConfigs.get(action);
462        if (actionConf == null) {
463            // doing lazy loading as we don't know upfront all actions, no need to synchronize
464            // as it is a read operation an in case of a race condition loading and inserting
465            // into the Map is idempotent and the action-config Map is a ConcurrentHashMap
466
467            // We first load a action of type default
468            // This allows for global configuration for all actions - for example
469            // all launchers in one queue and actions in another queue
470            // Are some configuration that applies to multiple actions - like
471            // config libraries path etc
472            actionConf = loadActionConf(hostPort, DEFAULT_ACTIONNAME);
473
474            // Action specific default configuration will override the default action config
475
476            XConfiguration.copy(loadActionConf(hostPort, action), actionConf);
477            hostPortActionConfigs.put(action, actionConf);
478        }
479        return new XConfiguration(actionConf.toProperties());
480    }
481
482    private Configuration getConfiguration(String hostPort) {
483        hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
484        Configuration conf = hadoopConfigs.get(hostPort);
485        if (conf == null) {
486            conf = hadoopConfigs.get("*");
487            if (conf == null) {
488                conf = new XConfiguration();
489            }
490        }
491        return conf;
492    }
493
494    /**
495     * Return a JobClient created with the provided user/group.
496     *
497     *
498     * @param conf JobConf with all necessary information to create the
499     *        JobClient.
500     * @return JobClient created with the provided user/group.
501     * @throws HadoopAccessorException if the client could not be created.
502     */
503    public JobClient createJobClient(String user, final JobConf conf) throws HadoopAccessorException {
504        ParamChecker.notEmpty(user, "user");
505        if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
506            throw new HadoopAccessorException(ErrorCode.E0903);
507        }
508        String jobTracker = conf.get(JavaActionExecutor.HADOOP_JOB_TRACKER);
509        validateJobTracker(jobTracker);
510        try {
511            UserGroupInformation ugi = getUGI(user);
512            JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() {
513                public JobClient run() throws Exception {
514                    return new JobClient(conf);
515                }
516            });
517            return jobClient;
518        }
519        catch (InterruptedException ex) {
520            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
521        }
522        catch (IOException ex) {
523            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
524        }
525    }
526
527    /**
528     * Get the RM delegation token using jobClient and add it to conf
529     *
530     * @param jobClient
531     * @param conf
532     * @throws HadoopAccessorException
533     */
534    public void addRMDelegationToken(JobClient jobClient, JobConf conf) throws HadoopAccessorException {
535        Token<DelegationTokenIdentifier> mrdt;
536        try {
537            mrdt = jobClient.getDelegationToken(getMRDelegationTokenRenewer(conf));
538        }
539        catch (IOException e) {
540            throw new HadoopAccessorException(ErrorCode.E0902, e.getMessage(), e);
541        }
542        catch (InterruptedException e) {
543            throw new HadoopAccessorException(ErrorCode.E0902, e.getMessage(), e);
544        }
545        conf.getCredentials().addToken(MR_TOKEN_ALIAS, mrdt);
546    }
547
548    /**
549     * Return a FileSystem created with the provided user for the specified URI.
550     *
551     *
552     * @param uri file system URI.
553     * @param conf Configuration with all necessary information to create the FileSystem.
554     * @return FileSystem created with the provided user/group.
555     * @throws HadoopAccessorException if the filesystem could not be created.
556     */
557    public FileSystem createFileSystem(String user, final URI uri, final Configuration conf)
558            throws HadoopAccessorException {
559        ParamChecker.notEmpty(user, "user");
560        if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
561            throw new HadoopAccessorException(ErrorCode.E0903);
562        }
563
564        checkSupportedFilesystem(uri);
565
566        String nameNode = uri.getAuthority();
567        if (nameNode == null) {
568            nameNode = conf.get("fs.default.name");
569            if (nameNode != null) {
570                try {
571                    nameNode = new URI(nameNode).getAuthority();
572                }
573                catch (URISyntaxException ex) {
574                    throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
575                }
576            }
577        }
578        validateNameNode(nameNode);
579
580        try {
581            UserGroupInformation ugi = getUGI(user);
582            return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
583                public FileSystem run() throws Exception {
584                    return FileSystem.get(uri, conf);
585                }
586            });
587        }
588        catch (InterruptedException ex) {
589            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
590        }
591        catch (IOException ex) {
592            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
593        }
594    }
595
596    /**
597     * Validate Job tracker
598     * @param jobTrackerUri
599     * @throws HadoopAccessorException
600     */
601    protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException {
602        validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900);
603    }
604
605    /**
606     * Validate Namenode list
607     * @param nameNodeUri
608     * @throws HadoopAccessorException
609     */
610    protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException {
611        validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901);
612    }
613
614    private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException {
615        if (uri != null) {
616            uri = uri.toLowerCase().trim();
617            if (whitelist.size() > 0 && !whitelist.contains(uri)) {
618                throw new HadoopAccessorException(error, uri, whitelist);
619            }
620        }
621    }
622
623    public Text getMRDelegationTokenRenewer(JobConf jobConf) throws IOException {
624        if (UserGroupInformation.isSecurityEnabled()) { // secure cluster
625            return getMRTokenRenewerInternal(jobConf);
626        }
627        else {
628            return MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer
629        }
630    }
631
632    // Package private for unit test purposes
633    Text getMRTokenRenewerInternal(JobConf jobConf) throws IOException {
634        // Getting renewer correctly for JT principal also though JT in hadoop 1.x does not have
635        // support for renewing/cancelling tokens
636        String servicePrincipal = jobConf.get(RM_PRINCIPAL, jobConf.get(JT_PRINCIPAL));
637        Text renewer;
638        if (servicePrincipal != null) { // secure cluster
639            renewer = mrTokenRenewers.get(servicePrincipal);
640            if (renewer == null) {
641                // Mimic org.apache.hadoop.mapred.Master.getMasterPrincipal()
642                String target = jobConf.get(HADOOP_YARN_RM, jobConf.get(HADOOP_JOB_TRACKER_2));
643                if (target == null) {
644                    target = jobConf.get(HADOOP_JOB_TRACKER);
645                }
646                try {
647                    String addr = NetUtils.createSocketAddr(target).getHostName();
648                    renewer = new Text(SecurityUtil.getServerPrincipal(servicePrincipal, addr));
649                    LOG.info("Delegation Token Renewer details: Principal=" + servicePrincipal + ",Target=" + target
650                            + ",Renewer=" + renewer);
651                }
652                catch (IllegalArgumentException iae) {
653                    renewer = new Text(servicePrincipal.split("[/@]")[0]);
654                    LOG.info("Delegation Token Renewer for " + servicePrincipal + " is " + renewer);
655                }
656                mrTokenRenewers.put(servicePrincipal, renewer);
657            }
658        }
659        else {
660            renewer = MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer
661        }
662        return renewer;
663    }
664
665    public void addFileToClassPath(String user, final Path file, final Configuration conf)
666            throws IOException {
667        ParamChecker.notEmpty(user, "user");
668        try {
669            UserGroupInformation ugi = getUGI(user);
670            ugi.doAs(new PrivilegedExceptionAction<Void>() {
671                @Override
672                public Void run() throws Exception {
673                    JobUtils.addFileToClassPath(file, conf, null);
674                    return null;
675                }
676            });
677
678        }
679        catch (InterruptedException ex) {
680            throw new IOException(ex);
681        }
682
683    }
684
685    /**
686     * checks configuration parameter if filesystem scheme is among the list of supported ones
687     * this makes system robust to filesystems other than HDFS also
688     */
689
690    public void checkSupportedFilesystem(URI uri) throws HadoopAccessorException {
691        if (allSchemesSupported)
692            return;
693        String uriScheme = uri.getScheme();
694        if (uriScheme != null) {    // skip the check if no scheme is given
695            if(!supportedSchemes.isEmpty()) {
696                LOG.debug("Checking if filesystem " + uriScheme + " is supported");
697                if (!supportedSchemes.contains(uriScheme)) {
698                    throw new HadoopAccessorException(ErrorCode.E0904, uriScheme, uri.toString());
699                }
700             }
701         }
702    }
703
704    public Set<String> getSupportedSchemes() {
705        return supportedSchemes;
706    }
707
708}