001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.io.BufferedReader; 021 import java.io.File; 022 import java.io.FileInputStream; 023 import java.io.FileNotFoundException; 024 import java.io.IOException; 025 import java.io.InputStreamReader; 026 import java.net.URI; 027 import java.util.HashSet; 028 import java.util.Set; 029 030 import org.apache.hadoop.conf.Configuration; 031 import org.apache.hadoop.fs.FileSystem; 032 import org.apache.hadoop.fs.Path; 033 import org.apache.oozie.BundleJobBean; 034 import org.apache.oozie.CoordinatorJobBean; 035 import org.apache.oozie.ErrorCode; 036 import org.apache.oozie.WorkflowJobBean; 037 import org.apache.oozie.client.XOozieClient; 038 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 039 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 040 import org.apache.oozie.executor.jpa.JPAExecutorException; 041 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 042 import org.apache.oozie.util.ConfigUtils; 043 import org.apache.oozie.util.Instrumentation; 044 import org.apache.oozie.util.XLog; 045 046 /** 047 * The authorization service provides all authorization checks. 048 */ 049 public class AuthorizationService implements Service { 050 051 public static final String CONF_PREFIX = Service.CONF_PREFIX + "AuthorizationService."; 052 053 /** 054 * Configuration parameter to enable or disable Oozie admin role. 055 */ 056 public static final String CONF_SECURITY_ENABLED = CONF_PREFIX + "security.enabled"; 057 058 /** 059 * Configuration parameter to enable or disable Oozie admin role. 060 */ 061 public static final String CONF_AUTHORIZATION_ENABLED = CONF_PREFIX + "authorization.enabled"; 062 063 /** 064 * Configuration parameter to enable old behavior default group as ACL. 065 */ 066 public static final String CONF_DEFAULT_GROUP_AS_ACL = CONF_PREFIX + "default.group.as.acl"; 067 068 /** 069 * File that contains list of admin users for Oozie. 070 */ 071 public static final String ADMIN_USERS_FILE = "adminusers.txt"; 072 073 protected static final String INSTRUMENTATION_GROUP = "authorization"; 074 protected static final String INSTR_FAILED_AUTH_COUNTER = "authorization.failed"; 075 076 private Set<String> adminUsers; 077 private boolean authorizationEnabled; 078 private boolean useDefaultGroupAsAcl; 079 080 private final XLog log = XLog.getLog(getClass()); 081 private Instrumentation instrumentation; 082 083 /** 084 * Initialize the service. <p/> Reads the security related configuration. parameters - security enabled and list of 085 * super users. 086 * 087 * @param services services instance. 088 * @throws ServiceException thrown if the service could not be initialized. 089 */ 090 public void init(Services services) throws ServiceException { 091 adminUsers = new HashSet<String>(); 092 authorizationEnabled = ConfigUtils.getWithDeprecatedCheck(services.getConf(), CONF_AUTHORIZATION_ENABLED, 093 CONF_SECURITY_ENABLED, false); 094 instrumentation = Services.get().get(InstrumentationService.class).get(); 095 if (authorizationEnabled) { 096 log.info("Oozie running with security enabled"); 097 loadAdminUsers(); 098 } 099 else { 100 log.warn("Oozie running with security disabled"); 101 } 102 103 useDefaultGroupAsAcl = Services.get().getConf().getBoolean(CONF_DEFAULT_GROUP_AS_ACL, false); 104 105 } 106 107 /** 108 * Return if security is enabled or not. 109 * 110 * @return if security is enabled or not. 111 */ 112 @Deprecated 113 public boolean isSecurityEnabled() { 114 return authorizationEnabled; 115 } 116 117 public boolean useDefaultGroupAsAcl() { 118 return useDefaultGroupAsAcl; 119 } 120 121 /** 122 * Return if security is enabled or not. 123 * 124 * @return if security is enabled or not. 125 */ 126 public boolean isAuthorizationEnabled() { 127 return isSecurityEnabled(); 128 } 129 130 /** 131 * Load the list of admin users from {@link AuthorizationService#ADMIN_USERS_FILE} </p> 132 * 133 * @throws ServiceException if the admin user list could not be loaded. 134 */ 135 private void loadAdminUsers() throws ServiceException { 136 String configDir = Services.get().get(ConfigurationService.class).getConfigDir(); 137 if (configDir != null) { 138 File file = new File(configDir, ADMIN_USERS_FILE); 139 if (file.exists()) { 140 try { 141 BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file))); 142 try { 143 String line = br.readLine(); 144 while (line != null) { 145 line = line.trim(); 146 if (line.length() > 0 && !line.startsWith("#")) { 147 adminUsers.add(line); 148 } 149 line = br.readLine(); 150 } 151 } 152 catch (IOException ex) { 153 throw new ServiceException(ErrorCode.E0160, file.getAbsolutePath(), ex); 154 } 155 } 156 catch (FileNotFoundException ex) { 157 throw new ServiceException(ErrorCode.E0160, ex); 158 } 159 } 160 else { 161 log.warn("Admin users file not available in config dir [{0}], running without admin users", configDir); 162 } 163 } 164 else { 165 log.warn("Reading configuration from classpath, running without admin users"); 166 } 167 } 168 169 /** 170 * Destroy the service. <p/> This implementation does a NOP. 171 */ 172 public void destroy() { 173 } 174 175 /** 176 * Return the public interface of the service. 177 * 178 * @return {@link AuthorizationService}. 179 */ 180 public Class<? extends Service> getInterface() { 181 return AuthorizationService.class; 182 } 183 184 /** 185 * Check if the user belongs to the group or not. 186 * 187 * @param user user name. 188 * @param group group name. 189 * @return if the user belongs to the group or not. 190 * @throws AuthorizationException thrown if the authorization query can not be performed. 191 */ 192 protected boolean isUserInGroup(String user, String group) throws AuthorizationException { 193 GroupsService groupsService = Services.get().get(GroupsService.class); 194 try { 195 return groupsService.getGroups(user).contains(group); 196 } 197 catch (IOException ex) { 198 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 199 } 200 } 201 202 /** 203 * Check if the user belongs to the group or not. <p/> <p/> Subclasses should override the {@link #isUserInGroup} 204 * method. 205 * 206 * @param user user name. 207 * @param group group name. 208 * @throws AuthorizationException thrown if the user is not authorized for the group or if the authorization query 209 * can not be performed. 210 */ 211 public void authorizeForGroup(String user, String group) throws AuthorizationException { 212 if (authorizationEnabled && !isUserInGroup(user, group)) { 213 throw new AuthorizationException(ErrorCode.E0502, user, group); 214 } 215 } 216 217 /** 218 * Return the default group to which the user belongs. <p/> This implementation always returns 'users'. 219 * 220 * @param user user name. 221 * @return default group of user. 222 * @throws AuthorizationException thrown if the default group con not be retrieved. 223 */ 224 public String getDefaultGroup(String user) throws AuthorizationException { 225 try { 226 return Services.get().get(GroupsService.class).getGroups(user).get(0); 227 } 228 catch (IOException ex) { 229 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 230 } 231 } 232 233 /** 234 * Check if the user has admin privileges. <p/> If admin is disabled it returns always <code>true</code>. <p/> If 235 * admin is enabled it returns <code>true</code> if the user is in the <code>adminusers.txt</code> file. 236 * 237 * @param user user name. 238 * @return if the user has admin privileges or not. 239 */ 240 protected boolean isAdmin(String user) { 241 return adminUsers.contains(user); 242 } 243 244 /** 245 * Check if the user has admin privileges. <p/> Subclasses should override the {@link #isUserInGroup} method. 246 * 247 * @param user user name. 248 * @param write indicates if the check is for read or write admin tasks (in this implementation this is ignored) 249 * @throws AuthorizationException thrown if user does not have admin priviledges. 250 */ 251 public void authorizeForAdmin(String user, boolean write) throws AuthorizationException { 252 if (authorizationEnabled && write && !isAdmin(user)) { 253 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 254 throw new AuthorizationException(ErrorCode.E0503, user); 255 } 256 } 257 258 /** 259 * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the 260 * file system permissions on the workflow application. 261 * 262 * @param user user name. 263 * @param group group name. 264 * @param appPath application path. 265 * @throws AuthorizationException thrown if the user is not authorized for the app. 266 */ 267 public void authorizeForApp(String user, String group, String appPath, Configuration jobConf) 268 throws AuthorizationException { 269 try { 270 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 271 URI uri = new Path(appPath).toUri(); 272 Configuration fsConf = has.createJobConf(uri.getAuthority()); 273 FileSystem fs = has.createFileSystem(user, uri, fsConf); 274 275 Path path = new Path(appPath); 276 try { 277 if (!fs.exists(path)) { 278 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 279 throw new AuthorizationException(ErrorCode.E0504, appPath); 280 } 281 Path wfXml = new Path(path, "workflow.xml"); 282 if (!fs.exists(wfXml)) { 283 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 284 throw new AuthorizationException(ErrorCode.E0505, appPath); 285 } 286 if (!fs.isFile(wfXml)) { 287 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 288 throw new AuthorizationException(ErrorCode.E0506, appPath); 289 } 290 fs.open(wfXml).close(); 291 } 292 // TODO change this when stopping support of 0.18 to the new 293 // Exception 294 catch (org.apache.hadoop.fs.permission.AccessControlException ex) { 295 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 296 throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex); 297 } 298 } 299 catch (IOException ex) { 300 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 301 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 302 } 303 catch (HadoopAccessorException e) { 304 throw new AuthorizationException(e); 305 } 306 } 307 308 /** 309 * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the 310 * file system permissions on the workflow application. 311 * 312 * @param user user name. 313 * @param group group name. 314 * @param appPath application path. 315 * @param fileName workflow or coordinator.xml 316 * @param conf 317 * @throws AuthorizationException thrown if the user is not authorized for the app. 318 */ 319 public void authorizeForApp(String user, String group, String appPath, String fileName, Configuration conf) 320 throws AuthorizationException { 321 try { 322 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 323 URI uri = new Path(appPath).toUri(); 324 Configuration fsConf = has.createJobConf(uri.getAuthority()); 325 FileSystem fs = has.createFileSystem(user, uri, fsConf); 326 327 Path path = new Path(appPath); 328 try { 329 if (!fs.exists(path)) { 330 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 331 throw new AuthorizationException(ErrorCode.E0504, appPath); 332 } 333 if (conf.get(XOozieClient.IS_PROXY_SUBMISSION) == null) { // Only further check existence of job definition files for non proxy submission jobs; 334 if (!fs.isFile(path)) { 335 Path appXml = new Path(path, fileName); 336 if (!fs.exists(appXml)) { 337 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 338 throw new AuthorizationException(ErrorCode.E0505, appPath); 339 } 340 if (!fs.isFile(appXml)) { 341 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 342 throw new AuthorizationException(ErrorCode.E0506, appPath); 343 } 344 fs.open(appXml).close(); 345 } 346 } 347 } 348 // TODO change this when stopping support of 0.18 to the new 349 // Exception 350 catch (org.apache.hadoop.fs.permission.AccessControlException ex) { 351 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 352 throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex); 353 } 354 } 355 catch (IOException ex) { 356 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 357 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 358 } 359 catch (HadoopAccessorException e) { 360 throw new AuthorizationException(e); 361 } 362 } 363 364 private boolean isUserInAcl(String user, String aclStr) throws IOException { 365 boolean userInAcl = false; 366 if (aclStr != null && aclStr.trim().length() > 0) { 367 GroupsService groupsService = Services.get().get(GroupsService.class); 368 String[] acl = aclStr.split(","); 369 for (int i = 0; !userInAcl && i < acl.length; i++) { 370 String aclItem = acl[i].trim(); 371 userInAcl = aclItem.equals(user) || groupsService.getGroups(user).equals(aclItem); 372 } 373 } 374 return userInAcl; 375 } 376 377 /** 378 * Check if the user+group is authorized to operate on the specified job. <p/> Checks if the user is a super-user or 379 * the one who started the job. <p/> Read operations are allowed to all users. 380 * 381 * @param user user name. 382 * @param jobId job id. 383 * @param write indicates if the check is for read or write job tasks. 384 * @throws AuthorizationException thrown if the user is not authorized for the job. 385 */ 386 public void authorizeForJob(String user, String jobId, boolean write) throws AuthorizationException { 387 if (authorizationEnabled && write && !isAdmin(user)) { 388 try { 389 // handle workflow jobs 390 if (jobId.endsWith("-W")) { 391 WorkflowJobBean jobBean = null; 392 JPAService jpaService = Services.get().get(JPAService.class); 393 if (jpaService != null) { 394 try { 395 jobBean = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId)); 396 } 397 catch (JPAExecutorException je) { 398 throw new AuthorizationException(je); 399 } 400 } 401 else { 402 throw new AuthorizationException(ErrorCode.E0610); 403 } 404 if (jobBean != null && !jobBean.getUser().equals(user)) { 405 if (!isUserInAcl(user, jobBean.getGroup())) { 406 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 407 throw new AuthorizationException(ErrorCode.E0508, user, jobId); 408 } 409 } 410 } 411 // handle bundle jobs 412 else if (jobId.endsWith("-B")){ 413 BundleJobBean jobBean = null; 414 JPAService jpaService = Services.get().get(JPAService.class); 415 if (jpaService != null) { 416 try { 417 jobBean = jpaService.execute(new BundleJobGetJPAExecutor(jobId)); 418 } 419 catch (JPAExecutorException je) { 420 throw new AuthorizationException(je); 421 } 422 } 423 else { 424 throw new AuthorizationException(ErrorCode.E0610); 425 } 426 if (jobBean != null && !jobBean.getUser().equals(user)) { 427 if (!isUserInAcl(user, jobBean.getGroup())) { 428 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 429 throw new AuthorizationException(ErrorCode.E0509, user, jobId); 430 } 431 } 432 } 433 // handle coordinator jobs 434 else { 435 CoordinatorJobBean jobBean = null; 436 JPAService jpaService = Services.get().get(JPAService.class); 437 if (jpaService != null) { 438 try { 439 jobBean = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 440 } 441 catch (JPAExecutorException je) { 442 throw new AuthorizationException(je); 443 } 444 } 445 else { 446 throw new AuthorizationException(ErrorCode.E0610); 447 } 448 if (jobBean != null && !jobBean.getUser().equals(user)) { 449 if (!isUserInAcl(user, jobBean.getGroup())) { 450 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 451 throw new AuthorizationException(ErrorCode.E0509, user, jobId); 452 } 453 } 454 } 455 } 456 catch (IOException ex) { 457 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 458 } 459 } 460 } 461 462 /** 463 * Convenience method for instrumentation counters. 464 * 465 * @param name counter name. 466 * @param count count to increment the counter. 467 */ 468 private void incrCounter(String name, int count) { 469 if (instrumentation != null) { 470 instrumentation.incr(INSTRUMENTATION_GROUP, name, count); 471 } 472 } 473 }