001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.io.BufferedReader; 021 import java.io.File; 022 import java.io.FileInputStream; 023 import java.io.FileNotFoundException; 024 import java.io.IOException; 025 import java.io.InputStreamReader; 026 import java.util.HashSet; 027 import java.util.Set; 028 029 import org.apache.hadoop.conf.Configuration; 030 import org.apache.hadoop.fs.FileSystem; 031 import org.apache.hadoop.fs.Path; 032 import org.apache.oozie.BundleJobBean; 033 import org.apache.oozie.CoordinatorJobBean; 034 import org.apache.oozie.ErrorCode; 035 import org.apache.oozie.WorkflowJobBean; 036 import org.apache.oozie.client.XOozieClient; 037 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 038 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 039 import org.apache.oozie.executor.jpa.JPAExecutorException; 040 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 041 import org.apache.oozie.util.Instrumentation; 042 import org.apache.oozie.util.XLog; 043 044 /** 045 * The authorization service provides all authorization checks. 046 */ 047 public class AuthorizationService implements Service { 048 049 public static final String CONF_PREFIX = Service.CONF_PREFIX + "AuthorizationService."; 050 051 /** 052 * Configuration parameter to enable or disable Oozie admin role. 053 */ 054 public static final String CONF_SECURITY_ENABLED = CONF_PREFIX + "security.enabled"; 055 056 /** 057 * File that contains list of admin users for Oozie. 058 */ 059 public static final String ADMIN_USERS_FILE = "adminusers.txt"; 060 061 /** 062 * Default group returned by getDefaultGroup(). 063 */ 064 public static final String DEFAULT_GROUP = "users"; 065 066 protected static final String INSTRUMENTATION_GROUP = "authorization"; 067 protected static final String INSTR_FAILED_AUTH_COUNTER = "authorization.failed"; 068 069 private Set<String> adminUsers; 070 private boolean securityEnabled; 071 072 private final XLog log = XLog.getLog(getClass()); 073 private Instrumentation instrumentation; 074 075 /** 076 * Initialize the service. <p/> Reads the security related configuration. parameters - security enabled and list of 077 * super users. 078 * 079 * @param services services instance. 080 * @throws ServiceException thrown if the service could not be initialized. 081 */ 082 public void init(Services services) throws ServiceException { 083 adminUsers = new HashSet<String>(); 084 securityEnabled = services.getConf().getBoolean(CONF_SECURITY_ENABLED, false); 085 instrumentation = Services.get().get(InstrumentationService.class).get(); 086 if (securityEnabled) { 087 log.info("Oozie running with security enabled"); 088 loadAdminUsers(); 089 } 090 else { 091 log.warn("Oozie running with security disabled"); 092 } 093 } 094 095 /** 096 * Return if security is enabled or not. 097 * 098 * @return if security is enabled or not. 099 */ 100 public boolean isSecurityEnabled() { 101 return securityEnabled; 102 } 103 104 /** 105 * Load the list of admin users from {@link AuthorizationService#ADMIN_USERS_FILE} </p> 106 * 107 * @throws ServiceException if the admin user list could not be loaded. 108 */ 109 private void loadAdminUsers() throws ServiceException { 110 String configDir = Services.get().get(ConfigurationService.class).getConfigDir(); 111 if (configDir != null) { 112 File file = new File(configDir, ADMIN_USERS_FILE); 113 if (file.exists()) { 114 try { 115 BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file))); 116 try { 117 String line = br.readLine(); 118 while (line != null) { 119 line = line.trim(); 120 if (line.length() > 0 && !line.startsWith("#")) { 121 adminUsers.add(line); 122 } 123 line = br.readLine(); 124 } 125 } 126 catch (IOException ex) { 127 throw new ServiceException(ErrorCode.E0160, file.getAbsolutePath(), ex); 128 } 129 } 130 catch (FileNotFoundException ex) { 131 throw new ServiceException(ErrorCode.E0160, ex); 132 } 133 } 134 else { 135 log.warn("Admin users file not available in config dir [{0}], running without admin users", configDir); 136 } 137 } 138 else { 139 log.warn("Reading configuration from classpath, running without admin users"); 140 } 141 } 142 143 /** 144 * Destroy the service. <p/> This implementation does a NOP. 145 */ 146 public void destroy() { 147 } 148 149 /** 150 * Return the public interface of the service. 151 * 152 * @return {@link AuthorizationService}. 153 */ 154 public Class<? extends Service> getInterface() { 155 return AuthorizationService.class; 156 } 157 158 /** 159 * Check if the user belongs to the group or not. <p/> This implementation returns always <code>true</code>. 160 * 161 * @param user user name. 162 * @param group group name. 163 * @return if the user belongs to the group or not. 164 * @throws AuthorizationException thrown if the authorization query can not be performed. 165 */ 166 protected boolean isUserInGroup(String user, String group) throws AuthorizationException { 167 return true; 168 } 169 170 /** 171 * Check if the user belongs to the group or not. <p/> <p/> Subclasses should override the {@link #isUserInGroup} 172 * method. 173 * 174 * @param user user name. 175 * @param group group name. 176 * @throws AuthorizationException thrown if the user is not authorized for the group or if the authorization query 177 * can not be performed. 178 */ 179 public void authorizeForGroup(String user, String group) throws AuthorizationException { 180 if (securityEnabled && !isUserInGroup(user, group)) { 181 throw new AuthorizationException(ErrorCode.E0502, user, group); 182 } 183 } 184 185 /** 186 * Return the default group to which the user belongs. <p/> This implementation always returns 'users'. 187 * 188 * @param user user name. 189 * @return default group of user. 190 * @throws AuthorizationException thrown if the default group con not be retrieved. 191 */ 192 public String getDefaultGroup(String user) throws AuthorizationException { 193 return DEFAULT_GROUP; 194 } 195 196 /** 197 * Check if the user has admin privileges. <p/> If admin is disabled it returns always <code>true</code>. <p/> If 198 * admin is enabled it returns <code>true</code> if the user is in the <code>adminusers.txt</code> file. 199 * 200 * @param user user name. 201 * @return if the user has admin privileges or not. 202 */ 203 protected boolean isAdmin(String user) { 204 return adminUsers.contains(user); 205 } 206 207 /** 208 * Check if the user has admin privileges. <p/> Subclasses should override the {@link #isUserInGroup} method. 209 * 210 * @param user user name. 211 * @param write indicates if the check is for read or write admin tasks (in this implementation this is ignored) 212 * @throws AuthorizationException thrown if user does not have admin priviledges. 213 */ 214 public void authorizeForAdmin(String user, boolean write) throws AuthorizationException { 215 if (securityEnabled && write && !isAdmin(user)) { 216 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 217 throw new AuthorizationException(ErrorCode.E0503, user); 218 } 219 } 220 221 /** 222 * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the 223 * file system permissions on the workflow application. 224 * 225 * @param user user name. 226 * @param group group name. 227 * @param appPath application path. 228 * @throws AuthorizationException thrown if the user is not authorized for the app. 229 */ 230 public void authorizeForApp(String user, String group, String appPath, Configuration jobConf) 231 throws AuthorizationException { 232 try { 233 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, 234 new Path(appPath).toUri(), jobConf); 235 236 Path path = new Path(appPath); 237 try { 238 if (!fs.exists(path)) { 239 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 240 throw new AuthorizationException(ErrorCode.E0504, appPath); 241 } 242 Path wfXml = new Path(path, "workflow.xml"); 243 if (!fs.exists(wfXml)) { 244 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 245 throw new AuthorizationException(ErrorCode.E0505, appPath); 246 } 247 if (!fs.isFile(wfXml)) { 248 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 249 throw new AuthorizationException(ErrorCode.E0506, appPath); 250 } 251 fs.open(wfXml).close(); 252 } 253 // TODO change this when stopping support of 0.18 to the new 254 // Exception 255 catch (org.apache.hadoop.fs.permission.AccessControlException ex) { 256 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 257 throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex); 258 } 259 } 260 catch (IOException ex) { 261 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 262 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 263 } 264 catch (HadoopAccessorException e) { 265 throw new AuthorizationException(e); 266 } 267 } 268 269 /** 270 * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the 271 * file system permissions on the workflow application. 272 * 273 * @param user user name. 274 * @param group group name. 275 * @param appPath application path. 276 * @param fileName workflow or coordinator.xml 277 * @param conf 278 * @throws AuthorizationException thrown if the user is not authorized for the app. 279 */ 280 public void authorizeForApp(String user, String group, String appPath, String fileName, Configuration conf) 281 throws AuthorizationException { 282 try { 283 //Configuration conf = new Configuration(); 284 //conf.set("user.name", user); 285 // TODO Temporary fix till 286 // https://issues.apache.org/jira/browse/HADOOP-4875 is resolved. 287 //conf.set("hadoop.job.ugi", user + "," + group); 288 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, 289 new Path(appPath).toUri(), conf); 290 Path path = new Path(appPath); 291 try { 292 if (!fs.exists(path)) { 293 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 294 throw new AuthorizationException(ErrorCode.E0504, appPath); 295 } 296 if (conf.get(XOozieClient.IS_PROXY_SUBMISSION) == null) { // Only further check existence of job definition files for non proxy submission jobs; 297 if (!fs.isFile(path)) { 298 Path appXml = new Path(path, fileName); 299 if (!fs.exists(appXml)) { 300 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 301 throw new AuthorizationException(ErrorCode.E0505, appPath); 302 } 303 if (!fs.isFile(appXml)) { 304 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 305 throw new AuthorizationException(ErrorCode.E0506, appPath); 306 } 307 fs.open(appXml).close(); 308 } 309 } 310 } 311 // TODO change this when stopping support of 0.18 to the new 312 // Exception 313 catch (org.apache.hadoop.fs.permission.AccessControlException ex) { 314 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 315 throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex); 316 } 317 } 318 catch (IOException ex) { 319 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 320 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); 321 } 322 catch (HadoopAccessorException e) { 323 throw new AuthorizationException(e); 324 } 325 } 326 327 /** 328 * Check if the user+group is authorized to operate on the specified job. <p/> Checks if the user is a super-user or 329 * the one who started the job. <p/> Read operations are allowed to all users. 330 * 331 * @param user user name. 332 * @param jobId job id. 333 * @param write indicates if the check is for read or write job tasks. 334 * @throws AuthorizationException thrown if the user is not authorized for the job. 335 */ 336 public void authorizeForJob(String user, String jobId, boolean write) throws AuthorizationException { 337 if (securityEnabled && write && !isAdmin(user)) { 338 // handle workflow jobs 339 if (jobId.endsWith("-W")) { 340 WorkflowJobBean jobBean = null; 341 JPAService jpaService = Services.get().get(JPAService.class); 342 if (jpaService != null) { 343 try { 344 jobBean = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId)); 345 } 346 catch (JPAExecutorException je) { 347 throw new AuthorizationException(je); 348 } 349 } 350 else { 351 throw new AuthorizationException(ErrorCode.E0610); 352 } 353 if (jobBean != null && !jobBean.getUser().equals(user)) { 354 if (!isUserInGroup(user, jobBean.getGroup())) { 355 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 356 throw new AuthorizationException(ErrorCode.E0508, user, jobId); 357 } 358 } 359 } 360 // handle bundle jobs 361 else if (jobId.endsWith("-B")){ 362 BundleJobBean jobBean = null; 363 JPAService jpaService = Services.get().get(JPAService.class); 364 if (jpaService != null) { 365 try { 366 jobBean = jpaService.execute(new BundleJobGetJPAExecutor(jobId)); 367 } 368 catch (JPAExecutorException je) { 369 throw new AuthorizationException(je); 370 } 371 } 372 else { 373 throw new AuthorizationException(ErrorCode.E0610); 374 } 375 if (jobBean != null && !jobBean.getUser().equals(user)) { 376 if (!isUserInGroup(user, jobBean.getGroup())) { 377 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 378 throw new AuthorizationException(ErrorCode.E0509, user, jobId); 379 } 380 } 381 } 382 // handle coordinator jobs 383 else { 384 CoordinatorJobBean jobBean = null; 385 JPAService jpaService = Services.get().get(JPAService.class); 386 if (jpaService != null) { 387 try { 388 jobBean = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 389 } 390 catch (JPAExecutorException je) { 391 throw new AuthorizationException(je); 392 } 393 } 394 else { 395 throw new AuthorizationException(ErrorCode.E0610); 396 } 397 if (jobBean != null && !jobBean.getUser().equals(user)) { 398 if (!isUserInGroup(user, jobBean.getGroup())) { 399 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); 400 throw new AuthorizationException(ErrorCode.E0509, user, jobId); 401 } 402 } 403 } 404 } 405 } 406 407 /** 408 * Convenience method for instrumentation counters. 409 * 410 * @param name counter name. 411 * @param count count to increment the counter. 412 */ 413 private void incrCounter(String name, int count) { 414 if (instrumentation != null) { 415 instrumentation.incr(INSTRUMENTATION_GROUP, name, count); 416 } 417 } 418 }