001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import org.apache.hadoop.io.Text;
021import org.apache.hadoop.mapred.JobClient;
022import org.apache.hadoop.mapred.JobConf;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
027import org.apache.hadoop.net.NetUtils;
028import org.apache.hadoop.security.SecurityUtil;
029import org.apache.hadoop.security.UserGroupInformation;
030import org.apache.hadoop.filecache.DistributedCache;
031import org.apache.hadoop.security.token.Token;
032import org.apache.oozie.ErrorCode;
033import org.apache.oozie.action.hadoop.JavaActionExecutor;
034import org.apache.oozie.util.ParamChecker;
035import org.apache.oozie.util.XConfiguration;
036import org.apache.oozie.util.XLog;
037import org.apache.oozie.util.JobUtils;
038
039import java.io.File;
040import java.io.FileInputStream;
041import java.io.IOException;
042import java.io.InputStream;
043import java.net.URI;
044import java.net.URISyntaxException;
045import java.security.PrivilegedExceptionAction;
046import java.util.HashMap;
047import java.util.Map;
048import java.util.Set;
049import java.util.HashSet;
050import java.util.concurrent.ConcurrentHashMap;
051
052/**
053 * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The
054 * default accessor used is the base accessor which just injects the UGI into the configuration instance used to
055 * create/obtain JobClient and FileSystem instances.
056 */
057public class HadoopAccessorService implements Service {
058
059    private static XLog LOG = XLog.getLog(HadoopAccessorService.class);
060
061    public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService.";
062    public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist";
063    public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist";
064    public static final String HADOOP_CONFS = CONF_PREFIX + "hadoop.configurations";
065    public static final String ACTION_CONFS = CONF_PREFIX + "action.configurations";
066    public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled";
067    public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file";
068    public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal";
069    public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token");
070
071    protected static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
072    /** The Kerberos principal for the job tracker.*/
073    protected static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
074    /** The Kerberos principal for the resource manager.*/
075    protected static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
076    protected static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
077    protected static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
078    protected static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
079    private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>();
080
081    private Set<String> jobTrackerWhitelist = new HashSet<String>();
082    private Set<String> nameNodeWhitelist = new HashSet<String>();
083    private Map<String, Configuration> hadoopConfigs = new HashMap<String, Configuration>();
084    private Map<String, File> actionConfigDirs = new HashMap<String, File>();
085    private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>();
086
087    private UserGroupInformationService ugiService;
088
089    /**
090     * Supported filesystem schemes for namespace federation
091     */
092    public static final String SUPPORTED_FILESYSTEMS = CONF_PREFIX + "supported.filesystems";
093    public static final String[] DEFAULT_SUPPORTED_SCHEMES = new String[]{"hdfs","hftp","webhdfs"};
094    private Set<String> supportedSchemes;
095    private boolean allSchemesSupported;
096
097    public void init(Services services) throws ServiceException {
098        this.ugiService = services.get(UserGroupInformationService.class);
099        init(services.getConf());
100    }
101
102    //for testing purposes, see XFsTestCase
103    public void init(Configuration conf) throws ServiceException {
104        for (String name : conf.getStringCollection(JOB_TRACKER_WHITELIST)) {
105            String tmp = name.toLowerCase().trim();
106            if (tmp.length() == 0) {
107                continue;
108            }
109            jobTrackerWhitelist.add(tmp);
110        }
111        XLog.getLog(getClass()).info(
112                "JOB_TRACKER_WHITELIST :" + conf.getStringCollection(JOB_TRACKER_WHITELIST)
113                        + ", Total entries :" + jobTrackerWhitelist.size());
114        for (String name : conf.getStringCollection(NAME_NODE_WHITELIST)) {
115            String tmp = name.toLowerCase().trim();
116            if (tmp.length() == 0) {
117                continue;
118            }
119            nameNodeWhitelist.add(tmp);
120        }
121        XLog.getLog(getClass()).info(
122                "NAME_NODE_WHITELIST :" + conf.getStringCollection(NAME_NODE_WHITELIST)
123                        + ", Total entries :" + nameNodeWhitelist.size());
124
125        boolean kerberosAuthOn = conf.getBoolean(KERBEROS_AUTH_ENABLED, true);
126        XLog.getLog(getClass()).info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled");
127        if (kerberosAuthOn) {
128            kerberosInit(conf);
129        }
130        else {
131            Configuration ugiConf = new Configuration();
132            ugiConf.set("hadoop.security.authentication", "simple");
133            UserGroupInformation.setConfiguration(ugiConf);
134        }
135
136        if (ugiService == null) { //for testing purposes, see XFsTestCase
137            this.ugiService = new UserGroupInformationService();
138        }
139
140        loadHadoopConfigs(conf);
141        preLoadActionConfigs(conf);
142
143        supportedSchemes = new HashSet<String>();
144        String[] schemesFromConf = conf.getStrings(SUPPORTED_FILESYSTEMS, DEFAULT_SUPPORTED_SCHEMES);
145        if(schemesFromConf != null) {
146            for (String scheme: schemesFromConf) {
147                scheme = scheme.trim();
148                // If user gives "*", supportedSchemes will be empty, so that checking is not done i.e. all schemes allowed
149                if(scheme.equals("*")) {
150                    if(schemesFromConf.length > 1) {
151                        throw new ServiceException(ErrorCode.E0100, getClass().getName(),
152                            SUPPORTED_FILESYSTEMS + " should contain either only wildcard or explicit list, not both");
153                    }
154                    allSchemesSupported = true;
155                }
156                supportedSchemes.add(scheme);
157            }
158        }
159    }
160
161    private void kerberosInit(Configuration serviceConf) throws ServiceException {
162            try {
163                String keytabFile = serviceConf.get(KERBEROS_KEYTAB,
164                                                    System.getProperty("user.home") + "/oozie.keytab").trim();
165                if (keytabFile.length() == 0) {
166                    throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB);
167                }
168                String principal = serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
169                if (principal.length() == 0) {
170                    throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
171                }
172                Configuration conf = new Configuration();
173                conf.set("hadoop.security.authentication", "kerberos");
174                UserGroupInformation.setConfiguration(conf);
175                UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
176                XLog.getLog(getClass()).info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]",
177                                             keytabFile, principal);
178            }
179            catch (ServiceException ex) {
180                throw ex;
181            }
182            catch (Exception ex) {
183                throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
184            }
185    }
186
187    private static final String[] HADOOP_CONF_FILES =
188        {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hadoop-site.xml"};
189
190
191    private Configuration loadHadoopConf(File dir) throws IOException {
192        Configuration hadoopConf = new XConfiguration();
193        for (String file : HADOOP_CONF_FILES) {
194            File f = new File(dir, file);
195            if (f.exists()) {
196                InputStream is = new FileInputStream(f);
197                Configuration conf = new XConfiguration(is);
198                is.close();
199                XConfiguration.copy(conf, hadoopConf);
200            }
201        }
202        return hadoopConf;
203    }
204
205    private Map<String, File> parseConfigDirs(String[] confDefs, String type) throws ServiceException, IOException {
206        Map<String, File> map = new HashMap<String, File>();
207        File configDir = new File(ConfigurationService.getConfigurationDirectory());
208        for (String confDef : confDefs) {
209            if (confDef.trim().length() > 0) {
210                String[] parts = confDef.split("=");
211                if (parts.length == 2) {
212                    String hostPort = parts[0];
213                    String confDir = parts[1];
214                    File dir = new File(confDir);
215                    if (!dir.isAbsolute()) {
216                        dir = new File(configDir, confDir);
217                    }
218                    if (dir.exists()) {
219                        map.put(hostPort.toLowerCase(), dir);
220                    }
221                    else {
222                        throw new ServiceException(ErrorCode.E0100, getClass().getName(),
223                                                   "could not find " + type + " configuration directory: " +
224                                                   dir.getAbsolutePath());
225                    }
226                }
227                else {
228                    throw new ServiceException(ErrorCode.E0100, getClass().getName(),
229                                               "Incorrect " + type + " configuration definition: " + confDef);
230                }
231            }
232        }
233        return map;
234    }
235
236    private void loadHadoopConfigs(Configuration serviceConf) throws ServiceException {
237        try {
238            Map<String, File> map = parseConfigDirs(serviceConf.getStrings(HADOOP_CONFS, "*=hadoop-conf"), "hadoop");
239            for (Map.Entry<String, File> entry : map.entrySet()) {
240                hadoopConfigs.put(entry.getKey(), loadHadoopConf(entry.getValue()));
241            }
242        }
243        catch (ServiceException ex) {
244            throw ex;
245        }
246        catch (Exception ex) {
247            throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
248        }
249    }
250
251    private void preLoadActionConfigs(Configuration serviceConf) throws ServiceException {
252        try {
253            actionConfigDirs = parseConfigDirs(serviceConf.getStrings(ACTION_CONFS, "*=hadoop-conf"), "action");
254            for (String hostport : actionConfigDirs.keySet()) {
255                actionConfigs.put(hostport, new ConcurrentHashMap<String, XConfiguration>());
256            }
257        }
258        catch (ServiceException ex) {
259            throw ex;
260        }
261        catch (Exception ex) {
262            throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
263        }
264    }
265
266    public void destroy() {
267    }
268
269    public Class<? extends Service> getInterface() {
270        return HadoopAccessorService.class;
271    }
272
273    private UserGroupInformation getUGI(String user) throws IOException {
274        return ugiService.getProxyUser(user);
275    }
276
277    /**
278     * Creates a JobConf using the site configuration for the specified hostname:port.
279     * <p/>
280     * If the specified hostname:port is not defined it falls back to the '*' site
281     * configuration if available. If the '*' site configuration is not available,
282     * the JobConf has all Hadoop defaults.
283     *
284     * @param hostPort hostname:port to lookup Hadoop site configuration.
285     * @return a JobConf with the corresponding site configuration for hostPort.
286     */
287    public JobConf createJobConf(String hostPort) {
288        JobConf jobConf = new JobConf();
289        XConfiguration.copy(getConfiguration(hostPort), jobConf);
290        jobConf.setBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, true);
291        return jobConf;
292    }
293
294    private XConfiguration loadActionConf(String hostPort, String action) {
295        File dir = actionConfigDirs.get(hostPort);
296        XConfiguration actionConf = new XConfiguration();
297        if (dir != null) {
298            File actionConfFile = new File(dir, action + ".xml");
299            if (actionConfFile.exists()) {
300                try {
301                    actionConf = new XConfiguration(new FileInputStream(actionConfFile));
302                }
303                catch (IOException ex) {
304                    XLog.getLog(getClass()).warn("Could not read file [{0}] for action [{1}] configuration for hostPort [{2}]",
305                                                 actionConfFile.getAbsolutePath(), action, hostPort);
306                }
307            }
308        }
309        return actionConf;
310    }
311
312    /**
313     * Returns a Configuration containing any defaults for an action for a particular cluster.
314     * <p/>
315     * This configuration is used as default for the action configuration and enables cluster
316     * level default values per action.
317     *
318     * @param hostPort hostname"port to lookup the action default confiugration.
319     * @param action action name.
320     * @return the default configuration for the action for the specified cluster.
321     */
322    public XConfiguration createActionDefaultConf(String hostPort, String action) {
323        hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
324        Map<String, XConfiguration> hostPortActionConfigs = actionConfigs.get(hostPort);
325        if (hostPortActionConfigs == null) {
326            hostPortActionConfigs = actionConfigs.get("*");
327            hostPort = "*";
328        }
329        XConfiguration actionConf = hostPortActionConfigs.get(action);
330        if (actionConf == null) {
331            // doing lazy loading as we don't know upfront all actions, no need to synchronize
332            // as it is a read operation an in case of a race condition loading and inserting
333            // into the Map is idempotent and the action-config Map is a ConcurrentHashMap
334            actionConf = loadActionConf(hostPort, action);
335            hostPortActionConfigs.put(action, actionConf);
336        }
337        return new XConfiguration(actionConf.toProperties());
338    }
339
340    private Configuration getConfiguration(String hostPort) {
341        hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
342        Configuration conf = hadoopConfigs.get(hostPort);
343        if (conf == null) {
344            conf = hadoopConfigs.get("*");
345            if (conf == null) {
346                conf = new XConfiguration();
347            }
348        }
349        return conf;
350    }
351
352    /**
353     * Return a JobClient created with the provided user/group.
354     *
355     *
356     * @param conf JobConf with all necessary information to create the
357     *        JobClient.
358     * @return JobClient created with the provided user/group.
359     * @throws HadoopAccessorException if the client could not be created.
360     */
361    public JobClient createJobClient(String user, final JobConf conf) throws HadoopAccessorException {
362        ParamChecker.notEmpty(user, "user");
363        if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
364            throw new HadoopAccessorException(ErrorCode.E0903);
365        }
366        String jobTracker = conf.get(JavaActionExecutor.HADOOP_JOB_TRACKER);
367        validateJobTracker(jobTracker);
368        try {
369            UserGroupInformation ugi = getUGI(user);
370            JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() {
371                public JobClient run() throws Exception {
372                    return new JobClient(conf);
373                }
374            });
375            Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(getMRDelegationTokenRenewer(conf));
376            conf.getCredentials().addToken(MR_TOKEN_ALIAS, mrdt);
377            return jobClient;
378        }
379        catch (InterruptedException ex) {
380            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
381        }
382        catch (IOException ex) {
383            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
384        }
385    }
386
387    /**
388     * Return a FileSystem created with the provided user for the specified URI.
389     *
390     *
391     * @param uri file system URI.
392     * @param conf Configuration with all necessary information to create the FileSystem.
393     * @return FileSystem created with the provided user/group.
394     * @throws HadoopAccessorException if the filesystem could not be created.
395     */
396    public FileSystem createFileSystem(String user, final URI uri, final Configuration conf)
397            throws HadoopAccessorException {
398        ParamChecker.notEmpty(user, "user");
399        if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
400            throw new HadoopAccessorException(ErrorCode.E0903);
401        }
402
403        checkSupportedFilesystem(uri);
404
405        String nameNode = uri.getAuthority();
406        if (nameNode == null) {
407            nameNode = conf.get("fs.default.name");
408            if (nameNode != null) {
409                try {
410                    nameNode = new URI(nameNode).getAuthority();
411                }
412                catch (URISyntaxException ex) {
413                    throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
414                }
415            }
416        }
417        validateNameNode(nameNode);
418
419        try {
420            UserGroupInformation ugi = getUGI(user);
421            return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
422                public FileSystem run() throws Exception {
423                    return FileSystem.get(uri, conf);
424                }
425            });
426        }
427        catch (InterruptedException ex) {
428            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
429        }
430        catch (IOException ex) {
431            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
432        }
433    }
434
435    /**
436     * Validate Job tracker
437     * @param jobTrackerUri
438     * @throws HadoopAccessorException
439     */
440    protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException {
441        validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900);
442    }
443
444    /**
445     * Validate Namenode list
446     * @param nameNodeUri
447     * @throws HadoopAccessorException
448     */
449    protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException {
450        validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901);
451    }
452
453    private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException {
454        if (uri != null) {
455            uri = uri.toLowerCase().trim();
456            if (whitelist.size() > 0 && !whitelist.contains(uri)) {
457                throw new HadoopAccessorException(error, uri);
458            }
459        }
460    }
461
462    public Text getMRDelegationTokenRenewer(JobConf jobConf) throws IOException {
463        if (UserGroupInformation.isSecurityEnabled()) { // secure cluster
464            return getMRTokenRenewerInternal(jobConf);
465        }
466        else {
467            return MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer
468        }
469    }
470
471    // Package private for unit test purposes
472    Text getMRTokenRenewerInternal(JobConf jobConf) throws IOException {
473        // Getting renewer correctly for JT principal also though JT in hadoop 1.x does not have
474        // support for renewing/cancelling tokens
475        String servicePrincipal = jobConf.get(RM_PRINCIPAL, jobConf.get(JT_PRINCIPAL));
476        Text renewer;
477        if (servicePrincipal != null) { // secure cluster
478            renewer = mrTokenRenewers.get(servicePrincipal);
479            if (renewer == null) {
480                // Mimic org.apache.hadoop.mapred.Master.getMasterPrincipal()
481                String target = jobConf.get(HADOOP_YARN_RM, jobConf.get(HADOOP_JOB_TRACKER_2));
482                if (target == null) {
483                    target = jobConf.get(HADOOP_JOB_TRACKER);
484                }
485                try {
486                    String addr = NetUtils.createSocketAddr(target).getHostName();
487                    renewer = new Text(SecurityUtil.getServerPrincipal(servicePrincipal, addr));
488                    LOG.info("Delegation Token Renewer details: Principal=" + servicePrincipal + ",Target=" + target
489                            + ",Renewer=" + renewer);
490                }
491                catch (IllegalArgumentException iae) {
492                    renewer = new Text(servicePrincipal.split("[/@]")[0]);
493                    LOG.info("Delegation Token Renewer for " + servicePrincipal + " is " + renewer);
494                }
495                mrTokenRenewers.put(servicePrincipal, renewer);
496            }
497        }
498        else {
499            renewer = MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer
500        }
501        return renewer;
502    }
503
504    public void addFileToClassPath(String user, final Path file, final Configuration conf)
505            throws IOException {
506        ParamChecker.notEmpty(user, "user");
507        try {
508            UserGroupInformation ugi = getUGI(user);
509            ugi.doAs(new PrivilegedExceptionAction<Void>() {
510                @Override
511                public Void run() throws Exception {
512                    JobUtils.addFileToClassPath(file, conf, null);
513                    return null;
514                }
515            });
516
517        }
518        catch (InterruptedException ex) {
519            throw new IOException(ex);
520        }
521
522    }
523
524    /**
525     * checks configuration parameter if filesystem scheme is among the list of supported ones
526     * this makes system robust to filesystems other than HDFS also
527     */
528
529    public void checkSupportedFilesystem(URI uri) throws HadoopAccessorException {
530        if (allSchemesSupported)
531            return;
532        String uriScheme = uri.getScheme();
533        if (uriScheme != null) {    // skip the check if no scheme is given
534            if(!supportedSchemes.isEmpty()) {
535                XLog.getLog(this.getClass()).debug("Checking if filesystem " + uriScheme + " is supported");
536                if (!supportedSchemes.contains(uriScheme)) {
537                    throw new HadoopAccessorException(ErrorCode.E0904, uriScheme, uri.toString());
538                }
539             }
540         }
541    }
542
543    public Set<String> getSupportedSchemes() {
544        return supportedSchemes;
545    }
546
547}