001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import org.apache.hadoop.io.Text;
021    import org.apache.hadoop.mapred.JobClient;
022    import org.apache.hadoop.mapred.JobConf;
023    import org.apache.hadoop.fs.FileSystem;
024    import org.apache.hadoop.fs.Path;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
027    import org.apache.hadoop.net.NetUtils;
028    import org.apache.hadoop.security.SecurityUtil;
029    import org.apache.hadoop.security.UserGroupInformation;
030    import org.apache.hadoop.filecache.DistributedCache;
031    import org.apache.hadoop.security.token.Token;
032    import org.apache.oozie.ErrorCode;
033    import org.apache.oozie.action.hadoop.JavaActionExecutor;
034    import org.apache.oozie.util.ParamChecker;
035    import org.apache.oozie.util.XConfiguration;
036    import org.apache.oozie.util.XLog;
037    
038    import java.io.File;
039    import java.io.FileInputStream;
040    import java.io.IOException;
041    import java.io.InputStream;
042    import java.net.URI;
043    import java.net.URISyntaxException;
044    import java.security.PrivilegedExceptionAction;
045    import java.util.HashMap;
046    import java.util.Map;
047    import java.util.Set;
048    import java.util.HashSet;
049    import java.util.concurrent.ConcurrentHashMap;
050    
051    /**
052     * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The
053     * default accessor used is the base accessor which just injects the UGI into the configuration instance used to
054     * create/obtain JobClient and FileSystem instances.
055     */
056    public class HadoopAccessorService implements Service {
057    
058        private static XLog LOG = XLog.getLog(HadoopAccessorService.class);
059    
060        public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService.";
061        public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist";
062        public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist";
063        public static final String HADOOP_CONFS = CONF_PREFIX + "hadoop.configurations";
064        public static final String ACTION_CONFS = CONF_PREFIX + "action.configurations";
065        public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled";
066        public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file";
067        public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal";
068        public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token");
069    
070        protected static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
071        /** The Kerberos principal for the job tracker.*/
072        protected static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
073        /** The Kerberos principal for the resource manager.*/
074        protected static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
075        protected static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
076        protected static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
077        protected static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
078        private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>();
079    
080        private Set<String> jobTrackerWhitelist = new HashSet<String>();
081        private Set<String> nameNodeWhitelist = new HashSet<String>();
082        private Map<String, Configuration> hadoopConfigs = new HashMap<String, Configuration>();
083        private Map<String, File> actionConfigDirs = new HashMap<String, File>();
084        private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>();
085    
086        private UserGroupInformationService ugiService;
087    
088        /**
089         * Supported filesystem schemes for namespace federation
090         */
091        public static final String SUPPORTED_FILESYSTEMS = CONF_PREFIX + "supported.filesystems";
092        public static final String[] DEFAULT_SUPPORTED_SCHEMES = new String[]{"hdfs","hftp","webhdfs"};
093        private Set<String> supportedSchemes;
094        private boolean allSchemesSupported;
095    
096        public void init(Services services) throws ServiceException {
097            this.ugiService = services.get(UserGroupInformationService.class);
098            init(services.getConf());
099        }
100    
101        //for testing purposes, see XFsTestCase
102        public void init(Configuration conf) throws ServiceException {
103            for (String name : conf.getStringCollection(JOB_TRACKER_WHITELIST)) {
104                String tmp = name.toLowerCase().trim();
105                if (tmp.length() == 0) {
106                    continue;
107                }
108                jobTrackerWhitelist.add(tmp);
109            }
110            XLog.getLog(getClass()).info(
111                    "JOB_TRACKER_WHITELIST :" + conf.getStringCollection(JOB_TRACKER_WHITELIST)
112                            + ", Total entries :" + jobTrackerWhitelist.size());
113            for (String name : conf.getStringCollection(NAME_NODE_WHITELIST)) {
114                String tmp = name.toLowerCase().trim();
115                if (tmp.length() == 0) {
116                    continue;
117                }
118                nameNodeWhitelist.add(tmp);
119            }
120            XLog.getLog(getClass()).info(
121                    "NAME_NODE_WHITELIST :" + conf.getStringCollection(NAME_NODE_WHITELIST)
122                            + ", Total entries :" + nameNodeWhitelist.size());
123    
124            boolean kerberosAuthOn = conf.getBoolean(KERBEROS_AUTH_ENABLED, true);
125            XLog.getLog(getClass()).info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled");
126            if (kerberosAuthOn) {
127                kerberosInit(conf);
128            }
129            else {
130                Configuration ugiConf = new Configuration();
131                ugiConf.set("hadoop.security.authentication", "simple");
132                UserGroupInformation.setConfiguration(ugiConf);
133            }
134    
135            if (ugiService == null) { //for testing purposes, see XFsTestCase
136                this.ugiService = new UserGroupInformationService();
137            }
138    
139            loadHadoopConfigs(conf);
140            preLoadActionConfigs(conf);
141    
142            supportedSchemes = new HashSet<String>();
143            String[] schemesFromConf = conf.getStrings(SUPPORTED_FILESYSTEMS, DEFAULT_SUPPORTED_SCHEMES);
144            if(schemesFromConf != null) {
145                for (String scheme: schemesFromConf) {
146                    scheme = scheme.trim();
147                    // If user gives "*", supportedSchemes will be empty, so that checking is not done i.e. all schemes allowed
148                    if(scheme.equals("*")) {
149                        if(schemesFromConf.length > 1) {
150                            throw new ServiceException(ErrorCode.E0100, getClass().getName(),
151                                SUPPORTED_FILESYSTEMS + " should contain either only wildcard or explicit list, not both");
152                        }
153                        allSchemesSupported = true;
154                    }
155                    supportedSchemes.add(scheme);
156                }
157            }
158        }
159    
160        private void kerberosInit(Configuration serviceConf) throws ServiceException {
161                try {
162                    String keytabFile = serviceConf.get(KERBEROS_KEYTAB,
163                                                        System.getProperty("user.home") + "/oozie.keytab").trim();
164                    if (keytabFile.length() == 0) {
165                        throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB);
166                    }
167                    String principal = serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
168                    if (principal.length() == 0) {
169                        throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
170                    }
171                    Configuration conf = new Configuration();
172                    conf.set("hadoop.security.authentication", "kerberos");
173                    UserGroupInformation.setConfiguration(conf);
174                    UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
175                    XLog.getLog(getClass()).info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]",
176                                                 keytabFile, principal);
177                }
178                catch (ServiceException ex) {
179                    throw ex;
180                }
181                catch (Exception ex) {
182                    throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
183                }
184        }
185    
186        private static final String[] HADOOP_CONF_FILES =
187            {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hadoop-site.xml"};
188    
189    
190        private Configuration loadHadoopConf(File dir) throws IOException {
191            Configuration hadoopConf = new XConfiguration();
192            for (String file : HADOOP_CONF_FILES) {
193                File f = new File(dir, file);
194                if (f.exists()) {
195                    InputStream is = new FileInputStream(f);
196                    Configuration conf = new XConfiguration(is);
197                    is.close();
198                    XConfiguration.copy(conf, hadoopConf);
199                }
200            }
201            return hadoopConf;
202        }
203    
204        private Map<String, File> parseConfigDirs(String[] confDefs, String type) throws ServiceException, IOException {
205            Map<String, File> map = new HashMap<String, File>();
206            File configDir = new File(ConfigurationService.getConfigurationDirectory());
207            for (String confDef : confDefs) {
208                if (confDef.trim().length() > 0) {
209                    String[] parts = confDef.split("=");
210                    if (parts.length == 2) {
211                        String hostPort = parts[0];
212                        String confDir = parts[1];
213                        File dir = new File(confDir);
214                        if (!dir.isAbsolute()) {
215                            dir = new File(configDir, confDir);
216                        }
217                        if (dir.exists()) {
218                            map.put(hostPort.toLowerCase(), dir);
219                        }
220                        else {
221                            throw new ServiceException(ErrorCode.E0100, getClass().getName(),
222                                                       "could not find " + type + " configuration directory: " +
223                                                       dir.getAbsolutePath());
224                        }
225                    }
226                    else {
227                        throw new ServiceException(ErrorCode.E0100, getClass().getName(),
228                                                   "Incorrect " + type + " configuration definition: " + confDef);
229                    }
230                }
231            }
232            return map;
233        }
234    
235        private void loadHadoopConfigs(Configuration serviceConf) throws ServiceException {
236            try {
237                Map<String, File> map = parseConfigDirs(serviceConf.getStrings(HADOOP_CONFS, "*=hadoop-conf"), "hadoop");
238                for (Map.Entry<String, File> entry : map.entrySet()) {
239                    hadoopConfigs.put(entry.getKey(), loadHadoopConf(entry.getValue()));
240                }
241            }
242            catch (ServiceException ex) {
243                throw ex;
244            }
245            catch (Exception ex) {
246                throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
247            }
248        }
249    
250        private void preLoadActionConfigs(Configuration serviceConf) throws ServiceException {
251            try {
252                actionConfigDirs = parseConfigDirs(serviceConf.getStrings(ACTION_CONFS, "*=hadoop-conf"), "action");
253                for (String hostport : actionConfigDirs.keySet()) {
254                    actionConfigs.put(hostport, new ConcurrentHashMap<String, XConfiguration>());
255                }
256            }
257            catch (ServiceException ex) {
258                throw ex;
259            }
260            catch (Exception ex) {
261                throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
262            }
263        }
264    
265        public void destroy() {
266        }
267    
268        public Class<? extends Service> getInterface() {
269            return HadoopAccessorService.class;
270        }
271    
272        private UserGroupInformation getUGI(String user) throws IOException {
273            return ugiService.getProxyUser(user);
274        }
275    
276        /**
277         * Creates a JobConf using the site configuration for the specified hostname:port.
278         * <p/>
279         * If the specified hostname:port is not defined it falls back to the '*' site
280         * configuration if available. If the '*' site configuration is not available,
281         * the JobConf has all Hadoop defaults.
282         *
283         * @param hostPort hostname:port to lookup Hadoop site configuration.
284         * @return a JobConf with the corresponding site configuration for hostPort.
285         */
286        public JobConf createJobConf(String hostPort) {
287            JobConf jobConf = new JobConf();
288            XConfiguration.copy(getConfiguration(hostPort), jobConf);
289            jobConf.setBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, true);
290            return jobConf;
291        }
292    
293        private XConfiguration loadActionConf(String hostPort, String action) {
294            File dir = actionConfigDirs.get(hostPort);
295            XConfiguration actionConf = new XConfiguration();
296            if (dir != null) {
297                File actionConfFile = new File(dir, action + ".xml");
298                if (actionConfFile.exists()) {
299                    try {
300                        actionConf = new XConfiguration(new FileInputStream(actionConfFile));
301                    }
302                    catch (IOException ex) {
303                        XLog.getLog(getClass()).warn("Could not read file [{0}] for action [{1}] configuration for hostPort [{2}]",
304                                                     actionConfFile.getAbsolutePath(), action, hostPort);
305                    }
306                }
307            }
308            return actionConf;
309        }
310    
311        /**
312         * Returns a Configuration containing any defaults for an action for a particular cluster.
313         * <p/>
314         * This configuration is used as default for the action configuration and enables cluster
315         * level default values per action.
316         *
317         * @param hostPort hostname"port to lookup the action default confiugration.
318         * @param action action name.
319         * @return the default configuration for the action for the specified cluster.
320         */
321        public XConfiguration createActionDefaultConf(String hostPort, String action) {
322            hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
323            Map<String, XConfiguration> hostPortActionConfigs = actionConfigs.get(hostPort);
324            if (hostPortActionConfigs == null) {
325                hostPortActionConfigs = actionConfigs.get("*");
326                hostPort = "*";
327            }
328            XConfiguration actionConf = hostPortActionConfigs.get(action);
329            if (actionConf == null) {
330                // doing lazy loading as we don't know upfront all actions, no need to synchronize
331                // as it is a read operation an in case of a race condition loading and inserting
332                // into the Map is idempotent and the action-config Map is a ConcurrentHashMap
333                actionConf = loadActionConf(hostPort, action);
334                hostPortActionConfigs.put(action, actionConf);
335            }
336            return new XConfiguration(actionConf.toProperties());
337        }
338    
339        private Configuration getConfiguration(String hostPort) {
340            hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
341            Configuration conf = hadoopConfigs.get(hostPort);
342            if (conf == null) {
343                conf = hadoopConfigs.get("*");
344                if (conf == null) {
345                    conf = new XConfiguration();
346                }
347            }
348            return conf;
349        }
350    
351        /**
352         * Return a JobClient created with the provided user/group.
353         *
354         *
355         * @param conf JobConf with all necessary information to create the
356         *        JobClient.
357         * @return JobClient created with the provided user/group.
358         * @throws HadoopAccessorException if the client could not be created.
359         */
360        public JobClient createJobClient(String user, final JobConf conf) throws HadoopAccessorException {
361            ParamChecker.notEmpty(user, "user");
362            if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
363                throw new HadoopAccessorException(ErrorCode.E0903);
364            }
365            String jobTracker = conf.get(JavaActionExecutor.HADOOP_JOB_TRACKER);
366            validateJobTracker(jobTracker);
367            try {
368                UserGroupInformation ugi = getUGI(user);
369                JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() {
370                    public JobClient run() throws Exception {
371                        return new JobClient(conf);
372                    }
373                });
374                Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(getMRDelegationTokenRenewer(conf));
375                conf.getCredentials().addToken(MR_TOKEN_ALIAS, mrdt);
376                return jobClient;
377            }
378            catch (InterruptedException ex) {
379                throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
380            }
381            catch (IOException ex) {
382                throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
383            }
384        }
385    
386        /**
387         * Return a FileSystem created with the provided user for the specified URI.
388         *
389         *
390         * @param uri file system URI.
391         * @param conf Configuration with all necessary information to create the FileSystem.
392         * @return FileSystem created with the provided user/group.
393         * @throws HadoopAccessorException if the filesystem could not be created.
394         */
395        public FileSystem createFileSystem(String user, final URI uri, final Configuration conf)
396                throws HadoopAccessorException {
397            ParamChecker.notEmpty(user, "user");
398            if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
399                throw new HadoopAccessorException(ErrorCode.E0903);
400            }
401    
402            checkSupportedFilesystem(uri);
403    
404            String nameNode = uri.getAuthority();
405            if (nameNode == null) {
406                nameNode = conf.get("fs.default.name");
407                if (nameNode != null) {
408                    try {
409                        nameNode = new URI(nameNode).getAuthority();
410                    }
411                    catch (URISyntaxException ex) {
412                        throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
413                    }
414                }
415            }
416            validateNameNode(nameNode);
417    
418            try {
419                UserGroupInformation ugi = getUGI(user);
420                return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
421                    public FileSystem run() throws Exception {
422                        return FileSystem.get(uri, conf);
423                    }
424                });
425            }
426            catch (InterruptedException ex) {
427                throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
428            }
429            catch (IOException ex) {
430                throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
431            }
432        }
433    
434        /**
435         * Validate Job tracker
436         * @param jobTrackerUri
437         * @throws HadoopAccessorException
438         */
439        protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException {
440            validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900);
441        }
442    
443        /**
444         * Validate Namenode list
445         * @param nameNodeUri
446         * @throws HadoopAccessorException
447         */
448        protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException {
449            validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901);
450        }
451    
452        private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException {
453            if (uri != null) {
454                uri = uri.toLowerCase().trim();
455                if (whitelist.size() > 0 && !whitelist.contains(uri)) {
456                    throw new HadoopAccessorException(error, uri);
457                }
458            }
459        }
460    
461        public Text getMRDelegationTokenRenewer(JobConf jobConf) throws IOException {
462            if (UserGroupInformation.isSecurityEnabled()) { // secure cluster
463                return getMRTokenRenewerInternal(jobConf);
464            }
465            else {
466                return MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer
467            }
468        }
469    
470        // Package private for unit test purposes
471        Text getMRTokenRenewerInternal(JobConf jobConf) throws IOException {
472            // Getting renewer correctly for JT principal also though JT in hadoop 1.x does not have
473            // support for renewing/cancelling tokens
474            String servicePrincipal = jobConf.get(RM_PRINCIPAL, jobConf.get(JT_PRINCIPAL));
475            Text renewer;
476            if (servicePrincipal != null) { // secure cluster
477                renewer = mrTokenRenewers.get(servicePrincipal);
478                if (renewer == null) {
479                    // Mimic org.apache.hadoop.mapred.Master.getMasterPrincipal()
480                    String target = jobConf.get(HADOOP_YARN_RM, jobConf.get(HADOOP_JOB_TRACKER_2));
481                    if (target == null) {
482                        target = jobConf.get(HADOOP_JOB_TRACKER);
483                    }
484                    String addr = NetUtils.createSocketAddr(target).getHostName();
485                    renewer = new Text(SecurityUtil.getServerPrincipal(servicePrincipal, addr));
486                    LOG.info("Delegation Token Renewer details: Principal=" + servicePrincipal + ",Target=" + target
487                            + ",Renewer=" + renewer);
488                    mrTokenRenewers.put(servicePrincipal, renewer);
489                }
490            }
491            else {
492                renewer = MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer
493            }
494            return renewer;
495        }
496    
497        public void addFileToClassPath(String user, final Path file, final Configuration conf)
498                throws IOException {
499            ParamChecker.notEmpty(user, "user");
500            try {
501                UserGroupInformation ugi = getUGI(user);
502                ugi.doAs(new PrivilegedExceptionAction<Void>() {
503                    public Void run() throws Exception {
504                        Configuration defaultConf = new Configuration();
505                        XConfiguration.copy(conf, defaultConf);
506                        //Doing this NOP add first to have the FS created and cached
507                        DistributedCache.addFileToClassPath(file, defaultConf);
508    
509                        DistributedCache.addFileToClassPath(file, conf);
510                        return null;
511                    }
512                });
513    
514            }
515            catch (InterruptedException ex) {
516                throw new IOException(ex);
517            }
518    
519        }
520    
521        /**
522         * checks configuration parameter if filesystem scheme is among the list of supported ones
523         * this makes system robust to filesystems other than HDFS also
524         */
525    
526        public void checkSupportedFilesystem(URI uri) throws HadoopAccessorException {
527            if (allSchemesSupported)
528                return;
529            String uriScheme = uri.getScheme();
530            if (uriScheme != null) {    // skip the check if no scheme is given
531                if(!supportedSchemes.isEmpty()) {
532                    XLog.getLog(this.getClass()).debug("Checking if filesystem " + uriScheme + " is supported");
533                    if (!supportedSchemes.contains(uriScheme)) {
534                        throw new HadoopAccessorException(ErrorCode.E0904, uriScheme, uri.toString());
535                    }
536                 }
537             }
538        }
539    
540        public Set<String> getSupportedSchemes() {
541            return supportedSchemes;
542        }
543    
544    }