001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.io.BufferedReader;
021    import java.io.File;
022    import java.io.FileInputStream;
023    import java.io.FileNotFoundException;
024    import java.io.IOException;
025    import java.io.InputStreamReader;
026    import java.net.URI;
027    import java.util.HashSet;
028    import java.util.Set;
029    
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.hadoop.fs.FileSystem;
032    import org.apache.hadoop.fs.Path;
033    import org.apache.oozie.BundleJobBean;
034    import org.apache.oozie.CoordinatorJobBean;
035    import org.apache.oozie.ErrorCode;
036    import org.apache.oozie.WorkflowJobBean;
037    import org.apache.oozie.client.XOozieClient;
038    import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
039    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
040    import org.apache.oozie.executor.jpa.JPAExecutorException;
041    import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
042    import org.apache.oozie.util.ConfigUtils;
043    import org.apache.oozie.util.Instrumentation;
044    import org.apache.oozie.util.XLog;
045    
046    /**
047     * The authorization service provides all authorization checks.
048     */
049    public class AuthorizationService implements Service {
050    
051        public static final String CONF_PREFIX = Service.CONF_PREFIX + "AuthorizationService.";
052    
053        /**
054         * Configuration parameter to enable or disable Oozie admin role.
055         */
056        public static final String CONF_SECURITY_ENABLED = CONF_PREFIX + "security.enabled";
057    
058        /**
059         * Configuration parameter to enable or disable Oozie admin role.
060         */
061        public static final String CONF_AUTHORIZATION_ENABLED = CONF_PREFIX + "authorization.enabled";
062    
063        /**
064         * Configuration parameter to enable old behavior default group as ACL.
065         */
066        public static final String CONF_DEFAULT_GROUP_AS_ACL = CONF_PREFIX + "default.group.as.acl";
067    
068        /**
069         * Configuration parameter to define admin groups, if NULL/empty the adminusers.txt file is used.
070         */
071        public static final String CONF_ADMIN_GROUPS = CONF_PREFIX + "admin.groups";
072    
073        /**
074         * File that contains list of admin users for Oozie.
075         */
076        public static final String ADMIN_USERS_FILE = "adminusers.txt";
077    
078        protected static final String INSTRUMENTATION_GROUP = "authorization";
079        protected static final String INSTR_FAILED_AUTH_COUNTER = "authorization.failed";
080    
081        private Set<String> adminGroups;
082        private Set<String> adminUsers;
083        private boolean authorizationEnabled;
084        private boolean useDefaultGroupAsAcl;
085    
086        private final XLog log = XLog.getLog(getClass());
087        private Instrumentation instrumentation;
088    
089        private String[] getTrimmedStrings(String str) {
090            if (null == str || "".equals(str.trim())) {
091                return new String[0];
092            }
093            return str.trim().split("\\s*,\\s*");
094        }
095    
096        /**
097         * Initialize the service. <p/> Reads the security related configuration. parameters - security enabled and list of
098         * super users.
099         *
100         * @param services services instance.
101         * @throws ServiceException thrown if the service could not be initialized.
102         */
103        public void init(Services services) throws ServiceException {
104            authorizationEnabled =
105                ConfigUtils.getWithDeprecatedCheck(services.getConf(), CONF_AUTHORIZATION_ENABLED,
106                                                   CONF_SECURITY_ENABLED, false);
107            if (authorizationEnabled) {
108                log.info("Oozie running with authorization enabled");
109                useDefaultGroupAsAcl = Services.get().getConf().getBoolean(CONF_DEFAULT_GROUP_AS_ACL, false);
110                String[] str = getTrimmedStrings(Services.get().getConf().get(CONF_ADMIN_GROUPS));
111                if (str.length > 0) {
112                    log.info("Admin users will be checked against the defined admin groups");
113                    adminGroups = new HashSet<String>();
114                    for (String s : str) {
115                        adminGroups.add(s.trim());
116                    }
117                }
118                else {
119                    log.info("Admin users will be checked against the 'adminusers.txt' file contents");
120                    adminUsers = new HashSet<String>();
121                    loadAdminUsers();
122                }
123            }
124            else {
125                log.warn("Oozie running with authorization disabled");
126            }
127            instrumentation = Services.get().get(InstrumentationService.class).get();
128        }
129    
130        /**
131         * Return if security is enabled or not.
132         *
133         * @return if security is enabled or not.
134         */
135        @Deprecated
136        public boolean isSecurityEnabled() {
137            return authorizationEnabled;
138        }
139    
140        public boolean useDefaultGroupAsAcl() {
141            return useDefaultGroupAsAcl;
142        }
143    
144        /**
145         * Return if security is enabled or not.
146         *
147         * @return if security is enabled or not.
148         */
149        public boolean isAuthorizationEnabled() {
150            return isSecurityEnabled();
151        }
152    
153        /**
154         * Load the list of admin users from {@link AuthorizationService#ADMIN_USERS_FILE} </p>
155         *
156         * @throws ServiceException if the admin user list could not be loaded.
157         */
158        private void loadAdminUsers() throws ServiceException {
159            String configDir = Services.get().get(ConfigurationService.class).getConfigDir();
160            if (configDir != null) {
161                File file = new File(configDir, ADMIN_USERS_FILE);
162                if (file.exists()) {
163                    try {
164                        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
165                        try {
166                            String line = br.readLine();
167                            while (line != null) {
168                                line = line.trim();
169                                if (line.length() > 0 && !line.startsWith("#")) {
170                                    adminUsers.add(line);
171                                }
172                                line = br.readLine();
173                            }
174                        }
175                        catch (IOException ex) {
176                            throw new ServiceException(ErrorCode.E0160, file.getAbsolutePath(), ex);
177                        }
178                    }
179                    catch (FileNotFoundException ex) {
180                        throw new ServiceException(ErrorCode.E0160, file.getAbsolutePath(), ex);
181                    }
182                }
183                else {
184                    log.warn("Admin users file not available in config dir [{0}], running without admin users", configDir);
185                }
186            }
187            else {
188                log.warn("Reading configuration from classpath, running without admin users");
189            }
190        }
191    
192        /**
193         * Destroy the service. <p/> This implementation does a NOP.
194         */
195        public void destroy() {
196        }
197    
198        /**
199         * Return the public interface of the service.
200         *
201         * @return {@link AuthorizationService}.
202         */
203        public Class<? extends Service> getInterface() {
204            return AuthorizationService.class;
205        }
206    
207        /**
208         * Check if the user belongs to the group or not.
209         *
210         * @param user user name.
211         * @param group group name.
212         * @return if the user belongs to the group or not.
213         * @throws AuthorizationException thrown if the authorization query can not be performed.
214         */
215        protected boolean isUserInGroup(String user, String group) throws AuthorizationException {
216            GroupsService groupsService = Services.get().get(GroupsService.class);
217            try {
218                return groupsService.getGroups(user).contains(group);
219            }
220            catch (IOException ex) {
221                throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
222            }
223        }
224    
225        /**
226         * Check if the user belongs to the group or not. <p/> <p/> Subclasses should override the {@link #isUserInGroup}
227         * method.
228         *
229         * @param user user name.
230         * @param group group name.
231         * @throws AuthorizationException thrown if the user is not authorized for the group or if the authorization query
232         * can not be performed.
233         */
234        public void authorizeForGroup(String user, String group) throws AuthorizationException {
235            if (authorizationEnabled && !isUserInGroup(user, group)) {
236                throw new AuthorizationException(ErrorCode.E0502, user, group);
237            }
238        }
239    
240        /**
241         * Return the default group to which the user belongs. <p/> This implementation always returns 'users'.
242         *
243         * @param user user name.
244         * @return default group of user.
245         * @throws AuthorizationException thrown if the default group con not be retrieved.
246         */
247        public String getDefaultGroup(String user) throws AuthorizationException {
248            try {
249                return Services.get().get(GroupsService.class).getGroups(user).get(0);
250            }
251            catch (IOException ex) {
252                throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
253            }
254        }
255    
256        /**
257         * Check if the user has admin privileges. <p/> If admin is disabled it returns always <code>true</code>. <p/> If
258         * admin is enabled it returns <code>true</code> if the user is in the <code>adminusers.txt</code> file.
259         *
260         * @param user user name.
261         * @return if the user has admin privileges or not.
262         */
263        protected boolean isAdmin(String user) {
264            boolean admin = false;
265            if (adminUsers != null) {
266                admin = adminUsers.contains(user);
267            }
268            else {
269                for (String adminGroup : adminGroups) {
270                    try {
271                        admin = isUserInGroup(user, adminGroup);
272                        if (admin) {
273                            break;
274                        }
275                    }
276                    catch (AuthorizationException ex) {
277                        log.warn("Admin check failed, " + ex.toString(), ex);
278                        break;
279                    }
280                }
281            }
282            return admin;
283        }
284    
285        /**
286         * Check if the user has admin privileges. <p/> Subclasses should override the {@link #isUserInGroup} method.
287         *
288         * @param user user name.
289         * @param write indicates if the check is for read or write admin tasks (in this implementation this is ignored)
290         * @throws AuthorizationException thrown if user does not have admin priviledges.
291         */
292        public void authorizeForAdmin(String user, boolean write) throws AuthorizationException {
293            if (authorizationEnabled && write && !isAdmin(user)) {
294                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
295                throw new AuthorizationException(ErrorCode.E0503, user);
296            }
297        }
298    
299        /**
300         * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the
301         * file system permissions on the workflow application.
302         *
303         * @param user user name.
304         * @param group group name.
305         * @param appPath application path.
306         * @throws AuthorizationException thrown if the user is not authorized for the app.
307         */
308        public void authorizeForApp(String user, String group, String appPath, Configuration jobConf)
309                throws AuthorizationException {
310            try {
311                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
312                URI uri = new Path(appPath).toUri();
313                Configuration fsConf = has.createJobConf(uri.getAuthority());
314                FileSystem fs = has.createFileSystem(user, uri, fsConf);
315    
316                Path path = new Path(appPath);
317                try {
318                    if (!fs.exists(path)) {
319                        incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
320                        throw new AuthorizationException(ErrorCode.E0504, appPath);
321                    }
322                    Path wfXml = new Path(path, "workflow.xml");
323                    if (!fs.exists(wfXml)) {
324                        incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
325                        throw new AuthorizationException(ErrorCode.E0505, appPath);
326                    }
327                    if (!fs.isFile(wfXml)) {
328                        incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
329                        throw new AuthorizationException(ErrorCode.E0506, appPath);
330                    }
331                    fs.open(wfXml).close();
332                }
333                // TODO change this when stopping support of 0.18 to the new
334                // Exception
335                catch (org.apache.hadoop.fs.permission.AccessControlException ex) {
336                    incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
337                    throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex);
338                }
339            }
340            catch (IOException ex) {
341                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
342                throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
343            }
344            catch (HadoopAccessorException e) {
345                throw new AuthorizationException(e);
346            }
347        }
348    
349        /**
350         * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the
351         * file system permissions on the workflow application.
352         *
353         * @param user user name.
354         * @param group group name.
355         * @param appPath application path.
356         * @param fileName workflow or coordinator.xml
357         * @param conf
358         * @throws AuthorizationException thrown if the user is not authorized for the app.
359         */
360        public void authorizeForApp(String user, String group, String appPath, String fileName, Configuration conf)
361                throws AuthorizationException {
362            try {
363                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
364                URI uri = new Path(appPath).toUri();
365                Configuration fsConf = has.createJobConf(uri.getAuthority());
366                FileSystem fs = has.createFileSystem(user, uri, fsConf);
367    
368                Path path = new Path(appPath);
369                try {
370                    if (!fs.exists(path)) {
371                        incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
372                        throw new AuthorizationException(ErrorCode.E0504, appPath);
373                    }
374                    if (conf.get(XOozieClient.IS_PROXY_SUBMISSION) == null) { // Only further check existence of job definition files for non proxy submission jobs;
375                        if (!fs.isFile(path)) {
376                            Path appXml = new Path(path, fileName);
377                            if (!fs.exists(appXml)) {
378                                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
379                                throw new AuthorizationException(ErrorCode.E0505, appPath);
380                            }
381                            if (!fs.isFile(appXml)) {
382                                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
383                                throw new AuthorizationException(ErrorCode.E0506, appPath);
384                            }
385                            fs.open(appXml).close();
386                        }
387                    }
388                }
389                // TODO change this when stopping support of 0.18 to the new
390                // Exception
391                catch (org.apache.hadoop.fs.permission.AccessControlException ex) {
392                    incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
393                    throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex);
394                }
395            }
396            catch (IOException ex) {
397                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
398                throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
399            }
400            catch (HadoopAccessorException e) {
401                throw new AuthorizationException(e);
402            }
403        }
404    
405        private boolean isUserInAcl(String user, String aclStr) throws IOException {
406            boolean userInAcl = false;
407            if (aclStr != null && aclStr.trim().length() > 0) {
408                GroupsService groupsService = Services.get().get(GroupsService.class);
409                String[] acl = aclStr.split(",");
410                for (int i = 0; !userInAcl && i < acl.length; i++) {
411                    String aclItem = acl[i].trim();
412                    userInAcl = aclItem.equals(user) || groupsService.getGroups(user).equals(aclItem);
413                }
414            }
415            return userInAcl;
416        }
417    
418        /**
419         * Check if the user+group is authorized to operate on the specified job. <p/> Checks if the user is a super-user or
420         * the one who started the job. <p/> Read operations are allowed to all users.
421         *
422         * @param user user name.
423         * @param jobId job id.
424         * @param write indicates if the check is for read or write job tasks.
425         * @throws AuthorizationException thrown if the user is not authorized for the job.
426         */
427        public void authorizeForJob(String user, String jobId, boolean write) throws AuthorizationException {
428            if (authorizationEnabled && write && !isAdmin(user)) {
429                try {
430                    // handle workflow jobs
431                    if (jobId.endsWith("-W")) {
432                        WorkflowJobBean jobBean = null;
433                        JPAService jpaService = Services.get().get(JPAService.class);
434                        if (jpaService != null) {
435                            try {
436                                jobBean = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
437                            }
438                            catch (JPAExecutorException je) {
439                                throw new AuthorizationException(je);
440                            }
441                        }
442                        else {
443                            throw new AuthorizationException(ErrorCode.E0610);
444                        }
445                        if (jobBean != null && !jobBean.getUser().equals(user)) {
446                            if (!isUserInAcl(user, jobBean.getGroup())) {
447                                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
448                                throw new AuthorizationException(ErrorCode.E0508, user, jobId);
449                            }
450                        }
451                    }
452                    // handle bundle jobs
453                    else if (jobId.endsWith("-B")){
454                        BundleJobBean jobBean = null;
455                        JPAService jpaService = Services.get().get(JPAService.class);
456                        if (jpaService != null) {
457                            try {
458                                jobBean = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
459                            }
460                            catch (JPAExecutorException je) {
461                                throw new AuthorizationException(je);
462                            }
463                        }
464                        else {
465                            throw new AuthorizationException(ErrorCode.E0610);
466                        }
467                        if (jobBean != null && !jobBean.getUser().equals(user)) {
468                            if (!isUserInAcl(user, jobBean.getGroup())) {
469                                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
470                                throw new AuthorizationException(ErrorCode.E0509, user, jobId);
471                            }
472                        }
473                    }
474                    // handle coordinator jobs
475                    else {
476                        CoordinatorJobBean jobBean = null;
477                        JPAService jpaService = Services.get().get(JPAService.class);
478                        if (jpaService != null) {
479                            try {
480                                jobBean = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
481                            }
482                            catch (JPAExecutorException je) {
483                                throw new AuthorizationException(je);
484                            }
485                        }
486                        else {
487                            throw new AuthorizationException(ErrorCode.E0610);
488                        }
489                        if (jobBean != null && !jobBean.getUser().equals(user)) {
490                            if (!isUserInAcl(user, jobBean.getGroup())) {
491                                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
492                                throw new AuthorizationException(ErrorCode.E0509, user, jobId);
493                            }
494                        }
495                    }
496                }
497                catch (IOException ex) {
498                    throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
499                }
500            }
501        }
502    
503        /**
504         * Convenience method for instrumentation counters.
505         *
506         * @param name counter name.
507         * @param count count to increment the counter.
508         */
509        private void incrCounter(String name, int count) {
510            if (instrumentation != null) {
511                instrumentation.incr(INSTRUMENTATION_GROUP, name, count);
512            }
513        }
514    }