This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.io.BufferedReader;
021 import java.io.File;
022 import java.io.FileInputStream;
023 import java.io.FileNotFoundException;
024 import java.io.IOException;
025 import java.io.InputStreamReader;
026 import java.util.HashSet;
027 import java.util.Set;
028
029 import org.apache.hadoop.conf.Configuration;
030 import org.apache.hadoop.fs.FileSystem;
031 import org.apache.hadoop.fs.Path;
032 import org.apache.oozie.BundleJobBean;
033 import org.apache.oozie.CoordinatorJobBean;
034 import org.apache.oozie.ErrorCode;
035 import org.apache.oozie.WorkflowJobBean;
036 import org.apache.oozie.client.XOozieClient;
037 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
038 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
039 import org.apache.oozie.executor.jpa.JPAExecutorException;
040 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
041 import org.apache.oozie.util.Instrumentation;
042 import org.apache.oozie.util.XLog;
043
044 /**
045 * The authorization service provides all authorization checks.
046 */
047 public class AuthorizationService implements Service {
048
049 public static final String CONF_PREFIX = Service.CONF_PREFIX + "AuthorizationService.";
050
051 /**
052 * Configuration parameter to enable or disable Oozie admin role.
053 */
054 public static final String CONF_SECURITY_ENABLED = CONF_PREFIX + "security.enabled";
055
056 /**
057 * File that contains list of admin users for Oozie.
058 */
059 public static final String ADMIN_USERS_FILE = "adminusers.txt";
060
061 /**
062 * Default group returned by getDefaultGroup().
063 */
064 public static final String DEFAULT_GROUP = "users";
065
066 protected static final String INSTRUMENTATION_GROUP = "authorization";
067 protected static final String INSTR_FAILED_AUTH_COUNTER = "authorization.failed";
068
069 private Set<String> adminUsers;
070 private boolean securityEnabled;
071
072 private final XLog log = XLog.getLog(getClass());
073 private Instrumentation instrumentation;
074
075 /**
076 * Initialize the service. <p/> Reads the security related configuration. parameters - security enabled and list of
077 * super users.
078 *
079 * @param services services instance.
080 * @throws ServiceException thrown if the service could not be initialized.
081 */
082 public void init(Services services) throws ServiceException {
083 adminUsers = new HashSet<String>();
084 securityEnabled = services.getConf().getBoolean(CONF_SECURITY_ENABLED, false);
085 instrumentation = Services.get().get(InstrumentationService.class).get();
086 if (securityEnabled) {
087 log.info("Oozie running with security enabled");
088 loadAdminUsers();
089 }
090 else {
091 log.warn("Oozie running with security disabled");
092 }
093 }
094
095 /**
096 * Return if security is enabled or not.
097 *
098 * @return if security is enabled or not.
099 */
100 public boolean isSecurityEnabled() {
101 return securityEnabled;
102 }
103
104 /**
105 * Load the list of admin users from {@link AuthorizationService#ADMIN_USERS_FILE} </p>
106 *
107 * @throws ServiceException if the admin user list could not be loaded.
108 */
109 private void loadAdminUsers() throws ServiceException {
110 String configDir = Services.get().get(ConfigurationService.class).getConfigDir();
111 if (configDir != null) {
112 File file = new File(configDir, ADMIN_USERS_FILE);
113 if (file.exists()) {
114 try {
115 BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
116 try {
117 String line = br.readLine();
118 while (line != null) {
119 line = line.trim();
120 if (line.length() > 0 && !line.startsWith("#")) {
121 adminUsers.add(line);
122 }
123 line = br.readLine();
124 }
125 }
126 catch (IOException ex) {
127 throw new ServiceException(ErrorCode.E0160, file.getAbsolutePath(), ex);
128 }
129 }
130 catch (FileNotFoundException ex) {
131 throw new ServiceException(ErrorCode.E0160, ex);
132 }
133 }
134 else {
135 log.warn("Admin users file not available in config dir [{0}], running without admin users", configDir);
136 }
137 }
138 else {
139 log.warn("Reading configuration from classpath, running without admin users");
140 }
141 }
142
143 /**
144 * Destroy the service. <p/> This implementation does a NOP.
145 */
146 public void destroy() {
147 }
148
149 /**
150 * Return the public interface of the service.
151 *
152 * @return {@link AuthorizationService}.
153 */
154 public Class<? extends Service> getInterface() {
155 return AuthorizationService.class;
156 }
157
158 /**
159 * Check if the user belongs to the group or not. <p/> This implementation returns always <code>true</code>.
160 *
161 * @param user user name.
162 * @param group group name.
163 * @return if the user belongs to the group or not.
164 * @throws AuthorizationException thrown if the authorization query can not be performed.
165 */
166 protected boolean isUserInGroup(String user, String group) throws AuthorizationException {
167 return true;
168 }
169
170 /**
171 * Check if the user belongs to the group or not. <p/> <p/> Subclasses should override the {@link #isUserInGroup}
172 * method.
173 *
174 * @param user user name.
175 * @param group group name.
176 * @throws AuthorizationException thrown if the user is not authorized for the group or if the authorization query
177 * can not be performed.
178 */
179 public void authorizeForGroup(String user, String group) throws AuthorizationException {
180 if (securityEnabled && !isUserInGroup(user, group)) {
181 throw new AuthorizationException(ErrorCode.E0502, user, group);
182 }
183 }
184
185 /**
186 * Return the default group to which the user belongs. <p/> This implementation always returns 'users'.
187 *
188 * @param user user name.
189 * @return default group of user.
190 * @throws AuthorizationException thrown if the default group con not be retrieved.
191 */
192 public String getDefaultGroup(String user) throws AuthorizationException {
193 return DEFAULT_GROUP;
194 }
195
196 /**
197 * Check if the user has admin privileges. <p/> If admin is disabled it returns always <code>true</code>. <p/> If
198 * admin is enabled it returns <code>true</code> if the user is in the <code>adminusers.txt</code> file.
199 *
200 * @param user user name.
201 * @return if the user has admin privileges or not.
202 */
203 protected boolean isAdmin(String user) {
204 return adminUsers.contains(user);
205 }
206
207 /**
208 * Check if the user has admin privileges. <p/> Subclasses should override the {@link #isUserInGroup} method.
209 *
210 * @param user user name.
211 * @param write indicates if the check is for read or write admin tasks (in this implementation this is ignored)
212 * @throws AuthorizationException thrown if user does not have admin priviledges.
213 */
214 public void authorizeForAdmin(String user, boolean write) throws AuthorizationException {
215 if (securityEnabled && write && !isAdmin(user)) {
216 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
217 throw new AuthorizationException(ErrorCode.E0503, user);
218 }
219 }
220
221 /**
222 * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the
223 * file system permissions on the workflow application.
224 *
225 * @param user user name.
226 * @param group group name.
227 * @param appPath application path.
228 * @throws AuthorizationException thrown if the user is not authorized for the app.
229 */
230 public void authorizeForApp(String user, String group, String appPath, Configuration jobConf)
231 throws AuthorizationException {
232 try {
233 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group,
234 new Path(appPath).toUri(), jobConf);
235
236 Path path = new Path(appPath);
237 try {
238 if (!fs.exists(path)) {
239 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
240 throw new AuthorizationException(ErrorCode.E0504, appPath);
241 }
242 Path wfXml = new Path(path, "workflow.xml");
243 if (!fs.exists(wfXml)) {
244 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
245 throw new AuthorizationException(ErrorCode.E0505, appPath);
246 }
247 if (!fs.isFile(wfXml)) {
248 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
249 throw new AuthorizationException(ErrorCode.E0506, appPath);
250 }
251 fs.open(wfXml).close();
252 }
253 // TODO change this when stopping support of 0.18 to the new
254 // Exception
255 catch (org.apache.hadoop.fs.permission.AccessControlException ex) {
256 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
257 throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex);
258 }
259 }
260 catch (IOException ex) {
261 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
262 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
263 }
264 catch (HadoopAccessorException e) {
265 throw new AuthorizationException(e);
266 }
267 }
268
269 /**
270 * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the
271 * file system permissions on the workflow application.
272 *
273 * @param user user name.
274 * @param group group name.
275 * @param appPath application path.
276 * @param fileName workflow or coordinator.xml
277 * @param conf
278 * @throws AuthorizationException thrown if the user is not authorized for the app.
279 */
280 public void authorizeForApp(String user, String group, String appPath, String fileName, Configuration conf)
281 throws AuthorizationException {
282 try {
283 //Configuration conf = new Configuration();
284 //conf.set("user.name", user);
285 // TODO Temporary fix till
286 // https://issues.apache.org/jira/browse/HADOOP-4875 is resolved.
287 //conf.set("hadoop.job.ugi", user + "," + group);
288 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group,
289 new Path(appPath).toUri(), conf);
290 Path path = new Path(appPath);
291 try {
292 if (!fs.exists(path)) {
293 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
294 throw new AuthorizationException(ErrorCode.E0504, appPath);
295 }
296 if (conf.get(XOozieClient.IS_PROXY_SUBMISSION) == null) { // Only further check existence of job definition files for non proxy submission jobs;
297 if (!fs.isFile(path)) {
298 Path appXml = new Path(path, fileName);
299 if (!fs.exists(appXml)) {
300 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
301 throw new AuthorizationException(ErrorCode.E0505, appPath);
302 }
303 if (!fs.isFile(appXml)) {
304 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
305 throw new AuthorizationException(ErrorCode.E0506, appPath);
306 }
307 fs.open(appXml).close();
308 }
309 }
310 }
311 // TODO change this when stopping support of 0.18 to the new
312 // Exception
313 catch (org.apache.hadoop.fs.permission.AccessControlException ex) {
314 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
315 throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex);
316 }
317 }
318 catch (IOException ex) {
319 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
320 throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
321 }
322 catch (HadoopAccessorException e) {
323 throw new AuthorizationException(e);
324 }
325 }
326
327 /**
328 * Check if the user+group is authorized to operate on the specified job. <p/> Checks if the user is a super-user or
329 * the one who started the job. <p/> Read operations are allowed to all users.
330 *
331 * @param user user name.
332 * @param jobId job id.
333 * @param write indicates if the check is for read or write job tasks.
334 * @throws AuthorizationException thrown if the user is not authorized for the job.
335 */
336 public void authorizeForJob(String user, String jobId, boolean write) throws AuthorizationException {
337 if (securityEnabled && write && !isAdmin(user)) {
338 // handle workflow jobs
339 if (jobId.endsWith("-W")) {
340 WorkflowJobBean jobBean = null;
341 JPAService jpaService = Services.get().get(JPAService.class);
342 if (jpaService != null) {
343 try {
344 jobBean = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
345 }
346 catch (JPAExecutorException je) {
347 throw new AuthorizationException(je);
348 }
349 }
350 else {
351 throw new AuthorizationException(ErrorCode.E0610);
352 }
353 if (jobBean != null && !jobBean.getUser().equals(user)) {
354 if (!isUserInGroup(user, jobBean.getGroup())) {
355 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
356 throw new AuthorizationException(ErrorCode.E0508, user, jobId);
357 }
358 }
359 }
360 // handle bundle jobs
361 else if (jobId.endsWith("-B")){
362 BundleJobBean jobBean = null;
363 JPAService jpaService = Services.get().get(JPAService.class);
364 if (jpaService != null) {
365 try {
366 jobBean = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
367 }
368 catch (JPAExecutorException je) {
369 throw new AuthorizationException(je);
370 }
371 }
372 else {
373 throw new AuthorizationException(ErrorCode.E0610);
374 }
375 if (jobBean != null && !jobBean.getUser().equals(user)) {
376 if (!isUserInGroup(user, jobBean.getGroup())) {
377 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
378 throw new AuthorizationException(ErrorCode.E0509, user, jobId);
379 }
380 }
381 }
382 // handle coordinator jobs
383 else {
384 CoordinatorJobBean jobBean = null;
385 JPAService jpaService = Services.get().get(JPAService.class);
386 if (jpaService != null) {
387 try {
388 jobBean = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
389 }
390 catch (JPAExecutorException je) {
391 throw new AuthorizationException(je);
392 }
393 }
394 else {
395 throw new AuthorizationException(ErrorCode.E0610);
396 }
397 if (jobBean != null && !jobBean.getUser().equals(user)) {
398 if (!isUserInGroup(user, jobBean.getGroup())) {
399 incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
400 throw new AuthorizationException(ErrorCode.E0509, user, jobId);
401 }
402 }
403 }
404 }
405 }
406
407 /**
408 * Convenience method for instrumentation counters.
409 *
410 * @param name counter name.
411 * @param count count to increment the counter.
412 */
413 private void incrCounter(String name, int count) {
414 if (instrumentation != null) {
415 instrumentation.incr(INSTRUMENTATION_GROUP, name, count);
416 }
417 }
418 }