001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.io.BufferedReader;
021    import java.io.File;
022    import java.io.FileInputStream;
023    import java.io.FileNotFoundException;
024    import java.io.IOException;
025    import java.io.InputStreamReader;
026    import java.util.HashSet;
027    import java.util.Set;
028    
029    import org.apache.hadoop.conf.Configuration;
030    import org.apache.hadoop.fs.FileSystem;
031    import org.apache.hadoop.fs.Path;
032    import org.apache.oozie.BundleJobBean;
033    import org.apache.oozie.CoordinatorJobBean;
034    import org.apache.oozie.ErrorCode;
035    import org.apache.oozie.WorkflowJobBean;
036    import org.apache.oozie.client.XOozieClient;
037    import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
038    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
039    import org.apache.oozie.executor.jpa.JPAExecutorException;
040    import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
041    import org.apache.oozie.util.Instrumentation;
042    import org.apache.oozie.util.XLog;
043    
044    /**
045     * The authorization service provides all authorization checks.
046     */
047    public class AuthorizationService implements Service {
048    
049        public static final String CONF_PREFIX = Service.CONF_PREFIX + "AuthorizationService.";
050    
051        /**
052         * Configuration parameter to enable or disable Oozie admin role.
053         */
054        public static final String CONF_SECURITY_ENABLED = CONF_PREFIX + "security.enabled";
055    
056        /**
057         * File that contains list of admin users for Oozie.
058         */
059        public static final String ADMIN_USERS_FILE = "adminusers.txt";
060    
061        /**
062         * Default group returned by getDefaultGroup().
063         */
064        public static final String DEFAULT_GROUP = "users";
065    
066        protected static final String INSTRUMENTATION_GROUP = "authorization";
067        protected static final String INSTR_FAILED_AUTH_COUNTER = "authorization.failed";
068    
069        private Set<String> adminUsers;
070        private boolean securityEnabled;
071    
072        private final XLog log = XLog.getLog(getClass());
073        private Instrumentation instrumentation;
074    
075        /**
076         * Initialize the service. <p/> Reads the security related configuration. parameters - security enabled and list of
077         * super users.
078         *
079         * @param services services instance.
080         * @throws ServiceException thrown if the service could not be initialized.
081         */
082        public void init(Services services) throws ServiceException {
083            adminUsers = new HashSet<String>();
084            securityEnabled = services.getConf().getBoolean(CONF_SECURITY_ENABLED, false);
085            instrumentation = Services.get().get(InstrumentationService.class).get();
086            if (securityEnabled) {
087                log.info("Oozie running with security enabled");
088                loadAdminUsers();
089            }
090            else {
091                log.warn("Oozie running with security disabled");
092            }
093        }
094    
095        /**
096         * Return if security is enabled or not.
097         *
098         * @return if security is enabled or not.
099         */
100        public boolean isSecurityEnabled() {
101            return securityEnabled;
102        }
103    
104        /**
105         * Load the list of admin users from {@link AuthorizationService#ADMIN_USERS_FILE} </p>
106         *
107         * @throws ServiceException if the admin user list could not be loaded.
108         */
109        private void loadAdminUsers() throws ServiceException {
110            String configDir = Services.get().get(ConfigurationService.class).getConfigDir();
111            if (configDir != null) {
112                File file = new File(configDir, ADMIN_USERS_FILE);
113                if (file.exists()) {
114                    try {
115                        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
116                        try {
117                            String line = br.readLine();
118                            while (line != null) {
119                                line = line.trim();
120                                if (line.length() > 0 && !line.startsWith("#")) {
121                                    adminUsers.add(line);
122                                }
123                                line = br.readLine();
124                            }
125                        }
126                        catch (IOException ex) {
127                            throw new ServiceException(ErrorCode.E0160, file.getAbsolutePath(), ex);
128                        }
129                    }
130                    catch (FileNotFoundException ex) {
131                        throw new ServiceException(ErrorCode.E0160, ex);
132                    }
133                }
134                else {
135                    log.warn("Admin users file not available in config dir [{0}], running without admin users", configDir);
136                }
137            }
138            else {
139                log.warn("Reading configuration from classpath, running without admin users");
140            }
141        }
142    
143        /**
144         * Destroy the service. <p/> This implementation does a NOP.
145         */
146        public void destroy() {
147        }
148    
149        /**
150         * Return the public interface of the service.
151         *
152         * @return {@link AuthorizationService}.
153         */
154        public Class<? extends Service> getInterface() {
155            return AuthorizationService.class;
156        }
157    
158        /**
159         * Check if the user belongs to the group or not. <p/> This implementation returns always <code>true</code>.
160         *
161         * @param user user name.
162         * @param group group name.
163         * @return if the user belongs to the group or not.
164         * @throws AuthorizationException thrown if the authorization query can not be performed.
165         */
166        protected boolean isUserInGroup(String user, String group) throws AuthorizationException {
167            return true;
168        }
169    
170        /**
171         * Check if the user belongs to the group or not. <p/> <p/> Subclasses should override the {@link #isUserInGroup}
172         * method.
173         *
174         * @param user user name.
175         * @param group group name.
176         * @throws AuthorizationException thrown if the user is not authorized for the group or if the authorization query
177         * can not be performed.
178         */
179        public void authorizeForGroup(String user, String group) throws AuthorizationException {
180            if (securityEnabled && !isUserInGroup(user, group)) {
181                throw new AuthorizationException(ErrorCode.E0502, user, group);
182            }
183        }
184    
185        /**
186         * Return the default group to which the user belongs. <p/> This implementation always returns 'users'.
187         *
188         * @param user user name.
189         * @return default group of user.
190         * @throws AuthorizationException thrown if the default group con not be retrieved.
191         */
192        public String getDefaultGroup(String user) throws AuthorizationException {
193            return DEFAULT_GROUP;
194        }
195    
196        /**
197         * Check if the user has admin privileges. <p/> If admin is disabled it returns always <code>true</code>. <p/> If
198         * admin is enabled it returns <code>true</code> if the user is in the <code>adminusers.txt</code> file.
199         *
200         * @param user user name.
201         * @return if the user has admin privileges or not.
202         */
203        protected boolean isAdmin(String user) {
204            return adminUsers.contains(user);
205        }
206    
207        /**
208         * Check if the user has admin privileges. <p/> Subclasses should override the {@link #isUserInGroup} method.
209         *
210         * @param user user name.
211         * @param write indicates if the check is for read or write admin tasks (in this implementation this is ignored)
212         * @throws AuthorizationException thrown if user does not have admin priviledges.
213         */
214        public void authorizeForAdmin(String user, boolean write) throws AuthorizationException {
215            if (securityEnabled && write && !isAdmin(user)) {
216                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
217                throw new AuthorizationException(ErrorCode.E0503, user);
218            }
219        }
220    
221        /**
222         * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the
223         * file system permissions on the workflow application.
224         *
225         * @param user user name.
226         * @param group group name.
227         * @param appPath application path.
228         * @throws AuthorizationException thrown if the user is not authorized for the app.
229         */
230        public void authorizeForApp(String user, String group, String appPath, Configuration jobConf)
231                throws AuthorizationException {
232            try {
233                FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group,
234                                                                                                 new Path(appPath).toUri(), jobConf);
235    
236                Path path = new Path(appPath);
237                try {
238                    if (!fs.exists(path)) {
239                        incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
240                        throw new AuthorizationException(ErrorCode.E0504, appPath);
241                    }
242                    Path wfXml = new Path(path, "workflow.xml");
243                    if (!fs.exists(wfXml)) {
244                        incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
245                        throw new AuthorizationException(ErrorCode.E0505, appPath);
246                    }
247                    if (!fs.isFile(wfXml)) {
248                        incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
249                        throw new AuthorizationException(ErrorCode.E0506, appPath);
250                    }
251                    fs.open(wfXml).close();
252                }
253                // TODO change this when stopping support of 0.18 to the new
254                // Exception
255                catch (org.apache.hadoop.fs.permission.AccessControlException ex) {
256                    incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
257                    throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex);
258                }
259            }
260            catch (IOException ex) {
261                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
262                throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
263            }
264            catch (HadoopAccessorException e) {
265                throw new AuthorizationException(e);
266            }
267        }
268    
269        /**
270         * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the
271         * file system permissions on the workflow application.
272         *
273         * @param user user name.
274         * @param group group name.
275         * @param appPath application path.
276         * @param fileName workflow or coordinator.xml
277         * @param conf
278         * @throws AuthorizationException thrown if the user is not authorized for the app.
279         */
280        public void authorizeForApp(String user, String group, String appPath, String fileName, Configuration conf)
281                throws AuthorizationException {
282            try {
283                //Configuration conf = new Configuration();
284                //conf.set("user.name", user);
285                // TODO Temporary fix till
286                // https://issues.apache.org/jira/browse/HADOOP-4875 is resolved.
287                //conf.set("hadoop.job.ugi", user + "," + group);
288                FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group,
289                                                                                                 new Path(appPath).toUri(), conf);
290                Path path = new Path(appPath);
291                try {
292                    if (!fs.exists(path)) {
293                        incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
294                        throw new AuthorizationException(ErrorCode.E0504, appPath);
295                    }
296                    if (conf.get(XOozieClient.IS_PROXY_SUBMISSION) == null) { // Only further check existence of job definition files for non proxy submission jobs;
297                        if (!fs.isFile(path)) {
298                            Path appXml = new Path(path, fileName);
299                            if (!fs.exists(appXml)) {
300                                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
301                                throw new AuthorizationException(ErrorCode.E0505, appPath);
302                            }
303                            if (!fs.isFile(appXml)) {
304                                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
305                                throw new AuthorizationException(ErrorCode.E0506, appPath);
306                            }
307                            fs.open(appXml).close();
308                        }
309                    }
310                }
311                // TODO change this when stopping support of 0.18 to the new
312                // Exception
313                catch (org.apache.hadoop.fs.permission.AccessControlException ex) {
314                    incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
315                    throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex);
316                }
317            }
318            catch (IOException ex) {
319                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
320                throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
321            }
322            catch (HadoopAccessorException e) {
323                throw new AuthorizationException(e);
324            }
325        }
326    
327        /**
328         * Check if the user+group is authorized to operate on the specified job. <p/> Checks if the user is a super-user or
329         * the one who started the job. <p/> Read operations are allowed to all users.
330         *
331         * @param user user name.
332         * @param jobId job id.
333         * @param write indicates if the check is for read or write job tasks.
334         * @throws AuthorizationException thrown if the user is not authorized for the job.
335         */
336        public void authorizeForJob(String user, String jobId, boolean write) throws AuthorizationException {
337            if (securityEnabled && write && !isAdmin(user)) {
338                // handle workflow jobs
339                if (jobId.endsWith("-W")) {
340                    WorkflowJobBean jobBean = null;
341                    JPAService jpaService = Services.get().get(JPAService.class);
342                    if (jpaService != null) {
343                        try {
344                            jobBean = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
345                        }
346                        catch (JPAExecutorException je) {
347                            throw new AuthorizationException(je);
348                        }
349                    }
350                    else {
351                        throw new AuthorizationException(ErrorCode.E0610);
352                    }
353                    if (jobBean != null && !jobBean.getUser().equals(user)) {
354                        if (!isUserInGroup(user, jobBean.getGroup())) {
355                            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
356                            throw new AuthorizationException(ErrorCode.E0508, user, jobId);
357                        }
358                    }
359                }
360                // handle bundle jobs
361                else if (jobId.endsWith("-B")){
362                    BundleJobBean jobBean = null;
363                    JPAService jpaService = Services.get().get(JPAService.class);
364                    if (jpaService != null) {
365                        try {
366                            jobBean = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
367                        }
368                        catch (JPAExecutorException je) {
369                            throw new AuthorizationException(je);
370                        }
371                    }
372                    else {
373                        throw new AuthorizationException(ErrorCode.E0610);
374                    }
375                    if (jobBean != null && !jobBean.getUser().equals(user)) {
376                        if (!isUserInGroup(user, jobBean.getGroup())) {
377                            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
378                            throw new AuthorizationException(ErrorCode.E0509, user, jobId);
379                        }
380                    }
381                }
382                // handle coordinator jobs
383                else {
384                    CoordinatorJobBean jobBean = null;
385                    JPAService jpaService = Services.get().get(JPAService.class);
386                    if (jpaService != null) {
387                        try {
388                            jobBean = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
389                        }
390                        catch (JPAExecutorException je) {
391                            throw new AuthorizationException(je);
392                        }
393                    }
394                    else {
395                        throw new AuthorizationException(ErrorCode.E0610);
396                    }
397                    if (jobBean != null && !jobBean.getUser().equals(user)) {
398                        if (!isUserInGroup(user, jobBean.getGroup())) {
399                            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
400                            throw new AuthorizationException(ErrorCode.E0509, user, jobId);
401                        }
402                    }
403                }
404            }
405        }
406    
407        /**
408         * Convenience method for instrumentation counters.
409         *
410         * @param name counter name.
411         * @param count count to increment the counter.
412         */
413        private void incrCounter(String name, int count) {
414            if (instrumentation != null) {
415                instrumentation.incr(INSTRUMENTATION_GROUP, name, count);
416            }
417        }
418    }