001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.servlet; 019 020 import java.io.IOException; 021 import java.util.List; 022 import javax.servlet.ServletInputStream; 023 import javax.servlet.http.HttpServletRequest; 024 import javax.servlet.http.HttpServletResponse; 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.oozie.*; 027 import org.apache.oozie.client.WorkflowAction; 028 import org.apache.oozie.client.WorkflowJob; 029 import org.apache.oozie.client.rest.*; 030 import org.apache.oozie.command.CommandException; 031 import org.apache.oozie.command.coord.CoordRerunXCommand; 032 import org.apache.oozie.service.BundleEngineService; 033 import org.apache.oozie.service.CoordinatorEngineService; 034 import org.apache.oozie.service.DagEngineService; 035 import org.apache.oozie.service.Services; 036 import org.apache.oozie.util.GraphGenerator; 037 import org.apache.oozie.util.XLog; 038 import org.json.simple.JSONObject; 039 040 041 @SuppressWarnings("serial") 042 public class V1JobServlet extends BaseJobServlet { 043 044 private static final String INSTRUMENTATION_NAME = "v1job"; 045 public static final String COORD_ACTIONS_DEFAULT_LENGTH = "oozie.coord.actions.default.length"; 046 047 public V1JobServlet() { 048 super(INSTRUMENTATION_NAME); 049 } 050 051 protected V1JobServlet(String instrumentation_name){ 052 super(instrumentation_name); 053 } 054 055 /* 056 * protected method to start a job 057 */ 058 @Override 059 protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 060 IOException { 061 /* 062 * Configuration conf = new XConfiguration(request.getInputStream()); 063 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 064 * conf.get(OozieClient.COORDINATOR_APP_PATH); 065 * 066 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 067 */ 068 String jobId = getResourceName(request); 069 if (jobId.endsWith("-W")) { 070 startWorkflowJob(request, response); 071 } 072 else if (jobId.endsWith("-B")) { 073 startBundleJob(request, response); 074 } 075 else { 076 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, RestConstants.ACTION_PARAM, RestConstants.JOB_ACTION_START); 077 } 078 079 } 080 081 /* 082 * protected method to resume a job 083 */ 084 @Override 085 protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 086 IOException { 087 /* 088 * Configuration conf = new XConfiguration(request.getInputStream()); 089 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 090 * conf.get(OozieClient.COORDINATOR_APP_PATH); 091 * 092 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 093 */ 094 String jobId = getResourceName(request); 095 if (jobId.endsWith("-W")) { 096 resumeWorkflowJob(request, response); 097 } 098 else if (jobId.endsWith("-B")) { 099 resumeBundleJob(request, response); 100 } 101 else { 102 resumeCoordinatorJob(request, response); 103 } 104 } 105 106 /* 107 * protected method to suspend a job 108 */ 109 @Override 110 protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 111 IOException { 112 /* 113 * Configuration conf = new XConfiguration(request.getInputStream()); 114 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 115 * conf.get(OozieClient.COORDINATOR_APP_PATH); 116 * 117 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 118 */ 119 String jobId = getResourceName(request); 120 if (jobId.endsWith("-W")) { 121 suspendWorkflowJob(request, response); 122 } 123 else if (jobId.endsWith("-B")) { 124 suspendBundleJob(request, response); 125 } 126 else { 127 suspendCoordinatorJob(request, response); 128 } 129 } 130 131 /* 132 * protected method to kill a job 133 */ 134 @Override 135 protected void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 136 IOException { 137 /* 138 * Configuration conf = new XConfiguration(request.getInputStream()); 139 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 140 * conf.get(OozieClient.COORDINATOR_APP_PATH); 141 * 142 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 143 */ 144 String jobId = getResourceName(request); 145 if (jobId.endsWith("-W")) { 146 killWorkflowJob(request, response); 147 } 148 else if (jobId.endsWith("-B")) { 149 killBundleJob(request, response); 150 } 151 else { 152 killCoordinatorJob(request, response); 153 } 154 } 155 156 /** 157 * protected method to change a coordinator job 158 * @param request request object 159 * @param response response object 160 * @throws XServletException 161 * @throws IOException 162 */ 163 @Override 164 protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 165 IOException { 166 String jobId = getResourceName(request); 167 if (jobId.endsWith("-B")) { 168 changeBundleJob(request, response); 169 } 170 else { 171 changeCoordinatorJob(request, response); 172 } 173 } 174 175 /* 176 * protected method to reRun a job 177 * 178 * @seeorg.apache.oozie.servlet.BaseJobServlet#reRunJob(javax.servlet.http. 179 * HttpServletRequest, javax.servlet.http.HttpServletResponse, 180 * org.apache.hadoop.conf.Configuration) 181 */ 182 @Override 183 protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 184 throws XServletException, IOException { 185 JSONObject json = null; 186 String jobId = getResourceName(request); 187 if (jobId.endsWith("-W")) { 188 reRunWorkflowJob(request, response, conf); 189 } 190 else if (jobId.endsWith("-B")) { 191 rerunBundleJob(request, response, conf); 192 } 193 else { 194 json = reRunCoordinatorActions(request, response, conf); 195 } 196 return json; 197 } 198 199 /* 200 * protected method to get a job in JsonBean representation 201 */ 202 @Override 203 protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 204 IOException, BaseEngineException { 205 ServletInputStream is = request.getInputStream(); 206 byte[] b = new byte[101]; 207 while (is.readLine(b, 0, 100) != -1) { 208 XLog.getLog(getClass()).warn("Printing :" + new String(b)); 209 } 210 211 JsonBean jobBean = null; 212 String jobId = getResourceName(request); 213 if (jobId.endsWith("-B")) { 214 jobBean = getBundleJob(request, response); 215 } 216 else { 217 if (jobId.endsWith("-W")) { 218 jobBean = getWorkflowJob(request, response); 219 } 220 else { 221 if (jobId.contains("-W@")) { 222 jobBean = getWorkflowAction(request, response); 223 } 224 else { 225 if (jobId.contains("-C@")) { 226 jobBean = getCoordinatorAction(request, response); 227 } 228 else { 229 jobBean = getCoordinatorJob(request, response); 230 } 231 } 232 } 233 } 234 235 return jobBean; 236 } 237 238 /* 239 * protected method to get a job definition in String format 240 */ 241 @Override 242 protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response) 243 throws XServletException, IOException { 244 String jobDefinition = null; 245 String jobId = getResourceName(request); 246 if (jobId.endsWith("-W")) { 247 jobDefinition = getWorkflowJobDefinition(request, response); 248 } 249 else if (jobId.endsWith("-B")) { 250 jobDefinition = getBundleJobDefinition(request, response); 251 } 252 else { 253 jobDefinition = getCoordinatorJobDefinition(request, response); 254 } 255 return jobDefinition; 256 } 257 258 /* 259 * protected method to stream a job log into response object 260 */ 261 @Override 262 protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 263 IOException { 264 String jobId = getResourceName(request); 265 if (jobId.endsWith("-W")) { 266 streamWorkflowJobLog(request, response); 267 } 268 else if (jobId.endsWith("-B")) { 269 streamBundleJob(request, response); 270 } 271 else { 272 streamCoordinatorJobLog(request, response); 273 } 274 } 275 276 @Override 277 protected void streamJobGraph(HttpServletRequest request, HttpServletResponse response) 278 throws XServletException, IOException { 279 String jobId = getResourceName(request); 280 if (jobId.endsWith("-W")) { 281 // Applicable only to worflow, for now 282 response.setContentType(RestConstants.PNG_IMAGE_CONTENT_TYPE); 283 try { 284 String showKill = request.getParameter(RestConstants.JOB_SHOW_KILL_PARAM); 285 boolean sK = showKill != null && (showKill.equalsIgnoreCase("yes") || showKill.equals("1") || showKill.equalsIgnoreCase("true")); 286 287 new GraphGenerator( 288 getWorkflowJobDefinition(request, response), 289 (JsonWorkflowJob)getWorkflowJob(request, response), 290 sK).write(response.getOutputStream()); 291 } 292 catch (Exception e) { 293 throw new XServletException(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, ErrorCode.E0307, e.getMessage(), e); 294 } 295 } 296 else { 297 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306); 298 } 299 } 300 301 /** 302 * Start wf job 303 * 304 * @param request servlet request 305 * @param response servlet response 306 * @throws XServletException 307 */ 308 private void startWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 309 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 310 311 String jobId = getResourceName(request); 312 try { 313 dagEngine.start(jobId); 314 } 315 catch (DagEngineException ex) { 316 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 317 } 318 } 319 320 /** 321 * Start bundle job 322 * 323 * @param request servlet request 324 * @param response servlet response 325 * @throws XServletException 326 */ 327 private void startBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 328 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 329 String jobId = getResourceName(request); 330 try { 331 bundleEngine.start(jobId); 332 } 333 catch (BundleEngineException ex) { 334 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 335 } 336 } 337 338 /** 339 * Resume workflow job 340 * 341 * @param request servlet request 342 * @param response servlet response 343 * @throws XServletException 344 */ 345 private void resumeWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 346 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 347 348 String jobId = getResourceName(request); 349 try { 350 dagEngine.resume(jobId); 351 } 352 catch (DagEngineException ex) { 353 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 354 } 355 } 356 357 /** 358 * Resume bundle job 359 * 360 * @param request servlet request 361 * @param response servlet response 362 * @throws XServletException 363 */ 364 private void resumeBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 365 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 366 String jobId = getResourceName(request); 367 try { 368 bundleEngine.resume(jobId); 369 } 370 catch (BundleEngineException ex) { 371 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 372 } 373 } 374 375 /** 376 * Resume coordinator job 377 * 378 * @param request servlet request 379 * @param response servlet response 380 * @throws XServletException 381 * @throws CoordinatorEngineException 382 */ 383 private void resumeCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 384 throws XServletException { 385 String jobId = getResourceName(request); 386 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 387 getUser(request)); 388 try { 389 coordEngine.resume(jobId); 390 } 391 catch (CoordinatorEngineException ex) { 392 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 393 } 394 } 395 396 /** 397 * Suspend a wf job 398 * 399 * @param request servlet request 400 * @param response servlet response 401 * @throws XServletException 402 */ 403 private void suspendWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 404 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 405 406 String jobId = getResourceName(request); 407 try { 408 dagEngine.suspend(jobId); 409 } 410 catch (DagEngineException ex) { 411 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 412 } 413 } 414 415 /** 416 * Suspend bundle job 417 * 418 * @param request servlet request 419 * @param response servlet response 420 * @throws XServletException 421 */ 422 private void suspendBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 423 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 424 String jobId = getResourceName(request); 425 try { 426 bundleEngine.suspend(jobId); 427 } 428 catch (BundleEngineException ex) { 429 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 430 } 431 } 432 433 /** 434 * Suspend coordinator job 435 * 436 * @param request servlet request 437 * @param response servlet response 438 * @throws XServletException 439 */ 440 private void suspendCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 441 throws XServletException { 442 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 443 getUser(request)); 444 String jobId = getResourceName(request); 445 try { 446 coordEngine.suspend(jobId); 447 } 448 catch (CoordinatorEngineException ex) { 449 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 450 } 451 } 452 453 /** 454 * Kill a wf job 455 * @param request servlet request 456 * @param response servlet response 457 * @throws XServletException 458 */ 459 private void killWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 460 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 461 462 String jobId = getResourceName(request); 463 try { 464 dagEngine.kill(jobId); 465 } 466 catch (DagEngineException ex) { 467 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 468 } 469 } 470 471 /** 472 * Kill a coord job 473 * @param request servlet request 474 * @param response servlet response 475 * @throws XServletException 476 */ 477 private void killCoordinatorJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 478 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 479 getUser(request)); 480 String jobId = getResourceName(request); 481 try { 482 coordEngine.kill(jobId); 483 } 484 catch (CoordinatorEngineException ex) { 485 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 486 } 487 } 488 489 /** 490 * Kill bundle job 491 * 492 * @param request servlet request 493 * @param response servlet response 494 * @throws XServletException 495 */ 496 private void killBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 497 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 498 String jobId = getResourceName(request); 499 try { 500 bundleEngine.kill(jobId); 501 } 502 catch (BundleEngineException ex) { 503 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 504 } 505 } 506 507 /** 508 * Change a coordinator job 509 * 510 * @param request servlet request 511 * @param response servlet response 512 * @throws XServletException 513 */ 514 private void changeCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 515 throws XServletException { 516 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 517 getUser(request)); 518 String jobId = getResourceName(request); 519 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE); 520 try { 521 coordEngine.change(jobId, changeValue); 522 } 523 catch (CoordinatorEngineException ex) { 524 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 525 } 526 } 527 528 /** 529 * Change a bundle job 530 * 531 * @param request servlet request 532 * @param response servlet response 533 * @throws XServletException 534 */ 535 private void changeBundleJob(HttpServletRequest request, HttpServletResponse response) 536 throws XServletException { 537 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 538 String jobId = getResourceName(request); 539 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE); 540 try { 541 bundleEngine.change(jobId, changeValue); 542 } 543 catch (BundleEngineException ex) { 544 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 545 } 546 } 547 548 /** 549 * Rerun a wf job 550 * 551 * @param request servlet request 552 * @param response servlet response 553 * @param conf configuration object 554 * @throws XServletException 555 */ 556 private void reRunWorkflowJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 557 throws XServletException { 558 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 559 560 String jobId = getResourceName(request); 561 try { 562 dagEngine.reRun(jobId, conf); 563 } 564 catch (DagEngineException ex) { 565 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 566 } 567 } 568 569 /** 570 * Rerun bundle job 571 * 572 * @param request servlet request 573 * @param response servlet response 574 * @param conf configration object 575 * @throws XServletException 576 */ 577 private void rerunBundleJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 578 throws XServletException { 579 JSONObject json = new JSONObject(); 580 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 581 String jobId = getResourceName(request); 582 583 String coordScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM); 584 String dateScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM); 585 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM); 586 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM); 587 588 XLog.getLog(getClass()).info( 589 "Rerun Bundle for jobId=" + jobId + ", coordScope=" + coordScope + ", dateScope=" + dateScope + ", refresh=" 590 + refresh + ", noCleanup=" + noCleanup); 591 592 try { 593 bundleEngine.reRun(jobId, coordScope, dateScope, Boolean.valueOf(refresh), Boolean.valueOf(noCleanup)); 594 } 595 catch (BaseEngineException ex) { 596 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 597 } 598 } 599 600 /** 601 * Rerun coordinator actions 602 * 603 * @param request servlet request 604 * @param response servlet response 605 * @param conf configuration object 606 * @throws XServletException 607 */ 608 @SuppressWarnings("unchecked") 609 private JSONObject reRunCoordinatorActions(HttpServletRequest request, HttpServletResponse response, 610 Configuration conf) throws XServletException { 611 JSONObject json = new JSONObject(); 612 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request)); 613 614 String jobId = getResourceName(request); 615 616 String rerunType = request.getParameter(RestConstants.JOB_COORD_RERUN_TYPE_PARAM); 617 String scope = request.getParameter(RestConstants.JOB_COORD_RERUN_SCOPE_PARAM); 618 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM); 619 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM); 620 621 XLog.getLog(getClass()).info( 622 "Rerun coordinator for jobId=" + jobId + ", rerunType=" + rerunType + ",scope=" + scope + ",refresh=" 623 + refresh + ", noCleanup=" + noCleanup); 624 625 try { 626 if (!(rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE) || rerunType 627 .equals(RestConstants.JOB_COORD_RERUN_ACTION))) { 628 throw new CommandException(ErrorCode.E1018, "date or action expected."); 629 } 630 CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh), 631 Boolean.valueOf(noCleanup)); 632 List<CoordinatorActionBean> coordActions; 633 if (coordInfo != null) { 634 coordActions = coordInfo.getCoordActions(); 635 } 636 else { 637 coordActions = CoordRerunXCommand.getCoordActions(rerunType, jobId, scope); 638 } 639 json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT")); 640 } 641 catch (BaseEngineException ex) { 642 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 643 } 644 catch (CommandException ex) { 645 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 646 } 647 648 return json; 649 } 650 651 652 653 /** 654 * Get workflow job 655 * 656 * @param request servlet request 657 * @param response servlet response 658 * @return JsonBean WorkflowJobBean 659 * @throws XServletException 660 */ 661 protected JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 662 JsonBean jobBean = getWorkflowJobBean(request, response); 663 // for backward compatibility (OOZIE-1231) 664 swapMRActionID((WorkflowJob)jobBean); 665 return jobBean; 666 } 667 668 /** 669 * Get workflow job 670 * 671 * @param request servlet request 672 * @param response servlet response 673 * @return JsonBean WorkflowJobBean 674 * @throws XServletException 675 */ 676 protected JsonBean getWorkflowJobBean(HttpServletRequest request, HttpServletResponse response) throws XServletException { 677 JsonBean jobBean = null; 678 String jobId = getResourceName(request); 679 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 680 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 681 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 682 start = (start < 1) ? 1 : start; 683 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0; 684 len = (len < 1) ? Integer.MAX_VALUE : len; 685 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 686 try { 687 jobBean = (JsonBean) dagEngine.getJob(jobId, start, len); 688 } 689 catch (DagEngineException ex) { 690 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 691 } 692 return jobBean; 693 } 694 695 private void swapMRActionID(WorkflowJob wjBean) { 696 List<WorkflowAction> actions = wjBean.getActions(); 697 if (actions != null) { 698 for (WorkflowAction wa : actions) { 699 swapMRActionID(wa); 700 } 701 } 702 } 703 704 private void swapMRActionID(WorkflowAction waBean) { 705 if (waBean.getType().equals("map-reduce")) { 706 String childId = waBean.getExternalChildIDs(); 707 if (childId != null && !childId.equals("")) { 708 String consoleBase = getConsoleBase(waBean.getConsoleUrl()); 709 ((WorkflowActionBean) waBean).setConsoleUrl(consoleBase + childId); 710 ((WorkflowActionBean) waBean).setExternalId(childId); 711 ((WorkflowActionBean) waBean).setExternalChildIDs(""); 712 } 713 } 714 } 715 716 private String getConsoleBase(String url) { 717 String consoleBase = null; 718 if (url.indexOf("application") != -1) { 719 consoleBase = url.split("application_[0-9]+_[0-9]+")[0]; 720 } 721 else { 722 consoleBase = url.split("job_[0-9]+_[0-9]+")[0]; 723 } 724 return consoleBase; 725 } 726 727 /** 728 * Get wf action info 729 * 730 * @param request servlet request 731 * @param response servlet response 732 * @return JsonBean WorkflowActionBean 733 * @throws XServletException 734 */ 735 protected JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response) 736 throws XServletException { 737 738 JsonBean actionBean = getWorkflowActionBean(request, response); 739 // for backward compatibility (OOZIE-1231) 740 swapMRActionID((WorkflowAction)actionBean); 741 return actionBean; 742 } 743 744 protected JsonBean getWorkflowActionBean(HttpServletRequest request, HttpServletResponse response) 745 throws XServletException { 746 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 747 748 JsonBean actionBean = null; 749 String actionId = getResourceName(request); 750 try { 751 actionBean = dagEngine.getWorkflowAction(actionId); 752 } 753 catch (BaseEngineException ex) { 754 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 755 } 756 return actionBean; 757 } 758 759 /** 760 * Get coord job info 761 * 762 * @param request servlet request 763 * @param response servlet response 764 * @return JsonBean CoordinatorJobBean 765 * @throws XServletException 766 * @throws BaseEngineException 767 */ 768 protected JsonBean getCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 769 throws XServletException, BaseEngineException { 770 JsonBean jobBean = null; 771 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 772 getUser(request)); 773 String jobId = getResourceName(request); 774 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 775 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 776 String filter = request.getParameter(RestConstants.JOB_FILTER_PARAM); 777 String orderStr = request.getParameter(RestConstants.ORDER_PARAM); 778 boolean order = (orderStr != null && orderStr.equals("desc")) ? true : false; 779 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 780 start = (start < 1) ? 1 : start; 781 // Get default number of coordinator actions to be retrieved 782 int defaultLen = Services.get().getConf().getInt(COORD_ACTIONS_DEFAULT_LENGTH, 1000); 783 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0; 784 len = (len < 0) ? defaultLen : len; 785 try { 786 JsonCoordinatorJob coordJob = coordEngine.getCoordJob(jobId, filter, start, len, order); 787 jobBean = coordJob; 788 } 789 catch (CoordinatorEngineException ex) { 790 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 791 } 792 793 return jobBean; 794 } 795 796 /** 797 * Get bundle job info 798 * 799 * @param request servlet request 800 * @param response servlet response 801 * @return JsonBean bundle job bean 802 * @throws XServletException 803 * @throws BaseEngineException 804 */ 805 private JsonBean getBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 806 BaseEngineException { 807 JsonBean jobBean = null; 808 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 809 String jobId = getResourceName(request); 810 811 try { 812 jobBean = (JsonBean) bundleEngine.getBundleJob(jobId); 813 814 return jobBean; 815 } 816 catch (BundleEngineException ex) { 817 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 818 } 819 } 820 821 /** 822 * Get coordinator action 823 * 824 * @param request servlet request 825 * @param response servlet response 826 * @return JsonBean CoordinatorActionBean 827 * @throws XServletException 828 * @throws BaseEngineException 829 */ 830 private JsonBean getCoordinatorAction(HttpServletRequest request, HttpServletResponse response) 831 throws XServletException, BaseEngineException { 832 JsonBean actionBean = null; 833 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 834 getUser(request)); 835 String actionId = getResourceName(request); 836 try { 837 actionBean = coordEngine.getCoordAction(actionId); 838 } 839 catch (CoordinatorEngineException ex) { 840 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 841 } 842 843 return actionBean; 844 } 845 846 /** 847 * Get wf job definition 848 * 849 * @param request servlet request 850 * @param response servlet response 851 * @return String wf definition 852 * @throws XServletException 853 */ 854 private String getWorkflowJobDefinition(HttpServletRequest request, HttpServletResponse response) 855 throws XServletException { 856 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 857 858 String wfDefinition; 859 String jobId = getResourceName(request); 860 try { 861 wfDefinition = dagEngine.getDefinition(jobId); 862 } 863 catch (DagEngineException ex) { 864 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 865 } 866 return wfDefinition; 867 } 868 869 /** 870 * Get bundle job definition 871 * 872 * @param request servlet request 873 * @param response servlet response 874 * @return String bundle definition 875 * @throws XServletException 876 */ 877 private String getBundleJobDefinition(HttpServletRequest request, HttpServletResponse response) throws XServletException { 878 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 879 String bundleDefinition; 880 String jobId = getResourceName(request); 881 try { 882 bundleDefinition = bundleEngine.getDefinition(jobId); 883 } 884 catch (BundleEngineException ex) { 885 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 886 } 887 return bundleDefinition; 888 } 889 890 /** 891 * Get coordinator job definition 892 * 893 * @param request servlet request 894 * @param response servlet response 895 * @return String coord definition 896 * @throws XServletException 897 */ 898 private String getCoordinatorJobDefinition(HttpServletRequest request, HttpServletResponse response) 899 throws XServletException { 900 901 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 902 getUser(request)); 903 904 String jobId = getResourceName(request); 905 906 String coordDefinition = null; 907 try { 908 coordDefinition = coordEngine.getDefinition(jobId); 909 } 910 catch (BaseEngineException ex) { 911 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 912 } 913 return coordDefinition; 914 } 915 916 /** 917 * Stream wf job log 918 * 919 * @param request servlet request 920 * @param response servlet response 921 * @throws XServletException 922 * @throws IOException 923 */ 924 private void streamWorkflowJobLog(HttpServletRequest request, HttpServletResponse response) 925 throws XServletException, IOException { 926 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 927 String jobId = getResourceName(request); 928 try { 929 dagEngine.streamLog(jobId, response.getWriter()); 930 } 931 catch (DagEngineException ex) { 932 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 933 } 934 } 935 936 /** 937 * Stream bundle job log 938 * 939 * @param request servlet request 940 * @param response servlet response 941 * @throws XServletException 942 */ 943 private void streamBundleJob(HttpServletRequest request, HttpServletResponse response) 944 throws XServletException, IOException { 945 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 946 String jobId = getResourceName(request); 947 try { 948 bundleEngine.streamLog(jobId, response.getWriter()); 949 } 950 catch (BundleEngineException ex) { 951 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 952 } 953 } 954 955 /** 956 * Stream coordinator job log 957 * 958 * @param request servlet request 959 * @param response servlet response 960 * @throws XServletException 961 * @throws IOException 962 */ 963 private void streamCoordinatorJobLog(HttpServletRequest request, HttpServletResponse response) 964 throws XServletException, IOException { 965 966 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 967 getUser(request)); 968 String jobId = getResourceName(request); 969 String logRetrievalScope = request.getParameter(RestConstants.JOB_LOG_SCOPE_PARAM); 970 String logRetrievalType = request.getParameter(RestConstants.JOB_LOG_TYPE_PARAM); 971 try { 972 coordEngine.streamLog(jobId, logRetrievalScope, logRetrievalType, response.getWriter()); 973 } 974 catch (BaseEngineException ex) { 975 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 976 } 977 catch (CommandException ex) { 978 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 979 } 980 } 981 982 @Override 983 protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException, 984 IOException { 985 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); 986 } 987 988 }