001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import org.apache.hadoop.io.Text;
021    import org.apache.hadoop.mapred.JobClient;
022    import org.apache.hadoop.mapred.JobConf;
023    import org.apache.hadoop.fs.FileSystem;
024    import org.apache.hadoop.fs.Path;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
027    import org.apache.hadoop.net.NetUtils;
028    import org.apache.hadoop.security.SecurityUtil;
029    import org.apache.hadoop.security.UserGroupInformation;
030    import org.apache.hadoop.filecache.DistributedCache;
031    import org.apache.hadoop.security.token.Token;
032    import org.apache.oozie.ErrorCode;
033    import org.apache.oozie.util.ParamChecker;
034    import org.apache.oozie.util.XConfiguration;
035    import org.apache.oozie.util.XLog;
036    
037    import java.io.File;
038    import java.io.FileInputStream;
039    import java.io.IOException;
040    import java.io.InputStream;
041    import java.net.URI;
042    import java.net.URISyntaxException;
043    import java.security.PrivilegedExceptionAction;
044    import java.util.HashMap;
045    import java.util.Map;
046    import java.util.Set;
047    import java.util.HashSet;
048    import java.util.concurrent.ConcurrentHashMap;
049    import java.util.concurrent.ConcurrentMap;
050    
051    /**
052     * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The
053     * default accessor used is the base accessor which just injects the UGI into the configuration instance used to
054     * create/obtain JobClient and ileSystem instances. <p/> The HadoopAccess class to use can be configured in the
055     * <code>oozie-site.xml</code> using the <code>oozie.service.HadoopAccessorService.accessor.class</code> property.
056     */
057    public class HadoopAccessorService implements Service {
058    
059        private static XLog LOG = XLog.getLog(HadoopAccessorService.class);
060    
061        public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService.";
062        public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist";
063        public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist";
064        public static final String HADOOP_CONFS = CONF_PREFIX + "hadoop.configurations";
065        public static final String ACTION_CONFS = CONF_PREFIX + "action.configurations";
066        public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled";
067        public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file";
068        public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal";
069        public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token");
070    
071        private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
072        /** The Kerberos principal for the job tracker.*/
073        private static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
074        /** The Kerberos principal for the resource manager.*/
075        private static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
076        private static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
077        private static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
078        private static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
079        private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>();
080    
081        private Set<String> jobTrackerWhitelist = new HashSet<String>();
082        private Set<String> nameNodeWhitelist = new HashSet<String>();
083        private Map<String, Configuration> hadoopConfigs = new HashMap<String, Configuration>();
084        private Map<String, File> actionConfigDirs = new HashMap<String, File>();
085        private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>();
086    
087        private ConcurrentMap<String, UserGroupInformation> userUgiMap;
088    
089        /**
090         * Supported filesystem schemes for namespace federation
091         */
092        public static final String SUPPORTED_FILESYSTEMS = CONF_PREFIX + "supported.filesystems";
093        private Set<String> supportedSchemes;
094    
095        public void init(Services services) throws ServiceException {
096            init(services.getConf());
097        }
098    
099        //for testing purposes, see XFsTestCase
100        public void init(Configuration conf) throws ServiceException {
101            for (String name : conf.getStringCollection(JOB_TRACKER_WHITELIST)) {
102                String tmp = name.toLowerCase().trim();
103                if (tmp.length() == 0) {
104                    continue;
105                }
106                jobTrackerWhitelist.add(tmp);
107            }
108            XLog.getLog(getClass()).info(
109                    "JOB_TRACKER_WHITELIST :" + conf.getStringCollection(JOB_TRACKER_WHITELIST)
110                            + ", Total entries :" + jobTrackerWhitelist.size());
111            for (String name : conf.getStringCollection(NAME_NODE_WHITELIST)) {
112                String tmp = name.toLowerCase().trim();
113                if (tmp.length() == 0) {
114                    continue;
115                }
116                nameNodeWhitelist.add(tmp);
117            }
118            XLog.getLog(getClass()).info(
119                    "NAME_NODE_WHITELIST :" + conf.getStringCollection(NAME_NODE_WHITELIST)
120                            + ", Total entries :" + nameNodeWhitelist.size());
121    
122            boolean kerberosAuthOn = conf.getBoolean(KERBEROS_AUTH_ENABLED, true);
123            XLog.getLog(getClass()).info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled");
124            if (kerberosAuthOn) {
125                kerberosInit(conf);
126            }
127            else {
128                Configuration ugiConf = new Configuration();
129                ugiConf.set("hadoop.security.authentication", "simple");
130                UserGroupInformation.setConfiguration(ugiConf);
131            }
132    
133            userUgiMap = new ConcurrentHashMap<String, UserGroupInformation>();
134    
135            loadHadoopConfigs(conf);
136            preLoadActionConfigs(conf);
137    
138            supportedSchemes = new HashSet<String>();
139            String[] schemesFromConf = conf.getStrings(SUPPORTED_FILESYSTEMS, new String[]{"hdfs","hftp","webhdfs"});
140            if(schemesFromConf != null) {
141                for (String scheme: schemesFromConf) {
142                    scheme = scheme.trim();
143                    // If user gives "*", supportedSchemes will be empty, so that checking is not done i.e. all schemes allowed
144                    if(scheme.equals("*")) {
145                        if(schemesFromConf.length > 1) {
146                            throw new ServiceException(ErrorCode.E0100, getClass().getName(),
147                                SUPPORTED_FILESYSTEMS + " should contain either only wildcard or explicit list, not both");
148                        }
149                    } else {
150                        supportedSchemes.add(scheme);
151                    }
152                }
153            }
154        }
155    
156        private void kerberosInit(Configuration serviceConf) throws ServiceException {
157                try {
158                    String keytabFile = serviceConf.get(KERBEROS_KEYTAB,
159                                                        System.getProperty("user.home") + "/oozie.keytab").trim();
160                    if (keytabFile.length() == 0) {
161                        throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB);
162                    }
163                    String principal = serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
164                    if (principal.length() == 0) {
165                        throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
166                    }
167                    Configuration conf = new Configuration();
168                    conf.set("hadoop.security.authentication", "kerberos");
169                    UserGroupInformation.setConfiguration(conf);
170                    UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
171                    XLog.getLog(getClass()).info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]",
172                                                 keytabFile, principal);
173                }
174                catch (ServiceException ex) {
175                    throw ex;
176                }
177                catch (Exception ex) {
178                    throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
179                }
180        }
181    
182        private static final String[] HADOOP_CONF_FILES =
183            {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hadoop-site.xml"};
184    
185    
186        private Configuration loadHadoopConf(File dir) throws IOException {
187            Configuration hadoopConf = new XConfiguration();
188            for (String file : HADOOP_CONF_FILES) {
189                File f = new File(dir, file);
190                if (f.exists()) {
191                    InputStream is = new FileInputStream(f);
192                    Configuration conf = new XConfiguration(is);
193                    is.close();
194                    XConfiguration.copy(conf, hadoopConf);
195                }
196            }
197            return hadoopConf;
198        }
199    
200        private Map<String, File> parseConfigDirs(String[] confDefs, String type) throws ServiceException, IOException {
201            Map<String, File> map = new HashMap<String, File>();
202            File configDir = new File(ConfigurationService.getConfigurationDirectory());
203            for (String confDef : confDefs) {
204                if (confDef.trim().length() > 0) {
205                    String[] parts = confDef.split("=");
206                    if (parts.length == 2) {
207                        String hostPort = parts[0];
208                        String confDir = parts[1];
209                        File dir = new File(confDir);
210                        if (!dir.isAbsolute()) {
211                            dir = new File(configDir, confDir);
212                        }
213                        if (dir.exists()) {
214                            map.put(hostPort.toLowerCase(), dir);
215                        }
216                        else {
217                            throw new ServiceException(ErrorCode.E0100, getClass().getName(),
218                                                       "could not find " + type + " configuration directory: " +
219                                                       dir.getAbsolutePath());
220                        }
221                    }
222                    else {
223                        throw new ServiceException(ErrorCode.E0100, getClass().getName(),
224                                                   "Incorrect " + type + " configuration definition: " + confDef);
225                    }
226                }
227            }
228            return map;
229        }
230    
231        private void loadHadoopConfigs(Configuration serviceConf) throws ServiceException {
232            try {
233                Map<String, File> map = parseConfigDirs(serviceConf.getStrings(HADOOP_CONFS, "*=hadoop-conf"), "hadoop");
234                for (Map.Entry<String, File> entry : map.entrySet()) {
235                    hadoopConfigs.put(entry.getKey(), loadHadoopConf(entry.getValue()));
236                }
237            }
238            catch (ServiceException ex) {
239                throw ex;
240            }
241            catch (Exception ex) {
242                throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
243            }
244        }
245    
246        private void preLoadActionConfigs(Configuration serviceConf) throws ServiceException {
247            try {
248                actionConfigDirs = parseConfigDirs(serviceConf.getStrings(ACTION_CONFS, "*=hadoop-conf"), "action");
249                for (String hostport : actionConfigDirs.keySet()) {
250                    actionConfigs.put(hostport, new ConcurrentHashMap<String, XConfiguration>());
251                }
252            }
253            catch (ServiceException ex) {
254                throw ex;
255            }
256            catch (Exception ex) {
257                throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
258            }
259        }
260    
261        public void destroy() {
262        }
263    
264        public Class<? extends Service> getInterface() {
265            return HadoopAccessorService.class;
266        }
267    
268        private UserGroupInformation getUGI(String user) throws IOException {
269            UserGroupInformation ugi = userUgiMap.get(user);
270            if (ugi == null) {
271                // taking care of a race condition, the latest UGI will be discarded
272                ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
273                userUgiMap.putIfAbsent(user, ugi);
274            }
275            return ugi;
276        }
277    
278        /**
279         * Creates a JobConf using the site configuration for the specified hostname:port.
280         * <p/>
281         * If the specified hostname:port is not defined it falls back to the '*' site
282         * configuration if available. If the '*' site configuration is not available,
283         * the JobConf has all Hadoop defaults.
284         *
285         * @param hostPort hostname:port to lookup Hadoop site configuration.
286         * @return a JobConf with the corresponding site configuration for hostPort.
287         */
288        public JobConf createJobConf(String hostPort) {
289            JobConf jobConf = new JobConf();
290            XConfiguration.copy(getConfiguration(hostPort), jobConf);
291            jobConf.setBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, true);
292            return jobConf;
293        }
294    
295        private XConfiguration loadActionConf(String hostPort, String action) {
296            File dir = actionConfigDirs.get(hostPort);
297            XConfiguration actionConf = new XConfiguration();
298            if (dir != null) {
299                File actionConfFile = new File(dir, action + ".xml");
300                if (actionConfFile.exists()) {
301                    try {
302                        actionConf = new XConfiguration(new FileInputStream(actionConfFile));
303                    }
304                    catch (IOException ex) {
305                        XLog.getLog(getClass()).warn("Could not read file [{0}] for action [{1}] configuration for hostPort [{2}]",
306                                                     actionConfFile.getAbsolutePath(), action, hostPort);
307                    }
308                }
309            }
310            return actionConf;
311        }
312    
313        /**
314         * Returns a Configuration containing any defaults for an action for a particular cluster.
315         * <p/>
316         * This configuration is used as default for the action configuration and enables cluster
317         * level default values per action.
318         *
319         * @param hostPort hostname"port to lookup the action default confiugration.
320         * @param action action name.
321         * @return the default configuration for the action for the specified cluster.
322         */
323        public XConfiguration createActionDefaultConf(String hostPort, String action) {
324            hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
325            Map<String, XConfiguration> hostPortActionConfigs = actionConfigs.get(hostPort);
326            if (hostPortActionConfigs == null) {
327                hostPortActionConfigs = actionConfigs.get("*");
328                hostPort = "*";
329            }
330            XConfiguration actionConf = hostPortActionConfigs.get(action);
331            if (actionConf == null) {
332                // doing lazy loading as we don't know upfront all actions, no need to synchronize
333                // as it is a read operation an in case of a race condition loading and inserting
334                // into the Map is idempotent and the action-config Map is a ConcurrentHashMap
335                actionConf = loadActionConf(hostPort, action);
336                hostPortActionConfigs.put(action, actionConf);
337            }
338            return new XConfiguration(actionConf.toProperties());
339        }
340    
341        private Configuration getConfiguration(String hostPort) {
342            hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
343            Configuration conf = hadoopConfigs.get(hostPort);
344            if (conf == null) {
345                conf = hadoopConfigs.get("*");
346                if (conf == null) {
347                    conf = new XConfiguration();
348                }
349            }
350            return conf;
351        }
352    
353        /**
354         * Return a JobClient created with the provided user/group.
355         *
356         *
357         * @param conf JobConf with all necessary information to create the
358         *        JobClient.
359         * @return JobClient created with the provided user/group.
360         * @throws HadoopAccessorException if the client could not be created.
361         */
362        public JobClient createJobClient(String user, final JobConf conf) throws HadoopAccessorException {
363            ParamChecker.notEmpty(user, "user");
364            if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
365                throw new HadoopAccessorException(ErrorCode.E0903);
366            }
367            String jobTracker = conf.get("mapred.job.tracker");
368            validateJobTracker(jobTracker);
369            try {
370                UserGroupInformation ugi = getUGI(user);
371                JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() {
372                    public JobClient run() throws Exception {
373                        return new JobClient(conf);
374                    }
375                });
376                Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(getMRDelegationTokenRenewer(conf));
377                conf.getCredentials().addToken(MR_TOKEN_ALIAS, mrdt);
378                return jobClient;
379            }
380            catch (InterruptedException ex) {
381                throw new HadoopAccessorException(ErrorCode.E0902, ex);
382            }
383            catch (IOException ex) {
384                throw new HadoopAccessorException(ErrorCode.E0902, ex);
385            }
386        }
387    
388        /**
389         * Return a FileSystem created with the provided user for the specified URI.
390         *
391         *
392         * @param uri file system URI.
393         * @param conf Configuration with all necessary information to create the FileSystem.
394         * @return FileSystem created with the provided user/group.
395         * @throws HadoopAccessorException if the filesystem could not be created.
396         */
397        public FileSystem createFileSystem(String user, final URI uri, final Configuration conf)
398                throws HadoopAccessorException {
399            ParamChecker.notEmpty(user, "user");
400            if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
401                throw new HadoopAccessorException(ErrorCode.E0903);
402            }
403    
404            checkSupportedFilesystem(uri);
405    
406            String nameNode = uri.getAuthority();
407            if (nameNode == null) {
408                nameNode = conf.get("fs.default.name");
409                if (nameNode != null) {
410                    try {
411                        nameNode = new URI(nameNode).getAuthority();
412                    }
413                    catch (URISyntaxException ex) {
414                        throw new HadoopAccessorException(ErrorCode.E0902, ex);
415                    }
416                }
417            }
418            validateNameNode(nameNode);
419    
420            try {
421                UserGroupInformation ugi = getUGI(user);
422                return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
423                    public FileSystem run() throws Exception {
424                        return FileSystem.get(uri, conf);
425                    }
426                });
427            }
428            catch (InterruptedException ex) {
429                throw new HadoopAccessorException(ErrorCode.E0902, ex);
430            }
431            catch (IOException ex) {
432                throw new HadoopAccessorException(ErrorCode.E0902, ex);
433            }
434        }
435    
436        /**
437         * Validate Job tracker
438         * @param jobTrackerUri
439         * @throws HadoopAccessorException
440         */
441        protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException {
442            validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900);
443        }
444    
445        /**
446         * Validate Namenode list
447         * @param nameNodeUri
448         * @throws HadoopAccessorException
449         */
450        protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException {
451            validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901);
452        }
453    
454        private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException {
455            if (uri != null) {
456                uri = uri.toLowerCase().trim();
457                if (whitelist.size() > 0 && !whitelist.contains(uri)) {
458                    throw new HadoopAccessorException(error, uri);
459                }
460            }
461        }
462    
463        public static Text getMRDelegationTokenRenewer(JobConf jobConf) throws IOException {
464            if (UserGroupInformation.isSecurityEnabled()) { // secure cluster
465                return getMRTokenRenewerInternal(jobConf);
466            }
467            else {
468                return MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer
469            }
470        }
471    
472        // Package private for unit test purposes
473        static Text getMRTokenRenewerInternal(JobConf jobConf) throws IOException {
474            // Getting renewer correctly for JT principal also though JT in hadoop 1.x does not have
475            // support for renewing/cancelling tokens
476            String servicePrincipal = jobConf.get(RM_PRINCIPAL, jobConf.get(JT_PRINCIPAL));
477            Text renewer;
478            if (servicePrincipal != null) { // secure cluster
479                renewer = mrTokenRenewers.get(servicePrincipal);
480                if (renewer == null) {
481                    // Mimic org.apache.hadoop.mapred.Master.getMasterPrincipal()
482                    String target = jobConf.get(HADOOP_YARN_RM, jobConf.get(HADOOP_JOB_TRACKER_2));
483                    if (target == null) {
484                        target = jobConf.get(HADOOP_JOB_TRACKER);
485                    }
486                    String addr = NetUtils.createSocketAddr(target).getHostName();
487                    renewer = new Text(SecurityUtil.getServerPrincipal(servicePrincipal, addr));
488                    LOG.info("Delegation Token Renewer details: Principal=" + servicePrincipal + ",Target=" + target
489                            + ",Renewer=" + renewer);
490                    mrTokenRenewers.put(servicePrincipal, renewer);
491                }
492            }
493            else {
494                renewer = MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer
495            }
496            return renewer;
497        }
498    
499        public void addFileToClassPath(String user, final Path file, final Configuration conf)
500                throws IOException {
501            ParamChecker.notEmpty(user, "user");
502            try {
503                UserGroupInformation ugi = getUGI(user);
504                ugi.doAs(new PrivilegedExceptionAction<Void>() {
505                    public Void run() throws Exception {
506                        Configuration defaultConf = new Configuration();
507                        XConfiguration.copy(conf, defaultConf);
508                        //Doing this NOP add first to have the FS created and cached
509                        DistributedCache.addFileToClassPath(file, defaultConf);
510    
511                        DistributedCache.addFileToClassPath(file, conf);
512                        return null;
513                    }
514                });
515    
516            }
517            catch (InterruptedException ex) {
518                throw new IOException(ex);
519            }
520    
521        }
522    
523        /**
524         * checks configuration parameter if filesystem scheme is among the list of supported ones
525         * this makes system robust to filesystems other than HDFS also
526         */
527    
528        public void checkSupportedFilesystem(URI uri) throws HadoopAccessorException {
529            String uriScheme = uri.getScheme();
530            if(!supportedSchemes.isEmpty()) {
531                XLog.getLog(this.getClass()).debug("Checking if filesystem " + uriScheme + " is supported");
532                if (!supportedSchemes.contains(uriScheme)) {
533                    throw new HadoopAccessorException(ErrorCode.E0904, uriScheme, uri.toString());
534                }
535            }
536        }
537    
538    }