001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import org.apache.hadoop.io.Text;
021    import org.apache.hadoop.mapred.JobClient;
022    import org.apache.hadoop.mapred.JobConf;
023    import org.apache.hadoop.fs.FileSystem;
024    import org.apache.hadoop.fs.Path;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
027    import org.apache.hadoop.security.UserGroupInformation;
028    import org.apache.hadoop.filecache.DistributedCache;
029    import org.apache.hadoop.security.token.Token;
030    import org.apache.oozie.ErrorCode;
031    import org.apache.oozie.util.ParamChecker;
032    import org.apache.oozie.util.XConfiguration;
033    import org.apache.oozie.util.XLog;
034    
035    import java.io.File;
036    import java.io.FileInputStream;
037    import java.io.IOException;
038    import java.io.InputStream;
039    import java.net.URI;
040    import java.net.URISyntaxException;
041    import java.security.PrivilegedExceptionAction;
042    import java.util.HashMap;
043    import java.util.Map;
044    import java.util.Set;
045    import java.util.HashSet;
046    import java.util.concurrent.ConcurrentHashMap;
047    import java.util.concurrent.ConcurrentMap;
048    
049    /**
050     * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The
051     * default accessor used is the base accessor which just injects the UGI into the configuration instance used to
052     * create/obtain JobClient and ileSystem instances. <p/> The HadoopAccess class to use can be configured in the
053     * <code>oozie-site.xml</code> using the <code>oozie.service.HadoopAccessorService.accessor.class</code> property.
054     */
055    public class HadoopAccessorService implements Service {
056    
057        public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService.";
058        public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist";
059        public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist";
060        public static final String HADOOP_CONFS = CONF_PREFIX + "hadoop.configurations";
061        public static final String ACTION_CONFS = CONF_PREFIX + "action.configurations";
062        public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled";
063        public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file";
064        public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal";
065    
066        private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
067    
068        private Set<String> jobTrackerWhitelist = new HashSet<String>();
069        private Set<String> nameNodeWhitelist = new HashSet<String>();
070        private Map<String, Configuration> hadoopConfigs = new HashMap<String, Configuration>();
071        private Map<String, File> actionConfigDirs = new HashMap<String, File>();
072        private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>();
073    
074        private ConcurrentMap<String, UserGroupInformation> userUgiMap;
075    
076        /**
077         * Supported filesystem schemes for namespace federation
078         */
079        public static final String SUPPORTED_FILESYSTEMS = CONF_PREFIX + "supported.filesystems";
080        private Set<String> supportedSchemes;
081    
082        public void init(Services services) throws ServiceException {
083            init(services.getConf());
084        }
085    
086        //for testing purposes, see XFsTestCase
087        public void init(Configuration conf) throws ServiceException {
088            for (String name : conf.getStringCollection(JOB_TRACKER_WHITELIST)) {
089                String tmp = name.toLowerCase().trim();
090                if (tmp.length() == 0) {
091                    continue;
092                }
093                jobTrackerWhitelist.add(tmp);
094            }
095            XLog.getLog(getClass()).info(
096                    "JOB_TRACKER_WHITELIST :" + conf.getStringCollection(JOB_TRACKER_WHITELIST)
097                            + ", Total entries :" + jobTrackerWhitelist.size());
098            for (String name : conf.getStringCollection(NAME_NODE_WHITELIST)) {
099                String tmp = name.toLowerCase().trim();
100                if (tmp.length() == 0) {
101                    continue;
102                }
103                nameNodeWhitelist.add(tmp);
104            }
105            XLog.getLog(getClass()).info(
106                    "NAME_NODE_WHITELIST :" + conf.getStringCollection(NAME_NODE_WHITELIST)
107                            + ", Total entries :" + nameNodeWhitelist.size());
108    
109            boolean kerberosAuthOn = conf.getBoolean(KERBEROS_AUTH_ENABLED, true);
110            XLog.getLog(getClass()).info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled");
111            if (kerberosAuthOn) {
112                kerberosInit(conf);
113            }
114            else {
115                Configuration ugiConf = new Configuration();
116                ugiConf.set("hadoop.security.authentication", "simple");
117                UserGroupInformation.setConfiguration(ugiConf);
118            }
119    
120            userUgiMap = new ConcurrentHashMap<String, UserGroupInformation>();
121    
122            loadHadoopConfigs(conf);
123            preLoadActionConfigs(conf);
124    
125            supportedSchemes = new HashSet<String>();
126            String[] schemesFromConf = conf.getStrings(SUPPORTED_FILESYSTEMS, new String[]{"hdfs"});
127            if(schemesFromConf != null) {
128                for (String scheme: schemesFromConf) {
129                    scheme = scheme.trim();
130                    // If user gives "*", supportedSchemes will be empty, so that checking is not done i.e. all schemes allowed
131                    if(scheme.equals("*")) {
132                        if(schemesFromConf.length > 1) {
133                            throw new ServiceException(ErrorCode.E0100, getClass().getName(),
134                                SUPPORTED_FILESYSTEMS + " should contain either only wildcard or explicit list, not both");
135                        }
136                    } else {
137                        supportedSchemes.add(scheme);
138                    }
139                }
140            }
141        }
142    
143        private void kerberosInit(Configuration serviceConf) throws ServiceException {
144                try {
145                    String keytabFile = serviceConf.get(KERBEROS_KEYTAB,
146                                                        System.getProperty("user.home") + "/oozie.keytab").trim();
147                    if (keytabFile.length() == 0) {
148                        throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB);
149                    }
150                    String principal = serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
151                    if (principal.length() == 0) {
152                        throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
153                    }
154                    Configuration conf = new Configuration();
155                    conf.set("hadoop.security.authentication", "kerberos");
156                    UserGroupInformation.setConfiguration(conf);
157                    UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
158                    XLog.getLog(getClass()).info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]",
159                                                 keytabFile, principal);
160                }
161                catch (ServiceException ex) {
162                    throw ex;
163                }
164                catch (Exception ex) {
165                    throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
166                }
167        }
168    
169        private static final String[] HADOOP_CONF_FILES =
170            {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hadoop-site.xml"};
171    
172    
173        private Configuration loadHadoopConf(File dir) throws IOException {
174            Configuration hadoopConf = new XConfiguration();
175            for (String file : HADOOP_CONF_FILES) {
176                File f = new File(dir, file);
177                if (f.exists()) {
178                    InputStream is = new FileInputStream(f);
179                    Configuration conf = new XConfiguration(is);
180                    is.close();
181                    XConfiguration.copy(conf, hadoopConf);
182                }
183            }
184            return hadoopConf;
185        }
186    
187        private Map<String, File> parseConfigDirs(String[] confDefs, String type) throws ServiceException, IOException {
188            Map<String, File> map = new HashMap<String, File>();
189            File configDir = new File(ConfigurationService.getConfigurationDirectory());
190            for (String confDef : confDefs) {
191                if (confDef.trim().length() > 0) {
192                    String[] parts = confDef.split("=");
193                    if (parts.length == 2) {
194                        String hostPort = parts[0];
195                        String confDir = parts[1];
196                        File dir = new File(confDir);
197                        if (!dir.isAbsolute()) {
198                            dir = new File(configDir, confDir);
199                        }
200                        if (dir.exists()) {
201                            map.put(hostPort.toLowerCase(), dir);
202                        }
203                        else {
204                            throw new ServiceException(ErrorCode.E0100, getClass().getName(),
205                                                       "could not find " + type + " configuration directory: " +
206                                                       dir.getAbsolutePath());
207                        }
208                    }
209                    else {
210                        throw new ServiceException(ErrorCode.E0100, getClass().getName(),
211                                                   "Incorrect " + type + " configuration definition: " + confDef);
212                    }
213                }
214            }
215            return map;
216        }
217    
218        private void loadHadoopConfigs(Configuration serviceConf) throws ServiceException {
219            try {
220                Map<String, File> map = parseConfigDirs(serviceConf.getStrings(HADOOP_CONFS, "*=hadoop-conf"), "hadoop");
221                for (Map.Entry<String, File> entry : map.entrySet()) {
222                    hadoopConfigs.put(entry.getKey(), loadHadoopConf(entry.getValue()));
223                }
224            }
225            catch (ServiceException ex) {
226                throw ex;
227            }
228            catch (Exception ex) {
229                throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
230            }
231        }
232    
233        private void preLoadActionConfigs(Configuration serviceConf) throws ServiceException {
234            try {
235                actionConfigDirs = parseConfigDirs(serviceConf.getStrings(ACTION_CONFS, "*=hadoop-conf"), "action");
236                for (String hostport : actionConfigDirs.keySet()) {
237                    actionConfigs.put(hostport, new ConcurrentHashMap<String, XConfiguration>());
238                }
239            }
240            catch (ServiceException ex) {
241                throw ex;
242            }
243            catch (Exception ex) {
244                throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
245            }
246        }
247    
248        public void destroy() {
249        }
250    
251        public Class<? extends Service> getInterface() {
252            return HadoopAccessorService.class;
253        }
254    
255        private UserGroupInformation getUGI(String user) throws IOException {
256            UserGroupInformation ugi = userUgiMap.get(user);
257            if (ugi == null) {
258                // taking care of a race condition, the latest UGI will be discarded
259                ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
260                userUgiMap.putIfAbsent(user, ugi);
261            }
262            return ugi;
263        }
264    
265        /**
266         * Creates a JobConf using the site configuration for the specified hostname:port.
267         * <p/>
268         * If the specified hostname:port is not defined it falls back to the '*' site
269         * configuration if available. If the '*' site configuration is not available,
270         * the JobConf has all Hadoop defaults.
271         *
272         * @param hostPort hostname:port to lookup Hadoop site configuration.
273         * @return a JobConf with the corresponding site configuration for hostPort.
274         */
275        public JobConf createJobConf(String hostPort) {
276            JobConf jobConf = new JobConf();
277            XConfiguration.copy(getConfiguration(hostPort), jobConf);
278            jobConf.setBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, true);
279            return jobConf;
280        }
281    
282        private XConfiguration loadActionConf(String hostPort, String action) {
283            File dir = actionConfigDirs.get(hostPort);
284            XConfiguration actionConf = new XConfiguration();
285            if (dir != null) {
286                File actionConfFile = new File(dir, action + ".xml");
287                if (actionConfFile.exists()) {
288                    try {
289                        actionConf = new XConfiguration(new FileInputStream(actionConfFile));
290                    }
291                    catch (IOException ex) {
292                        XLog.getLog(getClass()).warn("Could not read file [{0}] for action [{1}] configuration for hostPort [{2}]",
293                                                     actionConfFile.getAbsolutePath(), action, hostPort);
294                    }
295                }
296            }
297            return actionConf;
298        }
299    
300        /**
301         * Returns a Configuration containing any defaults for an action for a particular cluster.
302         * <p/>
303         * This configuration is used as default for the action configuration and enables cluster
304         * level default values per action.
305         *
306         * @param hostPort hostname"port to lookup the action default confiugration.
307         * @param action action name.
308         * @return the default configuration for the action for the specified cluster.
309         */
310        public XConfiguration createActionDefaultConf(String hostPort, String action) {
311            hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
312            Map<String, XConfiguration> hostPortActionConfigs = actionConfigs.get(hostPort);
313            if (hostPortActionConfigs == null) {
314                hostPortActionConfigs = actionConfigs.get("*");
315                hostPort = "*";
316            }
317            XConfiguration actionConf = hostPortActionConfigs.get(action);
318            if (actionConf == null) {
319                // doing lazy loading as we don't know upfront all actions, no need to synchronize
320                // as it is a read operation an in case of a race condition loading and inserting
321                // into the Map is idempotent and the action-config Map is a ConcurrentHashMap
322                actionConf = loadActionConf(hostPort, action);
323                hostPortActionConfigs.put(action, actionConf);
324            }
325            return new XConfiguration(actionConf.toProperties());
326        }
327    
328        private Configuration getConfiguration(String hostPort) {
329            hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
330            Configuration conf = hadoopConfigs.get(hostPort);
331            if (conf == null) {
332                conf = hadoopConfigs.get("*");
333                if (conf == null) {
334                    conf = new XConfiguration();
335                }
336            }
337            return conf;
338        }
339    
340        /**
341         * Return a JobClient created with the provided user/group.
342         *
343         *
344         * @param conf JobConf with all necessary information to create the
345         *        JobClient.
346         * @return JobClient created with the provided user/group.
347         * @throws HadoopAccessorException if the client could not be created.
348         */
349        public JobClient createJobClient(String user, final JobConf conf) throws HadoopAccessorException {
350            ParamChecker.notEmpty(user, "user");
351            if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
352                throw new HadoopAccessorException(ErrorCode.E0903);
353            }
354            String jobTracker = conf.get("mapred.job.tracker");
355            validateJobTracker(jobTracker);
356            try {
357                UserGroupInformation ugi = getUGI(user);
358                JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() {
359                    public JobClient run() throws Exception {
360                        return new JobClient(conf);
361                    }
362                });
363                Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(new Text("mr token"));
364                conf.getCredentials().addToken(new Text("mr token"), mrdt);
365                return jobClient;
366            }
367            catch (InterruptedException ex) {
368                throw new HadoopAccessorException(ErrorCode.E0902, ex);
369            }
370            catch (IOException ex) {
371                throw new HadoopAccessorException(ErrorCode.E0902, ex);
372            }
373        }
374    
375        /**
376         * Return a FileSystem created with the provided user for the specified URI.
377         *
378         *
379         * @param uri file system URI.
380         * @param conf Configuration with all necessary information to create the FileSystem.
381         * @return FileSystem created with the provided user/group.
382         * @throws HadoopAccessorException if the filesystem could not be created.
383         */
384        public FileSystem createFileSystem(String user, final URI uri, final Configuration conf)
385                throws HadoopAccessorException {
386            ParamChecker.notEmpty(user, "user");
387            if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
388                throw new HadoopAccessorException(ErrorCode.E0903);
389            }
390    
391            checkSupportedFilesystem(uri);
392    
393            String nameNode = uri.getAuthority();
394            if (nameNode == null) {
395                nameNode = conf.get("fs.default.name");
396                if (nameNode != null) {
397                    try {
398                        nameNode = new URI(nameNode).getAuthority();
399                    }
400                    catch (URISyntaxException ex) {
401                        throw new HadoopAccessorException(ErrorCode.E0902, ex);
402                    }
403                }
404            }
405            validateNameNode(nameNode);
406    
407            try {
408                UserGroupInformation ugi = getUGI(user);
409                return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
410                    public FileSystem run() throws Exception {
411                        return FileSystem.get(uri, conf);
412                    }
413                });
414            }
415            catch (InterruptedException ex) {
416                throw new HadoopAccessorException(ErrorCode.E0902, ex);
417            }
418            catch (IOException ex) {
419                throw new HadoopAccessorException(ErrorCode.E0902, ex);
420            }
421        }
422    
423        /**
424         * Validate Job tracker
425         * @param jobTrackerUri
426         * @throws HadoopAccessorException
427         */
428        protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException {
429            validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900);
430        }
431    
432        /**
433         * Validate Namenode list
434         * @param nameNodeUri
435         * @throws HadoopAccessorException
436         */
437        protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException {
438            validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901);
439        }
440    
441        private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException {
442            if (uri != null) {
443                uri = uri.toLowerCase().trim();
444                if (whitelist.size() > 0 && !whitelist.contains(uri)) {
445                    throw new HadoopAccessorException(error, uri);
446                }
447            }
448        }
449    
450        public void addFileToClassPath(String user, final Path file, final Configuration conf)
451                throws IOException {
452            ParamChecker.notEmpty(user, "user");
453            try {
454                UserGroupInformation ugi = getUGI(user);
455                ugi.doAs(new PrivilegedExceptionAction<Void>() {
456                    public Void run() throws Exception {
457                        Configuration defaultConf = new Configuration();
458                        XConfiguration.copy(conf, defaultConf);
459                        //Doing this NOP add first to have the FS created and cached
460                        DistributedCache.addFileToClassPath(file, defaultConf);
461    
462                        DistributedCache.addFileToClassPath(file, conf);
463                        return null;
464                    }
465                });
466    
467            }
468            catch (InterruptedException ex) {
469                throw new IOException(ex);
470            }
471    
472        }
473    
474        /**
475         * checks configuration parameter if filesystem scheme is among the list of supported ones
476         * this makes system robust to filesystems other than HDFS also
477         */
478    
479        public void checkSupportedFilesystem(URI uri) throws HadoopAccessorException {
480            String uriScheme = uri.getScheme();
481            if(!supportedSchemes.isEmpty()) {
482                XLog.getLog(this.getClass()).debug("Checking if filesystem " + uriScheme + " is supported");
483                if (!supportedSchemes.contains(uriScheme)) {
484                    throw new HadoopAccessorException(ErrorCode.E0904, uriScheme, uri.toString());
485                }
486            }
487        }
488    
489    }