001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.servlet; 019 020 import java.io.IOException; 021 import java.util.List; 022 023 import javax.servlet.ServletInputStream; 024 import javax.servlet.http.HttpServletRequest; 025 import javax.servlet.http.HttpServletResponse; 026 027 import org.apache.hadoop.conf.Configuration; 028 import org.apache.oozie.BaseEngineException; 029 import org.apache.oozie.BundleEngine; 030 import org.apache.oozie.BundleEngineException; 031 import org.apache.oozie.CoordinatorActionBean; 032 import org.apache.oozie.CoordinatorActionInfo; 033 import org.apache.oozie.CoordinatorEngine; 034 import org.apache.oozie.CoordinatorEngineException; 035 import org.apache.oozie.command.CommandException; 036 import org.apache.oozie.DagEngine; 037 import org.apache.oozie.DagEngineException; 038 import org.apache.oozie.ErrorCode; 039 import org.apache.oozie.client.rest.JsonBean; 040 import org.apache.oozie.client.rest.JsonCoordinatorJob; 041 import org.apache.oozie.client.rest.JsonTags; 042 import org.apache.oozie.client.rest.RestConstants; 043 import org.apache.oozie.service.BundleEngineService; 044 import org.apache.oozie.service.CoordinatorEngineService; 045 import org.apache.oozie.service.DagEngineService; 046 import org.apache.oozie.service.Services; 047 import org.apache.oozie.util.XLog; 048 import org.json.simple.JSONObject; 049 050 @SuppressWarnings("serial") 051 public class V1JobServlet extends BaseJobServlet { 052 053 private static final String INSTRUMENTATION_NAME = "v1job"; 054 055 public V1JobServlet() { 056 super(INSTRUMENTATION_NAME); 057 } 058 059 /* 060 * protected method to start a job 061 */ 062 @Override 063 protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 064 IOException { 065 /* 066 * Configuration conf = new XConfiguration(request.getInputStream()); 067 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 068 * conf.get(OozieClient.COORDINATOR_APP_PATH); 069 * 070 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 071 */ 072 String jobId = getResourceName(request); 073 if (jobId.endsWith("-W")) { 074 startWorkflowJob(request, response); 075 } 076 else if (jobId.endsWith("-B")) { 077 startBundleJob(request, response); 078 } 079 else { 080 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303); 081 } 082 083 } 084 085 /* 086 * protected method to resume a job 087 */ 088 @Override 089 protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 090 IOException { 091 /* 092 * Configuration conf = new XConfiguration(request.getInputStream()); 093 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 094 * conf.get(OozieClient.COORDINATOR_APP_PATH); 095 * 096 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 097 */ 098 String jobId = getResourceName(request); 099 if (jobId.endsWith("-W")) { 100 resumeWorkflowJob(request, response); 101 } 102 else if (jobId.endsWith("-B")) { 103 resumeBundleJob(request, response); 104 } 105 else { 106 resumeCoordinatorJob(request, response); 107 } 108 } 109 110 /* 111 * protected method to suspend a job 112 */ 113 @Override 114 protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 115 IOException { 116 /* 117 * Configuration conf = new XConfiguration(request.getInputStream()); 118 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 119 * conf.get(OozieClient.COORDINATOR_APP_PATH); 120 * 121 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 122 */ 123 String jobId = getResourceName(request); 124 if (jobId.endsWith("-W")) { 125 suspendWorkflowJob(request, response); 126 } 127 else if (jobId.endsWith("-B")) { 128 suspendBundleJob(request, response); 129 } 130 else { 131 suspendCoordinatorJob(request, response); 132 } 133 } 134 135 /* 136 * protected method to kill a job 137 */ 138 @Override 139 protected void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 140 IOException { 141 /* 142 * Configuration conf = new XConfiguration(request.getInputStream()); 143 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 144 * conf.get(OozieClient.COORDINATOR_APP_PATH); 145 * 146 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 147 */ 148 String jobId = getResourceName(request); 149 if (jobId.endsWith("-W")) { 150 killWorkflowJob(request, response); 151 } 152 else if (jobId.endsWith("-B")) { 153 killBundleJob(request, response); 154 } 155 else { 156 killCoordinatorJob(request, response); 157 } 158 } 159 160 /** 161 * protected method to change a coordinator job 162 * @param request request object 163 * @param response response object 164 * @throws XServletException 165 * @throws IOException 166 */ 167 @Override 168 protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 169 IOException { 170 String jobId = getResourceName(request); 171 if (jobId.endsWith("-B")) { 172 changeBundleJob(request, response); 173 } 174 else { 175 changeCoordinatorJob(request, response); 176 } 177 } 178 179 /* 180 * protected method to reRun a job 181 * 182 * @seeorg.apache.oozie.servlet.BaseJobServlet#reRunJob(javax.servlet.http. 183 * HttpServletRequest, javax.servlet.http.HttpServletResponse, 184 * org.apache.hadoop.conf.Configuration) 185 */ 186 @Override 187 protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 188 throws XServletException, IOException { 189 JSONObject json = null; 190 String jobId = getResourceName(request); 191 if (jobId.endsWith("-W")) { 192 reRunWorkflowJob(request, response, conf); 193 } 194 else if (jobId.endsWith("-B")) { 195 rerunBundleJob(request, response, conf); 196 } 197 else { 198 json = reRunCoordinatorActions(request, response, conf); 199 } 200 return json; 201 } 202 203 /* 204 * protected method to get a job in JsonBean representation 205 */ 206 @Override 207 protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 208 IOException, BaseEngineException { 209 ServletInputStream is = request.getInputStream(); 210 byte[] b = new byte[101]; 211 while (is.readLine(b, 0, 100) != -1) { 212 XLog.getLog(getClass()).warn("Printing :" + new String(b)); 213 } 214 215 JsonBean jobBean = null; 216 String jobId = getResourceName(request); 217 if (jobId.endsWith("-B")) { 218 jobBean = getBundleJob(request, response); 219 } 220 else { 221 if (jobId.endsWith("-W")) { 222 jobBean = getWorkflowJob(request, response); 223 } 224 else { 225 if (jobId.contains("-W@")) { 226 jobBean = getWorkflowAction(request, response); 227 } 228 else { 229 if (jobId.contains("-C@")) { 230 jobBean = getCoordinatorAction(request, response); 231 } 232 else { 233 jobBean = getCoordinatorJob(request, response); 234 } 235 } 236 } 237 } 238 239 return jobBean; 240 } 241 242 /* 243 * protected method to get a job definition in String format 244 */ 245 @Override 246 protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response) 247 throws XServletException, IOException { 248 String jobDefinition = null; 249 String jobId = getResourceName(request); 250 if (jobId.endsWith("-W")) { 251 jobDefinition = getWorkflowJobDefinition(request, response); 252 } 253 else if (jobId.endsWith("-B")) { 254 jobDefinition = getBundleJobDefinition(request, response); 255 } 256 else { 257 jobDefinition = getCoordinatorJobDefinition(request, response); 258 } 259 return jobDefinition; 260 } 261 262 /* 263 * protected method to stream a job log into response object 264 */ 265 @Override 266 protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 267 IOException { 268 String jobId = getResourceName(request); 269 if (jobId.endsWith("-W")) { 270 streamWorkflowJobLog(request, response); 271 } 272 else if (jobId.endsWith("-B")) { 273 streamBundleJob(request, response); 274 } 275 else { 276 streamCoordinatorJobLog(request, response); 277 } 278 } 279 280 /** 281 * Start wf job 282 * 283 * @param request servlet request 284 * @param response servlet response 285 * @throws XServletException 286 */ 287 private void startWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 288 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 289 getAuthToken(request)); 290 291 String jobId = getResourceName(request); 292 try { 293 dagEngine.start(jobId); 294 } 295 catch (DagEngineException ex) { 296 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 297 } 298 } 299 300 /** 301 * Start bundle job 302 * 303 * @param request servlet request 304 * @param response servlet response 305 * @throws XServletException 306 */ 307 private void startBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 308 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 309 getAuthToken(request)); 310 String jobId = getResourceName(request); 311 try { 312 bundleEngine.start(jobId); 313 } 314 catch (BundleEngineException ex) { 315 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 316 } 317 } 318 319 /** 320 * Resume workflow job 321 * 322 * @param request servlet request 323 * @param response servlet response 324 * @throws XServletException 325 */ 326 private void resumeWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 327 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 328 getAuthToken(request)); 329 330 String jobId = getResourceName(request); 331 try { 332 dagEngine.resume(jobId); 333 } 334 catch (DagEngineException ex) { 335 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 336 } 337 } 338 339 /** 340 * Resume bundle job 341 * 342 * @param request servlet request 343 * @param response servlet response 344 * @throws XServletException 345 */ 346 private void resumeBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 347 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 348 getAuthToken(request)); 349 String jobId = getResourceName(request); 350 try { 351 bundleEngine.resume(jobId); 352 } 353 catch (BundleEngineException ex) { 354 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 355 } 356 } 357 358 /** 359 * Resume coordinator job 360 * 361 * @param request servlet request 362 * @param response servlet response 363 * @throws XServletException 364 * @throws CoordinatorEngineException 365 */ 366 private void resumeCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 367 throws XServletException { 368 String jobId = getResourceName(request); 369 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 370 getUser(request), getAuthToken(request)); 371 try { 372 coordEngine.resume(jobId); 373 } 374 catch (CoordinatorEngineException ex) { 375 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 376 } 377 } 378 379 /** 380 * Suspend a wf job 381 * 382 * @param request servlet request 383 * @param response servlet response 384 * @throws XServletException 385 */ 386 private void suspendWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 387 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 388 getAuthToken(request)); 389 390 String jobId = getResourceName(request); 391 try { 392 dagEngine.suspend(jobId); 393 } 394 catch (DagEngineException ex) { 395 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 396 } 397 } 398 399 /** 400 * Suspend bundle job 401 * 402 * @param request servlet request 403 * @param response servlet response 404 * @throws XServletException 405 */ 406 private void suspendBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 407 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 408 getAuthToken(request)); 409 String jobId = getResourceName(request); 410 try { 411 bundleEngine.suspend(jobId); 412 } 413 catch (BundleEngineException ex) { 414 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 415 } 416 } 417 418 /** 419 * Suspend coordinator job 420 * 421 * @param request servlet request 422 * @param response servlet response 423 * @throws XServletException 424 */ 425 private void suspendCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 426 throws XServletException { 427 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 428 getUser(request), getAuthToken(request)); 429 String jobId = getResourceName(request); 430 try { 431 coordEngine.suspend(jobId); 432 } 433 catch (CoordinatorEngineException ex) { 434 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 435 } 436 } 437 438 /** 439 * Kill a wf job 440 * @param request servlet request 441 * @param response servlet response 442 * @throws XServletException 443 */ 444 private void killWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 445 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 446 getAuthToken(request)); 447 448 String jobId = getResourceName(request); 449 try { 450 dagEngine.kill(jobId); 451 } 452 catch (DagEngineException ex) { 453 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 454 } 455 } 456 457 /** 458 * Kill a coord job 459 * @param request servlet request 460 * @param response servlet response 461 * @throws XServletException 462 */ 463 private void killCoordinatorJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 464 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 465 getUser(request), getAuthToken(request)); 466 String jobId = getResourceName(request); 467 try { 468 coordEngine.kill(jobId); 469 } 470 catch (CoordinatorEngineException ex) { 471 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 472 } 473 } 474 475 /** 476 * Kill bundle job 477 * 478 * @param request servlet request 479 * @param response servlet response 480 * @throws XServletException 481 */ 482 private void killBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 483 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 484 getAuthToken(request)); 485 String jobId = getResourceName(request); 486 try { 487 bundleEngine.kill(jobId); 488 } 489 catch (BundleEngineException ex) { 490 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 491 } 492 } 493 494 /** 495 * Change a coordinator job 496 * 497 * @param request servlet request 498 * @param response servlet response 499 * @throws XServletException 500 */ 501 private void changeCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 502 throws XServletException { 503 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 504 getUser(request), getAuthToken(request)); 505 String jobId = getResourceName(request); 506 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE); 507 try { 508 coordEngine.change(jobId, changeValue); 509 } 510 catch (CoordinatorEngineException ex) { 511 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 512 } 513 } 514 515 /** 516 * Change a bundle job 517 * 518 * @param request servlet request 519 * @param response servlet response 520 * @throws XServletException 521 */ 522 private void changeBundleJob(HttpServletRequest request, HttpServletResponse response) 523 throws XServletException { 524 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine( 525 getUser(request), getAuthToken(request)); 526 String jobId = getResourceName(request); 527 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE); 528 try { 529 bundleEngine.change(jobId, changeValue); 530 } 531 catch (BundleEngineException ex) { 532 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 533 } 534 } 535 536 /** 537 * Rerun a wf job 538 * 539 * @param request servlet request 540 * @param response servlet response 541 * @param conf configuration object 542 * @throws XServletException 543 */ 544 private void reRunWorkflowJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 545 throws XServletException { 546 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 547 getAuthToken(request)); 548 549 String jobId = getResourceName(request); 550 try { 551 dagEngine.reRun(jobId, conf); 552 } 553 catch (DagEngineException ex) { 554 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 555 } 556 } 557 558 /** 559 * Rerun bundle job 560 * 561 * @param request servlet request 562 * @param response servlet response 563 * @param conf configration object 564 * @throws XServletException 565 */ 566 private void rerunBundleJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 567 throws XServletException { 568 JSONObject json = new JSONObject(); 569 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 570 getAuthToken(request)); 571 String jobId = getResourceName(request); 572 573 String coordScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM); 574 String dateScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM); 575 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM); 576 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM); 577 578 XLog.getLog(getClass()).info( 579 "Rerun Bundle for jobId=" + jobId + ", coordScope=" + coordScope + ", dateScope=" + dateScope + ", refresh=" 580 + refresh + ", noCleanup=" + noCleanup); 581 582 try { 583 bundleEngine.reRun(jobId, coordScope, dateScope, Boolean.valueOf(refresh), Boolean.valueOf(noCleanup)); 584 } 585 catch (BaseEngineException ex) { 586 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 587 } 588 } 589 590 /** 591 * Rerun coordinator actions 592 * 593 * @param request servlet request 594 * @param response servlet response 595 * @param conf configuration object 596 * @throws XServletException 597 */ 598 @SuppressWarnings("unchecked") 599 private JSONObject reRunCoordinatorActions(HttpServletRequest request, HttpServletResponse response, 600 Configuration conf) throws XServletException { 601 JSONObject json = new JSONObject(); 602 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request), 603 getAuthToken(request)); 604 605 String jobId = getResourceName(request); 606 607 String rerunType = request.getParameter(RestConstants.JOB_COORD_RERUN_TYPE_PARAM); 608 String scope = request.getParameter(RestConstants.JOB_COORD_RERUN_SCOPE_PARAM); 609 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM); 610 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM); 611 612 XLog.getLog(getClass()).info( 613 "Rerun coordinator for jobId=" + jobId + ", rerunType=" + rerunType + ",scope=" + scope + ",refresh=" 614 + refresh + ", noCleanup=" + noCleanup); 615 616 try { 617 CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh), 618 Boolean.valueOf(noCleanup)); 619 List<CoordinatorActionBean> actions = coordInfo.getCoordActions(); 620 json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(actions)); 621 } 622 catch (BaseEngineException ex) { 623 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 624 } 625 626 return json; 627 } 628 629 /** 630 * Get workflow job 631 * 632 * @param request servlet request 633 * @param response servlet response 634 * @return JsonBean WorkflowJobBean 635 * @throws XServletException 636 */ 637 private JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 638 JsonBean jobBean = null; 639 String jobId = getResourceName(request); 640 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 641 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 642 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 643 start = (start < 1) ? 1 : start; 644 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0; 645 len = (len < 1) ? Integer.MAX_VALUE : len; 646 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 647 getAuthToken(request)); 648 try { 649 jobBean = (JsonBean) dagEngine.getJob(jobId, start, len); 650 } 651 catch (DagEngineException ex) { 652 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 653 } 654 655 return jobBean; 656 } 657 658 /** 659 * Get wf action info 660 * 661 * @param request servlet request 662 * @param response servlet response 663 * @return JsonBean WorkflowActionBean 664 * @throws XServletException 665 */ 666 private JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response) 667 throws XServletException { 668 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 669 getAuthToken(request)); 670 671 JsonBean actionBean = null; 672 String actionId = getResourceName(request); 673 try { 674 actionBean = dagEngine.getWorkflowAction(actionId); 675 } 676 catch (BaseEngineException ex) { 677 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 678 } 679 680 return actionBean; 681 } 682 683 /** 684 * Get coord job info 685 * 686 * @param request servlet request 687 * @param response servlet response 688 * @return JsonBean CoordinatorJobBean 689 * @throws XServletException 690 * @throws BaseEngineException 691 */ 692 private JsonBean getCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 693 throws XServletException, BaseEngineException { 694 JsonBean jobBean = null; 695 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 696 getUser(request), getAuthToken(request)); 697 String jobId = getResourceName(request); 698 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 699 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 700 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 701 start = (start < 1) ? 1 : start; 702 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0; 703 len = (len < 1) ? Integer.MAX_VALUE : len; 704 try { 705 JsonCoordinatorJob coordJob = coordEngine.getCoordJob(jobId, start, len); 706 jobBean = coordJob; 707 } 708 catch (CoordinatorEngineException ex) { 709 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 710 } 711 712 return jobBean; 713 } 714 715 /** 716 * Get bundle job info 717 * 718 * @param request servlet request 719 * @param response servlet response 720 * @return JsonBean bundle job bean 721 * @throws XServletException 722 * @throws BaseEngineException 723 */ 724 private JsonBean getBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 725 BaseEngineException { 726 JsonBean jobBean = null; 727 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 728 getAuthToken(request)); 729 String jobId = getResourceName(request); 730 731 try { 732 jobBean = (JsonBean) bundleEngine.getBundleJob(jobId); 733 734 return jobBean; 735 } 736 catch (BundleEngineException ex) { 737 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 738 } 739 } 740 741 /** 742 * Get coordinator action 743 * 744 * @param request servlet request 745 * @param response servlet response 746 * @return JsonBean CoordinatorActionBean 747 * @throws XServletException 748 * @throws BaseEngineException 749 */ 750 private JsonBean getCoordinatorAction(HttpServletRequest request, HttpServletResponse response) 751 throws XServletException, BaseEngineException { 752 JsonBean actionBean = null; 753 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 754 getUser(request), getAuthToken(request)); 755 String actionId = getResourceName(request); 756 try { 757 actionBean = coordEngine.getCoordAction(actionId); 758 } 759 catch (CoordinatorEngineException ex) { 760 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 761 } 762 763 return actionBean; 764 } 765 766 /** 767 * Get wf job definition 768 * 769 * @param request servlet request 770 * @param response servlet response 771 * @return String wf definition 772 * @throws XServletException 773 */ 774 private String getWorkflowJobDefinition(HttpServletRequest request, HttpServletResponse response) 775 throws XServletException { 776 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 777 getAuthToken(request)); 778 779 String wfDefinition; 780 String jobId = getResourceName(request); 781 try { 782 wfDefinition = dagEngine.getDefinition(jobId); 783 } 784 catch (DagEngineException ex) { 785 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 786 } 787 return wfDefinition; 788 } 789 790 /** 791 * Get bundle job definition 792 * 793 * @param request servlet request 794 * @param response servlet response 795 * @return String bundle definition 796 * @throws XServletException 797 */ 798 private String getBundleJobDefinition(HttpServletRequest request, HttpServletResponse response) throws XServletException { 799 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 800 getAuthToken(request)); 801 String bundleDefinition; 802 String jobId = getResourceName(request); 803 try { 804 bundleDefinition = bundleEngine.getDefinition(jobId); 805 } 806 catch (BundleEngineException ex) { 807 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 808 } 809 return bundleDefinition; 810 } 811 812 /** 813 * Get coordinator job definition 814 * 815 * @param request servlet request 816 * @param response servlet response 817 * @return String coord definition 818 * @throws XServletException 819 */ 820 private String getCoordinatorJobDefinition(HttpServletRequest request, HttpServletResponse response) 821 throws XServletException { 822 823 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 824 getUser(request), getAuthToken(request)); 825 826 String jobId = getResourceName(request); 827 828 String coordDefinition = null; 829 try { 830 coordDefinition = coordEngine.getDefinition(jobId); 831 } 832 catch (BaseEngineException ex) { 833 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 834 } 835 return coordDefinition; 836 } 837 838 /** 839 * Stream wf job log 840 * 841 * @param request servlet request 842 * @param response servlet response 843 * @throws XServletException 844 * @throws IOException 845 */ 846 private void streamWorkflowJobLog(HttpServletRequest request, HttpServletResponse response) 847 throws XServletException, IOException { 848 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 849 getAuthToken(request)); 850 String jobId = getResourceName(request); 851 try { 852 dagEngine.streamLog(jobId, response.getWriter()); 853 } 854 catch (DagEngineException ex) { 855 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 856 } 857 } 858 859 /** 860 * Stream bundle job log 861 * 862 * @param request servlet request 863 * @param response servlet response 864 * @throws XServletException 865 */ 866 private void streamBundleJob(HttpServletRequest request, HttpServletResponse response) 867 throws XServletException, IOException { 868 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 869 getAuthToken(request)); 870 String jobId = getResourceName(request); 871 try { 872 bundleEngine.streamLog(jobId, response.getWriter()); 873 } 874 catch (BundleEngineException ex) { 875 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 876 } 877 } 878 879 /** 880 * Stream coordinator job log 881 * 882 * @param request servlet request 883 * @param response servlet response 884 * @throws XServletException 885 * @throws IOException 886 */ 887 private void streamCoordinatorJobLog(HttpServletRequest request, HttpServletResponse response) 888 throws XServletException, IOException { 889 890 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 891 getUser(request), getAuthToken(request)); 892 String jobId = getResourceName(request); 893 String logRetrievalScope = request.getParameter(RestConstants.JOB_LOG_SCOPE_PARAM); 894 String logRetrievalType = request.getParameter(RestConstants.JOB_LOG_TYPE_PARAM); 895 try { 896 coordEngine.streamLog(jobId, logRetrievalScope, logRetrievalType, response.getWriter()); 897 } 898 catch (BaseEngineException ex) { 899 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 900 } 901 catch (CommandException ex) { 902 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 903 } 904 } 905 }