This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.io.BufferedReader;
021 import java.io.File;
022 import java.io.FileInputStream;
023 import java.io.FileNotFoundException;
024 import java.io.IOException;
025 import java.io.InputStreamReader;
026 import java.net.URI;
027 import java.util.HashSet;
028 import java.util.Set;
029
030 import org.apache.hadoop.conf.Configuration;
031 import org.apache.hadoop.fs.FileSystem;
032 import org.apache.hadoop.fs.Path;
033 import org.apache.oozie.BundleJobBean;
034 import org.apache.oozie.CoordinatorJobBean;
035 import org.apache.oozie.ErrorCode;
036 import org.apache.oozie.WorkflowJobBean;
037 import org.apache.oozie.client.XOozieClient;
038 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
039 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
040 import org.apache.oozie.executor.jpa.JPAExecutorException;
041 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
042 import org.apache.oozie.util.ConfigUtils;
043 import org.apache.oozie.util.Instrumentation;
044 import org.apache.oozie.util.XLog;
045
046 /**
047 * The authorization service provides all authorization checks.
048 */
049 public class AuthorizationService implements Service {
050
051 public static final String CONF_PREFIX = Service.CONF_PREFIX + "AuthorizationService.";
052
053 /**
054 * Configuration parameter to enable or disable Oozie admin role.
055 */
056 public static final String CONF_SECURITY_ENABLED = CONF_PREFIX + "security.enabled";
057
058 /**
059 * Configuration parameter to enable or disable Oozie admin role.
060 */
061 public static final String CONF_AUTHORIZATION_ENABLED = CONF_PREFIX + "authorization.enabled";
062
063 /**
064 * Configuration parameter to enable old behavior default group as ACL.
065 */
066 public static final String CONF_DEFAULT_GROUP_AS_ACL = CONF_PREFIX + "default.group.as.acl";
067
068 /**
069 * Configuration parameter to define admin groups, if NULL/empty the adminusers.txt file is used.
070 */
071 public static final String CONF_ADMIN_GROUPS = CONF_PREFIX + "admin.groups";
072
073 /**
074 * File that contains list of admin users for Oozie.
075 */
076 public static final String ADMIN_USERS_FILE = "adminusers.txt";
077
078 protected static final String INSTRUMENTATION_GROUP = "authorization";
079 protected static final String INSTR_FAILED_AUTH_COUNTER = "authorization.failed";
080
081 private Set<String> adminGroups;
082 private Set<String> adminUsers;
083 private boolean authorizationEnabled;
084 private boolean useDefaultGroupAsAcl;
085
086 private final XLog log = XLog.getLog(getClass());
087 private Instrumentation instrumentation;
088
089 private String[] getTrimmedStrings(String str) {
090 if (null == str || "".equals(str.trim())) {
091 return new String[0];
092 }
093 return str.trim().split("\\s*,\\s*");
094 }
095
096 /**
097 * Initialize the service. <p/> Reads the security related configuration. parameters - security enabled and list of
098 * super users.
099 *
100 * @param services services instance.
101 * @throws ServiceException thrown if the service could not be initialized.
102 */
103 public void init(Services services) throws ServiceException {
104 authorizationEnabled =
105 ConfigUtils.getWithDeprecatedCheck(services.getConf(), CONF_AUTHORIZATION_ENABLED,
106 CONF_SECURITY_ENABLED, false);
107 if (authorizationEnabled) {
108 log.info("Oozie running with authorization enabled");
109 useDefaultGroupAsAcl = Services.get().getConf().getBoolean(CONF_DEFAULT_GROUP_AS_ACL, false);
110 String[] str = getTrimmedStrings(Services.get().getConf().get(CONF_ADMIN_GROUPS));
111 if (str.length > 0) {
112 log.info("Admin users will be checked against the defined admin groups");
113 adminGroups = new HashSet<String>();
114 for (String s : str) {
115 adminGroups.add(s.trim());
116 }
117 }
118 else {
119 log.info("Admin users will be checked against the 'adminusers.txt' file contents");
120 adminUsers = new HashSet<String>();
121 loadAdminUsers();
122 }
123 }
124 else {
125 log.warn("Oozie running with authorization disabled");
126 }
127 instrumentation = Services.get().get(InstrumentationService.class).get();
128 }
129
130 /**
131 * Return if security is enabled or not.
132 *
133 * @return if security is enabled or not.
134 */
135 @Deprecated
136 public boolean isSecurityEnabled() {
137 return authorizationEnabled;
138 }
139
140 public boolean useDefaultGroupAsAcl() {
141 return useDefaultGroupAsAcl;
142 }
143
144 /**
145 * Return if security is enabled or not.
146 *
147 * @return if security is enabled or not.
148 */
149 public boolean isAuthorizationEnabled() {
150 return isSecurityEnabled();
151 }
152
153 /**
154 * Load the list of admin users from {@link AuthorizationService#ADMIN_USERS_FILE} </p>
155 *
156 * @throws ServiceException if the admin user list could not be loaded.
157 */
158 private void loadAdminUsers() throws ServiceException {
159 String configDir = Services.get().get(ConfigurationService.class).getConfigDir();
160 if (configDir != null) {
161 File file = new File(configDir, ADMIN_USERS_FILE);
162 if (file.exists()) {
163 try {
164 BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
165 try {
166 String line = br.readLine();
167 while (line != null) {
168 line = line.trim();
169 if (line.length() > 0 && !line.startsWith("#")) {
170 adminUsers.add(line);
171 }
172 line = br.readLine();
173 }
174 }
175 catch (IOException ex) {
176 throw new ServiceException(ErrorCode.E0160, file.getAbsolutePath(), ex);
177 }
178 }
179 catch (FileNotFoundException ex) {
180 throw new ServiceException(ErrorCode.E0160, file.getAbsolutePath(), ex);
181 }
182 }
183 else {
184 log.warn("Admin users file not available in config dir [{0}], running without admin users", configDir);
185 }
186 }
187 else {
188 log.warn("Reading configuration from classpath, running without admin users");
189 }
190 }
191
192 /**
193 * Destroy the service. <p/> This implementation does a NOP.
194 */
195 public void destroy() {
196 }
197
198 /**
199 * Return the public interface of the service.
200 *
201 * @return {@link AuthorizationService}.
202 */
203 public Class<? extends Service> getInterface() {
204 return AuthorizationService.class;
205 }
206
207 /**
208 * Check if the user belongs to the group or not.
209 *
210 * @param user user name.
211 * @param group group name.
212 * @return if the user belongs to the group or not.
213 * @throws AuthorizationException thrown if the authorization query can not be performed.
214 */
215 protected boolean isUserInGroup(String user, String group) throws AuthorizationException {
216 GroupsService groupsService = Services.get().get(GroupsService.class);
217 try {
218 return groupsService.getGroups(user).contains(group);
219 }
220 catch (IOException ex) {
221 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
222 }
223 }
224
225 /**
226 * Check if the user belongs to the group or not. <p/> <p/> Subclasses should override the {@link #isUserInGroup}
227 * method.
228 *
229 * @param user user name.
230 * @param group group name.
231 * @throws AuthorizationException thrown if the user is not authorized for the group or if the authorization query
232 * can not be performed.
233 */
234 public void authorizeForGroup(String user, String group) throws AuthorizationException {
235 if (authorizationEnabled && !isUserInGroup(user, group)) {
236 throw new AuthorizationException(ErrorCode.E0502, user, group);
237 }
238 }
239
240 /**
241 * Return the default group to which the user belongs. <p/> This implementation always returns 'users'.
242 *
243 * @param user user name.
244 * @return default group of user.
245 * @throws AuthorizationException thrown if the default group con not be retrieved.
246 */
247 public String getDefaultGroup(String user) throws AuthorizationException {
248 try {
249 return Services.get().get(GroupsService.class).getGroups(user).get(0);
250 }
251 catch (IOException ex) {
252 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
253 }
254 }
255
256 /**
257 * Check if the user has admin privileges. <p/> If admin is disabled it returns always <code>true</code>. <p/> If
258 * admin is enabled it returns <code>true</code> if the user is in the <code>adminusers.txt</code> file.
259 *
260 * @param user user name.
261 * @return if the user has admin privileges or not.
262 */
263 protected boolean isAdmin(String user) {
264 boolean admin = false;
265 if (adminUsers != null) {
266 admin = adminUsers.contains(user);
267 }
268 else {
269 for (String adminGroup : adminGroups) {
270 try {
271 admin = isUserInGroup(user, adminGroup);
272 if (admin) {
273 break;
274 }
275 }
276 catch (AuthorizationException ex) {
277 log.warn("Admin check failed, " + ex.toString(), ex);
278 break;
279 }
280 }
281 }
282 return admin;
283 }
284
285 /**
286 * Check if the user has admin privileges. <p/> Subclasses should override the {@link #isUserInGroup} method.
287 *
288 * @param user user name.
289 * @param write indicates if the check is for read or write admin tasks (in this implementation this is ignored)
290 * @throws AuthorizationException thrown if user does not have admin priviledges.
291 */
292 public void authorizeForAdmin(String user, boolean write) throws AuthorizationException {
293 if (authorizationEnabled && write && !isAdmin(user)) {
294 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
295 throw new AuthorizationException(ErrorCode.E0503, user);
296 }
297 }
298
299 /**
300 * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the
301 * file system permissions on the workflow application.
302 *
303 * @param user user name.
304 * @param group group name.
305 * @param appPath application path.
306 * @throws AuthorizationException thrown if the user is not authorized for the app.
307 */
308 public void authorizeForApp(String user, String group, String appPath, Configuration jobConf)
309 throws AuthorizationException {
310 try {
311 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
312 URI uri = new Path(appPath).toUri();
313 Configuration fsConf = has.createJobConf(uri.getAuthority());
314 FileSystem fs = has.createFileSystem(user, uri, fsConf);
315
316 Path path = new Path(appPath);
317 try {
318 if (!fs.exists(path)) {
319 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
320 throw new AuthorizationException(ErrorCode.E0504, appPath);
321 }
322 Path wfXml = new Path(path, "workflow.xml");
323 if (!fs.exists(wfXml)) {
324 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
325 throw new AuthorizationException(ErrorCode.E0505, appPath);
326 }
327 if (!fs.isFile(wfXml)) {
328 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
329 throw new AuthorizationException(ErrorCode.E0506, appPath);
330 }
331 fs.open(wfXml).close();
332 }
333 // TODO change this when stopping support of 0.18 to the new
334 // Exception
335 catch (org.apache.hadoop.fs.permission.AccessControlException ex) {
336 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
337 throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex);
338 }
339 }
340 catch (IOException ex) {
341 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
342 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
343 }
344 catch (HadoopAccessorException e) {
345 throw new AuthorizationException(e);
346 }
347 }
348
349 /**
350 * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the
351 * file system permissions on the workflow application.
352 *
353 * @param user user name.
354 * @param group group name.
355 * @param appPath application path.
356 * @param fileName workflow or coordinator.xml
357 * @param conf
358 * @throws AuthorizationException thrown if the user is not authorized for the app.
359 */
360 public void authorizeForApp(String user, String group, String appPath, String fileName, Configuration conf)
361 throws AuthorizationException {
362 try {
363 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
364 URI uri = new Path(appPath).toUri();
365 Configuration fsConf = has.createJobConf(uri.getAuthority());
366 FileSystem fs = has.createFileSystem(user, uri, fsConf);
367
368 Path path = new Path(appPath);
369 try {
370 if (!fs.exists(path)) {
371 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
372 throw new AuthorizationException(ErrorCode.E0504, appPath);
373 }
374 if (conf.get(XOozieClient.IS_PROXY_SUBMISSION) == null) { // Only further check existence of job definition files for non proxy submission jobs;
375 if (!fs.isFile(path)) {
376 Path appXml = new Path(path, fileName);
377 if (!fs.exists(appXml)) {
378 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
379 throw new AuthorizationException(ErrorCode.E0505, appPath);
380 }
381 if (!fs.isFile(appXml)) {
382 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
383 throw new AuthorizationException(ErrorCode.E0506, appPath);
384 }
385 fs.open(appXml).close();
386 }
387 }
388 }
389 // TODO change this when stopping support of 0.18 to the new
390 // Exception
391 catch (org.apache.hadoop.fs.permission.AccessControlException ex) {
392 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
393 throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex);
394 }
395 }
396 catch (IOException ex) {
397 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
398 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
399 }
400 catch (HadoopAccessorException e) {
401 throw new AuthorizationException(e);
402 }
403 }
404
405 private boolean isUserInAcl(String user, String aclStr) throws IOException {
406 boolean userInAcl = false;
407 if (aclStr != null && aclStr.trim().length() > 0) {
408 GroupsService groupsService = Services.get().get(GroupsService.class);
409 String[] acl = aclStr.split(",");
410 for (int i = 0; !userInAcl && i < acl.length; i++) {
411 String aclItem = acl[i].trim();
412 userInAcl = aclItem.equals(user) || groupsService.getGroups(user).equals(aclItem);
413 }
414 }
415 return userInAcl;
416 }
417
418 /**
419 * Check if the user+group is authorized to operate on the specified job. <p/> Checks if the user is a super-user or
420 * the one who started the job. <p/> Read operations are allowed to all users.
421 *
422 * @param user user name.
423 * @param jobId job id.
424 * @param write indicates if the check is for read or write job tasks.
425 * @throws AuthorizationException thrown if the user is not authorized for the job.
426 */
427 public void authorizeForJob(String user, String jobId, boolean write) throws AuthorizationException {
428 if (authorizationEnabled && write && !isAdmin(user)) {
429 try {
430 // handle workflow jobs
431 if (jobId.endsWith("-W")) {
432 WorkflowJobBean jobBean = null;
433 JPAService jpaService = Services.get().get(JPAService.class);
434 if (jpaService != null) {
435 try {
436 jobBean = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
437 }
438 catch (JPAExecutorException je) {
439 throw new AuthorizationException(je);
440 }
441 }
442 else {
443 throw new AuthorizationException(ErrorCode.E0610);
444 }
445 if (jobBean != null && !jobBean.getUser().equals(user)) {
446 if (!isUserInAcl(user, jobBean.getGroup())) {
447 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
448 throw new AuthorizationException(ErrorCode.E0508, user, jobId);
449 }
450 }
451 }
452 // handle bundle jobs
453 else if (jobId.endsWith("-B")){
454 BundleJobBean jobBean = null;
455 JPAService jpaService = Services.get().get(JPAService.class);
456 if (jpaService != null) {
457 try {
458 jobBean = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
459 }
460 catch (JPAExecutorException je) {
461 throw new AuthorizationException(je);
462 }
463 }
464 else {
465 throw new AuthorizationException(ErrorCode.E0610);
466 }
467 if (jobBean != null && !jobBean.getUser().equals(user)) {
468 if (!isUserInAcl(user, jobBean.getGroup())) {
469 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
470 throw new AuthorizationException(ErrorCode.E0509, user, jobId);
471 }
472 }
473 }
474 // handle coordinator jobs
475 else {
476 CoordinatorJobBean jobBean = null;
477 JPAService jpaService = Services.get().get(JPAService.class);
478 if (jpaService != null) {
479 try {
480 jobBean = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
481 }
482 catch (JPAExecutorException je) {
483 throw new AuthorizationException(je);
484 }
485 }
486 else {
487 throw new AuthorizationException(ErrorCode.E0610);
488 }
489 if (jobBean != null && !jobBean.getUser().equals(user)) {
490 if (!isUserInAcl(user, jobBean.getGroup())) {
491 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
492 throw new AuthorizationException(ErrorCode.E0509, user, jobId);
493 }
494 }
495 }
496 }
497 catch (IOException ex) {
498 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
499 }
500 }
501 }
502
503 /**
504 * Convenience method for instrumentation counters.
505 *
506 * @param name counter name.
507 * @param count count to increment the counter.
508 */
509 private void incrCounter(String name, int count) {
510 if (instrumentation != null) {
511 instrumentation.incr(INSTRUMENTATION_GROUP, name, count);
512 }
513 }
514 }