001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.servlet; 019 020import java.io.IOException; 021import java.util.Arrays; 022 023import javax.servlet.ServletException; 024import javax.servlet.http.HttpServletRequest; 025import javax.servlet.http.HttpServletResponse; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.oozie.BaseEngineException; 029import org.apache.oozie.ErrorCode; 030import org.apache.oozie.client.OozieClient; 031import org.apache.oozie.client.XOozieClient; 032import org.apache.oozie.client.rest.JsonBean; 033import org.apache.oozie.client.rest.JsonTags; 034import org.apache.oozie.client.rest.RestConstants; 035import org.apache.oozie.service.AuthorizationException; 036import org.apache.oozie.service.AuthorizationService; 037import org.apache.oozie.service.Services; 038import org.apache.oozie.service.XLogService; 039import org.apache.oozie.util.ConfigUtils; 040import org.apache.oozie.util.JobUtils; 041import org.apache.oozie.util.XConfiguration; 042import org.apache.oozie.util.XLog; 043import org.json.simple.JSONObject; 044 045public abstract class BaseJobServlet extends JsonRestServlet { 046 047 private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[1]; 048 049 static { 050 RESOURCES_INFO[0] = new ResourceInfo("*", Arrays.asList("PUT", "GET"), Arrays.asList(new ParameterInfo( 051 RestConstants.ACTION_PARAM, String.class, true, Arrays.asList("PUT")), new ParameterInfo( 052 RestConstants.JOB_SHOW_PARAM, String.class, false, Arrays.asList("GET")), new ParameterInfo( 053 RestConstants.ORDER_PARAM, String.class, false, Arrays.asList("GET")))); 054 } 055 056 public BaseJobServlet(String instrumentationName) { 057 super(instrumentationName, RESOURCES_INFO); 058 } 059 060 /** 061 * Perform various job related actions - start, suspend, resume, kill, etc. 062 */ 063 @Override 064 protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 065 String jobId = getResourceName(request); 066 request.setAttribute(AUDIT_PARAM, jobId); 067 request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM)); 068 try { 069 AuthorizationService auth = Services.get().get(AuthorizationService.class); 070 auth.authorizeForJob(getUser(request), jobId, true); 071 } 072 catch (AuthorizationException ex) { 073 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); 074 } 075 076 String action = request.getParameter(RestConstants.ACTION_PARAM); 077 if (action.equals(RestConstants.JOB_ACTION_START)) { 078 stopCron(); 079 startJob(request, response); 080 startCron(); 081 response.setStatus(HttpServletResponse.SC_OK); 082 } 083 else if (action.equals(RestConstants.JOB_ACTION_RESUME)) { 084 stopCron(); 085 resumeJob(request, response); 086 startCron(); 087 response.setStatus(HttpServletResponse.SC_OK); 088 } 089 else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) { 090 stopCron(); 091 suspendJob(request, response); 092 startCron(); 093 response.setStatus(HttpServletResponse.SC_OK); 094 } 095 else if (action.equals(RestConstants.JOB_ACTION_KILL)) { 096 stopCron(); 097 JSONObject json = killJob(request, response); 098 startCron(); 099 if (json != null) { 100 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 101 } 102 else { 103 response.setStatus(HttpServletResponse.SC_OK); 104 } 105 } 106 else if (action.equals(RestConstants.JOB_ACTION_CHANGE)) { 107 stopCron(); 108 changeJob(request, response); 109 startCron(); 110 response.setStatus(HttpServletResponse.SC_OK); 111 } 112 else if (action.equals(RestConstants.JOB_ACTION_IGNORE)) { 113 stopCron(); 114 JSONObject json = ignoreJob(request, response); 115 startCron(); 116 if (json != null) { 117 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 118 } 119 else { 120 response.setStatus(HttpServletResponse.SC_OK); 121 } 122 } 123 else if (action.equals(RestConstants.JOB_ACTION_RERUN)) { 124 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 125 Configuration conf = new XConfiguration(request.getInputStream()); 126 stopCron(); 127 String requestUser = getUser(request); 128 if (!requestUser.equals(UNDEF)) { 129 conf.set(OozieClient.USER_NAME, requestUser); 130 } 131 if (conf.get(OozieClient.APP_PATH) != null) { 132 BaseJobServlet.checkAuthorizationForApp(conf); 133 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf); 134 } 135 reRunJob(request, response, conf); 136 startCron(); 137 response.setStatus(HttpServletResponse.SC_OK); 138 } 139 else if (action.equals(RestConstants.JOB_COORD_ACTION_RERUN)) { 140 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 141 stopCron(); 142 JSONObject json = reRunJob(request, response, null); 143 startCron(); 144 if (json != null) { 145 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 146 } 147 else { 148 response.setStatus(HttpServletResponse.SC_OK); 149 } 150 } 151 else if (action.equals(RestConstants.JOB_BUNDLE_ACTION_RERUN)) { 152 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 153 stopCron(); 154 JSONObject json = reRunJob(request, response, null); 155 startCron(); 156 if (json != null) { 157 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 158 } 159 else { 160 response.setStatus(HttpServletResponse.SC_OK); 161 } 162 } 163 else if (action.equals(RestConstants.JOB_COORD_UPDATE)) { 164 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 165 Configuration conf = new XConfiguration(request.getInputStream()); 166 stopCron(); 167 String requestUser = getUser(request); 168 if (!requestUser.equals(UNDEF)) { 169 conf.set(OozieClient.USER_NAME, requestUser); 170 } 171 if (conf.get(OozieClient.COORDINATOR_APP_PATH) != null) { 172 BaseJobServlet.checkAuthorizationForApp(conf); 173 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf); 174 } 175 JSONObject json = updateJob(request, response, conf); 176 startCron(); 177 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 178 } 179 else { 180 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 181 RestConstants.ACTION_PARAM, action); 182 } 183 } 184 185 abstract JSONObject ignoreJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 186 IOException; 187 188 /** 189 * Validate the configuration user/group. <p/> 190 * 191 * @param conf configuration. 192 * @throws XServletException thrown if the configuration does not have a property {@link 193 * org.apache.oozie.client.OozieClient#USER_NAME}. 194 */ 195 static void checkAuthorizationForApp(Configuration conf) throws XServletException { 196 String user = conf.get(OozieClient.USER_NAME); 197 String acl = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.GROUP_NAME, OozieClient.JOB_ACL, null); 198 try { 199 if (user == null) { 200 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0401, OozieClient.USER_NAME); 201 } 202 AuthorizationService auth = Services.get().get(AuthorizationService.class); 203 204 if (acl != null){ 205 conf.set(OozieClient.GROUP_NAME, acl); 206 } 207 else if (acl == null && auth.useDefaultGroupAsAcl()) { 208 acl = auth.getDefaultGroup(user); 209 conf.set(OozieClient.GROUP_NAME, acl); 210 } 211 XLog.Info.get().setParameter(XLogService.GROUP, acl); 212 String wfPath = conf.get(OozieClient.APP_PATH); 213 String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH); 214 String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH); 215 216 if (wfPath == null && coordPath == null && bundlePath == null) { 217 String[] libPaths = conf.getStrings(XOozieClient.LIBPATH); 218 if (libPaths != null && libPaths.length > 0 && libPaths[0].trim().length() > 0) { 219 conf.set(OozieClient.APP_PATH, libPaths[0].trim()); 220 wfPath = libPaths[0].trim(); 221 } 222 else { 223 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0405); 224 } 225 } 226 ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath); 227 228 if (wfPath != null) { 229 auth.authorizeForApp(user, acl, wfPath, "workflow.xml", conf); 230 } 231 else if (coordPath != null){ 232 auth.authorizeForApp(user, acl, coordPath, "coordinator.xml", conf); 233 } 234 else if (bundlePath != null){ 235 auth.authorizeForApp(user, acl, bundlePath, "bundle.xml", conf); 236 } 237 } 238 catch (AuthorizationException ex) { 239 XLog.getLog(BaseJobServlet.class).info("AuthorizationException ", ex); 240 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); 241 } 242 } 243 244 /** 245 * Return information about jobs. 246 */ 247 @Override 248 public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 249 String jobId = getResourceName(request); 250 String show = request.getParameter(RestConstants.JOB_SHOW_PARAM); 251 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 252 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 253 254 try { 255 AuthorizationService auth = Services.get().get(AuthorizationService.class); 256 auth.authorizeForJob(getUser(request), jobId, false); 257 } 258 catch (AuthorizationException ex) { 259 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); 260 } 261 262 if (show == null || show.equals(RestConstants.JOB_SHOW_INFO)) { 263 stopCron(); 264 JsonBean job = null; 265 try { 266 job = getJob(request, response); 267 } 268 catch (BaseEngineException e) { 269 // TODO Auto-generated catch block 270 // e.printStackTrace(); 271 272 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); 273 } 274 startCron(); 275 sendJsonResponse(response, HttpServletResponse.SC_OK, job, timeZoneId); 276 } 277 else if (show.equals(RestConstants.ALL_WORKFLOWS_FOR_COORD_ACTION)) { 278 stopCron(); 279 JSONObject json = getJobsByParentId(request, response); 280 startCron(); 281 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 282 } 283 else if (show.equals(RestConstants.JOB_SHOW_JMS_TOPIC)) { 284 stopCron(); 285 String jmsTopicName = getJMSTopicName(request, response); 286 JSONObject json = new JSONObject(); 287 json.put(JsonTags.JMS_TOPIC_NAME, jmsTopicName); 288 startCron(); 289 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 290 } 291 292 else if (show.equals(RestConstants.JOB_SHOW_LOG)) { 293 response.setContentType(TEXT_UTF8); 294 streamJobLog(request, response); 295 } 296 else if (show.equals(RestConstants.JOB_SHOW_DEFINITION)) { 297 stopCron(); 298 response.setContentType(XML_UTF8); 299 String wfDefinition = getJobDefinition(request, response); 300 startCron(); 301 response.setStatus(HttpServletResponse.SC_OK); 302 response.getWriter().write(wfDefinition); 303 } 304 else if (show.equals(RestConstants.JOB_SHOW_GRAPH)) { 305 stopCron(); 306 streamJobGraph(request, response); 307 startCron(); // -- should happen before you stream anything in response? 308 } 309 else { 310 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 311 RestConstants.JOB_SHOW_PARAM, show); 312 } 313 } 314 315 /** 316 * abstract method to start a job, either workflow or coordinator 317 * 318 * @param request 319 * @param response 320 * @throws XServletException 321 * @throws IOException TODO 322 */ 323 abstract void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 324 IOException; 325 326 /** 327 * abstract method to resume a job, either workflow or coordinator 328 * 329 * @param request 330 * @param response 331 * @throws XServletException 332 * @throws IOException TODO 333 */ 334 abstract void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 335 IOException; 336 337 /** 338 * abstract method to suspend a job, either workflow or coordinator 339 * 340 * @param request 341 * @param response 342 * @throws XServletException 343 * @throws IOException TODO 344 */ 345 abstract void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 346 IOException; 347 348 /** 349 * abstract method to kill a job, either workflow or coordinator 350 * 351 * @param request 352 * @param response 353 * @return 354 * @throws XServletException 355 * @throws IOException TODO 356 */ 357 abstract JSONObject killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 358 IOException; 359 360 /** 361 * abstract method to change a coordinator job 362 * 363 * @param request 364 * @param response 365 * @throws XServletException 366 * @throws IOException TODO 367 */ 368 abstract void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 369 IOException; 370 371 /** 372 * abstract method to re-run a job, either workflow or coordinator 373 * 374 * @param request 375 * @param response 376 * @param conf 377 * @throws XServletException 378 * @throws IOException TODO 379 */ 380 abstract JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 381 throws XServletException, IOException; 382 383 /** 384 * abstract method to get a job, either workflow or coordinator, in JsonBean representation 385 * 386 * @param request 387 * @param response 388 * @return JsonBean representation of a job, either workflow or coordinator 389 * @throws XServletException 390 * @throws IOException TODO 391 * @throws BaseEngineException 392 */ 393 abstract JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 394 IOException, BaseEngineException; 395 396 /** 397 * abstract method to get definition of a job, either workflow or coordinator 398 * 399 * @param request 400 * @param response 401 * @return job, either workflow or coordinator, definition in string format 402 * @throws XServletException 403 * @throws IOException TODO 404 */ 405 abstract String getJobDefinition(HttpServletRequest request, HttpServletResponse response) 406 throws XServletException, IOException; 407 408 /** 409 * abstract method to get and stream log information of job, either workflow or coordinator 410 * 411 * @param request 412 * @param response 413 * @throws XServletException 414 * @throws IOException 415 */ 416 abstract void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 417 IOException; 418 419 /** 420 * abstract method to create and stream image for runtime DAG -- workflow only 421 * 422 * @param request 423 * @param response 424 * @throws XServletException 425 * @throws IOException 426 */ 427 abstract void streamJobGraph(HttpServletRequest request, HttpServletResponse response) 428 throws XServletException, IOException; 429 430 /** 431 * abstract method to get JMS topic name for a job 432 * @param request 433 * @param response 434 * @throws XServletException 435 * @throws IOException 436 */ 437 abstract String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) 438 throws XServletException, IOException; 439 440 /** 441 * abstract method to get workflow job ids from the parent id 442 * i.e. coordinator action 443 * @param request 444 * @param response 445 * @return comma-separated list of workflow job ids 446 * @throws XServletException 447 * @throws IOException 448 */ 449 abstract JSONObject getJobsByParentId(HttpServletRequest request, HttpServletResponse response) 450 throws XServletException, IOException; 451 452 /** 453 * Abstract method to Update coord job. 454 * 455 * @param request the request 456 * @param response the response 457 * @param Configuration conf 458 * @return the JSON object 459 * @throws XServletException the x servlet exception 460 * @throws IOException Signals that an I/O exception has occurred. 461 */ 462 abstract JSONObject updateJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 463 throws XServletException, IOException; 464} 465