001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.servlet; 020 021import java.io.IOException; 022import java.util.Arrays; 023 024import javax.servlet.ServletException; 025import javax.servlet.http.HttpServletRequest; 026import javax.servlet.http.HttpServletResponse; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.oozie.BaseEngineException; 030import org.apache.oozie.ErrorCode; 031import org.apache.oozie.client.OozieClient; 032import org.apache.oozie.client.XOozieClient; 033import org.apache.oozie.client.rest.JsonBean; 034import org.apache.oozie.client.rest.JsonTags; 035import org.apache.oozie.client.rest.RestConstants; 036import org.apache.oozie.service.AuthorizationException; 037import org.apache.oozie.service.AuthorizationService; 038import org.apache.oozie.service.Services; 039import org.apache.oozie.service.XLogService; 040import org.apache.oozie.util.ConfigUtils; 041import org.apache.oozie.util.JobUtils; 042import org.apache.oozie.util.XConfiguration; 043import org.apache.oozie.util.XLog; 044import org.json.simple.JSONObject; 045 046public abstract class BaseJobServlet extends JsonRestServlet { 047 048 private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[1]; 049 050 static { 051 RESOURCES_INFO[0] = new ResourceInfo("*", Arrays.asList("PUT", "GET"), Arrays.asList(new ParameterInfo( 052 RestConstants.ACTION_PARAM, String.class, true, Arrays.asList("PUT")), new ParameterInfo( 053 RestConstants.JOB_SHOW_PARAM, String.class, false, Arrays.asList("GET")), new ParameterInfo( 054 RestConstants.ORDER_PARAM, String.class, false, Arrays.asList("GET")))); 055 } 056 057 public BaseJobServlet(String instrumentationName) { 058 super(instrumentationName, RESOURCES_INFO); 059 } 060 061 /** 062 * Perform various job related actions - start, suspend, resume, kill, etc. 063 */ 064 @Override 065 protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 066 String jobId = getResourceName(request); 067 request.setAttribute(AUDIT_PARAM, jobId); 068 request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM)); 069 try { 070 AuthorizationService auth = Services.get().get(AuthorizationService.class); 071 auth.authorizeForJob(getUser(request), jobId, true); 072 } 073 catch (AuthorizationException ex) { 074 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); 075 } 076 077 String action = request.getParameter(RestConstants.ACTION_PARAM); 078 if (action.equals(RestConstants.JOB_ACTION_START)) { 079 stopCron(); 080 startJob(request, response); 081 startCron(); 082 response.setStatus(HttpServletResponse.SC_OK); 083 } 084 else if (action.equals(RestConstants.JOB_ACTION_RESUME)) { 085 stopCron(); 086 resumeJob(request, response); 087 startCron(); 088 response.setStatus(HttpServletResponse.SC_OK); 089 } 090 else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) { 091 stopCron(); 092 suspendJob(request, response); 093 startCron(); 094 response.setStatus(HttpServletResponse.SC_OK); 095 } 096 else if (action.equals(RestConstants.JOB_ACTION_KILL)) { 097 stopCron(); 098 JSONObject json = killJob(request, response); 099 startCron(); 100 if (json != null) { 101 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 102 } 103 else { 104 response.setStatus(HttpServletResponse.SC_OK); 105 } 106 } 107 else if (action.equals(RestConstants.JOB_ACTION_CHANGE)) { 108 stopCron(); 109 changeJob(request, response); 110 startCron(); 111 response.setStatus(HttpServletResponse.SC_OK); 112 } 113 else if (action.equals(RestConstants.JOB_ACTION_IGNORE)) { 114 stopCron(); 115 JSONObject json = ignoreJob(request, response); 116 startCron(); 117 if (json != null) { 118 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 119 } 120 else { 121 response.setStatus(HttpServletResponse.SC_OK); 122 } 123 } 124 else if (action.equals(RestConstants.JOB_ACTION_RERUN)) { 125 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 126 Configuration conf = new XConfiguration(request.getInputStream()); 127 stopCron(); 128 String requestUser = getUser(request); 129 if (!requestUser.equals(UNDEF)) { 130 conf.set(OozieClient.USER_NAME, requestUser); 131 } 132 if (conf.get(OozieClient.APP_PATH) != null) { 133 BaseJobServlet.checkAuthorizationForApp(conf); 134 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf); 135 } 136 reRunJob(request, response, conf); 137 startCron(); 138 response.setStatus(HttpServletResponse.SC_OK); 139 } 140 else if (action.equals(RestConstants.JOB_COORD_ACTION_RERUN)) { 141 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 142 stopCron(); 143 JSONObject json = reRunJob(request, response, null); 144 startCron(); 145 if (json != null) { 146 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 147 } 148 else { 149 response.setStatus(HttpServletResponse.SC_OK); 150 } 151 } 152 else if (action.equals(RestConstants.JOB_BUNDLE_ACTION_RERUN)) { 153 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 154 stopCron(); 155 JSONObject json = reRunJob(request, response, null); 156 startCron(); 157 if (json != null) { 158 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 159 } 160 else { 161 response.setStatus(HttpServletResponse.SC_OK); 162 } 163 } 164 else if (action.equals(RestConstants.JOB_COORD_UPDATE)) { 165 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 166 Configuration conf = new XConfiguration(request.getInputStream()); 167 stopCron(); 168 String requestUser = getUser(request); 169 if (!requestUser.equals(UNDEF)) { 170 conf.set(OozieClient.USER_NAME, requestUser); 171 } 172 if (conf.get(OozieClient.COORDINATOR_APP_PATH) != null) { 173 //If coord is submitted from bundle, user may want to update individual coord job with bundle properties 174 //If COORDINATOR_APP_PATH is set, we should check only COORDINATOR_APP_PATH path permission 175 String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH); 176 if (bundlePath != null) { 177 conf.unset(OozieClient.BUNDLE_APP_PATH); 178 } 179 BaseJobServlet.checkAuthorizationForApp(conf); 180 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf); 181 if (bundlePath != null) { 182 conf.set(OozieClient.BUNDLE_APP_PATH, bundlePath); 183 } 184 } 185 JSONObject json = updateJob(request, response, conf); 186 startCron(); 187 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 188 } 189 else if (action.equals(RestConstants.SLA_ENABLE_ALERT)) { 190 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 191 stopCron(); 192 slaEnableAlert(request, response); 193 startCron(); 194 response.setStatus(HttpServletResponse.SC_OK); 195 } 196 else if (action.equals(RestConstants.SLA_DISABLE_ALERT)) { 197 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 198 stopCron(); 199 slaDisableAlert(request, response); 200 startCron(); 201 response.setStatus(HttpServletResponse.SC_OK); 202 } 203 else if (action.equals(RestConstants.SLA_CHANGE)) { 204 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 205 stopCron(); 206 slaChange(request, response); 207 startCron(); 208 response.setStatus(HttpServletResponse.SC_OK); 209 } 210 else { 211 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 212 RestConstants.ACTION_PARAM, action); 213 } 214 } 215 216 abstract JSONObject ignoreJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 217 IOException; 218 219 /** 220 * Validate the configuration user/group. <p/> 221 * 222 * @param conf configuration. 223 * @throws XServletException thrown if the configuration does not have a property {@link 224 * org.apache.oozie.client.OozieClient#USER_NAME}. 225 */ 226 static void checkAuthorizationForApp(Configuration conf) throws XServletException { 227 String user = conf.get(OozieClient.USER_NAME); 228 String acl = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.GROUP_NAME, OozieClient.JOB_ACL, null); 229 try { 230 if (user == null) { 231 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0401, OozieClient.USER_NAME); 232 } 233 AuthorizationService auth = Services.get().get(AuthorizationService.class); 234 235 if (acl != null){ 236 conf.set(OozieClient.GROUP_NAME, acl); 237 } 238 else if (acl == null && auth.useDefaultGroupAsAcl()) { 239 acl = auth.getDefaultGroup(user); 240 conf.set(OozieClient.GROUP_NAME, acl); 241 } 242 XLog.Info.get().setParameter(XLogService.GROUP, acl); 243 String wfPath = conf.get(OozieClient.APP_PATH); 244 String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH); 245 String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH); 246 247 if (wfPath == null && coordPath == null && bundlePath == null) { 248 String[] libPaths = conf.getStrings(XOozieClient.LIBPATH); 249 if (libPaths != null && libPaths.length > 0 && libPaths[0].trim().length() > 0) { 250 conf.set(OozieClient.APP_PATH, libPaths[0].trim()); 251 wfPath = libPaths[0].trim(); 252 } 253 else { 254 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0405); 255 } 256 } 257 ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath); 258 259 if (wfPath != null) { 260 auth.authorizeForApp(user, acl, wfPath, "workflow.xml", conf); 261 } 262 else if (coordPath != null){ 263 auth.authorizeForApp(user, acl, coordPath, "coordinator.xml", conf); 264 } 265 else if (bundlePath != null){ 266 auth.authorizeForApp(user, acl, bundlePath, "bundle.xml", conf); 267 } 268 } 269 catch (AuthorizationException ex) { 270 XLog.getLog(BaseJobServlet.class).info("AuthorizationException ", ex); 271 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); 272 } 273 } 274 275 /** 276 * Return information about jobs. 277 */ 278 @Override 279 @SuppressWarnings("unchecked") 280 public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 281 String jobId = getResourceName(request); 282 String show = request.getParameter(RestConstants.JOB_SHOW_PARAM); 283 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 284 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 285 286 try { 287 AuthorizationService auth = Services.get().get(AuthorizationService.class); 288 auth.authorizeForJob(getUser(request), jobId, false); 289 } 290 catch (AuthorizationException ex) { 291 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); 292 } 293 294 if (show == null || show.equals(RestConstants.JOB_SHOW_INFO)) { 295 stopCron(); 296 JsonBean job = null; 297 try { 298 job = getJob(request, response); 299 } 300 catch (BaseEngineException e) { 301 // TODO Auto-generated catch block 302 // e.printStackTrace(); 303 304 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); 305 } 306 startCron(); 307 sendJsonResponse(response, HttpServletResponse.SC_OK, job, timeZoneId); 308 } 309 else if (show.equals(RestConstants.ALL_WORKFLOWS_FOR_COORD_ACTION)) { 310 stopCron(); 311 JSONObject json = getJobsByParentId(request, response); 312 startCron(); 313 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 314 } 315 else if (show.equals(RestConstants.JOB_SHOW_JMS_TOPIC)) { 316 stopCron(); 317 String jmsTopicName = getJMSTopicName(request, response); 318 JSONObject json = new JSONObject(); 319 json.put(JsonTags.JMS_TOPIC_NAME, jmsTopicName); 320 startCron(); 321 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 322 } 323 324 else if (show.equals(RestConstants.JOB_SHOW_LOG)) { 325 response.setContentType(TEXT_UTF8); 326 streamJobLog(request, response); 327 } 328 else if (show.equals(RestConstants.JOB_SHOW_ERROR_LOG)) { 329 response.setContentType(TEXT_UTF8); 330 streamJobErrorLog(request, response); 331 } 332 else if (show.equals(RestConstants.JOB_SHOW_AUDIT_LOG)) { 333 response.setContentType(TEXT_UTF8); 334 streamJobAuditLog(request, response); 335 } 336 337 else if (show.equals(RestConstants.JOB_SHOW_DEFINITION)) { 338 stopCron(); 339 response.setContentType(XML_UTF8); 340 String wfDefinition = getJobDefinition(request, response); 341 startCron(); 342 response.setStatus(HttpServletResponse.SC_OK); 343 response.getWriter().write(wfDefinition); 344 } 345 else if (show.equals(RestConstants.JOB_SHOW_GRAPH)) { 346 stopCron(); 347 streamJobGraph(request, response); 348 startCron(); // -- should happen before you stream anything in response? 349 } else if (show.equals(RestConstants.JOB_SHOW_STATUS)) { 350 stopCron(); 351 String status = getJobStatus(request, response); 352 JSONObject json = new JSONObject(); 353 json.put(JsonTags.STATUS, status); 354 startCron(); 355 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 356 } 357 else { 358 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 359 RestConstants.JOB_SHOW_PARAM, show); 360 } 361 } 362 363 /** 364 * abstract method to start a job, either workflow or coordinator 365 * 366 * @param request 367 * @param response 368 * @throws XServletException 369 * @throws IOException TODO 370 */ 371 abstract void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 372 IOException; 373 374 /** 375 * abstract method to resume a job, either workflow or coordinator 376 * 377 * @param request 378 * @param response 379 * @throws XServletException 380 * @throws IOException TODO 381 */ 382 abstract void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 383 IOException; 384 385 /** 386 * abstract method to suspend a job, either workflow or coordinator 387 * 388 * @param request 389 * @param response 390 * @throws XServletException 391 * @throws IOException TODO 392 */ 393 abstract void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 394 IOException; 395 396 /** 397 * abstract method to kill a job, either workflow or coordinator 398 * 399 * @param request 400 * @param response 401 * @return 402 * @throws XServletException 403 * @throws IOException TODO 404 */ 405 abstract JSONObject killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 406 IOException; 407 408 /** 409 * abstract method to change a coordinator job 410 * 411 * @param request 412 * @param response 413 * @throws XServletException 414 * @throws IOException TODO 415 */ 416 abstract void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 417 IOException; 418 419 /** 420 * abstract method to re-run a job, either workflow or coordinator 421 * 422 * @param request 423 * @param response 424 * @param conf 425 * @throws XServletException 426 * @throws IOException TODO 427 */ 428 abstract JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 429 throws XServletException, IOException; 430 431 /** 432 * abstract method to get a job, either workflow or coordinator, in JsonBean representation 433 * 434 * @param request 435 * @param response 436 * @return JsonBean representation of a job, either workflow or coordinator 437 * @throws XServletException 438 * @throws IOException TODO 439 * @throws BaseEngineException 440 */ 441 abstract JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 442 IOException, BaseEngineException; 443 444 /** 445 * abstract method to get definition of a job, either workflow or coordinator 446 * 447 * @param request 448 * @param response 449 * @return job, either workflow or coordinator, definition in string format 450 * @throws XServletException 451 * @throws IOException TODO 452 */ 453 abstract String getJobDefinition(HttpServletRequest request, HttpServletResponse response) 454 throws XServletException, IOException; 455 456 /** 457 * abstract method to get and stream log information of job, either workflow or coordinator 458 * 459 * @param request 460 * @param response 461 * @throws XServletException 462 * @throws IOException 463 */ 464 abstract void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 465 IOException; 466 467 /** 468 * abstract method to get and stream error log information of job, either workflow, coordinator or bundle 469 * 470 * @param request 471 * @param response 472 * @throws XServletException 473 * @throws IOException 474 */ 475 abstract void streamJobErrorLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 476 IOException; 477 478 479 abstract void streamJobAuditLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 480 IOException; 481 482 /** 483 * abstract method to create and stream image for runtime DAG -- workflow only 484 * 485 * @param request 486 * @param response 487 * @throws XServletException 488 * @throws IOException 489 */ 490 abstract void streamJobGraph(HttpServletRequest request, HttpServletResponse response) 491 throws XServletException, IOException; 492 493 /** 494 * abstract method to get JMS topic name for a job 495 * @param request 496 * @param response 497 * @throws XServletException 498 * @throws IOException 499 */ 500 abstract String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) 501 throws XServletException, IOException; 502 503 /** 504 * abstract method to get workflow job ids from the parent id 505 * i.e. coordinator action 506 * @param request 507 * @param response 508 * @return comma-separated list of workflow job ids 509 * @throws XServletException 510 * @throws IOException 511 */ 512 abstract JSONObject getJobsByParentId(HttpServletRequest request, HttpServletResponse response) 513 throws XServletException, IOException; 514 515 /** 516 * Abstract method to Update coord job. 517 * 518 * @param request the request 519 * @param response the response 520 * @param Configuration conf 521 * @return the JSON object 522 * @throws XServletException the x servlet exception 523 * @throws IOException Signals that an I/O exception has occurred. 524 */ 525 abstract JSONObject updateJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 526 throws XServletException, IOException; 527 528 /** 529 * Abstract method to get status for a job 530 * 531 * @param request the request 532 * @param response the response 533 * @return the JSON object 534 * @throws XServletException the x servlet exception 535 * @throws IOException Signals that an I/O exception has occurred. 536 */ 537 abstract String getJobStatus(HttpServletRequest request, HttpServletResponse response) 538 throws XServletException, IOException; 539 540 /** 541 * Abstract method to enable SLA alert. 542 * 543 * @param request the request 544 * @param response the response 545 * @throws XServletException the x servlet exception 546 * @throws IOException Signals that an I/O exception has occurred. 547 */ 548 abstract void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, 549 IOException; 550 551 /** 552 * Abstract method to disable SLA alert. 553 * 554 * @param request the request 555 * @param response the response 556 * @throws XServletException the x servlet exception 557 * @throws IOException Signals that an I/O exception has occurred. 558 */ 559 abstract void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, 560 IOException; 561 562 /** 563 * Abstract method to change SLA definition. 564 * 565 * @param request the request 566 * @param response the response 567 * @throws XServletException the x servlet exception 568 * @throws IOException Signals that an I/O exception has occurred. 569 */ 570 abstract void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException, 571 IOException; 572 573}