001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.servlet; 020 021import java.io.IOException; 022import java.util.List; 023 024import javax.servlet.ServletInputStream; 025import javax.servlet.http.HttpServletRequest; 026import javax.servlet.http.HttpServletResponse; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.oozie.*; 030import org.apache.oozie.client.WorkflowAction; 031import org.apache.oozie.client.WorkflowJob; 032import org.apache.oozie.client.rest.*; 033import org.apache.oozie.command.CommandException; 034import org.apache.oozie.coord.CoordUtils; 035import org.apache.oozie.service.BundleEngineService; 036import org.apache.oozie.service.ConfigurationService; 037import org.apache.oozie.service.CoordinatorEngineService; 038import org.apache.oozie.service.DagEngineService; 039import org.apache.oozie.service.Services; 040import org.apache.oozie.service.UUIDService; 041import org.apache.oozie.util.GraphGenerator; 042import org.apache.oozie.util.XLog; 043import org.json.simple.JSONArray; 044import org.json.simple.JSONObject; 045 046 047@SuppressWarnings("serial") 048public class V1JobServlet extends BaseJobServlet { 049 050 private static final String INSTRUMENTATION_NAME = "v1job"; 051 public static final String COORD_ACTIONS_DEFAULT_LENGTH = "oozie.coord.actions.default.length"; 052 053 public V1JobServlet() { 054 super(INSTRUMENTATION_NAME); 055 } 056 057 protected V1JobServlet(String instrumentation_name){ 058 super(instrumentation_name); 059 } 060 061 /* 062 * protected method to start a job 063 */ 064 @Override 065 protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 066 IOException { 067 /* 068 * Configuration conf = new XConfiguration(request.getInputStream()); 069 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 070 * conf.get(OozieClient.COORDINATOR_APP_PATH); 071 * 072 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 073 */ 074 String jobId = getResourceName(request); 075 if (jobId.endsWith("-W")) { 076 startWorkflowJob(request, response); 077 } 078 else if (jobId.endsWith("-B")) { 079 startBundleJob(request, response); 080 } 081 else { 082 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, RestConstants.ACTION_PARAM, RestConstants.JOB_ACTION_START); 083 } 084 085 } 086 087 /* 088 * protected method to resume a job 089 */ 090 @Override 091 protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 092 IOException { 093 /* 094 * Configuration conf = new XConfiguration(request.getInputStream()); 095 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 096 * conf.get(OozieClient.COORDINATOR_APP_PATH); 097 * 098 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 099 */ 100 String jobId = getResourceName(request); 101 if (jobId.endsWith("-W")) { 102 resumeWorkflowJob(request, response); 103 } 104 else if (jobId.endsWith("-B")) { 105 resumeBundleJob(request, response); 106 } 107 else { 108 resumeCoordinatorJob(request, response); 109 } 110 } 111 112 /* 113 * protected method to suspend a job 114 */ 115 @Override 116 protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 117 IOException { 118 /* 119 * Configuration conf = new XConfiguration(request.getInputStream()); 120 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 121 * conf.get(OozieClient.COORDINATOR_APP_PATH); 122 * 123 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 124 */ 125 String jobId = getResourceName(request); 126 if (jobId.endsWith("-W")) { 127 suspendWorkflowJob(request, response); 128 } 129 else if (jobId.endsWith("-B")) { 130 suspendBundleJob(request, response); 131 } 132 else { 133 suspendCoordinatorJob(request, response); 134 } 135 } 136 137 /* 138 * protected method to kill a job 139 */ 140 @Override 141 protected JSONObject killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 142 IOException { 143 /* 144 * Configuration conf = new XConfiguration(request.getInputStream()); 145 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 146 * conf.get(OozieClient.COORDINATOR_APP_PATH); 147 * 148 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 149 */ 150 String jobId = getResourceName(request); 151 JSONObject json = null; 152 if (jobId.endsWith("-W")) { 153 killWorkflowJob(request, response); 154 } 155 else if (jobId.endsWith("-B")) { 156 killBundleJob(request, response); 157 } 158 else { 159 json = killCoordinator(request, response); 160 } 161 return json; 162 } 163 164 /** 165 * protected method to change a coordinator job 166 * @param request request object 167 * @param response response object 168 * @throws XServletException 169 * @throws IOException 170 */ 171 @Override 172 protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 173 IOException { 174 String jobId = getResourceName(request); 175 if (jobId.endsWith("-B")) { 176 changeBundleJob(request, response); 177 } 178 else { 179 changeCoordinatorJob(request, response); 180 } 181 } 182 @Override 183 protected JSONObject ignoreJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { 184 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); 185 } 186 187 /* 188 * protected method to reRun a job 189 * 190 * @seeorg.apache.oozie.servlet.BaseJobServlet#reRunJob(javax.servlet.http. 191 * HttpServletRequest, javax.servlet.http.HttpServletResponse, 192 * org.apache.hadoop.conf.Configuration) 193 */ 194 @Override 195 protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 196 throws XServletException, IOException { 197 JSONObject json = null; 198 String jobId = getResourceName(request); 199 if (jobId.endsWith("-W")) { 200 reRunWorkflowJob(request, response, conf); 201 } 202 else if (jobId.endsWith("-B")) { 203 rerunBundleJob(request, response, conf); 204 } 205 else { 206 json = reRunCoordinatorActions(request, response, conf); 207 } 208 return json; 209 } 210 211 /* 212 * protected method to get a job in JsonBean representation 213 */ 214 @Override 215 protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 216 IOException, BaseEngineException { 217 ServletInputStream is = request.getInputStream(); 218 byte[] b = new byte[101]; 219 while (is.readLine(b, 0, 100) != -1) { 220 XLog.getLog(getClass()).warn("Printing :" + new String(b)); 221 } 222 223 JsonBean jobBean = null; 224 String jobId = getResourceName(request); 225 if (jobId.endsWith("-B")) { 226 jobBean = getBundleJob(request, response); 227 } 228 else { 229 if (jobId.endsWith("-W")) { 230 jobBean = getWorkflowJob(request, response); 231 } 232 else { 233 if (jobId.contains("-W@")) { 234 jobBean = getWorkflowAction(request, response); 235 } 236 else { 237 if (jobId.contains("-C@")) { 238 jobBean = getCoordinatorAction(request, response); 239 } 240 else { 241 jobBean = getCoordinatorJob(request, response); 242 } 243 } 244 } 245 } 246 247 return jobBean; 248 } 249 250 /* 251 * protected method to get a job definition in String format 252 */ 253 @Override 254 protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response) 255 throws XServletException, IOException { 256 String jobDefinition = null; 257 String jobId = getResourceName(request); 258 if (jobId.endsWith("-W")) { 259 jobDefinition = getWorkflowJobDefinition(request, response); 260 } 261 else if (jobId.endsWith("-B")) { 262 jobDefinition = getBundleJobDefinition(request, response); 263 } 264 else { 265 jobDefinition = getCoordinatorJobDefinition(request, response); 266 } 267 return jobDefinition; 268 } 269 270 /* 271 * protected method to stream a job log into response object 272 */ 273 @Override 274 protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 275 IOException { 276 try { 277 String jobId = getResourceName(request); 278 if (jobId.endsWith("-W")) { 279 streamWorkflowJobLog(request, response); 280 } 281 else if (jobId.endsWith("-B")) { 282 streamBundleJobLog(request, response); 283 } 284 else { 285 streamCoordinatorJobLog(request, response); 286 } 287 } 288 catch (Exception e) { 289 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0307, e.getMessage()); 290 } 291 } 292 293 @Override 294 protected void streamJobGraph(HttpServletRequest request, HttpServletResponse response) 295 throws XServletException, IOException { 296 String jobId = getResourceName(request); 297 if (jobId.endsWith("-W")) { 298 try { 299 // Applicable only to worflow, for now 300 response.setContentType(RestConstants.PNG_IMAGE_CONTENT_TYPE); 301 302 String showKill = request.getParameter(RestConstants.JOB_SHOW_KILL_PARAM); 303 boolean sK = showKill != null && (showKill.equalsIgnoreCase("yes") || showKill.equals("1") || showKill.equalsIgnoreCase("true")); 304 305 new GraphGenerator( 306 getWorkflowJobDefinition(request, response), 307 (WorkflowJobBean)getWorkflowJob(request, response), 308 sK).write(response.getOutputStream()); 309 310 } 311 catch (Exception e) { 312 throw new XServletException(HttpServletResponse.SC_NOT_FOUND, ErrorCode.E0307, e.getMessage(), e); 313 } 314 } 315 else { 316 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306); 317 } 318 } 319 320 /** 321 * Start wf job 322 * 323 * @param request servlet request 324 * @param response servlet response 325 * @throws XServletException 326 */ 327 private void startWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 328 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 329 330 String jobId = getResourceName(request); 331 try { 332 dagEngine.start(jobId); 333 } 334 catch (DagEngineException ex) { 335 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 336 } 337 } 338 339 /** 340 * Start bundle job 341 * 342 * @param request servlet request 343 * @param response servlet response 344 * @throws XServletException 345 */ 346 private void startBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 347 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 348 String jobId = getResourceName(request); 349 try { 350 bundleEngine.start(jobId); 351 } 352 catch (BundleEngineException ex) { 353 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 354 } 355 } 356 357 /** 358 * Resume workflow job 359 * 360 * @param request servlet request 361 * @param response servlet response 362 * @throws XServletException 363 */ 364 private void resumeWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 365 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 366 367 String jobId = getResourceName(request); 368 try { 369 dagEngine.resume(jobId); 370 } 371 catch (DagEngineException ex) { 372 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 373 } 374 } 375 376 /** 377 * Resume bundle job 378 * 379 * @param request servlet request 380 * @param response servlet response 381 * @throws XServletException 382 */ 383 private void resumeBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 384 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 385 String jobId = getResourceName(request); 386 try { 387 bundleEngine.resume(jobId); 388 } 389 catch (BundleEngineException ex) { 390 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 391 } 392 } 393 394 /** 395 * Resume coordinator job 396 * 397 * @param request servlet request 398 * @param response servlet response 399 * @throws XServletException 400 * @throws CoordinatorEngineException 401 */ 402 private void resumeCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 403 throws XServletException { 404 String jobId = getResourceName(request); 405 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 406 getUser(request)); 407 try { 408 coordEngine.resume(jobId); 409 } 410 catch (CoordinatorEngineException ex) { 411 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 412 } 413 } 414 415 /** 416 * Suspend a wf job 417 * 418 * @param request servlet request 419 * @param response servlet response 420 * @throws XServletException 421 */ 422 private void suspendWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 423 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 424 425 String jobId = getResourceName(request); 426 try { 427 dagEngine.suspend(jobId); 428 } 429 catch (DagEngineException ex) { 430 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 431 } 432 } 433 434 /** 435 * Suspend bundle job 436 * 437 * @param request servlet request 438 * @param response servlet response 439 * @throws XServletException 440 */ 441 private void suspendBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 442 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 443 String jobId = getResourceName(request); 444 try { 445 bundleEngine.suspend(jobId); 446 } 447 catch (BundleEngineException ex) { 448 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 449 } 450 } 451 452 /** 453 * Suspend coordinator job 454 * 455 * @param request servlet request 456 * @param response servlet response 457 * @throws XServletException 458 */ 459 private void suspendCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 460 throws XServletException { 461 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 462 getUser(request)); 463 String jobId = getResourceName(request); 464 try { 465 coordEngine.suspend(jobId); 466 } 467 catch (CoordinatorEngineException ex) { 468 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 469 } 470 } 471 472 /** 473 * Kill a wf job 474 * @param request servlet request 475 * @param response servlet response 476 * @throws XServletException 477 */ 478 private void killWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 479 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 480 481 String jobId = getResourceName(request); 482 try { 483 dagEngine.kill(jobId); 484 } 485 catch (DagEngineException ex) { 486 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 487 } 488 } 489 490 /** 491 * Kill a coord job 492 * 493 * @param request servlet request 494 * @param response servlet response 495 * @throws XServletException 496 */ 497 @SuppressWarnings("unchecked") 498 private JSONObject killCoordinator(HttpServletRequest request, HttpServletResponse response) throws XServletException { 499 String jobId = getResourceName(request); 500 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class) 501 .getCoordinatorEngine(getUser(request)); 502 JSONObject json = null; 503 String rangeType = request.getParameter(RestConstants.JOB_COORD_RANGE_TYPE_PARAM); 504 String scope = request.getParameter(RestConstants.JOB_COORD_SCOPE_PARAM); 505 506 try { 507 if (rangeType != null && scope != null) { 508 XLog.getLog(getClass()).info( 509 "Kill coordinator actions for jobId=" + jobId + ", rangeType=" + rangeType + ",scope=" + scope); 510 511 json = new JSONObject(); 512 CoordinatorActionInfo coordInfo = coordEngine.killActions(jobId, rangeType, scope); 513 List<CoordinatorActionBean> coordActions; 514 if (coordInfo != null) { 515 coordActions = coordInfo.getCoordActions(); 516 } 517 else { 518 coordActions = CoordUtils.getCoordActions(rangeType, jobId, scope, true); 519 } 520 json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT")); 521 } 522 else { 523 coordEngine.kill(jobId); 524 } 525 } 526 catch (CoordinatorEngineException ex) { 527 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 528 } 529 catch (CommandException ex) { 530 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 531 } 532 return json; 533 } 534 535 /** 536 * Kill bundle job 537 * 538 * @param request servlet request 539 * @param response servlet response 540 * @throws XServletException 541 */ 542 private void killBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 543 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 544 String jobId = getResourceName(request); 545 try { 546 bundleEngine.kill(jobId); 547 } 548 catch (BundleEngineException ex) { 549 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 550 } 551 } 552 553 /** 554 * Change a coordinator job 555 * 556 * @param request servlet request 557 * @param response servlet response 558 * @throws XServletException 559 */ 560 private void changeCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 561 throws XServletException { 562 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 563 getUser(request)); 564 String jobId = getResourceName(request); 565 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE); 566 try { 567 coordEngine.change(jobId, changeValue); 568 } 569 catch (CoordinatorEngineException ex) { 570 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 571 } 572 } 573 574 /** 575 * Change a bundle job 576 * 577 * @param request servlet request 578 * @param response servlet response 579 * @throws XServletException 580 */ 581 private void changeBundleJob(HttpServletRequest request, HttpServletResponse response) 582 throws XServletException { 583 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 584 String jobId = getResourceName(request); 585 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE); 586 try { 587 bundleEngine.change(jobId, changeValue); 588 } 589 catch (BundleEngineException ex) { 590 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 591 } 592 } 593 594 /** 595 * Rerun a wf job 596 * 597 * @param request servlet request 598 * @param response servlet response 599 * @param conf configuration object 600 * @throws XServletException 601 */ 602 private void reRunWorkflowJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 603 throws XServletException { 604 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 605 606 String jobId = getResourceName(request); 607 try { 608 dagEngine.reRun(jobId, conf); 609 } 610 catch (DagEngineException ex) { 611 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 612 } 613 } 614 615 /** 616 * Rerun bundle job 617 * 618 * @param request servlet request 619 * @param response servlet response 620 * @param conf configration object 621 * @throws XServletException 622 */ 623 private void rerunBundleJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 624 throws XServletException { 625 JSONObject json = new JSONObject(); 626 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 627 String jobId = getResourceName(request); 628 629 String coordScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM); 630 String dateScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM); 631 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM); 632 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM); 633 634 XLog.getLog(getClass()).info( 635 "Rerun Bundle for jobId=" + jobId + ", coordScope=" + coordScope + ", dateScope=" + dateScope + ", refresh=" 636 + refresh + ", noCleanup=" + noCleanup); 637 638 try { 639 bundleEngine.reRun(jobId, coordScope, dateScope, Boolean.valueOf(refresh), Boolean.valueOf(noCleanup)); 640 } 641 catch (BaseEngineException ex) { 642 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 643 } 644 } 645 646 /** 647 * Rerun coordinator actions 648 * 649 * @param request servlet request 650 * @param response servlet response 651 * @param conf configuration object 652 * @throws XServletException 653 */ 654 @SuppressWarnings("unchecked") 655 private JSONObject reRunCoordinatorActions(HttpServletRequest request, HttpServletResponse response, 656 Configuration conf) throws XServletException { 657 JSONObject json = new JSONObject(); 658 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request)); 659 660 String jobId = getResourceName(request); 661 662 String rerunType = request.getParameter(RestConstants.JOB_COORD_RANGE_TYPE_PARAM); 663 String scope = request.getParameter(RestConstants.JOB_COORD_SCOPE_PARAM); 664 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM); 665 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM); 666 String failed = request.getParameter(RestConstants.JOB_COORD_RERUN_FAILED_PARAM); 667 668 XLog.getLog(getClass()).info( 669 "Rerun coordinator for jobId=" + jobId + ", rerunType=" + rerunType + ",scope=" + scope + ",refresh=" 670 + refresh + ", noCleanup=" + noCleanup); 671 672 try { 673 if (!(rerunType.equals(RestConstants.JOB_COORD_SCOPE_DATE) || rerunType 674 .equals(RestConstants.JOB_COORD_SCOPE_ACTION))) { 675 throw new CommandException(ErrorCode.E1018, "date or action expected."); 676 } 677 CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh), 678 Boolean.valueOf(noCleanup), Boolean.valueOf(failed), conf); 679 List<CoordinatorActionBean> coordActions; 680 if (coordInfo != null) { 681 coordActions = coordInfo.getCoordActions(); 682 } 683 else { 684 coordActions = CoordUtils.getCoordActions(rerunType, jobId, scope, false); 685 } 686 json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT")); 687 } 688 catch (BaseEngineException ex) { 689 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 690 } 691 catch (CommandException ex) { 692 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 693 } 694 695 return json; 696 } 697 698 699 700 /** 701 * Get workflow job 702 * 703 * @param request servlet request 704 * @param response servlet response 705 * @return JsonBean WorkflowJobBean 706 * @throws XServletException 707 */ 708 protected JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 709 JsonBean jobBean = getWorkflowJobBean(request, response); 710 // for backward compatibility (OOZIE-1231) 711 swapMRActionID((WorkflowJob)jobBean); 712 return jobBean; 713 } 714 715 /** 716 * Get workflow job 717 * 718 * @param request servlet request 719 * @param response servlet response 720 * @return JsonBean WorkflowJobBean 721 * @throws XServletException 722 */ 723 protected JsonBean getWorkflowJobBean(HttpServletRequest request, HttpServletResponse response) throws XServletException { 724 JsonBean jobBean = null; 725 String jobId = getResourceName(request); 726 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 727 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 728 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 729 start = (start < 1) ? 1 : start; 730 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0; 731 len = (len < 1) ? Integer.MAX_VALUE : len; 732 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 733 try { 734 jobBean = (JsonBean) dagEngine.getJob(jobId, start, len); 735 } 736 catch (DagEngineException ex) { 737 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 738 } 739 return jobBean; 740 } 741 742 private void swapMRActionID(WorkflowJob wjBean) { 743 List<WorkflowAction> actions = wjBean.getActions(); 744 if (actions != null) { 745 for (WorkflowAction wa : actions) { 746 swapMRActionID(wa); 747 } 748 } 749 } 750 751 private void swapMRActionID(WorkflowAction waBean) { 752 if (waBean.getType().equals("map-reduce")) { 753 String childId = waBean.getExternalChildIDs(); 754 if (childId != null && !childId.equals("")) { 755 String consoleBase = getConsoleBase(waBean.getConsoleUrl()); 756 ((WorkflowActionBean) waBean).setConsoleUrl(consoleBase + childId); 757 ((WorkflowActionBean) waBean).setExternalId(childId); 758 ((WorkflowActionBean) waBean).setExternalChildIDs(""); 759 } 760 } 761 } 762 763 private String getConsoleBase(String url) { 764 String consoleBase = null; 765 if (url.indexOf("application") != -1) { 766 consoleBase = url.split("application_[0-9]+_[0-9]+")[0]; 767 } 768 else { 769 consoleBase = url.split("job_[0-9]+_[0-9]+")[0]; 770 } 771 return consoleBase; 772 } 773 774 /** 775 * Get wf action info 776 * 777 * @param request servlet request 778 * @param response servlet response 779 * @return JsonBean WorkflowActionBean 780 * @throws XServletException 781 */ 782 protected JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response) 783 throws XServletException { 784 785 JsonBean actionBean = getWorkflowActionBean(request, response); 786 // for backward compatibility (OOZIE-1231) 787 swapMRActionID((WorkflowAction)actionBean); 788 return actionBean; 789 } 790 791 protected JsonBean getWorkflowActionBean(HttpServletRequest request, HttpServletResponse response) 792 throws XServletException { 793 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 794 795 JsonBean actionBean = null; 796 String actionId = getResourceName(request); 797 try { 798 actionBean = dagEngine.getWorkflowAction(actionId); 799 } 800 catch (BaseEngineException ex) { 801 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 802 } 803 return actionBean; 804 } 805 806 /** 807 * Get coord job info 808 * 809 * @param request servlet request 810 * @param response servlet response 811 * @return JsonBean CoordinatorJobBean 812 * @throws XServletException 813 * @throws BaseEngineException 814 */ 815 protected JsonBean getCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 816 throws XServletException, BaseEngineException { 817 JsonBean jobBean = null; 818 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 819 getUser(request)); 820 String jobId = getResourceName(request); 821 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 822 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 823 String filter = request.getParameter(RestConstants.JOB_FILTER_PARAM); 824 String orderStr = request.getParameter(RestConstants.ORDER_PARAM); 825 boolean order = (orderStr != null && orderStr.equals("desc")) ? true : false; 826 int offset = (startStr != null) ? Integer.parseInt(startStr) : 1; 827 offset = (offset < 1) ? 1 : offset; 828 // Get default number of coordinator actions to be retrieved 829 int defaultLen = ConfigurationService.getInt(COORD_ACTIONS_DEFAULT_LENGTH); 830 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0; 831 len = getCoordinatorJobLength(defaultLen, len); 832 try { 833 CoordinatorJobBean coordJob = coordEngine.getCoordJob(jobId, filter, offset, len, order); 834 jobBean = coordJob; 835 } 836 catch (CoordinatorEngineException ex) { 837 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 838 } 839 840 return jobBean; 841 } 842 843 /** 844 * Given the requested length and the default length, determine how many coordinator jobs to return. 845 * Used by {@link #getCoordinatorJob(javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)} 846 * 847 * @param defaultLen The default length 848 * @param len The requested length 849 * @return The length to use 850 */ 851 protected int getCoordinatorJobLength(int defaultLen, int len) { 852 return (len < 1) ? defaultLen : len; 853 } 854 855 /** 856 * Get bundle job info 857 * 858 * @param request servlet request 859 * @param response servlet response 860 * @return JsonBean bundle job bean 861 * @throws XServletException 862 * @throws BaseEngineException 863 */ 864 private JsonBean getBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 865 BaseEngineException { 866 JsonBean jobBean = null; 867 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 868 String jobId = getResourceName(request); 869 870 try { 871 jobBean = (JsonBean) bundleEngine.getBundleJob(jobId); 872 873 return jobBean; 874 } 875 catch (BundleEngineException ex) { 876 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 877 } 878 } 879 880 /** 881 * Get coordinator action 882 * 883 * @param request servlet request 884 * @param response servlet response 885 * @return JsonBean CoordinatorActionBean 886 * @throws XServletException 887 * @throws BaseEngineException 888 */ 889 private JsonBean getCoordinatorAction(HttpServletRequest request, HttpServletResponse response) 890 throws XServletException, BaseEngineException { 891 JsonBean actionBean = null; 892 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 893 getUser(request)); 894 String actionId = getResourceName(request); 895 try { 896 actionBean = coordEngine.getCoordAction(actionId); 897 } 898 catch (CoordinatorEngineException ex) { 899 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 900 } 901 902 return actionBean; 903 } 904 905 /** 906 * Get wf job definition 907 * 908 * @param request servlet request 909 * @param response servlet response 910 * @return String wf definition 911 * @throws XServletException 912 */ 913 private String getWorkflowJobDefinition(HttpServletRequest request, HttpServletResponse response) 914 throws XServletException { 915 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 916 917 String wfDefinition; 918 String jobId = getResourceName(request); 919 try { 920 wfDefinition = dagEngine.getDefinition(jobId); 921 } 922 catch (DagEngineException ex) { 923 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 924 } 925 return wfDefinition; 926 } 927 928 /** 929 * Get bundle job definition 930 * 931 * @param request servlet request 932 * @param response servlet response 933 * @return String bundle definition 934 * @throws XServletException 935 */ 936 private String getBundleJobDefinition(HttpServletRequest request, HttpServletResponse response) throws XServletException { 937 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 938 String bundleDefinition; 939 String jobId = getResourceName(request); 940 try { 941 bundleDefinition = bundleEngine.getDefinition(jobId); 942 } 943 catch (BundleEngineException ex) { 944 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 945 } 946 return bundleDefinition; 947 } 948 949 /** 950 * Get coordinator job definition 951 * 952 * @param request servlet request 953 * @param response servlet response 954 * @return String coord definition 955 * @throws XServletException 956 */ 957 private String getCoordinatorJobDefinition(HttpServletRequest request, HttpServletResponse response) 958 throws XServletException { 959 960 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 961 getUser(request)); 962 963 String jobId = getResourceName(request); 964 965 String coordDefinition = null; 966 try { 967 coordDefinition = coordEngine.getDefinition(jobId); 968 } 969 catch (BaseEngineException ex) { 970 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 971 } 972 return coordDefinition; 973 } 974 975 /** 976 * Stream wf job log 977 * 978 * @param request servlet request 979 * @param response servlet response 980 * @throws XServletException 981 * @throws IOException 982 */ 983 private void streamWorkflowJobLog(HttpServletRequest request, HttpServletResponse response) 984 throws XServletException, IOException { 985 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 986 String jobId = getResourceName(request); 987 try { 988 dagEngine.streamLog(jobId, response.getWriter(), request.getParameterMap()); 989 } 990 catch (DagEngineException ex) { 991 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 992 } 993 } 994 995 /** 996 * Stream bundle job log 997 * 998 * @param request servlet request 999 * @param response servlet response 1000 * @throws XServletException 1001 */ 1002 private void streamBundleJobLog(HttpServletRequest request, HttpServletResponse response) 1003 throws XServletException, IOException { 1004 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 1005 String jobId = getResourceName(request); 1006 try { 1007 bundleEngine.streamLog(jobId, response.getWriter(), request.getParameterMap()); 1008 } 1009 catch (BundleEngineException ex) { 1010 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 1011 } 1012 } 1013 1014 /** 1015 * Stream coordinator job log 1016 * 1017 * @param request servlet request 1018 * @param response servlet response 1019 * @throws XServletException 1020 * @throws IOException 1021 */ 1022 private void streamCoordinatorJobLog(HttpServletRequest request, HttpServletResponse response) 1023 throws XServletException, IOException { 1024 1025 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 1026 getUser(request)); 1027 String jobId = getResourceName(request); 1028 String logRetrievalScope = request.getParameter(RestConstants.JOB_LOG_SCOPE_PARAM); 1029 String logRetrievalType = request.getParameter(RestConstants.JOB_LOG_TYPE_PARAM); 1030 try { 1031 coordEngine.streamLog(jobId, logRetrievalScope, logRetrievalType, response.getWriter(), request.getParameterMap()); 1032 } 1033 catch (BaseEngineException ex) { 1034 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 1035 } 1036 catch (CommandException ex) { 1037 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 1038 } 1039 } 1040 1041 @Override 1042 protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException, 1043 IOException { 1044 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); 1045 } 1046 1047 @Override 1048 protected JSONObject getJobsByParentId(HttpServletRequest request, HttpServletResponse response) 1049 throws XServletException, IOException { 1050 JSONObject json = new JSONObject(); 1051 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class) 1052 .getCoordinatorEngine(getUser(request)); 1053 String coordActionId; 1054 String type = request.getParameter(RestConstants.JOB_COORD_RANGE_TYPE_PARAM); 1055 String scope = request.getParameter(RestConstants.JOB_COORD_SCOPE_PARAM); 1056 // for getting allruns for coordinator action - 2 alternate endpoints 1057 if (type != null && type.equals(RestConstants.JOB_COORD_SCOPE_ACTION) && scope != null) { 1058 // endpoint - oozie/v2/coord-job-id?type=action&scope=action-num&show=allruns 1059 String jobId = getResourceName(request); 1060 coordActionId = Services.get().get(UUIDService.class).generateChildId(jobId, scope); 1061 } 1062 else { 1063 // endpoint - oozie/v2/coord-action-id?show=allruns 1064 coordActionId = getResourceName(request); 1065 } 1066 try { 1067 List<WorkflowJobBean> wfs = coordEngine.getReruns(coordActionId); 1068 JSONArray array = new JSONArray(); 1069 if (wfs != null) { 1070 for (WorkflowJobBean wf : wfs) { 1071 JSONObject json1 = new JSONObject(); 1072 json1.put(JsonTags.WORKFLOW_ID, wf.getId()); 1073 json1.put(JsonTags.WORKFLOW_STATUS, wf.getStatus().toString()); 1074 json1.put(JsonTags.WORKFLOW_START_TIME, JsonUtils.formatDateRfc822(wf.getStartTime(), "GMT")); 1075 json1.put(JsonTags.WORKFLOW_ACTION_END_TIME, JsonUtils.formatDateRfc822(wf.getEndTime(), "GMT")); 1076 array.add(json1); 1077 } 1078 } 1079 json.put(JsonTags.WORKFLOWS_JOBS, array); 1080 return json; 1081 } 1082 catch (CoordinatorEngineException ex) { 1083 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 1084 } 1085 } 1086 /** 1087 * not supported for v1 1088 */ 1089 @Override 1090 protected JSONObject updateJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 1091 throws XServletException, IOException { 1092 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); 1093 } 1094 1095 @Override 1096 protected String getJobStatus(HttpServletRequest request, HttpServletResponse response) throws XServletException, 1097 IOException { 1098 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); 1099 } 1100 1101 @Override 1102 protected void streamJobErrorLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 1103 IOException { 1104 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); 1105 } 1106 @Override 1107 protected void streamJobAuditLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 1108 IOException { 1109 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); 1110 } 1111 @Override 1112 void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { 1113 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); 1114 } 1115 1116 @Override 1117 void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, 1118 IOException { 1119 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); 1120 } 1121 1122 @Override 1123 void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { 1124 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); 1125 } 1126}