001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.io.BufferedReader; 022import java.io.File; 023import java.io.FileInputStream; 024import java.io.FileNotFoundException; 025import java.io.IOException; 026import java.io.InputStreamReader; 027import java.net.URI; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Map; 031import java.util.ArrayList; 032import java.util.Set; 033 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.oozie.BundleJobBean; 038import org.apache.oozie.CoordinatorJobBean; 039import org.apache.oozie.ErrorCode; 040import org.apache.oozie.WorkflowJobBean; 041import org.apache.oozie.client.XOozieClient; 042import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 043import org.apache.oozie.executor.jpa.CoordJobInfoGetJPAExecutor; 044import org.apache.oozie.executor.jpa.BundleJobInfoGetJPAExecutor; 045import org.apache.oozie.executor.jpa.WorkflowsJobGetJPAExecutor; 046import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 047import org.apache.oozie.executor.jpa.JPAExecutorException; 048import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 049import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 050import org.apache.oozie.util.ConfigUtils; 051import org.apache.oozie.util.Instrumentation; 052import org.apache.oozie.util.XLog; 053 054/** 055 * The authorization service provides all authorization checks. 056 */ 057public class AuthorizationService implements Service { 058 059 public static final String CONF_PREFIX = Service.CONF_PREFIX + "AuthorizationService."; 060 061 /** 062 * Configuration parameter to enable or disable Oozie admin role. 063 */ 064 public static final String CONF_SECURITY_ENABLED = CONF_PREFIX + "security.enabled"; 065 066 /** 067 * Configuration parameter to enable or disable Oozie admin role. 068 */ 069 public static final String CONF_AUTHORIZATION_ENABLED = CONF_PREFIX + "authorization.enabled"; 070 071 /** 072 * Configuration parameter to enable old behavior default group as ACL. 073 */ 074 public static final String CONF_DEFAULT_GROUP_AS_ACL = CONF_PREFIX + "default.group.as.acl"; 075 076 /** 077 * Configuration parameter to define admin groups, if NULL/empty the adminusers.txt file is used. 078 */ 079 public static final String CONF_ADMIN_GROUPS = CONF_PREFIX + "admin.groups"; 080 081 /** 082 * File that contains list of admin users for Oozie. 083 */ 084 public static final String ADMIN_USERS_FILE = "adminusers.txt"; 085 086 protected static final String INSTRUMENTATION_GROUP = "authorization"; 087 protected static final String INSTR_FAILED_AUTH_COUNTER = "authorization.failed"; 088 089 private Set<String> adminGroups; 090 private Set<String> adminUsers; 091 private boolean authorizationEnabled; 092 private boolean useDefaultGroupAsAcl; 093 094 private final XLog log = XLog.getLog(getClass()); 095 private Instrumentation instrumentation; 096 097 private String[] getTrimmedStrings(String str) { 098 if (null == str || "".equals(str.trim())) { 099 return new String[0]; 100 } 101 return str.trim().split("\\s*,\\s*"); 102 } 103 104 /** 105 * Initialize the service. <p> Reads the security related configuration. parameters - security enabled and list of 106 * super users. 107 * 108 * @param services services instance. 109 * @throws ServiceException thrown if the service could not be initialized. 110 */ 111 public void init(Services services) throws ServiceException { 112 authorizationEnabled = 113 ConfigUtils.getWithDeprecatedCheck(services.getConf(), CONF_AUTHORIZATION_ENABLED, 114 CONF_SECURITY_ENABLED, false); 115 if (authorizationEnabled) { 116 log.info("Oozie running with authorization enabled"); 117 useDefaultGroupAsAcl = ConfigurationService.getBoolean(CONF_DEFAULT_GROUP_AS_ACL); 118 String[] str = getTrimmedStrings(Services.get().getConf().get(CONF_ADMIN_GROUPS)); 119 if (str.length > 0) { 120 log.info("Admin users will be checked against the defined admin groups"); 121 adminGroups = new HashSet<String>(); 122 for (String s : str) { 123 adminGroups.add(s.trim()); 124 } 125 } 126 else { 127 log.info("Admin users will be checked against the 'adminusers.txt' file contents"); 128 adminUsers = new HashSet<String>(); 129 loadAdminUsers(); 130 } 131 } 132 else { 133 log.warn("Oozie running with authorization disabled"); 134 } 135 instrumentation = Services.get().get(InstrumentationService.class).get(); 136 } 137 138 /** 139 * Return if security is enabled or not. 140 * 141 * @return if security is enabled or not. 142 */ 143 @Deprecated 144 public boolean isSecurityEnabled() { 145 return authorizationEnabled; 146 } 147 148 public boolean useDefaultGroupAsAcl() { 149 return useDefaultGroupAsAcl; 150 } 151 152 /** 153 * Return if security is enabled or not. 154 * 155 * @return if security is enabled or not. 156 */ 157 public boolean isAuthorizationEnabled() { 158 return isSecurityEnabled(); 159 } 160 161 /** 162 * Load the list of admin users from {@link AuthorizationService#ADMIN_USERS_FILE} </p> 163 * 164 * @throws ServiceException if the admin user list could not be loaded. 165 */ 166 private void loadAdminUsers() throws ServiceException { 167 String configDir = Services.get().get(ConfigurationService.class).getConfigDir(); 168 if (configDir != null) { 169 File file = new File(configDir, ADMIN_USERS_FILE); 170 if (file.exists()) { 171 try { 172 BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file))); 173 try { 174 String line = br.readLine(); 175 while (line != null) { 176 line = line.trim(); 177 if (line.length() > 0 && !line.startsWith("#")) { 178 adminUsers.add(line); 179 } 180 line = br.readLine(); 181 } 182 } 183 catch (IOException ex) { 184 throw new ServiceException(ErrorCode.E0160, file.getAbsolutePath(), ex); 185 } 186 } 187 catch (FileNotFoundException ex) { 188 throw new ServiceException(ErrorCode.E0160, file.getAbsolutePath(), ex); 189 } 190 } 191 else { 192 log.warn("Admin users file not available in config dir [{0}], running without admin users", configDir); 193 } 194 } 195 else { 196 log.warn("Reading configuration from classpath, running without admin users"); 197 } 198 } 199 200 /** 201 * Destroy the service. <p> This implementation does a NOP. 202 */ 203 public void destroy() { 204 } 205 206 /** 207 * Return the public interface of the service. 208 * 209 * @return {@link AuthorizationService}. 210 */ 211 public Class<? extends Service> getInterface() { 212 return AuthorizationService.class; 213 } 214 215 /** 216 * Check if the user belongs to the group or not. 217 * 218 * @param user user name. 219 * @param group group name. 220 * @return if the user belongs to the group or not. 221 * @throws AuthorizationException thrown if the authorization query can not be performed. 222 */ 223 protected boolean isUserInGroup(String user, String group) throws AuthorizationException { 224 GroupsService groupsService = Services.get().get(GroupsService.class); 225 try { 226 return groupsService.getGroups(user).contains(group); 227 } 228 catch (IOException ex) { 229 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 230 } 231 } 232 233 /** 234 * Check if the user belongs to the group or not. <p> <p> Subclasses should override the {@link #isUserInGroup} 235 * method. 236 * 237 * @param user user name. 238 * @param group group name. 239 * @throws AuthorizationException thrown if the user is not authorized for the group or if the authorization query 240 * can not be performed. 241 */ 242 public void authorizeForGroup(String user, String group) throws AuthorizationException { 243 if (authorizationEnabled && !isUserInGroup(user, group)) { 244 throw new AuthorizationException(ErrorCode.E0502, user, group); 245 } 246 } 247 248 /** 249 * Return the default group to which the user belongs. <p> This implementation always returns 'users'. 250 * 251 * @param user user name. 252 * @return default group of user. 253 * @throws AuthorizationException thrown if the default group con not be retrieved. 254 */ 255 public String getDefaultGroup(String user) throws AuthorizationException { 256 try { 257 return Services.get().get(GroupsService.class).getGroups(user).get(0); 258 } 259 catch (IOException ex) { 260 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 261 } 262 } 263 264 /** 265 * Check if the user has admin privileges. <p> If admin is disabled it returns always <code>true</code>. <p> If 266 * admin is enabled it returns <code>true</code> if the user is in the <code>adminusers.txt</code> file. 267 * 268 * @param user user name. 269 * @return if the user has admin privileges or not. 270 */ 271 protected boolean isAdmin(String user) { 272 boolean admin = false; 273 if (adminUsers != null) { 274 admin = adminUsers.contains(user); 275 } 276 else { 277 for (String adminGroup : adminGroups) { 278 try { 279 admin = isUserInGroup(user, adminGroup); 280 if (admin) { 281 break; 282 } 283 } 284 catch (AuthorizationException ex) { 285 log.warn("Admin check failed, " + ex.toString(), ex); 286 break; 287 } 288 } 289 } 290 return admin; 291 } 292 293 /** 294 * Check if the user has admin privileges. <p> Subclasses should override the {@link #isUserInGroup} method. 295 * 296 * @param user user name. 297 * @param write indicates if the check is for read or write admin tasks (in this implementation this is ignored) 298 * @throws AuthorizationException thrown if user does not have admin privileges. 299 */ 300 public void authorizeForAdmin(String user, boolean write) throws AuthorizationException { 301 if (authorizationEnabled && write && !isAdmin(user)) { 302 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 303 throw new AuthorizationException(ErrorCode.E0503, user); 304 } 305 } 306 307 /** 308 * Check if the user+group is authorized to use the specified application. <p> The check is done by checking the 309 * file system permissions on the workflow application. 310 * 311 * @param user user name. 312 * @param group group name. 313 * @param appPath application path. 314 * @throws AuthorizationException thrown if the user is not authorized for the app. 315 */ 316 public void authorizeForApp(String user, String group, String appPath, Configuration jobConf) 317 throws AuthorizationException { 318 try { 319 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 320 URI uri = new Path(appPath).toUri(); 321 Configuration fsConf = has.createJobConf(uri.getAuthority()); 322 FileSystem fs = has.createFileSystem(user, uri, fsConf); 323 324 Path path = new Path(appPath); 325 try { 326 if (!fs.exists(path)) { 327 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 328 throw new AuthorizationException(ErrorCode.E0504, appPath); 329 } 330 Path wfXml = new Path(path, "workflow.xml"); 331 if (!fs.exists(wfXml)) { 332 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 333 throw new AuthorizationException(ErrorCode.E0505, appPath); 334 } 335 if (!fs.isFile(wfXml)) { 336 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 337 throw new AuthorizationException(ErrorCode.E0506, appPath); 338 } 339 fs.open(wfXml).close(); 340 } 341 // TODO change this when stopping support of 0.18 to the new 342 // Exception 343 catch (org.apache.hadoop.fs.permission.AccessControlException ex) { 344 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 345 throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex); 346 } 347 } 348 catch (IOException ex) { 349 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 350 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 351 } 352 catch (HadoopAccessorException e) { 353 throw new AuthorizationException(e); 354 } 355 } 356 357 /** 358 * Check if the user+group is authorized to use the specified application. <p> The check is done by checking the 359 * file system permissions on the workflow application. 360 * 361 * @param user user name. 362 * @param group group name. 363 * @param appPath application path. 364 * @param fileName workflow or coordinator.xml 365 * @param conf 366 * @throws AuthorizationException thrown if the user is not authorized for the app. 367 */ 368 public void authorizeForApp(String user, String group, String appPath, String fileName, Configuration conf) 369 throws AuthorizationException { 370 try { 371 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 372 URI uri = new Path(appPath).toUri(); 373 Configuration fsConf = has.createJobConf(uri.getAuthority()); 374 FileSystem fs = has.createFileSystem(user, uri, fsConf); 375 376 Path path = new Path(appPath); 377 try { 378 if (!fs.exists(path)) { 379 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 380 throw new AuthorizationException(ErrorCode.E0504, appPath); 381 } 382 if (conf.get(XOozieClient.IS_PROXY_SUBMISSION) == null) { // Only further check existence of job definition files for non proxy submission jobs; 383 if (!fs.isFile(path)) { 384 Path appXml = new Path(path, fileName); 385 if (!fs.exists(appXml)) { 386 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 387 throw new AuthorizationException(ErrorCode.E0505, appPath); 388 } 389 if (!fs.isFile(appXml)) { 390 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 391 throw new AuthorizationException(ErrorCode.E0506, appPath); 392 } 393 fs.open(appXml).close(); 394 } 395 } 396 } 397 // TODO change this when stopping support of 0.18 to the new 398 // Exception 399 catch (org.apache.hadoop.fs.permission.AccessControlException ex) { 400 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 401 throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex); 402 } 403 } 404 catch (IOException ex) { 405 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 406 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 407 } 408 catch (HadoopAccessorException e) { 409 throw new AuthorizationException(e); 410 } 411 } 412 413 private boolean isUserInAcl(String user, String aclStr) throws IOException { 414 boolean userInAcl = false; 415 if (aclStr != null && aclStr.trim().length() > 0) { 416 GroupsService groupsService = Services.get().get(GroupsService.class); 417 String[] acl = aclStr.split(","); 418 for (int i = 0; !userInAcl && i < acl.length; i++) { 419 String aclItem = acl[i].trim(); 420 userInAcl = aclItem.equals(user) || groupsService.getGroups(user).contains(aclItem); 421 } 422 } 423 return userInAcl; 424 } 425 426 /** 427 * Check if the user+group is authorized to operate on the specified job. <p> Checks if the user is a super-user or 428 * the one who started the job. <p> Read operations are allowed to all users. 429 * 430 * @param user user name. 431 * @param jobId job id. 432 * @param write indicates if the check is for read or write job tasks. 433 * @throws AuthorizationException thrown if the user is not authorized for the job. 434 */ 435 public void authorizeForJob(String user, String jobId, boolean write) throws AuthorizationException { 436 if (authorizationEnabled && write && !isAdmin(user)) { 437 try { 438 // handle workflow jobs 439 if (jobId.endsWith("-W")) { 440 WorkflowJobBean jobBean = null; 441 JPAService jpaService = Services.get().get(JPAService.class); 442 if (jpaService != null) { 443 try { 444 jobBean = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_USER_GROUP, jobId); 445 } 446 catch (JPAExecutorException je) { 447 throw new AuthorizationException(je); 448 } 449 } 450 else { 451 throw new AuthorizationException(ErrorCode.E0610); 452 } 453 if (jobBean != null && !jobBean.getUser().equals(user)) { 454 if (!isUserInAcl(user, jobBean.getGroup())) { 455 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 456 throw new AuthorizationException(ErrorCode.E0508, user, jobId); 457 } 458 } 459 } 460 // handle bundle jobs 461 else if (jobId.endsWith("-B")){ 462 BundleJobBean jobBean = null; 463 JPAService jpaService = Services.get().get(JPAService.class); 464 if (jpaService != null) { 465 try { 466 jobBean = jpaService.execute(new BundleJobGetJPAExecutor(jobId)); 467 } 468 catch (JPAExecutorException je) { 469 throw new AuthorizationException(je); 470 } 471 } 472 else { 473 throw new AuthorizationException(ErrorCode.E0610); 474 } 475 if (jobBean != null && !jobBean.getUser().equals(user)) { 476 if (!isUserInAcl(user, jobBean.getGroup())) { 477 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 478 throw new AuthorizationException(ErrorCode.E0509, user, jobId); 479 } 480 } 481 } 482 // handle coordinator jobs 483 else { 484 CoordinatorJobBean jobBean = null; 485 JPAService jpaService = Services.get().get(JPAService.class); 486 if (jpaService != null) { 487 try { 488 jobBean = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 489 } 490 catch (JPAExecutorException je) { 491 throw new AuthorizationException(je); 492 } 493 } 494 else { 495 throw new AuthorizationException(ErrorCode.E0610); 496 } 497 if (jobBean != null && !jobBean.getUser().equals(user)) { 498 if (!isUserInAcl(user, jobBean.getGroup())) { 499 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 500 throw new AuthorizationException(ErrorCode.E0509, user, jobId); 501 } 502 } 503 } 504 } 505 catch (IOException ex) { 506 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 507 } 508 } 509 } 510 511 /** 512 * Check if the user+group is authorized to operate on the specified jobs. <p> Checks if the user is a super-user or 513 * the one who started the jobs. <p> Read operations are allowed to all users. 514 * 515 * @param user user name. 516 * @param filter filter used to select jobs 517 * @param start starting index of the jobs in DB 518 * @param len maximum amount of jobs to select 519 * @param write indicates if the check is for read or write job tasks. 520 * @throws AuthorizationException thrown if the user is not authorized for the job. 521 */ 522 public void authorizeForJobs(String user, Map<String, List<String>> filter, String jobType, 523 int start, int len, boolean write) throws AuthorizationException { 524 if (authorizationEnabled && write && !isAdmin(user)) { 525 try { 526 // handle workflow jobs 527 if (jobType.equals("wf")) { 528 List<WorkflowJobBean> jobBeans = new ArrayList<WorkflowJobBean>(); 529 JPAService jpaService = Services.get().get(JPAService.class); 530 if (jpaService != null) { 531 try { 532 jobBeans = jpaService.execute(new WorkflowsJobGetJPAExecutor( 533 filter, start, len)).getWorkflows(); 534 } 535 catch (JPAExecutorException je) { 536 throw new AuthorizationException(je); 537 } 538 } 539 else { 540 throw new AuthorizationException(ErrorCode.E0610); 541 } 542 for (WorkflowJobBean jobBean : jobBeans) { 543 if (jobBean != null && !jobBean.getUser().equals(user)) { 544 if (!isUserInAcl(user, jobBean.getGroup())) { 545 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 546 throw new AuthorizationException(ErrorCode.E0508, user, jobBean.getId()); 547 } 548 } 549 } 550 } 551 // handle bundle jobs 552 else if (jobType.equals("bundle")) { 553 List<BundleJobBean> jobBeans = new ArrayList<BundleJobBean>(); 554 JPAService jpaService = Services.get().get(JPAService.class); 555 if (jpaService != null) { 556 try { 557 jobBeans = jpaService.execute(new BundleJobInfoGetJPAExecutor( 558 filter, start, len)).getBundleJobs(); 559 } 560 catch (JPAExecutorException je) { 561 throw new AuthorizationException(je); 562 } 563 } 564 else { 565 throw new AuthorizationException(ErrorCode.E0610); 566 } 567 for (BundleJobBean jobBean : jobBeans){ 568 if (jobBean != null && !jobBean.getUser().equals(user)) { 569 if (!isUserInAcl(user, jobBean.getGroup())) { 570 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 571 throw new AuthorizationException(ErrorCode.E0509, user, jobBean.getId()); 572 } 573 } 574 } 575 } 576 // handle coordinator jobs 577 else { 578 List<CoordinatorJobBean> jobBeans = new ArrayList<CoordinatorJobBean>(); 579 JPAService jpaService = Services.get().get(JPAService.class); 580 if (jpaService != null) { 581 try { 582 jobBeans = jpaService.execute(new CoordJobInfoGetJPAExecutor( 583 filter, start, len)).getCoordJobs(); 584 } 585 catch (JPAExecutorException je) { 586 throw new AuthorizationException(je); 587 } 588 } 589 else { 590 throw new AuthorizationException(ErrorCode.E0610); 591 } 592 for (CoordinatorJobBean jobBean : jobBeans) { 593 if (jobBean != null && !jobBean.getUser().equals(user)) { 594 if (!isUserInAcl(user, jobBean.getGroup())) { 595 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 596 throw new AuthorizationException(ErrorCode.E0509, user, jobBean.getId()); 597 } 598 } 599 } 600 } 601 } 602 catch (IOException ex) { 603 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 604 } 605 } 606 } 607 /** 608 * Convenience method for instrumentation counters. 609 * 610 * @param name counter name. 611 * @param count count to increment the counter. 612 */ 613 private void incrCounter(String name, int count) { 614 if (instrumentation != null) { 615 instrumentation.incr(INSTRUMENTATION_GROUP, name, count); 616 } 617 } 618}