001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.io.BufferedReader;
022import java.io.File;
023import java.io.FileInputStream;
024import java.io.FileNotFoundException;
025import java.io.IOException;
026import java.io.InputStreamReader;
027import java.net.URI;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.ArrayList;
032import java.util.Set;
033
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.oozie.BundleJobBean;
038import org.apache.oozie.CoordinatorJobBean;
039import org.apache.oozie.ErrorCode;
040import org.apache.oozie.WorkflowJobBean;
041import org.apache.oozie.client.XOozieClient;
042import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
043import org.apache.oozie.executor.jpa.CoordJobInfoGetJPAExecutor;
044import org.apache.oozie.executor.jpa.BundleJobInfoGetJPAExecutor;
045import org.apache.oozie.executor.jpa.WorkflowsJobGetJPAExecutor;
046import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
047import org.apache.oozie.executor.jpa.JPAExecutorException;
048import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
049import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
050import org.apache.oozie.util.ConfigUtils;
051import org.apache.oozie.util.Instrumentation;
052import org.apache.oozie.util.XLog;
053
054/**
055 * The authorization service provides all authorization checks.
056 */
057public class AuthorizationService implements Service {
058
059    public static final String CONF_PREFIX = Service.CONF_PREFIX + "AuthorizationService.";
060
061    /**
062     * Configuration parameter to enable or disable Oozie admin role.
063     */
064    public static final String CONF_SECURITY_ENABLED = CONF_PREFIX + "security.enabled";
065
066    /**
067     * Configuration parameter to enable or disable Oozie admin role.
068     */
069    public static final String CONF_AUTHORIZATION_ENABLED = CONF_PREFIX + "authorization.enabled";
070
071    /**
072     * Configuration parameter to enable old behavior default group as ACL.
073     */
074    public static final String CONF_DEFAULT_GROUP_AS_ACL = CONF_PREFIX + "default.group.as.acl";
075
076    /**
077     * Configuration parameter to define admin groups, if NULL/empty the adminusers.txt file is used.
078     */
079    public static final String CONF_ADMIN_GROUPS = CONF_PREFIX + "admin.groups";
080
081    /**
082     * File that contains list of admin users for Oozie.
083     */
084    public static final String ADMIN_USERS_FILE = "adminusers.txt";
085
086    protected static final String INSTRUMENTATION_GROUP = "authorization";
087    protected static final String INSTR_FAILED_AUTH_COUNTER = "authorization.failed";
088
089    private Set<String> adminGroups;
090    private Set<String> adminUsers;
091    private boolean authorizationEnabled;
092    private boolean useDefaultGroupAsAcl;
093
094    private final XLog log = XLog.getLog(getClass());
095    private Instrumentation instrumentation;
096
097    private String[] getTrimmedStrings(String str) {
098        if (null == str || "".equals(str.trim())) {
099            return new String[0];
100        }
101        return str.trim().split("\\s*,\\s*");
102    }
103
104    /**
105     * Initialize the service. <p> Reads the security related configuration. parameters - security enabled and list of
106     * super users.
107     *
108     * @param services services instance.
109     * @throws ServiceException thrown if the service could not be initialized.
110     */
111    public void init(Services services) throws ServiceException {
112        authorizationEnabled =
113            ConfigUtils.getWithDeprecatedCheck(services.getConf(), CONF_AUTHORIZATION_ENABLED,
114                                               CONF_SECURITY_ENABLED, false);
115        if (authorizationEnabled) {
116            log.info("Oozie running with authorization enabled");
117            useDefaultGroupAsAcl = ConfigurationService.getBoolean(CONF_DEFAULT_GROUP_AS_ACL);
118            String[] str = getTrimmedStrings(Services.get().getConf().get(CONF_ADMIN_GROUPS));
119            if (str.length > 0) {
120                log.info("Admin users will be checked against the defined admin groups");
121                adminGroups = new HashSet<String>();
122                for (String s : str) {
123                    adminGroups.add(s.trim());
124                }
125            }
126            else {
127                log.info("Admin users will be checked against the 'adminusers.txt' file contents");
128                adminUsers = new HashSet<String>();
129                loadAdminUsers();
130            }
131        }
132        else {
133            log.warn("Oozie running with authorization disabled");
134        }
135        instrumentation = Services.get().get(InstrumentationService.class).get();
136    }
137
138    /**
139     * Return if security is enabled or not.
140     *
141     * @return if security is enabled or not.
142     */
143    @Deprecated
144    public boolean isSecurityEnabled() {
145        return authorizationEnabled;
146    }
147
148    public boolean useDefaultGroupAsAcl() {
149        return useDefaultGroupAsAcl;
150    }
151
152    /**
153     * Return if security is enabled or not.
154     *
155     * @return if security is enabled or not.
156     */
157    public boolean isAuthorizationEnabled() {
158        return isSecurityEnabled();
159    }
160
161    /**
162     * Load the list of admin users from {@link AuthorizationService#ADMIN_USERS_FILE} </p>
163     *
164     * @throws ServiceException if the admin user list could not be loaded.
165     */
166    private void loadAdminUsers() throws ServiceException {
167        String configDir = Services.get().get(ConfigurationService.class).getConfigDir();
168        if (configDir != null) {
169            File file = new File(configDir, ADMIN_USERS_FILE);
170            if (file.exists()) {
171                try {
172                    BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
173                    try {
174                        String line = br.readLine();
175                        while (line != null) {
176                            line = line.trim();
177                            if (line.length() > 0 && !line.startsWith("#")) {
178                                adminUsers.add(line);
179                            }
180                            line = br.readLine();
181                        }
182                    }
183                    catch (IOException ex) {
184                        throw new ServiceException(ErrorCode.E0160, file.getAbsolutePath(), ex);
185                    }
186                }
187                catch (FileNotFoundException ex) {
188                    throw new ServiceException(ErrorCode.E0160, file.getAbsolutePath(), ex);
189                }
190            }
191            else {
192                log.warn("Admin users file not available in config dir [{0}], running without admin users", configDir);
193            }
194        }
195        else {
196            log.warn("Reading configuration from classpath, running without admin users");
197        }
198    }
199
200    /**
201     * Destroy the service. <p> This implementation does a NOP.
202     */
203    public void destroy() {
204    }
205
206    /**
207     * Return the public interface of the service.
208     *
209     * @return {@link AuthorizationService}.
210     */
211    public Class<? extends Service> getInterface() {
212        return AuthorizationService.class;
213    }
214
215    /**
216     * Check if the user belongs to the group or not.
217     *
218     * @param user user name.
219     * @param group group name.
220     * @return if the user belongs to the group or not.
221     * @throws AuthorizationException thrown if the authorization query can not be performed.
222     */
223    protected boolean isUserInGroup(String user, String group) throws AuthorizationException {
224        GroupsService groupsService = Services.get().get(GroupsService.class);
225        try {
226            return groupsService.getGroups(user).contains(group);
227        }
228        catch (IOException ex) {
229            throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
230        }
231    }
232
233    /**
234     * Check if the user belongs to the group or not. <p> <p> Subclasses should override the {@link #isUserInGroup}
235     * method.
236     *
237     * @param user user name.
238     * @param group group name.
239     * @throws AuthorizationException thrown if the user is not authorized for the group or if the authorization query
240     * can not be performed.
241     */
242    public void authorizeForGroup(String user, String group) throws AuthorizationException {
243        if (authorizationEnabled && !isUserInGroup(user, group)) {
244            throw new AuthorizationException(ErrorCode.E0502, user, group);
245        }
246    }
247
248    /**
249     * Return the default group to which the user belongs. <p> This implementation always returns 'users'.
250     *
251     * @param user user name.
252     * @return default group of user.
253     * @throws AuthorizationException thrown if the default group con not be retrieved.
254     */
255    public String getDefaultGroup(String user) throws AuthorizationException {
256        try {
257            return Services.get().get(GroupsService.class).getGroups(user).get(0);
258        }
259        catch (IOException ex) {
260            throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
261        }
262    }
263
264    /**
265     * Check if the user has admin privileges. <p> If admin is disabled it returns always <code>true</code>. <p> If
266     * admin is enabled it returns <code>true</code> if the user is in the <code>adminusers.txt</code> file.
267     *
268     * @param user user name.
269     * @return if the user has admin privileges or not.
270     */
271    protected boolean isAdmin(String user) {
272        boolean admin = false;
273        if (adminUsers != null) {
274            admin = adminUsers.contains(user);
275        }
276        else {
277            for (String adminGroup : adminGroups) {
278                try {
279                    admin = isUserInGroup(user, adminGroup);
280                    if (admin) {
281                        break;
282                    }
283                }
284                catch (AuthorizationException ex) {
285                    log.warn("Admin check failed, " + ex.toString(), ex);
286                    break;
287                }
288            }
289        }
290        return admin;
291    }
292
293    /**
294     * Check if the user has admin privileges. <p> Subclasses should override the {@link #isUserInGroup} method.
295     *
296     * @param user user name.
297     * @param write indicates if the check is for read or write admin tasks (in this implementation this is ignored)
298     * @throws AuthorizationException thrown if user does not have admin privileges.
299     */
300    public void authorizeForAdmin(String user, boolean write) throws AuthorizationException {
301        if (authorizationEnabled && write && !isAdmin(user)) {
302            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
303            throw new AuthorizationException(ErrorCode.E0503, user);
304        }
305    }
306
307    /**
308     * Check if the user+group is authorized to use the specified application. <p> The check is done by checking the
309     * file system permissions on the workflow application.
310     *
311     * @param user user name.
312     * @param group group name.
313     * @param appPath application path.
314     * @throws AuthorizationException thrown if the user is not authorized for the app.
315     */
316    public void authorizeForApp(String user, String group, String appPath, Configuration jobConf)
317            throws AuthorizationException {
318        try {
319            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
320            URI uri = new Path(appPath).toUri();
321            Configuration fsConf = has.createJobConf(uri.getAuthority());
322            FileSystem fs = has.createFileSystem(user, uri, fsConf);
323
324            Path path = new Path(appPath);
325            try {
326                if (!fs.exists(path)) {
327                    incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
328                    throw new AuthorizationException(ErrorCode.E0504, appPath);
329                }
330                Path wfXml = new Path(path, "workflow.xml");
331                if (!fs.exists(wfXml)) {
332                    incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
333                    throw new AuthorizationException(ErrorCode.E0505, appPath);
334                }
335                if (!fs.isFile(wfXml)) {
336                    incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
337                    throw new AuthorizationException(ErrorCode.E0506, appPath);
338                }
339                fs.open(wfXml).close();
340            }
341            // TODO change this when stopping support of 0.18 to the new
342            // Exception
343            catch (org.apache.hadoop.fs.permission.AccessControlException ex) {
344                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
345                throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex);
346            }
347        }
348        catch (IOException ex) {
349            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
350            throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
351        }
352        catch (HadoopAccessorException e) {
353            throw new AuthorizationException(e);
354        }
355    }
356
357    /**
358     * Check if the user+group is authorized to use the specified application. <p> The check is done by checking the
359     * file system permissions on the workflow application.
360     *
361     * @param user user name.
362     * @param group group name.
363     * @param appPath application path.
364     * @param fileName workflow or coordinator.xml
365     * @param conf
366     * @throws AuthorizationException thrown if the user is not authorized for the app.
367     */
368    public void authorizeForApp(String user, String group, String appPath, String fileName, Configuration conf)
369            throws AuthorizationException {
370        try {
371            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
372            URI uri = new Path(appPath).toUri();
373            Configuration fsConf = has.createJobConf(uri.getAuthority());
374            FileSystem fs = has.createFileSystem(user, uri, fsConf);
375
376            Path path = new Path(appPath);
377            try {
378                if (!fs.exists(path)) {
379                    incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
380                    throw new AuthorizationException(ErrorCode.E0504, appPath);
381                }
382                if (conf.get(XOozieClient.IS_PROXY_SUBMISSION) == null) { // Only further check existence of job definition files for non proxy submission jobs;
383                    if (!fs.isFile(path)) {
384                        Path appXml = new Path(path, fileName);
385                        if (!fs.exists(appXml)) {
386                            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
387                            throw new AuthorizationException(ErrorCode.E0505, appPath);
388                        }
389                        if (!fs.isFile(appXml)) {
390                            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
391                            throw new AuthorizationException(ErrorCode.E0506, appPath);
392                        }
393                        fs.open(appXml).close();
394                    }
395                }
396            }
397            // TODO change this when stopping support of 0.18 to the new
398            // Exception
399            catch (org.apache.hadoop.fs.permission.AccessControlException ex) {
400                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
401                throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex);
402            }
403        }
404        catch (IOException ex) {
405            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
406            throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
407        }
408        catch (HadoopAccessorException e) {
409            throw new AuthorizationException(e);
410        }
411    }
412
413    private boolean isUserInAcl(String user, String aclStr) throws IOException {
414        boolean userInAcl = false;
415        if (aclStr != null && aclStr.trim().length() > 0) {
416            GroupsService groupsService = Services.get().get(GroupsService.class);
417            String[] acl = aclStr.split(",");
418            for (int i = 0; !userInAcl && i < acl.length; i++) {
419                String aclItem = acl[i].trim();
420                userInAcl = aclItem.equals(user) || groupsService.getGroups(user).contains(aclItem);
421            }
422        }
423        return userInAcl;
424    }
425
426    /**
427     * Check if the user+group is authorized to operate on the specified job. <p> Checks if the user is a super-user or
428     * the one who started the job. <p> Read operations are allowed to all users.
429     *
430     * @param user user name.
431     * @param jobId job id.
432     * @param write indicates if the check is for read or write job tasks.
433     * @throws AuthorizationException thrown if the user is not authorized for the job.
434     */
435    public void authorizeForJob(String user, String jobId, boolean write) throws AuthorizationException {
436        if (authorizationEnabled && write && !isAdmin(user)) {
437            try {
438                // handle workflow jobs
439                if (jobId.endsWith("-W")) {
440                    WorkflowJobBean jobBean = null;
441                    JPAService jpaService = Services.get().get(JPAService.class);
442                    if (jpaService != null) {
443                        try {
444                            jobBean = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_USER_GROUP, jobId);
445                        }
446                        catch (JPAExecutorException je) {
447                            throw new AuthorizationException(je);
448                        }
449                    }
450                    else {
451                        throw new AuthorizationException(ErrorCode.E0610);
452                    }
453                    if (jobBean != null && !jobBean.getUser().equals(user)) {
454                        if (!isUserInAcl(user, jobBean.getGroup())) {
455                            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
456                            throw new AuthorizationException(ErrorCode.E0508, user, jobId);
457                        }
458                    }
459                }
460                // handle bundle jobs
461                else if (jobId.endsWith("-B")){
462                    BundleJobBean jobBean = null;
463                    JPAService jpaService = Services.get().get(JPAService.class);
464                    if (jpaService != null) {
465                        try {
466                            jobBean = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
467                        }
468                        catch (JPAExecutorException je) {
469                            throw new AuthorizationException(je);
470                        }
471                    }
472                    else {
473                        throw new AuthorizationException(ErrorCode.E0610);
474                    }
475                    if (jobBean != null && !jobBean.getUser().equals(user)) {
476                        if (!isUserInAcl(user, jobBean.getGroup())) {
477                            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
478                            throw new AuthorizationException(ErrorCode.E0509, user, jobId);
479                        }
480                    }
481                }
482                // handle coordinator jobs
483                else {
484                    CoordinatorJobBean jobBean = null;
485                    JPAService jpaService = Services.get().get(JPAService.class);
486                    if (jpaService != null) {
487                        try {
488                            jobBean = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
489                        }
490                        catch (JPAExecutorException je) {
491                            throw new AuthorizationException(je);
492                        }
493                    }
494                    else {
495                        throw new AuthorizationException(ErrorCode.E0610);
496                    }
497                    if (jobBean != null && !jobBean.getUser().equals(user)) {
498                        if (!isUserInAcl(user, jobBean.getGroup())) {
499                            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
500                            throw new AuthorizationException(ErrorCode.E0509, user, jobId);
501                        }
502                    }
503                }
504            }
505            catch (IOException ex) {
506                throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
507            }
508        }
509    }
510
511    /**
512     * Check if the user+group is authorized to operate on the specified jobs. <p> Checks if the user is a super-user or
513     * the one who started the jobs. <p> Read operations are allowed to all users.
514     *
515     * @param user user name.
516     * @param filter filter used to select jobs
517     * @param start starting index of the jobs in DB
518     * @param len maximum amount of jobs to select
519     * @param write indicates if the check is for read or write job tasks.
520     * @throws AuthorizationException thrown if the user is not authorized for the job.
521     */
522    public void authorizeForJobs(String user, Map<String, List<String>> filter, String jobType,
523                                 int start, int len, boolean write) throws AuthorizationException {
524        if (authorizationEnabled && write && !isAdmin(user)) {
525            try {
526                // handle workflow jobs
527                if (jobType.equals("wf")) {
528                    List<WorkflowJobBean> jobBeans = new ArrayList<WorkflowJobBean>();
529                    JPAService jpaService = Services.get().get(JPAService.class);
530                    if (jpaService != null) {
531                        try {
532                            jobBeans = jpaService.execute(new WorkflowsJobGetJPAExecutor(
533                                    filter, start, len)).getWorkflows();
534                        }
535                        catch (JPAExecutorException je) {
536                            throw new AuthorizationException(je);
537                        }
538                    }
539                    else {
540                        throw new AuthorizationException(ErrorCode.E0610);
541                    }
542                    for (WorkflowJobBean jobBean : jobBeans) {
543                        if (jobBean != null && !jobBean.getUser().equals(user)) {
544                            if (!isUserInAcl(user, jobBean.getGroup())) {
545                                    incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
546                                throw new AuthorizationException(ErrorCode.E0508, user, jobBean.getId());
547                            }
548                        }
549                    }
550                }
551                // handle bundle jobs
552                else if (jobType.equals("bundle")) {
553                    List<BundleJobBean> jobBeans = new ArrayList<BundleJobBean>();
554                    JPAService jpaService = Services.get().get(JPAService.class);
555                    if (jpaService != null) {
556                        try {
557                            jobBeans = jpaService.execute(new BundleJobInfoGetJPAExecutor(
558                                    filter, start, len)).getBundleJobs();
559                        }
560                        catch (JPAExecutorException je) {
561                            throw new AuthorizationException(je);
562                        }
563                    }
564                    else {
565                        throw new AuthorizationException(ErrorCode.E0610);
566                    }
567                    for (BundleJobBean jobBean : jobBeans){
568                        if (jobBean != null && !jobBean.getUser().equals(user)) {
569                            if (!isUserInAcl(user, jobBean.getGroup())) {
570                                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
571                                throw new AuthorizationException(ErrorCode.E0509, user, jobBean.getId());
572                            }
573                        }
574                    }
575                }
576                // handle coordinator jobs
577                else {
578                    List<CoordinatorJobBean> jobBeans = new ArrayList<CoordinatorJobBean>();
579                    JPAService jpaService = Services.get().get(JPAService.class);
580                    if (jpaService != null) {
581                        try {
582                            jobBeans = jpaService.execute(new CoordJobInfoGetJPAExecutor(
583                                    filter, start, len)).getCoordJobs();
584                        }
585                        catch (JPAExecutorException je) {
586                            throw new AuthorizationException(je);
587                        }
588                    }
589                    else {
590                        throw new AuthorizationException(ErrorCode.E0610);
591                    }
592                    for (CoordinatorJobBean jobBean : jobBeans) {
593                        if (jobBean != null && !jobBean.getUser().equals(user)) {
594                            if (!isUserInAcl(user, jobBean.getGroup())) {
595                                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
596                                throw new AuthorizationException(ErrorCode.E0509, user, jobBean.getId());
597                            }
598                        }
599                    }
600                }
601            }
602            catch (IOException ex) {
603                throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
604            }
605        }
606    }
607    /**
608     * Convenience method for instrumentation counters.
609     *
610     * @param name counter name.
611     * @param count count to increment the counter.
612     */
613    private void incrCounter(String name, int count) {
614        if (instrumentation != null) {
615            instrumentation.incr(INSTRUMENTATION_GROUP, name, count);
616        }
617    }
618}