001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.io.BufferedReader; 021 import java.io.File; 022 import java.io.FileInputStream; 023 import java.io.FileNotFoundException; 024 import java.io.IOException; 025 import java.io.InputStreamReader; 026 import java.net.URI; 027 import java.util.HashSet; 028 import java.util.Set; 029 030 import org.apache.hadoop.conf.Configuration; 031 import org.apache.hadoop.fs.FileSystem; 032 import org.apache.hadoop.fs.Path; 033 import org.apache.oozie.BundleJobBean; 034 import org.apache.oozie.CoordinatorJobBean; 035 import org.apache.oozie.ErrorCode; 036 import org.apache.oozie.WorkflowJobBean; 037 import org.apache.oozie.client.XOozieClient; 038 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 039 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 040 import org.apache.oozie.executor.jpa.JPAExecutorException; 041 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 042 import org.apache.oozie.util.ConfigUtils; 043 import org.apache.oozie.util.Instrumentation; 044 import org.apache.oozie.util.XLog; 045 046 /** 047 * The authorization service provides all authorization checks. 048 */ 049 public class AuthorizationService implements Service { 050 051 public static final String CONF_PREFIX = Service.CONF_PREFIX + "AuthorizationService."; 052 053 /** 054 * Configuration parameter to enable or disable Oozie admin role. 055 */ 056 public static final String CONF_SECURITY_ENABLED = CONF_PREFIX + "security.enabled"; 057 058 /** 059 * Configuration parameter to enable or disable Oozie admin role. 060 */ 061 public static final String CONF_AUTHORIZATION_ENABLED = CONF_PREFIX + "authorization.enabled"; 062 063 /** 064 * Configuration parameter to enable old behavior default group as ACL. 065 */ 066 public static final String CONF_DEFAULT_GROUP_AS_ACL = CONF_PREFIX + "default.group.as.acl"; 067 068 /** 069 * Configuration parameter to define admin groups, if NULL/empty the adminusers.txt file is used. 070 */ 071 public static final String CONF_ADMIN_GROUPS = CONF_PREFIX + "admin.groups"; 072 073 /** 074 * File that contains list of admin users for Oozie. 075 */ 076 public static final String ADMIN_USERS_FILE = "adminusers.txt"; 077 078 protected static final String INSTRUMENTATION_GROUP = "authorization"; 079 protected static final String INSTR_FAILED_AUTH_COUNTER = "authorization.failed"; 080 081 private Set<String> adminGroups; 082 private Set<String> adminUsers; 083 private boolean authorizationEnabled; 084 private boolean useDefaultGroupAsAcl; 085 086 private final XLog log = XLog.getLog(getClass()); 087 private Instrumentation instrumentation; 088 089 private String[] getTrimmedStrings(String str) { 090 if (null == str || "".equals(str.trim())) { 091 return new String[0]; 092 } 093 return str.trim().split("\\s*,\\s*"); 094 } 095 096 /** 097 * Initialize the service. <p/> Reads the security related configuration. parameters - security enabled and list of 098 * super users. 099 * 100 * @param services services instance. 101 * @throws ServiceException thrown if the service could not be initialized. 102 */ 103 public void init(Services services) throws ServiceException { 104 authorizationEnabled = 105 ConfigUtils.getWithDeprecatedCheck(services.getConf(), CONF_AUTHORIZATION_ENABLED, 106 CONF_SECURITY_ENABLED, false); 107 if (authorizationEnabled) { 108 log.info("Oozie running with authorization enabled"); 109 useDefaultGroupAsAcl = Services.get().getConf().getBoolean(CONF_DEFAULT_GROUP_AS_ACL, false); 110 String[] str = getTrimmedStrings(Services.get().getConf().get(CONF_ADMIN_GROUPS)); 111 if (str.length > 0) { 112 log.info("Admin users will be checked against the defined admin groups"); 113 adminGroups = new HashSet<String>(); 114 for (String s : str) { 115 adminGroups.add(s.trim()); 116 } 117 } 118 else { 119 log.info("Admin users will be checked against the 'adminusers.txt' file contents"); 120 adminUsers = new HashSet<String>(); 121 loadAdminUsers(); 122 } 123 } 124 else { 125 log.warn("Oozie running with authorization disabled"); 126 } 127 instrumentation = Services.get().get(InstrumentationService.class).get(); 128 } 129 130 /** 131 * Return if security is enabled or not. 132 * 133 * @return if security is enabled or not. 134 */ 135 @Deprecated 136 public boolean isSecurityEnabled() { 137 return authorizationEnabled; 138 } 139 140 public boolean useDefaultGroupAsAcl() { 141 return useDefaultGroupAsAcl; 142 } 143 144 /** 145 * Return if security is enabled or not. 146 * 147 * @return if security is enabled or not. 148 */ 149 public boolean isAuthorizationEnabled() { 150 return isSecurityEnabled(); 151 } 152 153 /** 154 * Load the list of admin users from {@link AuthorizationService#ADMIN_USERS_FILE} </p> 155 * 156 * @throws ServiceException if the admin user list could not be loaded. 157 */ 158 private void loadAdminUsers() throws ServiceException { 159 String configDir = Services.get().get(ConfigurationService.class).getConfigDir(); 160 if (configDir != null) { 161 File file = new File(configDir, ADMIN_USERS_FILE); 162 if (file.exists()) { 163 try { 164 BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file))); 165 try { 166 String line = br.readLine(); 167 while (line != null) { 168 line = line.trim(); 169 if (line.length() > 0 && !line.startsWith("#")) { 170 adminUsers.add(line); 171 } 172 line = br.readLine(); 173 } 174 } 175 catch (IOException ex) { 176 throw new ServiceException(ErrorCode.E0160, file.getAbsolutePath(), ex); 177 } 178 } 179 catch (FileNotFoundException ex) { 180 throw new ServiceException(ErrorCode.E0160, file.getAbsolutePath(), ex); 181 } 182 } 183 else { 184 log.warn("Admin users file not available in config dir [{0}], running without admin users", configDir); 185 } 186 } 187 else { 188 log.warn("Reading configuration from classpath, running without admin users"); 189 } 190 } 191 192 /** 193 * Destroy the service. <p/> This implementation does a NOP. 194 */ 195 public void destroy() { 196 } 197 198 /** 199 * Return the public interface of the service. 200 * 201 * @return {@link AuthorizationService}. 202 */ 203 public Class<? extends Service> getInterface() { 204 return AuthorizationService.class; 205 } 206 207 /** 208 * Check if the user belongs to the group or not. 209 * 210 * @param user user name. 211 * @param group group name. 212 * @return if the user belongs to the group or not. 213 * @throws AuthorizationException thrown if the authorization query can not be performed. 214 */ 215 protected boolean isUserInGroup(String user, String group) throws AuthorizationException { 216 GroupsService groupsService = Services.get().get(GroupsService.class); 217 try { 218 return groupsService.getGroups(user).contains(group); 219 } 220 catch (IOException ex) { 221 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 222 } 223 } 224 225 /** 226 * Check if the user belongs to the group or not. <p/> <p/> Subclasses should override the {@link #isUserInGroup} 227 * method. 228 * 229 * @param user user name. 230 * @param group group name. 231 * @throws AuthorizationException thrown if the user is not authorized for the group or if the authorization query 232 * can not be performed. 233 */ 234 public void authorizeForGroup(String user, String group) throws AuthorizationException { 235 if (authorizationEnabled && !isUserInGroup(user, group)) { 236 throw new AuthorizationException(ErrorCode.E0502, user, group); 237 } 238 } 239 240 /** 241 * Return the default group to which the user belongs. <p/> This implementation always returns 'users'. 242 * 243 * @param user user name. 244 * @return default group of user. 245 * @throws AuthorizationException thrown if the default group con not be retrieved. 246 */ 247 public String getDefaultGroup(String user) throws AuthorizationException { 248 try { 249 return Services.get().get(GroupsService.class).getGroups(user).get(0); 250 } 251 catch (IOException ex) { 252 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 253 } 254 } 255 256 /** 257 * Check if the user has admin privileges. <p/> If admin is disabled it returns always <code>true</code>. <p/> If 258 * admin is enabled it returns <code>true</code> if the user is in the <code>adminusers.txt</code> file. 259 * 260 * @param user user name. 261 * @return if the user has admin privileges or not. 262 */ 263 protected boolean isAdmin(String user) { 264 boolean admin = false; 265 if (adminUsers != null) { 266 admin = adminUsers.contains(user); 267 } 268 else { 269 for (String adminGroup : adminGroups) { 270 try { 271 admin = isUserInGroup(user, adminGroup); 272 if (admin) { 273 break; 274 } 275 } 276 catch (AuthorizationException ex) { 277 log.warn("Admin check failed, " + ex.toString(), ex); 278 break; 279 } 280 } 281 } 282 return admin; 283 } 284 285 /** 286 * Check if the user has admin privileges. <p/> Subclasses should override the {@link #isUserInGroup} method. 287 * 288 * @param user user name. 289 * @param write indicates if the check is for read or write admin tasks (in this implementation this is ignored) 290 * @throws AuthorizationException thrown if user does not have admin priviledges. 291 */ 292 public void authorizeForAdmin(String user, boolean write) throws AuthorizationException { 293 if (authorizationEnabled && write && !isAdmin(user)) { 294 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 295 throw new AuthorizationException(ErrorCode.E0503, user); 296 } 297 } 298 299 /** 300 * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the 301 * file system permissions on the workflow application. 302 * 303 * @param user user name. 304 * @param group group name. 305 * @param appPath application path. 306 * @throws AuthorizationException thrown if the user is not authorized for the app. 307 */ 308 public void authorizeForApp(String user, String group, String appPath, Configuration jobConf) 309 throws AuthorizationException { 310 try { 311 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 312 URI uri = new Path(appPath).toUri(); 313 Configuration fsConf = has.createJobConf(uri.getAuthority()); 314 FileSystem fs = has.createFileSystem(user, uri, fsConf); 315 316 Path path = new Path(appPath); 317 try { 318 if (!fs.exists(path)) { 319 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 320 throw new AuthorizationException(ErrorCode.E0504, appPath); 321 } 322 Path wfXml = new Path(path, "workflow.xml"); 323 if (!fs.exists(wfXml)) { 324 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 325 throw new AuthorizationException(ErrorCode.E0505, appPath); 326 } 327 if (!fs.isFile(wfXml)) { 328 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 329 throw new AuthorizationException(ErrorCode.E0506, appPath); 330 } 331 fs.open(wfXml).close(); 332 } 333 // TODO change this when stopping support of 0.18 to the new 334 // Exception 335 catch (org.apache.hadoop.fs.permission.AccessControlException ex) { 336 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 337 throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex); 338 } 339 } 340 catch (IOException ex) { 341 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 342 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 343 } 344 catch (HadoopAccessorException e) { 345 throw new AuthorizationException(e); 346 } 347 } 348 349 /** 350 * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the 351 * file system permissions on the workflow application. 352 * 353 * @param user user name. 354 * @param group group name. 355 * @param appPath application path. 356 * @param fileName workflow or coordinator.xml 357 * @param conf 358 * @throws AuthorizationException thrown if the user is not authorized for the app. 359 */ 360 public void authorizeForApp(String user, String group, String appPath, String fileName, Configuration conf) 361 throws AuthorizationException { 362 try { 363 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 364 URI uri = new Path(appPath).toUri(); 365 Configuration fsConf = has.createJobConf(uri.getAuthority()); 366 FileSystem fs = has.createFileSystem(user, uri, fsConf); 367 368 Path path = new Path(appPath); 369 try { 370 if (!fs.exists(path)) { 371 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 372 throw new AuthorizationException(ErrorCode.E0504, appPath); 373 } 374 if (conf.get(XOozieClient.IS_PROXY_SUBMISSION) == null) { // Only further check existence of job definition files for non proxy submission jobs; 375 if (!fs.isFile(path)) { 376 Path appXml = new Path(path, fileName); 377 if (!fs.exists(appXml)) { 378 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 379 throw new AuthorizationException(ErrorCode.E0505, appPath); 380 } 381 if (!fs.isFile(appXml)) { 382 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 383 throw new AuthorizationException(ErrorCode.E0506, appPath); 384 } 385 fs.open(appXml).close(); 386 } 387 } 388 } 389 // TODO change this when stopping support of 0.18 to the new 390 // Exception 391 catch (org.apache.hadoop.fs.permission.AccessControlException ex) { 392 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 393 throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex); 394 } 395 } 396 catch (IOException ex) { 397 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 398 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 399 } 400 catch (HadoopAccessorException e) { 401 throw new AuthorizationException(e); 402 } 403 } 404 405 private boolean isUserInAcl(String user, String aclStr) throws IOException { 406 boolean userInAcl = false; 407 if (aclStr != null && aclStr.trim().length() > 0) { 408 GroupsService groupsService = Services.get().get(GroupsService.class); 409 String[] acl = aclStr.split(","); 410 for (int i = 0; !userInAcl && i < acl.length; i++) { 411 String aclItem = acl[i].trim(); 412 userInAcl = aclItem.equals(user) || groupsService.getGroups(user).equals(aclItem); 413 } 414 } 415 return userInAcl; 416 } 417 418 /** 419 * Check if the user+group is authorized to operate on the specified job. <p/> Checks if the user is a super-user or 420 * the one who started the job. <p/> Read operations are allowed to all users. 421 * 422 * @param user user name. 423 * @param jobId job id. 424 * @param write indicates if the check is for read or write job tasks. 425 * @throws AuthorizationException thrown if the user is not authorized for the job. 426 */ 427 public void authorizeForJob(String user, String jobId, boolean write) throws AuthorizationException { 428 if (authorizationEnabled && write && !isAdmin(user)) { 429 try { 430 // handle workflow jobs 431 if (jobId.endsWith("-W")) { 432 WorkflowJobBean jobBean = null; 433 JPAService jpaService = Services.get().get(JPAService.class); 434 if (jpaService != null) { 435 try { 436 jobBean = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId)); 437 } 438 catch (JPAExecutorException je) { 439 throw new AuthorizationException(je); 440 } 441 } 442 else { 443 throw new AuthorizationException(ErrorCode.E0610); 444 } 445 if (jobBean != null && !jobBean.getUser().equals(user)) { 446 if (!isUserInAcl(user, jobBean.getGroup())) { 447 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 448 throw new AuthorizationException(ErrorCode.E0508, user, jobId); 449 } 450 } 451 } 452 // handle bundle jobs 453 else if (jobId.endsWith("-B")){ 454 BundleJobBean jobBean = null; 455 JPAService jpaService = Services.get().get(JPAService.class); 456 if (jpaService != null) { 457 try { 458 jobBean = jpaService.execute(new BundleJobGetJPAExecutor(jobId)); 459 } 460 catch (JPAExecutorException je) { 461 throw new AuthorizationException(je); 462 } 463 } 464 else { 465 throw new AuthorizationException(ErrorCode.E0610); 466 } 467 if (jobBean != null && !jobBean.getUser().equals(user)) { 468 if (!isUserInAcl(user, jobBean.getGroup())) { 469 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 470 throw new AuthorizationException(ErrorCode.E0509, user, jobId); 471 } 472 } 473 } 474 // handle coordinator jobs 475 else { 476 CoordinatorJobBean jobBean = null; 477 JPAService jpaService = Services.get().get(JPAService.class); 478 if (jpaService != null) { 479 try { 480 jobBean = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 481 } 482 catch (JPAExecutorException je) { 483 throw new AuthorizationException(je); 484 } 485 } 486 else { 487 throw new AuthorizationException(ErrorCode.E0610); 488 } 489 if (jobBean != null && !jobBean.getUser().equals(user)) { 490 if (!isUserInAcl(user, jobBean.getGroup())) { 491 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 492 throw new AuthorizationException(ErrorCode.E0509, user, jobId); 493 } 494 } 495 } 496 } 497 catch (IOException ex) { 498 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 499 } 500 } 501 } 502 503 /** 504 * Convenience method for instrumentation counters. 505 * 506 * @param name counter name. 507 * @param count count to increment the counter. 508 */ 509 private void incrCounter(String name, int count) { 510 if (instrumentation != null) { 511 instrumentation.incr(INSTRUMENTATION_GROUP, name, count); 512 } 513 } 514 }