001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.servlet; 019 020 import java.io.IOException; 021 import java.util.Arrays; 022 023 import javax.servlet.ServletConfig; 024 import javax.servlet.ServletException; 025 import javax.servlet.http.HttpServletRequest; 026 import javax.servlet.http.HttpServletResponse; 027 028 import org.apache.hadoop.conf.Configuration; 029 import org.apache.hadoop.fs.FileStatus; 030 import org.apache.hadoop.fs.FileSystem; 031 import org.apache.hadoop.fs.Path; 032 import org.apache.oozie.BaseEngineException; 033 import org.apache.oozie.ErrorCode; 034 import org.apache.oozie.client.OozieClient; 035 import org.apache.oozie.client.XOozieClient; 036 import org.apache.oozie.client.rest.JsonBean; 037 import org.apache.oozie.client.rest.RestConstants; 038 import org.apache.oozie.service.AuthorizationException; 039 import org.apache.oozie.service.AuthorizationService; 040 import org.apache.oozie.service.HadoopAccessorException; 041 import org.apache.oozie.service.HadoopAccessorService; 042 import org.apache.oozie.service.Services; 043 import org.apache.oozie.service.XLogService; 044 import org.apache.oozie.util.ConfigUtils; 045 import org.apache.oozie.util.JobUtils; 046 import org.apache.oozie.util.XConfiguration; 047 import org.apache.oozie.util.XLog; 048 import org.json.simple.JSONObject; 049 050 public abstract class BaseJobServlet extends JsonRestServlet { 051 052 private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[1]; 053 054 static { 055 RESOURCES_INFO[0] = new ResourceInfo("*", Arrays.asList("PUT", "GET"), Arrays.asList(new ParameterInfo( 056 RestConstants.ACTION_PARAM, String.class, true, Arrays.asList("PUT")), new ParameterInfo( 057 RestConstants.JOB_SHOW_PARAM, String.class, false, Arrays.asList("GET")))); 058 } 059 060 public BaseJobServlet(String instrumentationName) { 061 super(instrumentationName, RESOURCES_INFO); 062 } 063 064 /** 065 * Perform various job related actions - start, suspend, resume, kill, etc. 066 */ 067 @Override 068 protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 069 String jobId = getResourceName(request); 070 request.setAttribute(AUDIT_PARAM, jobId); 071 request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM)); 072 try { 073 AuthorizationService auth = Services.get().get(AuthorizationService.class); 074 auth.authorizeForJob(getUser(request), jobId, true); 075 } 076 catch (AuthorizationException ex) { 077 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); 078 } 079 080 String action = request.getParameter(RestConstants.ACTION_PARAM); 081 if (action.equals(RestConstants.JOB_ACTION_START)) { 082 stopCron(); 083 startJob(request, response); 084 startCron(); 085 response.setStatus(HttpServletResponse.SC_OK); 086 } 087 else if (action.equals(RestConstants.JOB_ACTION_RESUME)) { 088 stopCron(); 089 resumeJob(request, response); 090 startCron(); 091 response.setStatus(HttpServletResponse.SC_OK); 092 } 093 else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) { 094 stopCron(); 095 suspendJob(request, response); 096 startCron(); 097 response.setStatus(HttpServletResponse.SC_OK); 098 } 099 else if (action.equals(RestConstants.JOB_ACTION_KILL)) { 100 stopCron(); 101 killJob(request, response); 102 startCron(); 103 response.setStatus(HttpServletResponse.SC_OK); 104 } 105 else if (action.equals(RestConstants.JOB_ACTION_CHANGE)) { 106 stopCron(); 107 changeJob(request, response); 108 startCron(); 109 response.setStatus(HttpServletResponse.SC_OK); 110 } 111 else if (action.equals(RestConstants.JOB_ACTION_RERUN)) { 112 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 113 Configuration conf = new XConfiguration(request.getInputStream()); 114 stopCron(); 115 checkAuthorizationForApp(getUser(request), conf); 116 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf); 117 reRunJob(request, response, conf); 118 startCron(); 119 response.setStatus(HttpServletResponse.SC_OK); 120 } 121 else if (action.equals(RestConstants.JOB_COORD_ACTION_RERUN)) { 122 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 123 stopCron(); 124 JSONObject json = reRunJob(request, response, null); 125 startCron(); 126 if (json != null) { 127 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 128 } 129 else { 130 response.setStatus(HttpServletResponse.SC_OK); 131 } 132 } 133 else if (action.equals(RestConstants.JOB_BUNDLE_ACTION_RERUN)) { 134 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 135 stopCron(); 136 JSONObject json = reRunJob(request, response, null); 137 startCron(); 138 if (json != null) { 139 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 140 } 141 else { 142 response.setStatus(HttpServletResponse.SC_OK); 143 } 144 } 145 else { 146 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 147 RestConstants.ACTION_PARAM, action); 148 } 149 } 150 151 /** 152 * Validate the configuration user/group. <p/> 153 * 154 * @param requestUser user in request. 155 * @param conf configuration. 156 * @throws XServletException thrown if the configuration does not have a property {@link 157 * org.apache.oozie.client.OozieClient#USER_NAME}. 158 */ 159 static void checkAuthorizationForApp(String requestUser, Configuration conf) throws XServletException { 160 String user = conf.get(OozieClient.USER_NAME); 161 String acl = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.GROUP_NAME, OozieClient.JOB_ACL, null); 162 try { 163 if (user == null) { 164 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0401, OozieClient.USER_NAME); 165 } 166 if (!requestUser.equals(UNDEF) && !user.equals(requestUser)) { 167 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0400, requestUser, user); 168 } 169 AuthorizationService auth = Services.get().get(AuthorizationService.class); 170 171 if (acl == null && auth.useDefaultGroupAsAcl()) { 172 acl = auth.getDefaultGroup(user); 173 conf.set(OozieClient.GROUP_NAME, acl); 174 } 175 XLog.Info.get().setParameter(XLogService.GROUP, acl); 176 String wfPath = conf.get(OozieClient.APP_PATH); 177 String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH); 178 String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH); 179 180 if (wfPath == null && coordPath == null && bundlePath == null) { 181 String libPath = conf.get(XOozieClient.LIBPATH); 182 conf.set(OozieClient.APP_PATH, libPath); 183 wfPath = libPath; 184 } 185 ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath); 186 187 if (wfPath != null) { 188 auth.authorizeForApp(user, acl, wfPath, "workflow.xml", conf); 189 } 190 else if (coordPath != null){ 191 auth.authorizeForApp(user, acl, coordPath, "coordinator.xml", conf); 192 } 193 else if (bundlePath != null){ 194 auth.authorizeForApp(user, acl, bundlePath, "bundle.xml", conf); 195 } 196 } 197 catch (AuthorizationException ex) { 198 XLog.getLog(BaseJobServlet.class).info("AuthorizationException ", ex); 199 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); 200 } 201 } 202 203 /** 204 * Return information about jobs. 205 */ 206 @Override 207 public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 208 String jobId = getResourceName(request); 209 String show = request.getParameter(RestConstants.JOB_SHOW_PARAM); 210 211 try { 212 AuthorizationService auth = Services.get().get(AuthorizationService.class); 213 auth.authorizeForJob(getUser(request), jobId, false); 214 } 215 catch (AuthorizationException ex) { 216 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); 217 } 218 219 if (show == null || show.equals(RestConstants.JOB_SHOW_INFO)) { 220 stopCron(); 221 JsonBean job = null; 222 try { 223 job = getJob(request, response); 224 } 225 catch (BaseEngineException e) { 226 // TODO Auto-generated catch block 227 // e.printStackTrace(); 228 229 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); 230 } 231 startCron(); 232 sendJsonResponse(response, HttpServletResponse.SC_OK, job); 233 } 234 else if (show.equals(RestConstants.JOB_SHOW_LOG)) { 235 response.setContentType(TEXT_UTF8); 236 streamJobLog(request, response); 237 } 238 else if (show.equals(RestConstants.JOB_SHOW_DEFINITION)) { 239 stopCron(); 240 response.setContentType(XML_UTF8); 241 String wfDefinition = getJobDefinition(request, response); 242 startCron(); 243 response.setStatus(HttpServletResponse.SC_OK); 244 response.getWriter().write(wfDefinition); 245 } 246 else { 247 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 248 RestConstants.JOB_SHOW_PARAM, show); 249 } 250 } 251 252 /** 253 * abstract method to start a job, either workflow or coordinator 254 * 255 * @param request 256 * @param response 257 * @throws XServletException 258 * @throws IOException TODO 259 */ 260 abstract void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 261 IOException; 262 263 /** 264 * abstract method to resume a job, either workflow or coordinator 265 * 266 * @param request 267 * @param response 268 * @throws XServletException 269 * @throws IOException TODO 270 */ 271 abstract void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 272 IOException; 273 274 /** 275 * abstract method to suspend a job, either workflow or coordinator 276 * 277 * @param request 278 * @param response 279 * @throws XServletException 280 * @throws IOException TODO 281 */ 282 abstract void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 283 IOException; 284 285 /** 286 * abstract method to kill a job, either workflow or coordinator 287 * 288 * @param request 289 * @param response 290 * @throws XServletException 291 * @throws IOException TODO 292 */ 293 abstract void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 294 IOException; 295 296 /** 297 * abstract method to change a coordinator job 298 * 299 * @param request 300 * @param response 301 * @throws XServletException 302 * @throws IOException TODO 303 */ 304 abstract void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 305 IOException; 306 307 /** 308 * abstract method to re-run a job, either workflow or coordinator 309 * 310 * @param request 311 * @param response 312 * @param conf 313 * @throws XServletException 314 * @throws IOException TODO 315 */ 316 abstract JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 317 throws XServletException, IOException; 318 319 /** 320 * abstract method to get a job, either workflow or coordinator, in JsonBean representation 321 * 322 * @param request 323 * @param response 324 * @return JsonBean representation of a job, either workflow or coordinator 325 * @throws XServletException 326 * @throws IOException TODO 327 * @throws BaseEngineException 328 */ 329 abstract JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 330 IOException, BaseEngineException; 331 332 /** 333 * abstract method to get definition of a job, either workflow or coordinator 334 * 335 * @param request 336 * @param response 337 * @return job, either workflow or coordinator, definition in string format 338 * @throws XServletException 339 * @throws IOException TODO 340 */ 341 abstract String getJobDefinition(HttpServletRequest request, HttpServletResponse response) 342 throws XServletException, IOException; 343 344 /** 345 * abstract method to get and stream log information of job, either workflow or coordinator 346 * 347 * @param request 348 * @param response 349 * @throws XServletException 350 * @throws IOException 351 */ 352 abstract void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 353 IOException; 354 355 }