001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import org.apache.hadoop.io.Text;
021    import org.apache.hadoop.mapred.JobClient;
022    import org.apache.hadoop.mapred.JobConf;
023    import org.apache.hadoop.fs.FileSystem;
024    import org.apache.hadoop.fs.Path;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
027    import org.apache.hadoop.net.NetUtils;
028    import org.apache.hadoop.security.SecurityUtil;
029    import org.apache.hadoop.security.UserGroupInformation;
030    import org.apache.hadoop.filecache.DistributedCache;
031    import org.apache.hadoop.security.token.Token;
032    import org.apache.oozie.ErrorCode;
033    import org.apache.oozie.action.hadoop.JavaActionExecutor;
034    import org.apache.oozie.util.ParamChecker;
035    import org.apache.oozie.util.XConfiguration;
036    import org.apache.oozie.util.XLog;
037    
038    import java.io.File;
039    import java.io.FileInputStream;
040    import java.io.IOException;
041    import java.io.InputStream;
042    import java.net.URI;
043    import java.net.URISyntaxException;
044    import java.security.PrivilegedExceptionAction;
045    import java.util.HashMap;
046    import java.util.Map;
047    import java.util.Set;
048    import java.util.HashSet;
049    import java.util.concurrent.ConcurrentHashMap;
050    import java.util.concurrent.ConcurrentMap;
051    
052    /**
053     * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The
054     * default accessor used is the base accessor which just injects the UGI into the configuration instance used to
055     * create/obtain JobClient and ileSystem instances. <p/> The HadoopAccess class to use can be configured in the
056     * <code>oozie-site.xml</code> using the <code>oozie.service.HadoopAccessorService.accessor.class</code> property.
057     */
058    public class HadoopAccessorService implements Service {
059    
060        private static XLog LOG = XLog.getLog(HadoopAccessorService.class);
061    
062        public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService.";
063        public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist";
064        public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist";
065        public static final String HADOOP_CONFS = CONF_PREFIX + "hadoop.configurations";
066        public static final String ACTION_CONFS = CONF_PREFIX + "action.configurations";
067        public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled";
068        public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file";
069        public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal";
070        public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token");
071    
072        private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
073        /** The Kerberos principal for the job tracker.*/
074        private static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
075        /** The Kerberos principal for the resource manager.*/
076        private static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
077        private static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
078        private static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
079        private static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
080        private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>();
081    
082        private Set<String> jobTrackerWhitelist = new HashSet<String>();
083        private Set<String> nameNodeWhitelist = new HashSet<String>();
084        private Map<String, Configuration> hadoopConfigs = new HashMap<String, Configuration>();
085        private Map<String, File> actionConfigDirs = new HashMap<String, File>();
086        private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>();
087    
088        private ConcurrentMap<String, UserGroupInformation> userUgiMap;
089    
090        /**
091         * Supported filesystem schemes for namespace federation
092         */
093        public static final String SUPPORTED_FILESYSTEMS = CONF_PREFIX + "supported.filesystems";
094        private Set<String> supportedSchemes;
095    
096        public void init(Services services) throws ServiceException {
097            init(services.getConf());
098        }
099    
100        //for testing purposes, see XFsTestCase
101        public void init(Configuration conf) throws ServiceException {
102            for (String name : conf.getStringCollection(JOB_TRACKER_WHITELIST)) {
103                String tmp = name.toLowerCase().trim();
104                if (tmp.length() == 0) {
105                    continue;
106                }
107                jobTrackerWhitelist.add(tmp);
108            }
109            XLog.getLog(getClass()).info(
110                    "JOB_TRACKER_WHITELIST :" + conf.getStringCollection(JOB_TRACKER_WHITELIST)
111                            + ", Total entries :" + jobTrackerWhitelist.size());
112            for (String name : conf.getStringCollection(NAME_NODE_WHITELIST)) {
113                String tmp = name.toLowerCase().trim();
114                if (tmp.length() == 0) {
115                    continue;
116                }
117                nameNodeWhitelist.add(tmp);
118            }
119            XLog.getLog(getClass()).info(
120                    "NAME_NODE_WHITELIST :" + conf.getStringCollection(NAME_NODE_WHITELIST)
121                            + ", Total entries :" + nameNodeWhitelist.size());
122    
123            boolean kerberosAuthOn = conf.getBoolean(KERBEROS_AUTH_ENABLED, true);
124            XLog.getLog(getClass()).info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled");
125            if (kerberosAuthOn) {
126                kerberosInit(conf);
127            }
128            else {
129                Configuration ugiConf = new Configuration();
130                ugiConf.set("hadoop.security.authentication", "simple");
131                UserGroupInformation.setConfiguration(ugiConf);
132            }
133    
134            userUgiMap = new ConcurrentHashMap<String, UserGroupInformation>();
135    
136            loadHadoopConfigs(conf);
137            preLoadActionConfigs(conf);
138    
139            supportedSchemes = new HashSet<String>();
140            String[] schemesFromConf = conf.getStrings(SUPPORTED_FILESYSTEMS, new String[]{"hdfs","hftp","webhdfs"});
141            if(schemesFromConf != null) {
142                for (String scheme: schemesFromConf) {
143                    scheme = scheme.trim();
144                    // If user gives "*", supportedSchemes will be empty, so that checking is not done i.e. all schemes allowed
145                    if(scheme.equals("*")) {
146                        if(schemesFromConf.length > 1) {
147                            throw new ServiceException(ErrorCode.E0100, getClass().getName(),
148                                SUPPORTED_FILESYSTEMS + " should contain either only wildcard or explicit list, not both");
149                        }
150                    } else {
151                        supportedSchemes.add(scheme);
152                    }
153                }
154            }
155        }
156    
157        private void kerberosInit(Configuration serviceConf) throws ServiceException {
158                try {
159                    String keytabFile = serviceConf.get(KERBEROS_KEYTAB,
160                                                        System.getProperty("user.home") + "/oozie.keytab").trim();
161                    if (keytabFile.length() == 0) {
162                        throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB);
163                    }
164                    String principal = serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
165                    if (principal.length() == 0) {
166                        throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
167                    }
168                    Configuration conf = new Configuration();
169                    conf.set("hadoop.security.authentication", "kerberos");
170                    UserGroupInformation.setConfiguration(conf);
171                    UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
172                    XLog.getLog(getClass()).info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]",
173                                                 keytabFile, principal);
174                }
175                catch (ServiceException ex) {
176                    throw ex;
177                }
178                catch (Exception ex) {
179                    throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
180                }
181        }
182    
183        private static final String[] HADOOP_CONF_FILES =
184            {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hadoop-site.xml"};
185    
186    
187        private Configuration loadHadoopConf(File dir) throws IOException {
188            Configuration hadoopConf = new XConfiguration();
189            for (String file : HADOOP_CONF_FILES) {
190                File f = new File(dir, file);
191                if (f.exists()) {
192                    InputStream is = new FileInputStream(f);
193                    Configuration conf = new XConfiguration(is);
194                    is.close();
195                    XConfiguration.copy(conf, hadoopConf);
196                }
197            }
198            return hadoopConf;
199        }
200    
201        private Map<String, File> parseConfigDirs(String[] confDefs, String type) throws ServiceException, IOException {
202            Map<String, File> map = new HashMap<String, File>();
203            File configDir = new File(ConfigurationService.getConfigurationDirectory());
204            for (String confDef : confDefs) {
205                if (confDef.trim().length() > 0) {
206                    String[] parts = confDef.split("=");
207                    if (parts.length == 2) {
208                        String hostPort = parts[0];
209                        String confDir = parts[1];
210                        File dir = new File(confDir);
211                        if (!dir.isAbsolute()) {
212                            dir = new File(configDir, confDir);
213                        }
214                        if (dir.exists()) {
215                            map.put(hostPort.toLowerCase(), dir);
216                        }
217                        else {
218                            throw new ServiceException(ErrorCode.E0100, getClass().getName(),
219                                                       "could not find " + type + " configuration directory: " +
220                                                       dir.getAbsolutePath());
221                        }
222                    }
223                    else {
224                        throw new ServiceException(ErrorCode.E0100, getClass().getName(),
225                                                   "Incorrect " + type + " configuration definition: " + confDef);
226                    }
227                }
228            }
229            return map;
230        }
231    
232        private void loadHadoopConfigs(Configuration serviceConf) throws ServiceException {
233            try {
234                Map<String, File> map = parseConfigDirs(serviceConf.getStrings(HADOOP_CONFS, "*=hadoop-conf"), "hadoop");
235                for (Map.Entry<String, File> entry : map.entrySet()) {
236                    hadoopConfigs.put(entry.getKey(), loadHadoopConf(entry.getValue()));
237                }
238            }
239            catch (ServiceException ex) {
240                throw ex;
241            }
242            catch (Exception ex) {
243                throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
244            }
245        }
246    
247        private void preLoadActionConfigs(Configuration serviceConf) throws ServiceException {
248            try {
249                actionConfigDirs = parseConfigDirs(serviceConf.getStrings(ACTION_CONFS, "*=hadoop-conf"), "action");
250                for (String hostport : actionConfigDirs.keySet()) {
251                    actionConfigs.put(hostport, new ConcurrentHashMap<String, XConfiguration>());
252                }
253            }
254            catch (ServiceException ex) {
255                throw ex;
256            }
257            catch (Exception ex) {
258                throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
259            }
260        }
261    
262        public void destroy() {
263        }
264    
265        public Class<? extends Service> getInterface() {
266            return HadoopAccessorService.class;
267        }
268    
269        private UserGroupInformation getUGI(String user) throws IOException {
270            UserGroupInformation ugi = userUgiMap.get(user);
271            if (ugi == null) {
272                // taking care of a race condition, the latest UGI will be discarded
273                ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
274                userUgiMap.putIfAbsent(user, ugi);
275            }
276            return ugi;
277        }
278    
279        /**
280         * Creates a JobConf using the site configuration for the specified hostname:port.
281         * <p/>
282         * If the specified hostname:port is not defined it falls back to the '*' site
283         * configuration if available. If the '*' site configuration is not available,
284         * the JobConf has all Hadoop defaults.
285         *
286         * @param hostPort hostname:port to lookup Hadoop site configuration.
287         * @return a JobConf with the corresponding site configuration for hostPort.
288         */
289        public JobConf createJobConf(String hostPort) {
290            JobConf jobConf = new JobConf();
291            XConfiguration.copy(getConfiguration(hostPort), jobConf);
292            jobConf.setBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, true);
293            return jobConf;
294        }
295    
296        private XConfiguration loadActionConf(String hostPort, String action) {
297            File dir = actionConfigDirs.get(hostPort);
298            XConfiguration actionConf = new XConfiguration();
299            if (dir != null) {
300                File actionConfFile = new File(dir, action + ".xml");
301                if (actionConfFile.exists()) {
302                    try {
303                        actionConf = new XConfiguration(new FileInputStream(actionConfFile));
304                    }
305                    catch (IOException ex) {
306                        XLog.getLog(getClass()).warn("Could not read file [{0}] for action [{1}] configuration for hostPort [{2}]",
307                                                     actionConfFile.getAbsolutePath(), action, hostPort);
308                    }
309                }
310            }
311            return actionConf;
312        }
313    
314        /**
315         * Returns a Configuration containing any defaults for an action for a particular cluster.
316         * <p/>
317         * This configuration is used as default for the action configuration and enables cluster
318         * level default values per action.
319         *
320         * @param hostPort hostname"port to lookup the action default confiugration.
321         * @param action action name.
322         * @return the default configuration for the action for the specified cluster.
323         */
324        public XConfiguration createActionDefaultConf(String hostPort, String action) {
325            hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
326            Map<String, XConfiguration> hostPortActionConfigs = actionConfigs.get(hostPort);
327            if (hostPortActionConfigs == null) {
328                hostPortActionConfigs = actionConfigs.get("*");
329                hostPort = "*";
330            }
331            XConfiguration actionConf = hostPortActionConfigs.get(action);
332            if (actionConf == null) {
333                // doing lazy loading as we don't know upfront all actions, no need to synchronize
334                // as it is a read operation an in case of a race condition loading and inserting
335                // into the Map is idempotent and the action-config Map is a ConcurrentHashMap
336                actionConf = loadActionConf(hostPort, action);
337                hostPortActionConfigs.put(action, actionConf);
338            }
339            return new XConfiguration(actionConf.toProperties());
340        }
341    
342        private Configuration getConfiguration(String hostPort) {
343            hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
344            Configuration conf = hadoopConfigs.get(hostPort);
345            if (conf == null) {
346                conf = hadoopConfigs.get("*");
347                if (conf == null) {
348                    conf = new XConfiguration();
349                }
350            }
351            return conf;
352        }
353    
354        /**
355         * Return a JobClient created with the provided user/group.
356         *
357         *
358         * @param conf JobConf with all necessary information to create the
359         *        JobClient.
360         * @return JobClient created with the provided user/group.
361         * @throws HadoopAccessorException if the client could not be created.
362         */
363        public JobClient createJobClient(String user, final JobConf conf) throws HadoopAccessorException {
364            ParamChecker.notEmpty(user, "user");
365            if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
366                throw new HadoopAccessorException(ErrorCode.E0903);
367            }
368            String jobTracker = conf.get(JavaActionExecutor.HADOOP_JOB_TRACKER);
369            validateJobTracker(jobTracker);
370            try {
371                UserGroupInformation ugi = getUGI(user);
372                JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() {
373                    public JobClient run() throws Exception {
374                        return new JobClient(conf);
375                    }
376                });
377                Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(getMRDelegationTokenRenewer(conf));
378                conf.getCredentials().addToken(MR_TOKEN_ALIAS, mrdt);
379                return jobClient;
380            }
381            catch (InterruptedException ex) {
382                throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
383            }
384            catch (IOException ex) {
385                throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
386            }
387        }
388    
389        /**
390         * Return a FileSystem created with the provided user for the specified URI.
391         *
392         *
393         * @param uri file system URI.
394         * @param conf Configuration with all necessary information to create the FileSystem.
395         * @return FileSystem created with the provided user/group.
396         * @throws HadoopAccessorException if the filesystem could not be created.
397         */
398        public FileSystem createFileSystem(String user, final URI uri, final Configuration conf)
399                throws HadoopAccessorException {
400            ParamChecker.notEmpty(user, "user");
401            if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
402                throw new HadoopAccessorException(ErrorCode.E0903);
403            }
404    
405            checkSupportedFilesystem(uri);
406    
407            String nameNode = uri.getAuthority();
408            if (nameNode == null) {
409                nameNode = conf.get("fs.default.name");
410                if (nameNode != null) {
411                    try {
412                        nameNode = new URI(nameNode).getAuthority();
413                    }
414                    catch (URISyntaxException ex) {
415                        throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
416                    }
417                }
418            }
419            validateNameNode(nameNode);
420    
421            try {
422                UserGroupInformation ugi = getUGI(user);
423                return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
424                    public FileSystem run() throws Exception {
425                        return FileSystem.get(uri, conf);
426                    }
427                });
428            }
429            catch (InterruptedException ex) {
430                throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
431            }
432            catch (IOException ex) {
433                throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
434            }
435        }
436    
437        /**
438         * Validate Job tracker
439         * @param jobTrackerUri
440         * @throws HadoopAccessorException
441         */
442        protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException {
443            validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900);
444        }
445    
446        /**
447         * Validate Namenode list
448         * @param nameNodeUri
449         * @throws HadoopAccessorException
450         */
451        protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException {
452            validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901);
453        }
454    
455        private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException {
456            if (uri != null) {
457                uri = uri.toLowerCase().trim();
458                if (whitelist.size() > 0 && !whitelist.contains(uri)) {
459                    throw new HadoopAccessorException(error, uri);
460                }
461            }
462        }
463    
464        public static Text getMRDelegationTokenRenewer(JobConf jobConf) throws IOException {
465            if (UserGroupInformation.isSecurityEnabled()) { // secure cluster
466                return getMRTokenRenewerInternal(jobConf);
467            }
468            else {
469                return MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer
470            }
471        }
472    
473        // Package private for unit test purposes
474        static Text getMRTokenRenewerInternal(JobConf jobConf) throws IOException {
475            // Getting renewer correctly for JT principal also though JT in hadoop 1.x does not have
476            // support for renewing/cancelling tokens
477            String servicePrincipal = jobConf.get(RM_PRINCIPAL, jobConf.get(JT_PRINCIPAL));
478            Text renewer;
479            if (servicePrincipal != null) { // secure cluster
480                renewer = mrTokenRenewers.get(servicePrincipal);
481                if (renewer == null) {
482                    // Mimic org.apache.hadoop.mapred.Master.getMasterPrincipal()
483                    String target = jobConf.get(HADOOP_YARN_RM, jobConf.get(HADOOP_JOB_TRACKER_2));
484                    if (target == null) {
485                        target = jobConf.get(HADOOP_JOB_TRACKER);
486                    }
487                    String addr = NetUtils.createSocketAddr(target).getHostName();
488                    renewer = new Text(SecurityUtil.getServerPrincipal(servicePrincipal, addr));
489                    LOG.info("Delegation Token Renewer details: Principal=" + servicePrincipal + ",Target=" + target
490                            + ",Renewer=" + renewer);
491                    mrTokenRenewers.put(servicePrincipal, renewer);
492                }
493            }
494            else {
495                renewer = MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer
496            }
497            return renewer;
498        }
499    
500        public void addFileToClassPath(String user, final Path file, final Configuration conf)
501                throws IOException {
502            ParamChecker.notEmpty(user, "user");
503            try {
504                UserGroupInformation ugi = getUGI(user);
505                ugi.doAs(new PrivilegedExceptionAction<Void>() {
506                    public Void run() throws Exception {
507                        Configuration defaultConf = new Configuration();
508                        XConfiguration.copy(conf, defaultConf);
509                        //Doing this NOP add first to have the FS created and cached
510                        DistributedCache.addFileToClassPath(file, defaultConf);
511    
512                        DistributedCache.addFileToClassPath(file, conf);
513                        return null;
514                    }
515                });
516    
517            }
518            catch (InterruptedException ex) {
519                throw new IOException(ex);
520            }
521    
522        }
523    
524        /**
525         * checks configuration parameter if filesystem scheme is among the list of supported ones
526         * this makes system robust to filesystems other than HDFS also
527         */
528    
529        public void checkSupportedFilesystem(URI uri) throws HadoopAccessorException {
530            String uriScheme = uri.getScheme();
531            if (uriScheme != null) {    // skip the check if no scheme is given
532                if(!supportedSchemes.isEmpty()) {
533                    XLog.getLog(this.getClass()).debug("Checking if filesystem " + uriScheme + " is supported");
534                    if (!supportedSchemes.contains(uriScheme)) {
535                        throw new HadoopAccessorException(ErrorCode.E0904, uriScheme, uri.toString());
536                    }
537                }
538            }
539        }
540    
541    }