001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.io.BufferedReader;
021    import java.io.File;
022    import java.io.FileInputStream;
023    import java.io.FileNotFoundException;
024    import java.io.IOException;
025    import java.io.InputStreamReader;
026    import java.net.URI;
027    import java.util.HashSet;
028    import java.util.Set;
029    
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.hadoop.fs.FileSystem;
032    import org.apache.hadoop.fs.Path;
033    import org.apache.oozie.BundleJobBean;
034    import org.apache.oozie.CoordinatorJobBean;
035    import org.apache.oozie.ErrorCode;
036    import org.apache.oozie.WorkflowJobBean;
037    import org.apache.oozie.client.XOozieClient;
038    import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
039    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
040    import org.apache.oozie.executor.jpa.JPAExecutorException;
041    import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
042    import org.apache.oozie.util.ConfigUtils;
043    import org.apache.oozie.util.Instrumentation;
044    import org.apache.oozie.util.XLog;
045    
046    /**
047     * The authorization service provides all authorization checks.
048     */
049    public class AuthorizationService implements Service {
050    
051        public static final String CONF_PREFIX = Service.CONF_PREFIX + "AuthorizationService.";
052    
053        /**
054         * Configuration parameter to enable or disable Oozie admin role.
055         */
056        public static final String CONF_SECURITY_ENABLED = CONF_PREFIX + "security.enabled";
057    
058        /**
059         * Configuration parameter to enable or disable Oozie admin role.
060         */
061        public static final String CONF_AUTHORIZATION_ENABLED = CONF_PREFIX + "authorization.enabled";
062    
063        /**
064         * Configuration parameter to enable old behavior default group as ACL.
065         */
066        public static final String CONF_DEFAULT_GROUP_AS_ACL = CONF_PREFIX + "default.group.as.acl";
067    
068        /**
069         * File that contains list of admin users for Oozie.
070         */
071        public static final String ADMIN_USERS_FILE = "adminusers.txt";
072    
073        protected static final String INSTRUMENTATION_GROUP = "authorization";
074        protected static final String INSTR_FAILED_AUTH_COUNTER = "authorization.failed";
075    
076        private Set<String> adminUsers;
077        private boolean authorizationEnabled;
078        private boolean useDefaultGroupAsAcl;
079    
080        private final XLog log = XLog.getLog(getClass());
081        private Instrumentation instrumentation;
082    
083        /**
084         * Initialize the service. <p/> Reads the security related configuration. parameters - security enabled and list of
085         * super users.
086         *
087         * @param services services instance.
088         * @throws ServiceException thrown if the service could not be initialized.
089         */
090        public void init(Services services) throws ServiceException {
091            adminUsers = new HashSet<String>();
092            authorizationEnabled = ConfigUtils.getWithDeprecatedCheck(services.getConf(), CONF_AUTHORIZATION_ENABLED,
093                                                                 CONF_SECURITY_ENABLED, false);
094            instrumentation = Services.get().get(InstrumentationService.class).get();
095            if (authorizationEnabled) {
096                log.info("Oozie running with security enabled");
097                loadAdminUsers();
098            }
099            else {
100                log.warn("Oozie running with security disabled");
101            }
102    
103            useDefaultGroupAsAcl = Services.get().getConf().getBoolean(CONF_DEFAULT_GROUP_AS_ACL, false);
104    
105        }
106    
107        /**
108         * Return if security is enabled or not.
109         *
110         * @return if security is enabled or not.
111         */
112        @Deprecated
113        public boolean isSecurityEnabled() {
114            return authorizationEnabled;
115        }
116    
117        public boolean useDefaultGroupAsAcl() {
118            return useDefaultGroupAsAcl;
119        }
120    
121        /**
122         * Return if security is enabled or not.
123         *
124         * @return if security is enabled or not.
125         */
126        public boolean isAuthorizationEnabled() {
127            return isSecurityEnabled();
128        }
129    
130        /**
131         * Load the list of admin users from {@link AuthorizationService#ADMIN_USERS_FILE} </p>
132         *
133         * @throws ServiceException if the admin user list could not be loaded.
134         */
135        private void loadAdminUsers() throws ServiceException {
136            String configDir = Services.get().get(ConfigurationService.class).getConfigDir();
137            if (configDir != null) {
138                File file = new File(configDir, ADMIN_USERS_FILE);
139                if (file.exists()) {
140                    try {
141                        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
142                        try {
143                            String line = br.readLine();
144                            while (line != null) {
145                                line = line.trim();
146                                if (line.length() > 0 && !line.startsWith("#")) {
147                                    adminUsers.add(line);
148                                }
149                                line = br.readLine();
150                            }
151                        }
152                        catch (IOException ex) {
153                            throw new ServiceException(ErrorCode.E0160, file.getAbsolutePath(), ex);
154                        }
155                    }
156                    catch (FileNotFoundException ex) {
157                        throw new ServiceException(ErrorCode.E0160, ex);
158                    }
159                }
160                else {
161                    log.warn("Admin users file not available in config dir [{0}], running without admin users", configDir);
162                }
163            }
164            else {
165                log.warn("Reading configuration from classpath, running without admin users");
166            }
167        }
168    
169        /**
170         * Destroy the service. <p/> This implementation does a NOP.
171         */
172        public void destroy() {
173        }
174    
175        /**
176         * Return the public interface of the service.
177         *
178         * @return {@link AuthorizationService}.
179         */
180        public Class<? extends Service> getInterface() {
181            return AuthorizationService.class;
182        }
183    
184        /**
185         * Check if the user belongs to the group or not.
186         *
187         * @param user user name.
188         * @param group group name.
189         * @return if the user belongs to the group or not.
190         * @throws AuthorizationException thrown if the authorization query can not be performed.
191         */
192        protected boolean isUserInGroup(String user, String group) throws AuthorizationException {
193            GroupsService groupsService = Services.get().get(GroupsService.class);
194            try {
195                return groupsService.getGroups(user).contains(group);
196            }
197            catch (IOException ex) {
198                throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
199            }
200        }
201    
202        /**
203         * Check if the user belongs to the group or not. <p/> <p/> Subclasses should override the {@link #isUserInGroup}
204         * method.
205         *
206         * @param user user name.
207         * @param group group name.
208         * @throws AuthorizationException thrown if the user is not authorized for the group or if the authorization query
209         * can not be performed.
210         */
211        public void authorizeForGroup(String user, String group) throws AuthorizationException {
212            if (authorizationEnabled && !isUserInGroup(user, group)) {
213                throw new AuthorizationException(ErrorCode.E0502, user, group);
214            }
215        }
216    
217        /**
218         * Return the default group to which the user belongs. <p/> This implementation always returns 'users'.
219         *
220         * @param user user name.
221         * @return default group of user.
222         * @throws AuthorizationException thrown if the default group con not be retrieved.
223         */
224        public String getDefaultGroup(String user) throws AuthorizationException {
225            try {
226                return Services.get().get(GroupsService.class).getGroups(user).get(0);
227            }
228            catch (IOException ex) {
229                throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
230            }
231        }
232    
233        /**
234         * Check if the user has admin privileges. <p/> If admin is disabled it returns always <code>true</code>. <p/> If
235         * admin is enabled it returns <code>true</code> if the user is in the <code>adminusers.txt</code> file.
236         *
237         * @param user user name.
238         * @return if the user has admin privileges or not.
239         */
240        protected boolean isAdmin(String user) {
241            return adminUsers.contains(user);
242        }
243    
244        /**
245         * Check if the user has admin privileges. <p/> Subclasses should override the {@link #isUserInGroup} method.
246         *
247         * @param user user name.
248         * @param write indicates if the check is for read or write admin tasks (in this implementation this is ignored)
249         * @throws AuthorizationException thrown if user does not have admin priviledges.
250         */
251        public void authorizeForAdmin(String user, boolean write) throws AuthorizationException {
252            if (authorizationEnabled && write && !isAdmin(user)) {
253                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
254                throw new AuthorizationException(ErrorCode.E0503, user);
255            }
256        }
257    
258        /**
259         * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the
260         * file system permissions on the workflow application.
261         *
262         * @param user user name.
263         * @param group group name.
264         * @param appPath application path.
265         * @throws AuthorizationException thrown if the user is not authorized for the app.
266         */
267        public void authorizeForApp(String user, String group, String appPath, Configuration jobConf)
268                throws AuthorizationException {
269            try {
270                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
271                URI uri = new Path(appPath).toUri();
272                Configuration fsConf = has.createJobConf(uri.getAuthority());
273                FileSystem fs = has.createFileSystem(user, uri, fsConf);
274    
275                Path path = new Path(appPath);
276                try {
277                    if (!fs.exists(path)) {
278                        incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
279                        throw new AuthorizationException(ErrorCode.E0504, appPath);
280                    }
281                    Path wfXml = new Path(path, "workflow.xml");
282                    if (!fs.exists(wfXml)) {
283                        incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
284                        throw new AuthorizationException(ErrorCode.E0505, appPath);
285                    }
286                    if (!fs.isFile(wfXml)) {
287                        incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
288                        throw new AuthorizationException(ErrorCode.E0506, appPath);
289                    }
290                    fs.open(wfXml).close();
291                }
292                // TODO change this when stopping support of 0.18 to the new
293                // Exception
294                catch (org.apache.hadoop.fs.permission.AccessControlException ex) {
295                    incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
296                    throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex);
297                }
298            }
299            catch (IOException ex) {
300                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
301                throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
302            }
303            catch (HadoopAccessorException e) {
304                throw new AuthorizationException(e);
305            }
306        }
307    
308        /**
309         * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the
310         * file system permissions on the workflow application.
311         *
312         * @param user user name.
313         * @param group group name.
314         * @param appPath application path.
315         * @param fileName workflow or coordinator.xml
316         * @param conf
317         * @throws AuthorizationException thrown if the user is not authorized for the app.
318         */
319        public void authorizeForApp(String user, String group, String appPath, String fileName, Configuration conf)
320                throws AuthorizationException {
321            try {
322                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
323                URI uri = new Path(appPath).toUri();
324                Configuration fsConf = has.createJobConf(uri.getAuthority());
325                FileSystem fs = has.createFileSystem(user, uri, fsConf);
326    
327                Path path = new Path(appPath);
328                try {
329                    if (!fs.exists(path)) {
330                        incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
331                        throw new AuthorizationException(ErrorCode.E0504, appPath);
332                    }
333                    if (conf.get(XOozieClient.IS_PROXY_SUBMISSION) == null) { // Only further check existence of job definition files for non proxy submission jobs;
334                        if (!fs.isFile(path)) {
335                            Path appXml = new Path(path, fileName);
336                            if (!fs.exists(appXml)) {
337                                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
338                                throw new AuthorizationException(ErrorCode.E0505, appPath);
339                            }
340                            if (!fs.isFile(appXml)) {
341                                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
342                                throw new AuthorizationException(ErrorCode.E0506, appPath);
343                            }
344                            fs.open(appXml).close();
345                        }
346                    }
347                }
348                // TODO change this when stopping support of 0.18 to the new
349                // Exception
350                catch (org.apache.hadoop.fs.permission.AccessControlException ex) {
351                    incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
352                    throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex);
353                }
354            }
355            catch (IOException ex) {
356                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
357                throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
358            }
359            catch (HadoopAccessorException e) {
360                throw new AuthorizationException(e);
361            }
362        }
363    
364        private boolean isUserInAcl(String user, String aclStr) throws IOException {
365            boolean userInAcl = false;
366            if (aclStr != null && aclStr.trim().length() > 0) {
367                GroupsService groupsService = Services.get().get(GroupsService.class);
368                String[] acl = aclStr.split(",");
369                for (int i = 0; !userInAcl && i < acl.length; i++) {
370                    String aclItem = acl[i].trim();
371                    userInAcl = aclItem.equals(user) || groupsService.getGroups(user).equals(aclItem);
372                }
373            }
374            return userInAcl;
375        }
376    
377        /**
378         * Check if the user+group is authorized to operate on the specified job. <p/> Checks if the user is a super-user or
379         * the one who started the job. <p/> Read operations are allowed to all users.
380         *
381         * @param user user name.
382         * @param jobId job id.
383         * @param write indicates if the check is for read or write job tasks.
384         * @throws AuthorizationException thrown if the user is not authorized for the job.
385         */
386        public void authorizeForJob(String user, String jobId, boolean write) throws AuthorizationException {
387            if (authorizationEnabled && write && !isAdmin(user)) {
388                try {
389                    // handle workflow jobs
390                    if (jobId.endsWith("-W")) {
391                        WorkflowJobBean jobBean = null;
392                        JPAService jpaService = Services.get().get(JPAService.class);
393                        if (jpaService != null) {
394                            try {
395                                jobBean = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
396                            }
397                            catch (JPAExecutorException je) {
398                                throw new AuthorizationException(je);
399                            }
400                        }
401                        else {
402                            throw new AuthorizationException(ErrorCode.E0610);
403                        }
404                        if (jobBean != null && !jobBean.getUser().equals(user)) {
405                            if (!isUserInAcl(user, jobBean.getGroup())) {
406                                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
407                                throw new AuthorizationException(ErrorCode.E0508, user, jobId);
408                            }
409                        }
410                    }
411                    // handle bundle jobs
412                    else if (jobId.endsWith("-B")){
413                        BundleJobBean jobBean = null;
414                        JPAService jpaService = Services.get().get(JPAService.class);
415                        if (jpaService != null) {
416                            try {
417                                jobBean = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
418                            }
419                            catch (JPAExecutorException je) {
420                                throw new AuthorizationException(je);
421                            }
422                        }
423                        else {
424                            throw new AuthorizationException(ErrorCode.E0610);
425                        }
426                        if (jobBean != null && !jobBean.getUser().equals(user)) {
427                            if (!isUserInAcl(user, jobBean.getGroup())) {
428                                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
429                                throw new AuthorizationException(ErrorCode.E0509, user, jobId);
430                            }
431                        }
432                    }
433                    // handle coordinator jobs
434                    else {
435                        CoordinatorJobBean jobBean = null;
436                        JPAService jpaService = Services.get().get(JPAService.class);
437                        if (jpaService != null) {
438                            try {
439                                jobBean = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
440                            }
441                            catch (JPAExecutorException je) {
442                                throw new AuthorizationException(je);
443                            }
444                        }
445                        else {
446                            throw new AuthorizationException(ErrorCode.E0610);
447                        }
448                        if (jobBean != null && !jobBean.getUser().equals(user)) {
449                            if (!isUserInAcl(user, jobBean.getGroup())) {
450                                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
451                                throw new AuthorizationException(ErrorCode.E0509, user, jobId);
452                            }
453                        }
454                    }
455                }
456                catch (IOException ex) {
457                    throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
458                }
459            }
460        }
461    
462        /**
463         * Convenience method for instrumentation counters.
464         *
465         * @param name counter name.
466         * @param count count to increment the counter.
467         */
468        private void incrCounter(String name, int count) {
469            if (instrumentation != null) {
470                instrumentation.incr(INSTRUMENTATION_GROUP, name, count);
471            }
472        }
473    }