001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.servlet; 019 020 import java.io.IOException; 021 import java.util.List; 022 import javax.servlet.ServletInputStream; 023 import javax.servlet.http.HttpServletRequest; 024 import javax.servlet.http.HttpServletResponse; 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.oozie.*; 027 import org.apache.oozie.client.rest.*; 028 import org.apache.oozie.command.CommandException; 029 import org.apache.oozie.command.coord.CoordRerunXCommand; 030 import org.apache.oozie.service.BundleEngineService; 031 import org.apache.oozie.service.CoordinatorEngineService; 032 import org.apache.oozie.service.DagEngineService; 033 import org.apache.oozie.service.Services; 034 import org.apache.oozie.util.GraphGenerator; 035 import org.apache.oozie.util.XLog; 036 import org.json.simple.JSONObject; 037 038 039 @SuppressWarnings("serial") 040 public class V1JobServlet extends BaseJobServlet { 041 042 private static final String INSTRUMENTATION_NAME = "v1job"; 043 public static final String COORD_ACTIONS_DEFAULT_LENGTH = "oozie.coord.actions.default.length"; 044 045 public V1JobServlet() { 046 super(INSTRUMENTATION_NAME); 047 } 048 049 /* 050 * protected method to start a job 051 */ 052 @Override 053 protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 054 IOException { 055 /* 056 * Configuration conf = new XConfiguration(request.getInputStream()); 057 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 058 * conf.get(OozieClient.COORDINATOR_APP_PATH); 059 * 060 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 061 */ 062 String jobId = getResourceName(request); 063 if (jobId.endsWith("-W")) { 064 startWorkflowJob(request, response); 065 } 066 else if (jobId.endsWith("-B")) { 067 startBundleJob(request, response); 068 } 069 else { 070 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, RestConstants.ACTION_PARAM, RestConstants.JOB_ACTION_START); 071 } 072 073 } 074 075 /* 076 * protected method to resume a job 077 */ 078 @Override 079 protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 080 IOException { 081 /* 082 * Configuration conf = new XConfiguration(request.getInputStream()); 083 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 084 * conf.get(OozieClient.COORDINATOR_APP_PATH); 085 * 086 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 087 */ 088 String jobId = getResourceName(request); 089 if (jobId.endsWith("-W")) { 090 resumeWorkflowJob(request, response); 091 } 092 else if (jobId.endsWith("-B")) { 093 resumeBundleJob(request, response); 094 } 095 else { 096 resumeCoordinatorJob(request, response); 097 } 098 } 099 100 /* 101 * protected method to suspend a job 102 */ 103 @Override 104 protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 105 IOException { 106 /* 107 * Configuration conf = new XConfiguration(request.getInputStream()); 108 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 109 * conf.get(OozieClient.COORDINATOR_APP_PATH); 110 * 111 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 112 */ 113 String jobId = getResourceName(request); 114 if (jobId.endsWith("-W")) { 115 suspendWorkflowJob(request, response); 116 } 117 else if (jobId.endsWith("-B")) { 118 suspendBundleJob(request, response); 119 } 120 else { 121 suspendCoordinatorJob(request, response); 122 } 123 } 124 125 /* 126 * protected method to kill a job 127 */ 128 @Override 129 protected void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 130 IOException { 131 /* 132 * Configuration conf = new XConfiguration(request.getInputStream()); 133 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 134 * conf.get(OozieClient.COORDINATOR_APP_PATH); 135 * 136 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 137 */ 138 String jobId = getResourceName(request); 139 if (jobId.endsWith("-W")) { 140 killWorkflowJob(request, response); 141 } 142 else if (jobId.endsWith("-B")) { 143 killBundleJob(request, response); 144 } 145 else { 146 killCoordinatorJob(request, response); 147 } 148 } 149 150 /** 151 * protected method to change a coordinator job 152 * @param request request object 153 * @param response response object 154 * @throws XServletException 155 * @throws IOException 156 */ 157 @Override 158 protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 159 IOException { 160 String jobId = getResourceName(request); 161 if (jobId.endsWith("-B")) { 162 changeBundleJob(request, response); 163 } 164 else { 165 changeCoordinatorJob(request, response); 166 } 167 } 168 169 /* 170 * protected method to reRun a job 171 * 172 * @seeorg.apache.oozie.servlet.BaseJobServlet#reRunJob(javax.servlet.http. 173 * HttpServletRequest, javax.servlet.http.HttpServletResponse, 174 * org.apache.hadoop.conf.Configuration) 175 */ 176 @Override 177 protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 178 throws XServletException, IOException { 179 JSONObject json = null; 180 String jobId = getResourceName(request); 181 if (jobId.endsWith("-W")) { 182 reRunWorkflowJob(request, response, conf); 183 } 184 else if (jobId.endsWith("-B")) { 185 rerunBundleJob(request, response, conf); 186 } 187 else { 188 json = reRunCoordinatorActions(request, response, conf); 189 } 190 return json; 191 } 192 193 /* 194 * protected method to get a job in JsonBean representation 195 */ 196 @Override 197 protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 198 IOException, BaseEngineException { 199 ServletInputStream is = request.getInputStream(); 200 byte[] b = new byte[101]; 201 while (is.readLine(b, 0, 100) != -1) { 202 XLog.getLog(getClass()).warn("Printing :" + new String(b)); 203 } 204 205 JsonBean jobBean = null; 206 String jobId = getResourceName(request); 207 if (jobId.endsWith("-B")) { 208 jobBean = getBundleJob(request, response); 209 } 210 else { 211 if (jobId.endsWith("-W")) { 212 jobBean = getWorkflowJob(request, response); 213 } 214 else { 215 if (jobId.contains("-W@")) { 216 jobBean = getWorkflowAction(request, response); 217 } 218 else { 219 if (jobId.contains("-C@")) { 220 jobBean = getCoordinatorAction(request, response); 221 } 222 else { 223 jobBean = getCoordinatorJob(request, response); 224 } 225 } 226 } 227 } 228 229 return jobBean; 230 } 231 232 /* 233 * protected method to get a job definition in String format 234 */ 235 @Override 236 protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response) 237 throws XServletException, IOException { 238 String jobDefinition = null; 239 String jobId = getResourceName(request); 240 if (jobId.endsWith("-W")) { 241 jobDefinition = getWorkflowJobDefinition(request, response); 242 } 243 else if (jobId.endsWith("-B")) { 244 jobDefinition = getBundleJobDefinition(request, response); 245 } 246 else { 247 jobDefinition = getCoordinatorJobDefinition(request, response); 248 } 249 return jobDefinition; 250 } 251 252 /* 253 * protected method to stream a job log into response object 254 */ 255 @Override 256 protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 257 IOException { 258 String jobId = getResourceName(request); 259 if (jobId.endsWith("-W")) { 260 streamWorkflowJobLog(request, response); 261 } 262 else if (jobId.endsWith("-B")) { 263 streamBundleJob(request, response); 264 } 265 else { 266 streamCoordinatorJobLog(request, response); 267 } 268 } 269 270 @Override 271 protected void streamJobGraph(HttpServletRequest request, HttpServletResponse response) 272 throws XServletException, IOException { 273 String jobId = getResourceName(request); 274 if (jobId.endsWith("-W")) { 275 // Applicable only to worflow, for now 276 response.setContentType(RestConstants.PNG_IMAGE_CONTENT_TYPE); 277 try { 278 String showKill = request.getParameter(RestConstants.JOB_SHOW_KILL_PARAM); 279 boolean sK = showKill != null && (showKill.equalsIgnoreCase("yes") || showKill.equals("1") || showKill.equalsIgnoreCase("true")); 280 281 new GraphGenerator( 282 getWorkflowJobDefinition(request, response), 283 (JsonWorkflowJob)getWorkflowJob(request, response), 284 sK).write(response.getOutputStream()); 285 } 286 catch (Exception e) { 287 throw new XServletException(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, ErrorCode.E0307, e.getMessage(), e); 288 } 289 } 290 else { 291 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306); 292 } 293 } 294 295 /** 296 * Start wf job 297 * 298 * @param request servlet request 299 * @param response servlet response 300 * @throws XServletException 301 */ 302 private void startWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 303 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 304 getAuthToken(request)); 305 306 String jobId = getResourceName(request); 307 try { 308 dagEngine.start(jobId); 309 } 310 catch (DagEngineException ex) { 311 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 312 } 313 } 314 315 /** 316 * Start bundle job 317 * 318 * @param request servlet request 319 * @param response servlet response 320 * @throws XServletException 321 */ 322 private void startBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 323 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 324 getAuthToken(request)); 325 String jobId = getResourceName(request); 326 try { 327 bundleEngine.start(jobId); 328 } 329 catch (BundleEngineException ex) { 330 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 331 } 332 } 333 334 /** 335 * Resume workflow job 336 * 337 * @param request servlet request 338 * @param response servlet response 339 * @throws XServletException 340 */ 341 private void resumeWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 342 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 343 getAuthToken(request)); 344 345 String jobId = getResourceName(request); 346 try { 347 dagEngine.resume(jobId); 348 } 349 catch (DagEngineException ex) { 350 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 351 } 352 } 353 354 /** 355 * Resume bundle job 356 * 357 * @param request servlet request 358 * @param response servlet response 359 * @throws XServletException 360 */ 361 private void resumeBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 362 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 363 getAuthToken(request)); 364 String jobId = getResourceName(request); 365 try { 366 bundleEngine.resume(jobId); 367 } 368 catch (BundleEngineException ex) { 369 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 370 } 371 } 372 373 /** 374 * Resume coordinator job 375 * 376 * @param request servlet request 377 * @param response servlet response 378 * @throws XServletException 379 * @throws CoordinatorEngineException 380 */ 381 private void resumeCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 382 throws XServletException { 383 String jobId = getResourceName(request); 384 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 385 getUser(request), getAuthToken(request)); 386 try { 387 coordEngine.resume(jobId); 388 } 389 catch (CoordinatorEngineException ex) { 390 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 391 } 392 } 393 394 /** 395 * Suspend a wf job 396 * 397 * @param request servlet request 398 * @param response servlet response 399 * @throws XServletException 400 */ 401 private void suspendWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 402 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 403 getAuthToken(request)); 404 405 String jobId = getResourceName(request); 406 try { 407 dagEngine.suspend(jobId); 408 } 409 catch (DagEngineException ex) { 410 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 411 } 412 } 413 414 /** 415 * Suspend bundle job 416 * 417 * @param request servlet request 418 * @param response servlet response 419 * @throws XServletException 420 */ 421 private void suspendBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 422 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 423 getAuthToken(request)); 424 String jobId = getResourceName(request); 425 try { 426 bundleEngine.suspend(jobId); 427 } 428 catch (BundleEngineException ex) { 429 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 430 } 431 } 432 433 /** 434 * Suspend coordinator job 435 * 436 * @param request servlet request 437 * @param response servlet response 438 * @throws XServletException 439 */ 440 private void suspendCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 441 throws XServletException { 442 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 443 getUser(request), getAuthToken(request)); 444 String jobId = getResourceName(request); 445 try { 446 coordEngine.suspend(jobId); 447 } 448 catch (CoordinatorEngineException ex) { 449 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 450 } 451 } 452 453 /** 454 * Kill a wf job 455 * @param request servlet request 456 * @param response servlet response 457 * @throws XServletException 458 */ 459 private void killWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 460 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 461 getAuthToken(request)); 462 463 String jobId = getResourceName(request); 464 try { 465 dagEngine.kill(jobId); 466 } 467 catch (DagEngineException ex) { 468 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 469 } 470 } 471 472 /** 473 * Kill a coord job 474 * @param request servlet request 475 * @param response servlet response 476 * @throws XServletException 477 */ 478 private void killCoordinatorJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 479 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 480 getUser(request), getAuthToken(request)); 481 String jobId = getResourceName(request); 482 try { 483 coordEngine.kill(jobId); 484 } 485 catch (CoordinatorEngineException ex) { 486 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 487 } 488 } 489 490 /** 491 * Kill bundle job 492 * 493 * @param request servlet request 494 * @param response servlet response 495 * @throws XServletException 496 */ 497 private void killBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 498 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 499 getAuthToken(request)); 500 String jobId = getResourceName(request); 501 try { 502 bundleEngine.kill(jobId); 503 } 504 catch (BundleEngineException ex) { 505 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 506 } 507 } 508 509 /** 510 * Change a coordinator job 511 * 512 * @param request servlet request 513 * @param response servlet response 514 * @throws XServletException 515 */ 516 private void changeCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 517 throws XServletException { 518 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 519 getUser(request), getAuthToken(request)); 520 String jobId = getResourceName(request); 521 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE); 522 try { 523 coordEngine.change(jobId, changeValue); 524 } 525 catch (CoordinatorEngineException ex) { 526 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 527 } 528 } 529 530 /** 531 * Change a bundle job 532 * 533 * @param request servlet request 534 * @param response servlet response 535 * @throws XServletException 536 */ 537 private void changeBundleJob(HttpServletRequest request, HttpServletResponse response) 538 throws XServletException { 539 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine( 540 getUser(request), getAuthToken(request)); 541 String jobId = getResourceName(request); 542 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE); 543 try { 544 bundleEngine.change(jobId, changeValue); 545 } 546 catch (BundleEngineException ex) { 547 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 548 } 549 } 550 551 /** 552 * Rerun a wf job 553 * 554 * @param request servlet request 555 * @param response servlet response 556 * @param conf configuration object 557 * @throws XServletException 558 */ 559 private void reRunWorkflowJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 560 throws XServletException { 561 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 562 getAuthToken(request)); 563 564 String jobId = getResourceName(request); 565 try { 566 dagEngine.reRun(jobId, conf); 567 } 568 catch (DagEngineException ex) { 569 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 570 } 571 } 572 573 /** 574 * Rerun bundle job 575 * 576 * @param request servlet request 577 * @param response servlet response 578 * @param conf configration object 579 * @throws XServletException 580 */ 581 private void rerunBundleJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 582 throws XServletException { 583 JSONObject json = new JSONObject(); 584 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 585 getAuthToken(request)); 586 String jobId = getResourceName(request); 587 588 String coordScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM); 589 String dateScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM); 590 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM); 591 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM); 592 593 XLog.getLog(getClass()).info( 594 "Rerun Bundle for jobId=" + jobId + ", coordScope=" + coordScope + ", dateScope=" + dateScope + ", refresh=" 595 + refresh + ", noCleanup=" + noCleanup); 596 597 try { 598 bundleEngine.reRun(jobId, coordScope, dateScope, Boolean.valueOf(refresh), Boolean.valueOf(noCleanup)); 599 } 600 catch (BaseEngineException ex) { 601 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 602 } 603 } 604 605 /** 606 * Rerun coordinator actions 607 * 608 * @param request servlet request 609 * @param response servlet response 610 * @param conf configuration object 611 * @throws XServletException 612 */ 613 @SuppressWarnings("unchecked") 614 private JSONObject reRunCoordinatorActions(HttpServletRequest request, HttpServletResponse response, 615 Configuration conf) throws XServletException { 616 JSONObject json = new JSONObject(); 617 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request), 618 getAuthToken(request)); 619 620 String jobId = getResourceName(request); 621 622 String rerunType = request.getParameter(RestConstants.JOB_COORD_RERUN_TYPE_PARAM); 623 String scope = request.getParameter(RestConstants.JOB_COORD_RERUN_SCOPE_PARAM); 624 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM); 625 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM); 626 627 XLog.getLog(getClass()).info( 628 "Rerun coordinator for jobId=" + jobId + ", rerunType=" + rerunType + ",scope=" + scope + ",refresh=" 629 + refresh + ", noCleanup=" + noCleanup); 630 631 try { 632 if (!(rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE) || rerunType 633 .equals(RestConstants.JOB_COORD_RERUN_ACTION))) { 634 throw new CommandException(ErrorCode.E1018, "date or action expected."); 635 } 636 CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh), 637 Boolean.valueOf(noCleanup)); 638 List<CoordinatorActionBean> coordActions; 639 if (coordInfo != null) { 640 coordActions = coordInfo.getCoordActions(); 641 } 642 else { 643 coordActions = CoordRerunXCommand.getCoordActions(rerunType, jobId, scope); 644 } 645 json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT")); 646 } 647 catch (BaseEngineException ex) { 648 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 649 } 650 catch (CommandException ex) { 651 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 652 } 653 654 return json; 655 } 656 657 658 659 /** 660 * Get workflow job 661 * 662 * @param request servlet request 663 * @param response servlet response 664 * @return JsonBean WorkflowJobBean 665 * @throws XServletException 666 */ 667 private JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 668 JsonBean jobBean = null; 669 String jobId = getResourceName(request); 670 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 671 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 672 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 673 start = (start < 1) ? 1 : start; 674 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0; 675 len = (len < 1) ? Integer.MAX_VALUE : len; 676 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 677 getAuthToken(request)); 678 try { 679 jobBean = (JsonBean) dagEngine.getJob(jobId, start, len); 680 } 681 catch (DagEngineException ex) { 682 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 683 } 684 685 return jobBean; 686 } 687 688 /** 689 * Get wf action info 690 * 691 * @param request servlet request 692 * @param response servlet response 693 * @return JsonBean WorkflowActionBean 694 * @throws XServletException 695 */ 696 private JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response) 697 throws XServletException { 698 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 699 getAuthToken(request)); 700 701 JsonBean actionBean = null; 702 String actionId = getResourceName(request); 703 try { 704 actionBean = dagEngine.getWorkflowAction(actionId); 705 } 706 catch (BaseEngineException ex) { 707 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 708 } 709 710 return actionBean; 711 } 712 713 /** 714 * Get coord job info 715 * 716 * @param request servlet request 717 * @param response servlet response 718 * @return JsonBean CoordinatorJobBean 719 * @throws XServletException 720 * @throws BaseEngineException 721 */ 722 private JsonBean getCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 723 throws XServletException, BaseEngineException { 724 JsonBean jobBean = null; 725 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 726 getUser(request), getAuthToken(request)); 727 String jobId = getResourceName(request); 728 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 729 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 730 String filter = request.getParameter(RestConstants.JOB_FILTER_PARAM); 731 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 732 start = (start < 1) ? 1 : start; 733 // Get default number of coordinator actions to be retrieved 734 int defaultLen = Services.get().getConf().getInt(COORD_ACTIONS_DEFAULT_LENGTH, 1000); 735 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0; 736 len = (len < 1) ? defaultLen : len; 737 try { 738 JsonCoordinatorJob coordJob = coordEngine.getCoordJob(jobId, filter, start, len); 739 jobBean = coordJob; 740 } 741 catch (CoordinatorEngineException ex) { 742 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 743 } 744 745 return jobBean; 746 } 747 748 /** 749 * Get bundle job info 750 * 751 * @param request servlet request 752 * @param response servlet response 753 * @return JsonBean bundle job bean 754 * @throws XServletException 755 * @throws BaseEngineException 756 */ 757 private JsonBean getBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 758 BaseEngineException { 759 JsonBean jobBean = null; 760 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 761 getAuthToken(request)); 762 String jobId = getResourceName(request); 763 764 try { 765 jobBean = (JsonBean) bundleEngine.getBundleJob(jobId); 766 767 return jobBean; 768 } 769 catch (BundleEngineException ex) { 770 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 771 } 772 } 773 774 /** 775 * Get coordinator action 776 * 777 * @param request servlet request 778 * @param response servlet response 779 * @return JsonBean CoordinatorActionBean 780 * @throws XServletException 781 * @throws BaseEngineException 782 */ 783 private JsonBean getCoordinatorAction(HttpServletRequest request, HttpServletResponse response) 784 throws XServletException, BaseEngineException { 785 JsonBean actionBean = null; 786 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 787 getUser(request), getAuthToken(request)); 788 String actionId = getResourceName(request); 789 try { 790 actionBean = coordEngine.getCoordAction(actionId); 791 } 792 catch (CoordinatorEngineException ex) { 793 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 794 } 795 796 return actionBean; 797 } 798 799 /** 800 * Get wf job definition 801 * 802 * @param request servlet request 803 * @param response servlet response 804 * @return String wf definition 805 * @throws XServletException 806 */ 807 private String getWorkflowJobDefinition(HttpServletRequest request, HttpServletResponse response) 808 throws XServletException { 809 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 810 getAuthToken(request)); 811 812 String wfDefinition; 813 String jobId = getResourceName(request); 814 try { 815 wfDefinition = dagEngine.getDefinition(jobId); 816 } 817 catch (DagEngineException ex) { 818 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 819 } 820 return wfDefinition; 821 } 822 823 /** 824 * Get bundle job definition 825 * 826 * @param request servlet request 827 * @param response servlet response 828 * @return String bundle definition 829 * @throws XServletException 830 */ 831 private String getBundleJobDefinition(HttpServletRequest request, HttpServletResponse response) throws XServletException { 832 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 833 getAuthToken(request)); 834 String bundleDefinition; 835 String jobId = getResourceName(request); 836 try { 837 bundleDefinition = bundleEngine.getDefinition(jobId); 838 } 839 catch (BundleEngineException ex) { 840 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 841 } 842 return bundleDefinition; 843 } 844 845 /** 846 * Get coordinator job definition 847 * 848 * @param request servlet request 849 * @param response servlet response 850 * @return String coord definition 851 * @throws XServletException 852 */ 853 private String getCoordinatorJobDefinition(HttpServletRequest request, HttpServletResponse response) 854 throws XServletException { 855 856 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 857 getUser(request), getAuthToken(request)); 858 859 String jobId = getResourceName(request); 860 861 String coordDefinition = null; 862 try { 863 coordDefinition = coordEngine.getDefinition(jobId); 864 } 865 catch (BaseEngineException ex) { 866 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 867 } 868 return coordDefinition; 869 } 870 871 /** 872 * Stream wf job log 873 * 874 * @param request servlet request 875 * @param response servlet response 876 * @throws XServletException 877 * @throws IOException 878 */ 879 private void streamWorkflowJobLog(HttpServletRequest request, HttpServletResponse response) 880 throws XServletException, IOException { 881 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 882 getAuthToken(request)); 883 String jobId = getResourceName(request); 884 try { 885 dagEngine.streamLog(jobId, response.getWriter()); 886 } 887 catch (DagEngineException ex) { 888 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 889 } 890 } 891 892 /** 893 * Stream bundle job log 894 * 895 * @param request servlet request 896 * @param response servlet response 897 * @throws XServletException 898 */ 899 private void streamBundleJob(HttpServletRequest request, HttpServletResponse response) 900 throws XServletException, IOException { 901 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 902 getAuthToken(request)); 903 String jobId = getResourceName(request); 904 try { 905 bundleEngine.streamLog(jobId, response.getWriter()); 906 } 907 catch (BundleEngineException ex) { 908 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 909 } 910 } 911 912 /** 913 * Stream coordinator job log 914 * 915 * @param request servlet request 916 * @param response servlet response 917 * @throws XServletException 918 * @throws IOException 919 */ 920 private void streamCoordinatorJobLog(HttpServletRequest request, HttpServletResponse response) 921 throws XServletException, IOException { 922 923 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 924 getUser(request), getAuthToken(request)); 925 String jobId = getResourceName(request); 926 String logRetrievalScope = request.getParameter(RestConstants.JOB_LOG_SCOPE_PARAM); 927 String logRetrievalType = request.getParameter(RestConstants.JOB_LOG_TYPE_PARAM); 928 try { 929 coordEngine.streamLog(jobId, logRetrievalScope, logRetrievalType, response.getWriter()); 930 } 931 catch (BaseEngineException ex) { 932 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 933 } 934 catch (CommandException ex) { 935 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 936 } 937 } 938 }