001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import org.apache.hadoop.mapred.JobClient;
021    import org.apache.hadoop.mapred.JobConf;
022    import org.apache.hadoop.fs.FileSystem;
023    import org.apache.hadoop.fs.Path;
024    import org.apache.hadoop.conf.Configuration;
025    import org.apache.hadoop.security.UserGroupInformation;
026    import org.apache.hadoop.filecache.DistributedCache;
027    import org.apache.oozie.ErrorCode;
028    import org.apache.oozie.util.ParamChecker;
029    import org.apache.oozie.util.XConfiguration;
030    import org.apache.oozie.util.XLog;
031    
032    import java.io.IOException;
033    import java.net.URI;
034    import java.net.URISyntaxException;
035    import java.security.PrivilegedExceptionAction;
036    import java.util.Set;
037    import java.util.HashSet;
038    
039    /**
040     * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The
041     * default accessor used is the base accessor which just injects the UGI into the configuration instance used to
042     * create/obtain JobClient and ileSystem instances. <p/> The HadoopAccess class to use can be configured in the
043     * <code>oozie-site.xml</code> using the <code>oozie.service.HadoopAccessorService.accessor.class</code> property.
044     */
045    public class HadoopAccessorService implements Service {
046    
047        public static final String CONF_PREFIX = Service.CONF_PREFIX + "HadoopAccessorService.";
048        public static final String JOB_TRACKER_WHITELIST = CONF_PREFIX + "jobTracker.whitelist";
049        public static final String NAME_NODE_WHITELIST = CONF_PREFIX + "nameNode.whitelist";
050    
051        private Set<String> jobTrackerWhitelist = new HashSet<String>();
052        private Set<String> nameNodeWhitelist = new HashSet<String>();
053    
054        public void init(Services services) throws ServiceException {
055            for (String name : services.getConf().getStringCollection(JOB_TRACKER_WHITELIST)) {
056                String tmp = name.toLowerCase().trim();
057                if (tmp.length() == 0) {
058                    continue;
059                }
060                jobTrackerWhitelist.add(tmp);
061            }
062            XLog.getLog(getClass()).info(
063                    "JOB_TRACKER_WHITELIST :" + services.getConf().getStringCollection(JOB_TRACKER_WHITELIST)
064                            + ", Total entries :" + jobTrackerWhitelist.size());
065            for (String name : services.getConf().getStringCollection(NAME_NODE_WHITELIST)) {
066                String tmp = name.toLowerCase().trim();
067                if (tmp.length() == 0) {
068                    continue;
069                }
070                nameNodeWhitelist.add(tmp);
071            }
072            XLog.getLog(getClass()).info(
073                    "NAME_NODE_WHITELIST :" + services.getConf().getStringCollection(NAME_NODE_WHITELIST)
074                            + ", Total entries :" + nameNodeWhitelist.size());
075            init(services.getConf());
076        }
077    
078        public void init(Configuration serviceConf) throws ServiceException {
079        }
080    
081        public void destroy() {
082        }
083    
084        public Class<? extends Service> getInterface() {
085            return HadoopAccessorService.class;
086        }
087    
088        /**
089         * Return a JobClient created with the provided user/group.
090         * 
091         * @param conf JobConf with all necessary information to create the
092         *        JobClient.
093         * @return JobClient created with the provided user/group.
094         * @throws HadoopAccessorException if the client could not be created.
095         */
096        public JobClient createJobClient(String user, String group, JobConf conf) throws HadoopAccessorException {
097            validateJobTracker(conf.get("mapred.job.tracker"));
098            conf = createConfiguration(user, group, conf);
099            try {
100                return new JobClient(conf);
101            }
102            catch (IOException e) {
103                throw new HadoopAccessorException(ErrorCode.E0902, e);
104            }
105        }
106    
107        /**
108         * Return a FileSystem created with the provided user/group.
109         * 
110         * @param conf Configuration with all necessary information to create the
111         *        FileSystem.
112         * @return FileSystem created with the provided user/group.
113         * @throws HadoopAccessorException if the filesystem could not be created.
114         */
115        public FileSystem createFileSystem(String user, String group, Configuration conf) throws HadoopAccessorException {
116            try {
117                validateNameNode(new URI(conf.get("fs.default.name")).getAuthority());
118                conf = createConfiguration(user, group, conf);
119                return FileSystem.get(conf);
120            }
121            catch (IOException e) {
122                throw new HadoopAccessorException(ErrorCode.E0902, e);
123            }
124            catch (URISyntaxException e) {
125                throw new HadoopAccessorException(ErrorCode.E0902, e);
126            }
127        }
128    
129        /**
130         * Return a FileSystem created with the provided user/group for the
131         * specified URI.
132         * 
133         * @param uri file system URI.
134         * @param conf Configuration with all necessary information to create the
135         *        FileSystem.
136         * @return FileSystem created with the provided user/group.
137         * @throws HadoopAccessorException if the filesystem could not be created.
138         */
139        public FileSystem createFileSystem(String user, String group, URI uri, Configuration conf)
140                throws HadoopAccessorException {
141            validateNameNode(uri.getAuthority());
142            conf = createConfiguration(user, group, conf);
143            try {
144                return FileSystem.get(uri, conf);
145            }
146            catch (IOException e) {
147                throw new HadoopAccessorException(ErrorCode.E0902, e);
148            }
149        }
150    
151        /**
152         * Validate Job tracker
153         * @param jobTrackerUri
154         * @throws HadoopAccessorException
155         */
156        protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException {
157            validate(jobTrackerUri, jobTrackerWhitelist, ErrorCode.E0900);
158        }
159    
160        /**
161         * Validate Namenode list
162         * @param nameNodeUri
163         * @throws HadoopAccessorException
164         */
165        protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException {
166            validate(nameNodeUri, nameNodeWhitelist, ErrorCode.E0901);
167        }
168    
169        private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException {
170            if (uri != null) {
171                uri = uri.toLowerCase().trim();
172                if (whitelist.size() > 0 && !whitelist.contains(uri)) {
173                    throw new HadoopAccessorException(error, uri);
174                }
175            }
176        }
177    
178        @SuppressWarnings("unchecked")
179        private <C extends Configuration> C createConfiguration(String user, String group, C conf) {
180            ParamChecker.notEmpty(user, "user");
181            ParamChecker.notEmpty(group, "group");
182            C fsConf = (C) ((conf instanceof JobConf) ? new JobConf() : new Configuration());
183            XConfiguration.copy(conf, fsConf);
184            fsConf.set("user.name", user);
185            fsConf.set("hadoop.job.ugi", user + "," + group);
186            return fsConf;
187        }
188    
189        /**
190         * Add a file to the ClassPath via the DistributedCache.
191         */
192        public void addFileToClassPath(String user, String group, final Path file, final Configuration conf)
193                throws IOException {
194            Configuration defaultConf = createConfiguration(user, group, conf);
195            DistributedCache.addFileToClassPath(file, defaultConf);
196            DistributedCache.addFileToClassPath(file, conf);
197        }
198    
199    }