001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import org.apache.hadoop.io.Text;
021    import org.apache.hadoop.mapred.JobClient;
022    import org.apache.hadoop.mapred.JobConf;
023    import org.apache.hadoop.fs.FileSystem;
024    import org.apache.hadoop.fs.Path;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
027    import org.apache.hadoop.security.UserGroupInformation;
028    import org.apache.hadoop.filecache.DistributedCache;
029    import org.apache.hadoop.security.token.Token;
030    import org.apache.oozie.ErrorCode;
031    import org.apache.oozie.util.ParamChecker;
032    import org.apache.oozie.util.XConfiguration;
033    import org.apache.oozie.util.XLog;
034    
035    import java.io.File;
036    import java.io.FileInputStream;
037    import java.io.IOException;
038    import java.io.InputStream;
039    import java.net.URI;
040    import java.net.URISyntaxException;
041    import java.security.PrivilegedExceptionAction;
042    import java.util.HashMap;
043    import java.util.Map;
044    import java.util.Set;
045    import java.util.HashSet;
046    import java.util.concurrent.ConcurrentHashMap;
047    import java.util.concurrent.ConcurrentMap;
048    
049    /**
050     * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The
051     * default accessor used is the base accessor which just injects the UGI into the configuration instance used to
052     * create/obtain JobClient and ileSystem instances. <p/> The HadoopAccess class to use can be configured in the
053     * <code>oozie-site.xml</code> using the <code>oozie.service.HadoopAccessorService.accessor.class</code> property.
054     */
055    public class HadoopAccessorService implements Service {
056    
057        public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService.";
058        public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist";
059        public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist";
060        public static final String HADOOP_CONFS = CONF_PREFIX + "hadoop.configurations";
061        public static final String ACTION_CONFS = CONF_PREFIX + "action.configurations";
062        public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled";
063        public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file";
064        public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal";
065    
066        private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
067    
068        private Set<String> jobTrackerWhitelist = new HashSet<String>();
069        private Set<String> nameNodeWhitelist = new HashSet<String>();
070        private Map<String, Configuration> hadoopConfigs = new HashMap<String, Configuration>();
071        private Map<String, File> actionConfigDirs = new HashMap<String, File>();
072        private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>();
073    
074        private ConcurrentMap<String, UserGroupInformation> userUgiMap;
075    
076        public void init(Services services) throws ServiceException {
077            init(services.getConf());
078        }
079    
080        //for testing purposes, see XFsTestCase
081        public void init(Configuration conf) throws ServiceException {
082            for (String name : conf.getStringCollection(JOB_TRACKER_WHITELIST)) {
083                String tmp = name.toLowerCase().trim();
084                if (tmp.length() == 0) {
085                    continue;
086                }
087                jobTrackerWhitelist.add(tmp);
088            }
089            XLog.getLog(getClass()).info(
090                    "JOB_TRACKER_WHITELIST :" + conf.getStringCollection(JOB_TRACKER_WHITELIST)
091                            + ", Total entries :" + jobTrackerWhitelist.size());
092            for (String name : conf.getStringCollection(NAME_NODE_WHITELIST)) {
093                String tmp = name.toLowerCase().trim();
094                if (tmp.length() == 0) {
095                    continue;
096                }
097                nameNodeWhitelist.add(tmp);
098            }
099            XLog.getLog(getClass()).info(
100                    "NAME_NODE_WHITELIST :" + conf.getStringCollection(NAME_NODE_WHITELIST)
101                            + ", Total entries :" + nameNodeWhitelist.size());
102    
103            boolean kerberosAuthOn = conf.getBoolean(KERBEROS_AUTH_ENABLED, true);
104            XLog.getLog(getClass()).info("Oozie Kerberos Authentication [{0}]", (kerberosAuthOn) ? "enabled" : "disabled");
105            if (kerberosAuthOn) {
106                kerberosInit(conf);
107            }
108            else {
109                Configuration ugiConf = new Configuration();
110                ugiConf.set("hadoop.security.authentication", "simple");
111                UserGroupInformation.setConfiguration(ugiConf);
112            }
113    
114            userUgiMap = new ConcurrentHashMap<String, UserGroupInformation>();
115    
116            loadHadoopConfigs(conf);
117            preLoadActionConfigs(conf);
118        }
119    
120        private void kerberosInit(Configuration serviceConf) throws ServiceException {
121                try {
122                    String keytabFile = serviceConf.get(KERBEROS_KEYTAB,
123                                                        System.getProperty("user.home") + "/oozie.keytab").trim();
124                    if (keytabFile.length() == 0) {
125                        throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB);
126                    }
127                    String principal = serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
128                    if (principal.length() == 0) {
129                        throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
130                    }
131                    Configuration conf = new Configuration();
132                    conf.set("hadoop.security.authentication", "kerberos");
133                    UserGroupInformation.setConfiguration(conf);
134                    UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
135                    XLog.getLog(getClass()).info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]",
136                                                 keytabFile, principal);
137                }
138                catch (ServiceException ex) {
139                    throw ex;
140                }
141                catch (Exception ex) {
142                    throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
143                }
144        }
145    
146        private static final String[] HADOOP_CONF_FILES =
147            {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hadoop-site.xml"};
148    
149    
150        private Configuration loadHadoopConf(File dir) throws IOException {
151            Configuration hadoopConf = new XConfiguration();
152            for (String file : HADOOP_CONF_FILES) {
153                File f = new File(dir, file);
154                if (f.exists()) {
155                    InputStream is = new FileInputStream(f);
156                    Configuration conf = new XConfiguration(is);
157                    is.close();
158                    XConfiguration.copy(conf, hadoopConf);
159                }
160            }
161            return hadoopConf;
162        }
163    
164        private Map<String, File> parseConfigDirs(String[] confDefs, String type) throws ServiceException, IOException {
165            Map<String, File> map = new HashMap<String, File>();
166            File configDir = new File(ConfigurationService.getConfigurationDirectory());
167            for (String confDef : confDefs) {
168                if (confDef.trim().length() > 0) {
169                    String[] parts = confDef.split("=");
170                    if (parts.length == 2) {
171                        String hostPort = parts[0];
172                        String confDir = parts[1];
173                        File dir = new File(confDir);
174                        if (!dir.isAbsolute()) {
175                            dir = new File(configDir, confDir);
176                        }
177                        if (dir.exists()) {
178                            map.put(hostPort.toLowerCase(), dir);
179                        }
180                        else {
181                            throw new ServiceException(ErrorCode.E0100, getClass().getName(),
182                                                       "could not find " + type + " configuration directory: " +
183                                                       dir.getAbsolutePath());
184                        }
185                    }
186                    else {
187                        throw new ServiceException(ErrorCode.E0100, getClass().getName(),
188                                                   "Incorrect " + type + " configuration definition: " + confDef);
189                    }
190                }
191            }
192            return map;
193        }
194    
195        private void loadHadoopConfigs(Configuration serviceConf) throws ServiceException {
196            try {
197                Map<String, File> map = parseConfigDirs(serviceConf.getStrings(HADOOP_CONFS, "*=hadoop-conf"), "hadoop");
198                for (Map.Entry<String, File> entry : map.entrySet()) {
199                    hadoopConfigs.put(entry.getKey(), loadHadoopConf(entry.getValue()));
200                }
201            }
202            catch (ServiceException ex) {
203                throw ex;
204            }
205            catch (Exception ex) {
206                throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
207            }
208        }
209    
210        private void preLoadActionConfigs(Configuration serviceConf) throws ServiceException {
211            try {
212                actionConfigDirs = parseConfigDirs(serviceConf.getStrings(ACTION_CONFS, "*=hadoop-conf"), "action");
213                for (String hostport : actionConfigDirs.keySet()) {
214                    actionConfigs.put(hostport, new ConcurrentHashMap<String, XConfiguration>());
215                }
216            }
217            catch (ServiceException ex) {
218                throw ex;
219            }
220            catch (Exception ex) {
221                throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex.getMessage(), ex);
222            }
223        }
224    
225        public void destroy() {
226        }
227    
228        public Class<? extends Service> getInterface() {
229            return HadoopAccessorService.class;
230        }
231    
232        private UserGroupInformation getUGI(String user) throws IOException {
233            UserGroupInformation ugi = userUgiMap.get(user);
234            if (ugi == null) {
235                // taking care of a race condition, the latest UGI will be discarded
236                ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
237                userUgiMap.putIfAbsent(user, ugi);
238            }
239            return ugi;
240        }
241    
242        /**
243         * Creates a JobConf using the site configuration for the specified hostname:port.
244         * <p/>
245         * If the specified hostname:port is not defined it falls back to the '*' site
246         * configuration if available. If the '*' site configuration is not available,
247         * the JobConf has all Hadoop defaults.
248         *
249         * @param hostPort hostname:port to lookup Hadoop site configuration.
250         * @return a JobConf with the corresponding site configuration for hostPort.
251         */
252        public JobConf createJobConf(String hostPort) {
253            JobConf jobConf = new JobConf();
254            XConfiguration.copy(getConfiguration(hostPort), jobConf);
255            jobConf.setBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, true);
256            return jobConf;
257        }
258    
259        private XConfiguration loadActionConf(String hostPort, String action) {
260            File dir = actionConfigDirs.get(hostPort);
261            XConfiguration actionConf = new XConfiguration();
262            if (dir != null) {
263                File actionConfFile = new File(dir, action + ".xml");
264                if (actionConfFile.exists()) {
265                    try {
266                        actionConf = new XConfiguration(new FileInputStream(actionConfFile));
267                    }
268                    catch (IOException ex) {
269                        XLog.getLog(getClass()).warn("Could not read file [{0}] for action [{1}] configuration for hostPort [{2}]",
270                                                     actionConfFile.getAbsolutePath(), action, hostPort);
271                    }
272                }
273            }
274            return actionConf;
275        }
276    
277        /**
278         * Returns a Configuration containing any defaults for an action for a particular cluster.
279         * <p/>
280         * This configuration is used as default for the action configuration and enables cluster
281         * level default values per action.
282         *
283         * @param hostPort hostname"port to lookup the action default confiugration.
284         * @param action action name.
285         * @return the default configuration for the action for the specified cluster.
286         */
287        public XConfiguration createActionDefaultConf(String hostPort, String action) {
288            hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
289            Map<String, XConfiguration> hostPortActionConfigs = actionConfigs.get(hostPort);
290            if (hostPortActionConfigs == null) {
291                hostPortActionConfigs = actionConfigs.get("*");
292                hostPort = "*";
293            }
294            XConfiguration actionConf = hostPortActionConfigs.get(action);
295            if (actionConf == null) {
296                // doing lazy loading as we don't know upfront all actions, no need to synchronize
297                // as it is a read operation an in case of a race condition loading and inserting
298                // into the Map is idempotent and the action-config Map is a ConcurrentHashMap
299                actionConf = loadActionConf(hostPort, action);
300                hostPortActionConfigs.put(action, actionConf);
301            }
302            return new XConfiguration(actionConf.toProperties());
303        }
304    
305        private Configuration getConfiguration(String hostPort) {
306            hostPort = (hostPort != null) ? hostPort.toLowerCase() : null;
307            Configuration conf = hadoopConfigs.get(hostPort);
308            if (conf == null) {
309                conf = hadoopConfigs.get("*");
310                if (conf == null) {
311                    conf = new XConfiguration();
312                }
313            }
314            return conf;
315        }
316    
317        /**
318         * Return a JobClient created with the provided user/group.
319         *
320         *
321         * @param conf JobConf with all necessary information to create the
322         *        JobClient.
323         * @return JobClient created with the provided user/group.
324         * @throws HadoopAccessorException if the client could not be created.
325         */
326        public JobClient createJobClient(String user, final JobConf conf) throws HadoopAccessorException {
327            ParamChecker.notEmpty(user, "user");
328            if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
329                throw new HadoopAccessorException(ErrorCode.E0903);
330            }
331            String jobTracker = conf.get("mapred.job.tracker");
332            validateJobTracker(jobTracker);
333            try {
334                UserGroupInformation ugi = getUGI(user);
335                JobClient jobClient = ugi.doAs(new PrivilegedExceptionAction<JobClient>() {
336                    public JobClient run() throws Exception {
337                        return new JobClient(conf);
338                    }
339                });
340                Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(new Text("mr token"));
341                conf.getCredentials().addToken(new Text("mr token"), mrdt);
342                return jobClient;
343            }
344            catch (InterruptedException ex) {
345                throw new HadoopAccessorException(ErrorCode.E0902, ex);
346            }
347            catch (IOException ex) {
348                throw new HadoopAccessorException(ErrorCode.E0902, ex);
349            }
350        }
351    
352        /**
353         * Return a FileSystem created with the provided user for the specified URI.
354         *
355         *
356         * @param uri file system URI.
357         * @param conf Configuration with all necessary information to create the FileSystem.
358         * @return FileSystem created with the provided user/group.
359         * @throws HadoopAccessorException if the filesystem could not be created.
360         */
361        public FileSystem createFileSystem(String user, final URI uri, final Configuration conf)
362                throws HadoopAccessorException {
363            ParamChecker.notEmpty(user, "user");
364            if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
365                throw new HadoopAccessorException(ErrorCode.E0903);
366            }
367            String nameNode = uri.getAuthority();
368            if (nameNode == null) {
369                nameNode = conf.get("fs.default.name");
370                if (nameNode != null) {
371                    try {
372                        nameNode = new URI(nameNode).getAuthority();
373                    }
374                    catch (URISyntaxException ex) {
375                        throw new HadoopAccessorException(ErrorCode.E0902, ex);
376                    }
377                }
378            }
379            validateNameNode(nameNode);
380    
381            try {
382                UserGroupInformation ugi = getUGI(user);
383                return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
384                    public FileSystem run() throws Exception {
385                        return FileSystem.get(uri, conf);
386                    }
387                });
388            }
389            catch (InterruptedException ex) {
390                throw new HadoopAccessorException(ErrorCode.E0902, ex);
391            }
392            catch (IOException ex) {
393                throw new HadoopAccessorException(ErrorCode.E0902, ex);
394            }
395        }
396    
397        /**
398         * Validate Job tracker
399         * @param jobTrackerUri
400         * @throws HadoopAccessorException
401         */
402        protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException {
403            validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900);
404        }
405    
406        /**
407         * Validate Namenode list
408         * @param nameNodeUri
409         * @throws HadoopAccessorException
410         */
411        protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException {
412            validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901);
413        }
414    
415        private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException {
416            if (uri != null) {
417                uri = uri.toLowerCase().trim();
418                if (whitelist.size() > 0 && !whitelist.contains(uri)) {
419                    throw new HadoopAccessorException(error, uri);
420                }
421            }
422        }
423    
424        public void addFileToClassPath(String user, final Path file, final Configuration conf)
425                throws IOException {
426            ParamChecker.notEmpty(user, "user");
427            try {
428                UserGroupInformation ugi = getUGI(user);
429                ugi.doAs(new PrivilegedExceptionAction<Void>() {
430                    public Void run() throws Exception {
431                        Configuration defaultConf = new Configuration();
432                        XConfiguration.copy(conf, defaultConf);
433                        //Doing this NOP add first to have the FS created and cached
434                        DistributedCache.addFileToClassPath(file, defaultConf);
435    
436                        DistributedCache.addFileToClassPath(file, conf);
437                        return null;
438                    }
439                });
440    
441            }
442            catch (InterruptedException ex) {
443                throw new IOException(ex);
444            }
445    
446        }
447    
448    }