001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import java.io.BufferedReader; 021import java.io.File; 022import java.io.FileInputStream; 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.io.InputStreamReader; 026import java.net.URI; 027import java.util.HashSet; 028import java.util.Set; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.oozie.BundleJobBean; 034import org.apache.oozie.CoordinatorJobBean; 035import org.apache.oozie.ErrorCode; 036import org.apache.oozie.WorkflowJobBean; 037import org.apache.oozie.client.XOozieClient; 038import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 039import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 040import org.apache.oozie.executor.jpa.JPAExecutorException; 041import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 042import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 043import org.apache.oozie.util.ConfigUtils; 044import org.apache.oozie.util.Instrumentation; 045import org.apache.oozie.util.XLog; 046 047/** 048 * The authorization service provides all authorization checks. 049 */ 050public class AuthorizationService implements Service { 051 052 public static final String CONF_PREFIX = Service.CONF_PREFIX + "AuthorizationService."; 053 054 /** 055 * Configuration parameter to enable or disable Oozie admin role. 056 */ 057 public static final String CONF_SECURITY_ENABLED = CONF_PREFIX + "security.enabled"; 058 059 /** 060 * Configuration parameter to enable or disable Oozie admin role. 061 */ 062 public static final String CONF_AUTHORIZATION_ENABLED = CONF_PREFIX + "authorization.enabled"; 063 064 /** 065 * Configuration parameter to enable old behavior default group as ACL. 066 */ 067 public static final String CONF_DEFAULT_GROUP_AS_ACL = CONF_PREFIX + "default.group.as.acl"; 068 069 /** 070 * Configuration parameter to define admin groups, if NULL/empty the adminusers.txt file is used. 071 */ 072 public static final String CONF_ADMIN_GROUPS = CONF_PREFIX + "admin.groups"; 073 074 /** 075 * File that contains list of admin users for Oozie. 076 */ 077 public static final String ADMIN_USERS_FILE = "adminusers.txt"; 078 079 protected static final String INSTRUMENTATION_GROUP = "authorization"; 080 protected static final String INSTR_FAILED_AUTH_COUNTER = "authorization.failed"; 081 082 private Set<String> adminGroups; 083 private Set<String> adminUsers; 084 private boolean authorizationEnabled; 085 private boolean useDefaultGroupAsAcl; 086 087 private final XLog log = XLog.getLog(getClass()); 088 private Instrumentation instrumentation; 089 090 private String[] getTrimmedStrings(String str) { 091 if (null == str || "".equals(str.trim())) { 092 return new String[0]; 093 } 094 return str.trim().split("\\s*,\\s*"); 095 } 096 097 /** 098 * Initialize the service. <p/> Reads the security related configuration. parameters - security enabled and list of 099 * super users. 100 * 101 * @param services services instance. 102 * @throws ServiceException thrown if the service could not be initialized. 103 */ 104 public void init(Services services) throws ServiceException { 105 authorizationEnabled = 106 ConfigUtils.getWithDeprecatedCheck(services.getConf(), CONF_AUTHORIZATION_ENABLED, 107 CONF_SECURITY_ENABLED, false); 108 if (authorizationEnabled) { 109 log.info("Oozie running with authorization enabled"); 110 useDefaultGroupAsAcl = Services.get().getConf().getBoolean(CONF_DEFAULT_GROUP_AS_ACL, false); 111 String[] str = getTrimmedStrings(Services.get().getConf().get(CONF_ADMIN_GROUPS)); 112 if (str.length > 0) { 113 log.info("Admin users will be checked against the defined admin groups"); 114 adminGroups = new HashSet<String>(); 115 for (String s : str) { 116 adminGroups.add(s.trim()); 117 } 118 } 119 else { 120 log.info("Admin users will be checked against the 'adminusers.txt' file contents"); 121 adminUsers = new HashSet<String>(); 122 loadAdminUsers(); 123 } 124 } 125 else { 126 log.warn("Oozie running with authorization disabled"); 127 } 128 instrumentation = Services.get().get(InstrumentationService.class).get(); 129 } 130 131 /** 132 * Return if security is enabled or not. 133 * 134 * @return if security is enabled or not. 135 */ 136 @Deprecated 137 public boolean isSecurityEnabled() { 138 return authorizationEnabled; 139 } 140 141 public boolean useDefaultGroupAsAcl() { 142 return useDefaultGroupAsAcl; 143 } 144 145 /** 146 * Return if security is enabled or not. 147 * 148 * @return if security is enabled or not. 149 */ 150 public boolean isAuthorizationEnabled() { 151 return isSecurityEnabled(); 152 } 153 154 /** 155 * Load the list of admin users from {@link AuthorizationService#ADMIN_USERS_FILE} </p> 156 * 157 * @throws ServiceException if the admin user list could not be loaded. 158 */ 159 private void loadAdminUsers() throws ServiceException { 160 String configDir = Services.get().get(ConfigurationService.class).getConfigDir(); 161 if (configDir != null) { 162 File file = new File(configDir, ADMIN_USERS_FILE); 163 if (file.exists()) { 164 try { 165 BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file))); 166 try { 167 String line = br.readLine(); 168 while (line != null) { 169 line = line.trim(); 170 if (line.length() > 0 && !line.startsWith("#")) { 171 adminUsers.add(line); 172 } 173 line = br.readLine(); 174 } 175 } 176 catch (IOException ex) { 177 throw new ServiceException(ErrorCode.E0160, file.getAbsolutePath(), ex); 178 } 179 } 180 catch (FileNotFoundException ex) { 181 throw new ServiceException(ErrorCode.E0160, file.getAbsolutePath(), ex); 182 } 183 } 184 else { 185 log.warn("Admin users file not available in config dir [{0}], running without admin users", configDir); 186 } 187 } 188 else { 189 log.warn("Reading configuration from classpath, running without admin users"); 190 } 191 } 192 193 /** 194 * Destroy the service. <p/> This implementation does a NOP. 195 */ 196 public void destroy() { 197 } 198 199 /** 200 * Return the public interface of the service. 201 * 202 * @return {@link AuthorizationService}. 203 */ 204 public Class<? extends Service> getInterface() { 205 return AuthorizationService.class; 206 } 207 208 /** 209 * Check if the user belongs to the group or not. 210 * 211 * @param user user name. 212 * @param group group name. 213 * @return if the user belongs to the group or not. 214 * @throws AuthorizationException thrown if the authorization query can not be performed. 215 */ 216 protected boolean isUserInGroup(String user, String group) throws AuthorizationException { 217 GroupsService groupsService = Services.get().get(GroupsService.class); 218 try { 219 return groupsService.getGroups(user).contains(group); 220 } 221 catch (IOException ex) { 222 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 223 } 224 } 225 226 /** 227 * Check if the user belongs to the group or not. <p/> <p/> Subclasses should override the {@link #isUserInGroup} 228 * method. 229 * 230 * @param user user name. 231 * @param group group name. 232 * @throws AuthorizationException thrown if the user is not authorized for the group or if the authorization query 233 * can not be performed. 234 */ 235 public void authorizeForGroup(String user, String group) throws AuthorizationException { 236 if (authorizationEnabled && !isUserInGroup(user, group)) { 237 throw new AuthorizationException(ErrorCode.E0502, user, group); 238 } 239 } 240 241 /** 242 * Return the default group to which the user belongs. <p/> This implementation always returns 'users'. 243 * 244 * @param user user name. 245 * @return default group of user. 246 * @throws AuthorizationException thrown if the default group con not be retrieved. 247 */ 248 public String getDefaultGroup(String user) throws AuthorizationException { 249 try { 250 return Services.get().get(GroupsService.class).getGroups(user).get(0); 251 } 252 catch (IOException ex) { 253 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 254 } 255 } 256 257 /** 258 * Check if the user has admin privileges. <p/> If admin is disabled it returns always <code>true</code>. <p/> If 259 * admin is enabled it returns <code>true</code> if the user is in the <code>adminusers.txt</code> file. 260 * 261 * @param user user name. 262 * @return if the user has admin privileges or not. 263 */ 264 protected boolean isAdmin(String user) { 265 boolean admin = false; 266 if (adminUsers != null) { 267 admin = adminUsers.contains(user); 268 } 269 else { 270 for (String adminGroup : adminGroups) { 271 try { 272 admin = isUserInGroup(user, adminGroup); 273 if (admin) { 274 break; 275 } 276 } 277 catch (AuthorizationException ex) { 278 log.warn("Admin check failed, " + ex.toString(), ex); 279 break; 280 } 281 } 282 } 283 return admin; 284 } 285 286 /** 287 * Check if the user has admin privileges. <p/> Subclasses should override the {@link #isUserInGroup} method. 288 * 289 * @param user user name. 290 * @param write indicates if the check is for read or write admin tasks (in this implementation this is ignored) 291 * @throws AuthorizationException thrown if user does not have admin priviledges. 292 */ 293 public void authorizeForAdmin(String user, boolean write) throws AuthorizationException { 294 if (authorizationEnabled && write && !isAdmin(user)) { 295 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 296 throw new AuthorizationException(ErrorCode.E0503, user); 297 } 298 } 299 300 /** 301 * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the 302 * file system permissions on the workflow application. 303 * 304 * @param user user name. 305 * @param group group name. 306 * @param appPath application path. 307 * @throws AuthorizationException thrown if the user is not authorized for the app. 308 */ 309 public void authorizeForApp(String user, String group, String appPath, Configuration jobConf) 310 throws AuthorizationException { 311 try { 312 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 313 URI uri = new Path(appPath).toUri(); 314 Configuration fsConf = has.createJobConf(uri.getAuthority()); 315 FileSystem fs = has.createFileSystem(user, uri, fsConf); 316 317 Path path = new Path(appPath); 318 try { 319 if (!fs.exists(path)) { 320 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 321 throw new AuthorizationException(ErrorCode.E0504, appPath); 322 } 323 Path wfXml = new Path(path, "workflow.xml"); 324 if (!fs.exists(wfXml)) { 325 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 326 throw new AuthorizationException(ErrorCode.E0505, appPath); 327 } 328 if (!fs.isFile(wfXml)) { 329 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 330 throw new AuthorizationException(ErrorCode.E0506, appPath); 331 } 332 fs.open(wfXml).close(); 333 } 334 // TODO change this when stopping support of 0.18 to the new 335 // Exception 336 catch (org.apache.hadoop.fs.permission.AccessControlException ex) { 337 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 338 throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex); 339 } 340 } 341 catch (IOException ex) { 342 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 343 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 344 } 345 catch (HadoopAccessorException e) { 346 throw new AuthorizationException(e); 347 } 348 } 349 350 /** 351 * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the 352 * file system permissions on the workflow application. 353 * 354 * @param user user name. 355 * @param group group name. 356 * @param appPath application path. 357 * @param fileName workflow or coordinator.xml 358 * @param conf 359 * @throws AuthorizationException thrown if the user is not authorized for the app. 360 */ 361 public void authorizeForApp(String user, String group, String appPath, String fileName, Configuration conf) 362 throws AuthorizationException { 363 try { 364 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 365 URI uri = new Path(appPath).toUri(); 366 Configuration fsConf = has.createJobConf(uri.getAuthority()); 367 FileSystem fs = has.createFileSystem(user, uri, fsConf); 368 369 Path path = new Path(appPath); 370 try { 371 if (!fs.exists(path)) { 372 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 373 throw new AuthorizationException(ErrorCode.E0504, appPath); 374 } 375 if (conf.get(XOozieClient.IS_PROXY_SUBMISSION) == null) { // Only further check existence of job definition files for non proxy submission jobs; 376 if (!fs.isFile(path)) { 377 Path appXml = new Path(path, fileName); 378 if (!fs.exists(appXml)) { 379 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 380 throw new AuthorizationException(ErrorCode.E0505, appPath); 381 } 382 if (!fs.isFile(appXml)) { 383 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 384 throw new AuthorizationException(ErrorCode.E0506, appPath); 385 } 386 fs.open(appXml).close(); 387 } 388 } 389 } 390 // TODO change this when stopping support of 0.18 to the new 391 // Exception 392 catch (org.apache.hadoop.fs.permission.AccessControlException ex) { 393 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 394 throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex); 395 } 396 } 397 catch (IOException ex) { 398 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 399 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 400 } 401 catch (HadoopAccessorException e) { 402 throw new AuthorizationException(e); 403 } 404 } 405 406 private boolean isUserInAcl(String user, String aclStr) throws IOException { 407 boolean userInAcl = false; 408 if (aclStr != null && aclStr.trim().length() > 0) { 409 GroupsService groupsService = Services.get().get(GroupsService.class); 410 String[] acl = aclStr.split(","); 411 for (int i = 0; !userInAcl && i < acl.length; i++) { 412 String aclItem = acl[i].trim(); 413 userInAcl = aclItem.equals(user) || groupsService.getGroups(user).equals(aclItem); 414 } 415 } 416 return userInAcl; 417 } 418 419 /** 420 * Check if the user+group is authorized to operate on the specified job. <p/> Checks if the user is a super-user or 421 * the one who started the job. <p/> Read operations are allowed to all users. 422 * 423 * @param user user name. 424 * @param jobId job id. 425 * @param write indicates if the check is for read or write job tasks. 426 * @throws AuthorizationException thrown if the user is not authorized for the job. 427 */ 428 public void authorizeForJob(String user, String jobId, boolean write) throws AuthorizationException { 429 if (authorizationEnabled && write && !isAdmin(user)) { 430 try { 431 // handle workflow jobs 432 if (jobId.endsWith("-W")) { 433 WorkflowJobBean jobBean = null; 434 JPAService jpaService = Services.get().get(JPAService.class); 435 if (jpaService != null) { 436 try { 437 jobBean = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_USER_GROUP, jobId); 438 } 439 catch (JPAExecutorException je) { 440 throw new AuthorizationException(je); 441 } 442 } 443 else { 444 throw new AuthorizationException(ErrorCode.E0610); 445 } 446 if (jobBean != null && !jobBean.getUser().equals(user)) { 447 if (!isUserInAcl(user, jobBean.getGroup())) { 448 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 449 throw new AuthorizationException(ErrorCode.E0508, user, jobId); 450 } 451 } 452 } 453 // handle bundle jobs 454 else if (jobId.endsWith("-B")){ 455 BundleJobBean jobBean = null; 456 JPAService jpaService = Services.get().get(JPAService.class); 457 if (jpaService != null) { 458 try { 459 jobBean = jpaService.execute(new BundleJobGetJPAExecutor(jobId)); 460 } 461 catch (JPAExecutorException je) { 462 throw new AuthorizationException(je); 463 } 464 } 465 else { 466 throw new AuthorizationException(ErrorCode.E0610); 467 } 468 if (jobBean != null && !jobBean.getUser().equals(user)) { 469 if (!isUserInAcl(user, jobBean.getGroup())) { 470 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 471 throw new AuthorizationException(ErrorCode.E0509, user, jobId); 472 } 473 } 474 } 475 // handle coordinator jobs 476 else { 477 CoordinatorJobBean jobBean = null; 478 JPAService jpaService = Services.get().get(JPAService.class); 479 if (jpaService != null) { 480 try { 481 jobBean = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 482 } 483 catch (JPAExecutorException je) { 484 throw new AuthorizationException(je); 485 } 486 } 487 else { 488 throw new AuthorizationException(ErrorCode.E0610); 489 } 490 if (jobBean != null && !jobBean.getUser().equals(user)) { 491 if (!isUserInAcl(user, jobBean.getGroup())) { 492 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 493 throw new AuthorizationException(ErrorCode.E0509, user, jobId); 494 } 495 } 496 } 497 } 498 catch (IOException ex) { 499 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 500 } 501 } 502 } 503 504 /** 505 * Convenience method for instrumentation counters. 506 * 507 * @param name counter name. 508 * @param count count to increment the counter. 509 */ 510 private void incrCounter(String name, int count) { 511 if (instrumentation != null) { 512 instrumentation.incr(INSTRUMENTATION_GROUP, name, count); 513 } 514 } 515}