001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.io.BufferedReader;
021import java.io.File;
022import java.io.FileInputStream;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.io.InputStreamReader;
026import java.net.URI;
027import java.util.HashSet;
028import java.util.Set;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.oozie.BundleJobBean;
034import org.apache.oozie.CoordinatorJobBean;
035import org.apache.oozie.ErrorCode;
036import org.apache.oozie.WorkflowJobBean;
037import org.apache.oozie.client.XOozieClient;
038import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
039import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
040import org.apache.oozie.executor.jpa.JPAExecutorException;
041import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
042import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
043import org.apache.oozie.util.ConfigUtils;
044import org.apache.oozie.util.Instrumentation;
045import org.apache.oozie.util.XLog;
046
047/**
048 * The authorization service provides all authorization checks.
049 */
050public class AuthorizationService implements Service {
051
052    public static final String CONF_PREFIX = Service.CONF_PREFIX + "AuthorizationService.";
053
054    /**
055     * Configuration parameter to enable or disable Oozie admin role.
056     */
057    public static final String CONF_SECURITY_ENABLED = CONF_PREFIX + "security.enabled";
058
059    /**
060     * Configuration parameter to enable or disable Oozie admin role.
061     */
062    public static final String CONF_AUTHORIZATION_ENABLED = CONF_PREFIX + "authorization.enabled";
063
064    /**
065     * Configuration parameter to enable old behavior default group as ACL.
066     */
067    public static final String CONF_DEFAULT_GROUP_AS_ACL = CONF_PREFIX + "default.group.as.acl";
068
069    /**
070     * Configuration parameter to define admin groups, if NULL/empty the adminusers.txt file is used.
071     */
072    public static final String CONF_ADMIN_GROUPS = CONF_PREFIX + "admin.groups";
073
074    /**
075     * File that contains list of admin users for Oozie.
076     */
077    public static final String ADMIN_USERS_FILE = "adminusers.txt";
078
079    protected static final String INSTRUMENTATION_GROUP = "authorization";
080    protected static final String INSTR_FAILED_AUTH_COUNTER = "authorization.failed";
081
082    private Set<String> adminGroups;
083    private Set<String> adminUsers;
084    private boolean authorizationEnabled;
085    private boolean useDefaultGroupAsAcl;
086
087    private final XLog log = XLog.getLog(getClass());
088    private Instrumentation instrumentation;
089
090    private String[] getTrimmedStrings(String str) {
091        if (null == str || "".equals(str.trim())) {
092            return new String[0];
093        }
094        return str.trim().split("\\s*,\\s*");
095    }
096
097    /**
098     * Initialize the service. <p/> Reads the security related configuration. parameters - security enabled and list of
099     * super users.
100     *
101     * @param services services instance.
102     * @throws ServiceException thrown if the service could not be initialized.
103     */
104    public void init(Services services) throws ServiceException {
105        authorizationEnabled =
106            ConfigUtils.getWithDeprecatedCheck(services.getConf(), CONF_AUTHORIZATION_ENABLED,
107                                               CONF_SECURITY_ENABLED, false);
108        if (authorizationEnabled) {
109            log.info("Oozie running with authorization enabled");
110            useDefaultGroupAsAcl = Services.get().getConf().getBoolean(CONF_DEFAULT_GROUP_AS_ACL, false);
111            String[] str = getTrimmedStrings(Services.get().getConf().get(CONF_ADMIN_GROUPS));
112            if (str.length > 0) {
113                log.info("Admin users will be checked against the defined admin groups");
114                adminGroups = new HashSet<String>();
115                for (String s : str) {
116                    adminGroups.add(s.trim());
117                }
118            }
119            else {
120                log.info("Admin users will be checked against the 'adminusers.txt' file contents");
121                adminUsers = new HashSet<String>();
122                loadAdminUsers();
123            }
124        }
125        else {
126            log.warn("Oozie running with authorization disabled");
127        }
128        instrumentation = Services.get().get(InstrumentationService.class).get();
129    }
130
131    /**
132     * Return if security is enabled or not.
133     *
134     * @return if security is enabled or not.
135     */
136    @Deprecated
137    public boolean isSecurityEnabled() {
138        return authorizationEnabled;
139    }
140
141    public boolean useDefaultGroupAsAcl() {
142        return useDefaultGroupAsAcl;
143    }
144
145    /**
146     * Return if security is enabled or not.
147     *
148     * @return if security is enabled or not.
149     */
150    public boolean isAuthorizationEnabled() {
151        return isSecurityEnabled();
152    }
153
154    /**
155     * Load the list of admin users from {@link AuthorizationService#ADMIN_USERS_FILE} </p>
156     *
157     * @throws ServiceException if the admin user list could not be loaded.
158     */
159    private void loadAdminUsers() throws ServiceException {
160        String configDir = Services.get().get(ConfigurationService.class).getConfigDir();
161        if (configDir != null) {
162            File file = new File(configDir, ADMIN_USERS_FILE);
163            if (file.exists()) {
164                try {
165                    BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
166                    try {
167                        String line = br.readLine();
168                        while (line != null) {
169                            line = line.trim();
170                            if (line.length() > 0 && !line.startsWith("#")) {
171                                adminUsers.add(line);
172                            }
173                            line = br.readLine();
174                        }
175                    }
176                    catch (IOException ex) {
177                        throw new ServiceException(ErrorCode.E0160, file.getAbsolutePath(), ex);
178                    }
179                }
180                catch (FileNotFoundException ex) {
181                    throw new ServiceException(ErrorCode.E0160, file.getAbsolutePath(), ex);
182                }
183            }
184            else {
185                log.warn("Admin users file not available in config dir [{0}], running without admin users", configDir);
186            }
187        }
188        else {
189            log.warn("Reading configuration from classpath, running without admin users");
190        }
191    }
192
193    /**
194     * Destroy the service. <p/> This implementation does a NOP.
195     */
196    public void destroy() {
197    }
198
199    /**
200     * Return the public interface of the service.
201     *
202     * @return {@link AuthorizationService}.
203     */
204    public Class<? extends Service> getInterface() {
205        return AuthorizationService.class;
206    }
207
208    /**
209     * Check if the user belongs to the group or not.
210     *
211     * @param user user name.
212     * @param group group name.
213     * @return if the user belongs to the group or not.
214     * @throws AuthorizationException thrown if the authorization query can not be performed.
215     */
216    protected boolean isUserInGroup(String user, String group) throws AuthorizationException {
217        GroupsService groupsService = Services.get().get(GroupsService.class);
218        try {
219            return groupsService.getGroups(user).contains(group);
220        }
221        catch (IOException ex) {
222            throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
223        }
224    }
225
226    /**
227     * Check if the user belongs to the group or not. <p/> <p/> Subclasses should override the {@link #isUserInGroup}
228     * method.
229     *
230     * @param user user name.
231     * @param group group name.
232     * @throws AuthorizationException thrown if the user is not authorized for the group or if the authorization query
233     * can not be performed.
234     */
235    public void authorizeForGroup(String user, String group) throws AuthorizationException {
236        if (authorizationEnabled && !isUserInGroup(user, group)) {
237            throw new AuthorizationException(ErrorCode.E0502, user, group);
238        }
239    }
240
241    /**
242     * Return the default group to which the user belongs. <p/> This implementation always returns 'users'.
243     *
244     * @param user user name.
245     * @return default group of user.
246     * @throws AuthorizationException thrown if the default group con not be retrieved.
247     */
248    public String getDefaultGroup(String user) throws AuthorizationException {
249        try {
250            return Services.get().get(GroupsService.class).getGroups(user).get(0);
251        }
252        catch (IOException ex) {
253            throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
254        }
255    }
256
257    /**
258     * Check if the user has admin privileges. <p/> If admin is disabled it returns always <code>true</code>. <p/> If
259     * admin is enabled it returns <code>true</code> if the user is in the <code>adminusers.txt</code> file.
260     *
261     * @param user user name.
262     * @return if the user has admin privileges or not.
263     */
264    protected boolean isAdmin(String user) {
265        boolean admin = false;
266        if (adminUsers != null) {
267            admin = adminUsers.contains(user);
268        }
269        else {
270            for (String adminGroup : adminGroups) {
271                try {
272                    admin = isUserInGroup(user, adminGroup);
273                    if (admin) {
274                        break;
275                    }
276                }
277                catch (AuthorizationException ex) {
278                    log.warn("Admin check failed, " + ex.toString(), ex);
279                    break;
280                }
281            }
282        }
283        return admin;
284    }
285
286    /**
287     * Check if the user has admin privileges. <p/> Subclasses should override the {@link #isUserInGroup} method.
288     *
289     * @param user user name.
290     * @param write indicates if the check is for read or write admin tasks (in this implementation this is ignored)
291     * @throws AuthorizationException thrown if user does not have admin priviledges.
292     */
293    public void authorizeForAdmin(String user, boolean write) throws AuthorizationException {
294        if (authorizationEnabled && write && !isAdmin(user)) {
295            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
296            throw new AuthorizationException(ErrorCode.E0503, user);
297        }
298    }
299
300    /**
301     * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the
302     * file system permissions on the workflow application.
303     *
304     * @param user user name.
305     * @param group group name.
306     * @param appPath application path.
307     * @throws AuthorizationException thrown if the user is not authorized for the app.
308     */
309    public void authorizeForApp(String user, String group, String appPath, Configuration jobConf)
310            throws AuthorizationException {
311        try {
312            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
313            URI uri = new Path(appPath).toUri();
314            Configuration fsConf = has.createJobConf(uri.getAuthority());
315            FileSystem fs = has.createFileSystem(user, uri, fsConf);
316
317            Path path = new Path(appPath);
318            try {
319                if (!fs.exists(path)) {
320                    incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
321                    throw new AuthorizationException(ErrorCode.E0504, appPath);
322                }
323                Path wfXml = new Path(path, "workflow.xml");
324                if (!fs.exists(wfXml)) {
325                    incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
326                    throw new AuthorizationException(ErrorCode.E0505, appPath);
327                }
328                if (!fs.isFile(wfXml)) {
329                    incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
330                    throw new AuthorizationException(ErrorCode.E0506, appPath);
331                }
332                fs.open(wfXml).close();
333            }
334            // TODO change this when stopping support of 0.18 to the new
335            // Exception
336            catch (org.apache.hadoop.fs.permission.AccessControlException ex) {
337                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
338                throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex);
339            }
340        }
341        catch (IOException ex) {
342            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
343            throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
344        }
345        catch (HadoopAccessorException e) {
346            throw new AuthorizationException(e);
347        }
348    }
349
350    /**
351     * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the
352     * file system permissions on the workflow application.
353     *
354     * @param user user name.
355     * @param group group name.
356     * @param appPath application path.
357     * @param fileName workflow or coordinator.xml
358     * @param conf
359     * @throws AuthorizationException thrown if the user is not authorized for the app.
360     */
361    public void authorizeForApp(String user, String group, String appPath, String fileName, Configuration conf)
362            throws AuthorizationException {
363        try {
364            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
365            URI uri = new Path(appPath).toUri();
366            Configuration fsConf = has.createJobConf(uri.getAuthority());
367            FileSystem fs = has.createFileSystem(user, uri, fsConf);
368
369            Path path = new Path(appPath);
370            try {
371                if (!fs.exists(path)) {
372                    incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
373                    throw new AuthorizationException(ErrorCode.E0504, appPath);
374                }
375                if (conf.get(XOozieClient.IS_PROXY_SUBMISSION) == null) { // Only further check existence of job definition files for non proxy submission jobs;
376                    if (!fs.isFile(path)) {
377                        Path appXml = new Path(path, fileName);
378                        if (!fs.exists(appXml)) {
379                            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
380                            throw new AuthorizationException(ErrorCode.E0505, appPath);
381                        }
382                        if (!fs.isFile(appXml)) {
383                            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
384                            throw new AuthorizationException(ErrorCode.E0506, appPath);
385                        }
386                        fs.open(appXml).close();
387                    }
388                }
389            }
390            // TODO change this when stopping support of 0.18 to the new
391            // Exception
392            catch (org.apache.hadoop.fs.permission.AccessControlException ex) {
393                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
394                throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex);
395            }
396        }
397        catch (IOException ex) {
398            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
399            throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
400        }
401        catch (HadoopAccessorException e) {
402            throw new AuthorizationException(e);
403        }
404    }
405
406    private boolean isUserInAcl(String user, String aclStr) throws IOException {
407        boolean userInAcl = false;
408        if (aclStr != null && aclStr.trim().length() > 0) {
409            GroupsService groupsService = Services.get().get(GroupsService.class);
410            String[] acl = aclStr.split(",");
411            for (int i = 0; !userInAcl && i < acl.length; i++) {
412                String aclItem = acl[i].trim();
413                userInAcl = aclItem.equals(user) || groupsService.getGroups(user).equals(aclItem);
414            }
415        }
416        return userInAcl;
417    }
418
419    /**
420     * Check if the user+group is authorized to operate on the specified job. <p/> Checks if the user is a super-user or
421     * the one who started the job. <p/> Read operations are allowed to all users.
422     *
423     * @param user user name.
424     * @param jobId job id.
425     * @param write indicates if the check is for read or write job tasks.
426     * @throws AuthorizationException thrown if the user is not authorized for the job.
427     */
428    public void authorizeForJob(String user, String jobId, boolean write) throws AuthorizationException {
429        if (authorizationEnabled && write && !isAdmin(user)) {
430            try {
431                // handle workflow jobs
432                if (jobId.endsWith("-W")) {
433                    WorkflowJobBean jobBean = null;
434                    JPAService jpaService = Services.get().get(JPAService.class);
435                    if (jpaService != null) {
436                        try {
437                            jobBean = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_USER_GROUP, jobId);
438                        }
439                        catch (JPAExecutorException je) {
440                            throw new AuthorizationException(je);
441                        }
442                    }
443                    else {
444                        throw new AuthorizationException(ErrorCode.E0610);
445                    }
446                    if (jobBean != null && !jobBean.getUser().equals(user)) {
447                        if (!isUserInAcl(user, jobBean.getGroup())) {
448                            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
449                            throw new AuthorizationException(ErrorCode.E0508, user, jobId);
450                        }
451                    }
452                }
453                // handle bundle jobs
454                else if (jobId.endsWith("-B")){
455                    BundleJobBean jobBean = null;
456                    JPAService jpaService = Services.get().get(JPAService.class);
457                    if (jpaService != null) {
458                        try {
459                            jobBean = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
460                        }
461                        catch (JPAExecutorException je) {
462                            throw new AuthorizationException(je);
463                        }
464                    }
465                    else {
466                        throw new AuthorizationException(ErrorCode.E0610);
467                    }
468                    if (jobBean != null && !jobBean.getUser().equals(user)) {
469                        if (!isUserInAcl(user, jobBean.getGroup())) {
470                            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
471                            throw new AuthorizationException(ErrorCode.E0509, user, jobId);
472                        }
473                    }
474                }
475                // handle coordinator jobs
476                else {
477                    CoordinatorJobBean jobBean = null;
478                    JPAService jpaService = Services.get().get(JPAService.class);
479                    if (jpaService != null) {
480                        try {
481                            jobBean = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
482                        }
483                        catch (JPAExecutorException je) {
484                            throw new AuthorizationException(je);
485                        }
486                    }
487                    else {
488                        throw new AuthorizationException(ErrorCode.E0610);
489                    }
490                    if (jobBean != null && !jobBean.getUser().equals(user)) {
491                        if (!isUserInAcl(user, jobBean.getGroup())) {
492                            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
493                            throw new AuthorizationException(ErrorCode.E0509, user, jobId);
494                        }
495                    }
496                }
497            }
498            catch (IOException ex) {
499                throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
500            }
501        }
502    }
503
504    /**
505     * Convenience method for instrumentation counters.
506     *
507     * @param name counter name.
508     * @param count count to increment the counter.
509     */
510    private void incrCounter(String name, int count) {
511        if (instrumentation != null) {
512            instrumentation.incr(INSTRUMENTATION_GROUP, name, count);
513        }
514    }
515}