This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.io.BufferedReader;
021 import java.io.File;
022 import java.io.FileInputStream;
023 import java.io.FileNotFoundException;
024 import java.io.IOException;
025 import java.io.InputStreamReader;
026 import java.net.URI;
027 import java.util.HashSet;
028 import java.util.Set;
029
030 import org.apache.hadoop.conf.Configuration;
031 import org.apache.hadoop.fs.FileSystem;
032 import org.apache.hadoop.fs.Path;
033 import org.apache.oozie.BundleJobBean;
034 import org.apache.oozie.CoordinatorJobBean;
035 import org.apache.oozie.ErrorCode;
036 import org.apache.oozie.WorkflowJobBean;
037 import org.apache.oozie.client.XOozieClient;
038 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
039 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
040 import org.apache.oozie.executor.jpa.JPAExecutorException;
041 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
042 import org.apache.oozie.util.ConfigUtils;
043 import org.apache.oozie.util.Instrumentation;
044 import org.apache.oozie.util.XLog;
045
046 /**
047 * The authorization service provides all authorization checks.
048 */
049 public class AuthorizationService implements Service {
050
051 public static final String CONF_PREFIX = Service.CONF_PREFIX + "AuthorizationService.";
052
053 /**
054 * Configuration parameter to enable or disable Oozie admin role.
055 */
056 public static final String CONF_SECURITY_ENABLED = CONF_PREFIX + "security.enabled";
057
058 /**
059 * Configuration parameter to enable or disable Oozie admin role.
060 */
061 public static final String CONF_AUTHORIZATION_ENABLED = CONF_PREFIX + "authorization.enabled";
062
063 /**
064 * Configuration parameter to enable old behavior default group as ACL.
065 */
066 public static final String CONF_DEFAULT_GROUP_AS_ACL = CONF_PREFIX + "default.group.as.acl";
067
068 /**
069 * File that contains list of admin users for Oozie.
070 */
071 public static final String ADMIN_USERS_FILE = "adminusers.txt";
072
073 protected static final String INSTRUMENTATION_GROUP = "authorization";
074 protected static final String INSTR_FAILED_AUTH_COUNTER = "authorization.failed";
075
076 private Set<String> adminUsers;
077 private boolean authorizationEnabled;
078 private boolean useDefaultGroupAsAcl;
079
080 private final XLog log = XLog.getLog(getClass());
081 private Instrumentation instrumentation;
082
083 /**
084 * Initialize the service. <p/> Reads the security related configuration. parameters - security enabled and list of
085 * super users.
086 *
087 * @param services services instance.
088 * @throws ServiceException thrown if the service could not be initialized.
089 */
090 public void init(Services services) throws ServiceException {
091 adminUsers = new HashSet<String>();
092 authorizationEnabled = ConfigUtils.getWithDeprecatedCheck(services.getConf(), CONF_AUTHORIZATION_ENABLED,
093 CONF_SECURITY_ENABLED, false);
094 instrumentation = Services.get().get(InstrumentationService.class).get();
095 if (authorizationEnabled) {
096 log.info("Oozie running with security enabled");
097 loadAdminUsers();
098 }
099 else {
100 log.warn("Oozie running with security disabled");
101 }
102
103 useDefaultGroupAsAcl = Services.get().getConf().getBoolean(CONF_DEFAULT_GROUP_AS_ACL, false);
104
105 }
106
107 /**
108 * Return if security is enabled or not.
109 *
110 * @return if security is enabled or not.
111 */
112 @Deprecated
113 public boolean isSecurityEnabled() {
114 return authorizationEnabled;
115 }
116
117 public boolean useDefaultGroupAsAcl() {
118 return useDefaultGroupAsAcl;
119 }
120
121 /**
122 * Return if security is enabled or not.
123 *
124 * @return if security is enabled or not.
125 */
126 public boolean isAuthorizationEnabled() {
127 return isSecurityEnabled();
128 }
129
130 /**
131 * Load the list of admin users from {@link AuthorizationService#ADMIN_USERS_FILE} </p>
132 *
133 * @throws ServiceException if the admin user list could not be loaded.
134 */
135 private void loadAdminUsers() throws ServiceException {
136 String configDir = Services.get().get(ConfigurationService.class).getConfigDir();
137 if (configDir != null) {
138 File file = new File(configDir, ADMIN_USERS_FILE);
139 if (file.exists()) {
140 try {
141 BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
142 try {
143 String line = br.readLine();
144 while (line != null) {
145 line = line.trim();
146 if (line.length() > 0 && !line.startsWith("#")) {
147 adminUsers.add(line);
148 }
149 line = br.readLine();
150 }
151 }
152 catch (IOException ex) {
153 throw new ServiceException(ErrorCode.E0160, file.getAbsolutePath(), ex);
154 }
155 }
156 catch (FileNotFoundException ex) {
157 throw new ServiceException(ErrorCode.E0160, ex);
158 }
159 }
160 else {
161 log.warn("Admin users file not available in config dir [{0}], running without admin users", configDir);
162 }
163 }
164 else {
165 log.warn("Reading configuration from classpath, running without admin users");
166 }
167 }
168
169 /**
170 * Destroy the service. <p/> This implementation does a NOP.
171 */
172 public void destroy() {
173 }
174
175 /**
176 * Return the public interface of the service.
177 *
178 * @return {@link AuthorizationService}.
179 */
180 public Class<? extends Service> getInterface() {
181 return AuthorizationService.class;
182 }
183
184 /**
185 * Check if the user belongs to the group or not.
186 *
187 * @param user user name.
188 * @param group group name.
189 * @return if the user belongs to the group or not.
190 * @throws AuthorizationException thrown if the authorization query can not be performed.
191 */
192 protected boolean isUserInGroup(String user, String group) throws AuthorizationException {
193 GroupsService groupsService = Services.get().get(GroupsService.class);
194 try {
195 return groupsService.getGroups(user).contains(group);
196 }
197 catch (IOException ex) {
198 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
199 }
200 }
201
202 /**
203 * Check if the user belongs to the group or not. <p/> <p/> Subclasses should override the {@link #isUserInGroup}
204 * method.
205 *
206 * @param user user name.
207 * @param group group name.
208 * @throws AuthorizationException thrown if the user is not authorized for the group or if the authorization query
209 * can not be performed.
210 */
211 public void authorizeForGroup(String user, String group) throws AuthorizationException {
212 if (authorizationEnabled && !isUserInGroup(user, group)) {
213 throw new AuthorizationException(ErrorCode.E0502, user, group);
214 }
215 }
216
217 /**
218 * Return the default group to which the user belongs. <p/> This implementation always returns 'users'.
219 *
220 * @param user user name.
221 * @return default group of user.
222 * @throws AuthorizationException thrown if the default group con not be retrieved.
223 */
224 public String getDefaultGroup(String user) throws AuthorizationException {
225 try {
226 return Services.get().get(GroupsService.class).getGroups(user).get(0);
227 }
228 catch (IOException ex) {
229 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
230 }
231 }
232
233 /**
234 * Check if the user has admin privileges. <p/> If admin is disabled it returns always <code>true</code>. <p/> If
235 * admin is enabled it returns <code>true</code> if the user is in the <code>adminusers.txt</code> file.
236 *
237 * @param user user name.
238 * @return if the user has admin privileges or not.
239 */
240 protected boolean isAdmin(String user) {
241 return adminUsers.contains(user);
242 }
243
244 /**
245 * Check if the user has admin privileges. <p/> Subclasses should override the {@link #isUserInGroup} method.
246 *
247 * @param user user name.
248 * @param write indicates if the check is for read or write admin tasks (in this implementation this is ignored)
249 * @throws AuthorizationException thrown if user does not have admin priviledges.
250 */
251 public void authorizeForAdmin(String user, boolean write) throws AuthorizationException {
252 if (authorizationEnabled && write && !isAdmin(user)) {
253 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
254 throw new AuthorizationException(ErrorCode.E0503, user);
255 }
256 }
257
258 /**
259 * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the
260 * file system permissions on the workflow application.
261 *
262 * @param user user name.
263 * @param group group name.
264 * @param appPath application path.
265 * @throws AuthorizationException thrown if the user is not authorized for the app.
266 */
267 public void authorizeForApp(String user, String group, String appPath, Configuration jobConf)
268 throws AuthorizationException {
269 try {
270 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
271 URI uri = new Path(appPath).toUri();
272 Configuration fsConf = has.createJobConf(uri.getAuthority());
273 FileSystem fs = has.createFileSystem(user, uri, fsConf);
274
275 Path path = new Path(appPath);
276 try {
277 if (!fs.exists(path)) {
278 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
279 throw new AuthorizationException(ErrorCode.E0504, appPath);
280 }
281 Path wfXml = new Path(path, "workflow.xml");
282 if (!fs.exists(wfXml)) {
283 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
284 throw new AuthorizationException(ErrorCode.E0505, appPath);
285 }
286 if (!fs.isFile(wfXml)) {
287 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
288 throw new AuthorizationException(ErrorCode.E0506, appPath);
289 }
290 fs.open(wfXml).close();
291 }
292 // TODO change this when stopping support of 0.18 to the new
293 // Exception
294 catch (org.apache.hadoop.fs.permission.AccessControlException ex) {
295 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
296 throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex);
297 }
298 }
299 catch (IOException ex) {
300 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
301 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
302 }
303 catch (HadoopAccessorException e) {
304 throw new AuthorizationException(e);
305 }
306 }
307
308 /**
309 * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the
310 * file system permissions on the workflow application.
311 *
312 * @param user user name.
313 * @param group group name.
314 * @param appPath application path.
315 * @param fileName workflow or coordinator.xml
316 * @param conf
317 * @throws AuthorizationException thrown if the user is not authorized for the app.
318 */
319 public void authorizeForApp(String user, String group, String appPath, String fileName, Configuration conf)
320 throws AuthorizationException {
321 try {
322 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
323 URI uri = new Path(appPath).toUri();
324 Configuration fsConf = has.createJobConf(uri.getAuthority());
325 FileSystem fs = has.createFileSystem(user, uri, fsConf);
326
327 Path path = new Path(appPath);
328 try {
329 if (!fs.exists(path)) {
330 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
331 throw new AuthorizationException(ErrorCode.E0504, appPath);
332 }
333 if (conf.get(XOozieClient.IS_PROXY_SUBMISSION) == null) { // Only further check existence of job definition files for non proxy submission jobs;
334 if (!fs.isFile(path)) {
335 Path appXml = new Path(path, fileName);
336 if (!fs.exists(appXml)) {
337 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
338 throw new AuthorizationException(ErrorCode.E0505, appPath);
339 }
340 if (!fs.isFile(appXml)) {
341 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
342 throw new AuthorizationException(ErrorCode.E0506, appPath);
343 }
344 fs.open(appXml).close();
345 }
346 }
347 }
348 // TODO change this when stopping support of 0.18 to the new
349 // Exception
350 catch (org.apache.hadoop.fs.permission.AccessControlException ex) {
351 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
352 throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex);
353 }
354 }
355 catch (IOException ex) {
356 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
357 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
358 }
359 catch (HadoopAccessorException e) {
360 throw new AuthorizationException(e);
361 }
362 }
363
364 private boolean isUserInAcl(String user, String aclStr) throws IOException {
365 boolean userInAcl = false;
366 if (aclStr != null && aclStr.trim().length() > 0) {
367 GroupsService groupsService = Services.get().get(GroupsService.class);
368 String[] acl = aclStr.split(",");
369 for (int i = 0; !userInAcl && i < acl.length; i++) {
370 String aclItem = acl[i].trim();
371 userInAcl = aclItem.equals(user) || groupsService.getGroups(user).equals(aclItem);
372 }
373 }
374 return userInAcl;
375 }
376
377 /**
378 * Check if the user+group is authorized to operate on the specified job. <p/> Checks if the user is a super-user or
379 * the one who started the job. <p/> Read operations are allowed to all users.
380 *
381 * @param user user name.
382 * @param jobId job id.
383 * @param write indicates if the check is for read or write job tasks.
384 * @throws AuthorizationException thrown if the user is not authorized for the job.
385 */
386 public void authorizeForJob(String user, String jobId, boolean write) throws AuthorizationException {
387 if (authorizationEnabled && write && !isAdmin(user)) {
388 try {
389 // handle workflow jobs
390 if (jobId.endsWith("-W")) {
391 WorkflowJobBean jobBean = null;
392 JPAService jpaService = Services.get().get(JPAService.class);
393 if (jpaService != null) {
394 try {
395 jobBean = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
396 }
397 catch (JPAExecutorException je) {
398 throw new AuthorizationException(je);
399 }
400 }
401 else {
402 throw new AuthorizationException(ErrorCode.E0610);
403 }
404 if (jobBean != null && !jobBean.getUser().equals(user)) {
405 if (!isUserInAcl(user, jobBean.getGroup())) {
406 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
407 throw new AuthorizationException(ErrorCode.E0508, user, jobId);
408 }
409 }
410 }
411 // handle bundle jobs
412 else if (jobId.endsWith("-B")){
413 BundleJobBean jobBean = null;
414 JPAService jpaService = Services.get().get(JPAService.class);
415 if (jpaService != null) {
416 try {
417 jobBean = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
418 }
419 catch (JPAExecutorException je) {
420 throw new AuthorizationException(je);
421 }
422 }
423 else {
424 throw new AuthorizationException(ErrorCode.E0610);
425 }
426 if (jobBean != null && !jobBean.getUser().equals(user)) {
427 if (!isUserInAcl(user, jobBean.getGroup())) {
428 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
429 throw new AuthorizationException(ErrorCode.E0509, user, jobId);
430 }
431 }
432 }
433 // handle coordinator jobs
434 else {
435 CoordinatorJobBean jobBean = null;
436 JPAService jpaService = Services.get().get(JPAService.class);
437 if (jpaService != null) {
438 try {
439 jobBean = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
440 }
441 catch (JPAExecutorException je) {
442 throw new AuthorizationException(je);
443 }
444 }
445 else {
446 throw new AuthorizationException(ErrorCode.E0610);
447 }
448 if (jobBean != null && !jobBean.getUser().equals(user)) {
449 if (!isUserInAcl(user, jobBean.getGroup())) {
450 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
451 throw new AuthorizationException(ErrorCode.E0509, user, jobId);
452 }
453 }
454 }
455 }
456 catch (IOException ex) {
457 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
458 }
459 }
460 }
461
462 /**
463 * Convenience method for instrumentation counters.
464 *
465 * @param name counter name.
466 * @param count count to increment the counter.
467 */
468 private void incrCounter(String name, int count) {
469 if (instrumentation != null) {
470 instrumentation.incr(INSTRUMENTATION_GROUP, name, count);
471 }
472 }
473 }