001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.servlet; 019 020 import java.io.IOException; 021 import java.util.List; 022 023 import javax.servlet.ServletInputStream; 024 import javax.servlet.http.HttpServletRequest; 025 import javax.servlet.http.HttpServletResponse; 026 027 import org.apache.hadoop.conf.Configuration; 028 import org.apache.oozie.BaseEngineException; 029 import org.apache.oozie.BundleEngine; 030 import org.apache.oozie.BundleEngineException; 031 import org.apache.oozie.CoordinatorActionBean; 032 import org.apache.oozie.CoordinatorActionInfo; 033 import org.apache.oozie.CoordinatorEngine; 034 import org.apache.oozie.CoordinatorEngineException; 035 import org.apache.oozie.command.CommandException; 036 import org.apache.oozie.command.coord.CoordRerunXCommand; 037 import org.apache.oozie.coord.CoordUtils; 038 import org.apache.oozie.DagEngine; 039 import org.apache.oozie.DagEngineException; 040 import org.apache.oozie.ErrorCode; 041 import org.apache.oozie.client.rest.JsonBean; 042 import org.apache.oozie.client.rest.JsonCoordinatorJob; 043 import org.apache.oozie.client.rest.JsonTags; 044 import org.apache.oozie.client.rest.RestConstants; 045 import org.apache.oozie.service.BundleEngineService; 046 import org.apache.oozie.service.CoordinatorEngineService; 047 import org.apache.oozie.service.DagEngineService; 048 import org.apache.oozie.service.Services; 049 import org.apache.oozie.util.XLog; 050 import org.json.simple.JSONObject; 051 052 @SuppressWarnings("serial") 053 public class V1JobServlet extends BaseJobServlet { 054 055 private static final String INSTRUMENTATION_NAME = "v1job"; 056 public static final String COORD_ACTIONS_DEFAULT_LENGTH = "oozie.coord.actions.default.length"; 057 058 public V1JobServlet() { 059 super(INSTRUMENTATION_NAME); 060 } 061 062 /* 063 * protected method to start a job 064 */ 065 @Override 066 protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 067 IOException { 068 /* 069 * Configuration conf = new XConfiguration(request.getInputStream()); 070 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 071 * conf.get(OozieClient.COORDINATOR_APP_PATH); 072 * 073 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 074 */ 075 String jobId = getResourceName(request); 076 if (jobId.endsWith("-W")) { 077 startWorkflowJob(request, response); 078 } 079 else if (jobId.endsWith("-B")) { 080 startBundleJob(request, response); 081 } 082 else { 083 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303); 084 } 085 086 } 087 088 /* 089 * protected method to resume a job 090 */ 091 @Override 092 protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 093 IOException { 094 /* 095 * Configuration conf = new XConfiguration(request.getInputStream()); 096 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 097 * conf.get(OozieClient.COORDINATOR_APP_PATH); 098 * 099 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 100 */ 101 String jobId = getResourceName(request); 102 if (jobId.endsWith("-W")) { 103 resumeWorkflowJob(request, response); 104 } 105 else if (jobId.endsWith("-B")) { 106 resumeBundleJob(request, response); 107 } 108 else { 109 resumeCoordinatorJob(request, response); 110 } 111 } 112 113 /* 114 * protected method to suspend a job 115 */ 116 @Override 117 protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 118 IOException { 119 /* 120 * Configuration conf = new XConfiguration(request.getInputStream()); 121 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 122 * conf.get(OozieClient.COORDINATOR_APP_PATH); 123 * 124 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 125 */ 126 String jobId = getResourceName(request); 127 if (jobId.endsWith("-W")) { 128 suspendWorkflowJob(request, response); 129 } 130 else if (jobId.endsWith("-B")) { 131 suspendBundleJob(request, response); 132 } 133 else { 134 suspendCoordinatorJob(request, response); 135 } 136 } 137 138 /* 139 * protected method to kill a job 140 */ 141 @Override 142 protected void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 143 IOException { 144 /* 145 * Configuration conf = new XConfiguration(request.getInputStream()); 146 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 147 * conf.get(OozieClient.COORDINATOR_APP_PATH); 148 * 149 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 150 */ 151 String jobId = getResourceName(request); 152 if (jobId.endsWith("-W")) { 153 killWorkflowJob(request, response); 154 } 155 else if (jobId.endsWith("-B")) { 156 killBundleJob(request, response); 157 } 158 else { 159 killCoordinatorJob(request, response); 160 } 161 } 162 163 /** 164 * protected method to change a coordinator job 165 * @param request request object 166 * @param response response object 167 * @throws XServletException 168 * @throws IOException 169 */ 170 @Override 171 protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 172 IOException { 173 String jobId = getResourceName(request); 174 if (jobId.endsWith("-B")) { 175 changeBundleJob(request, response); 176 } 177 else { 178 changeCoordinatorJob(request, response); 179 } 180 } 181 182 /* 183 * protected method to reRun a job 184 * 185 * @seeorg.apache.oozie.servlet.BaseJobServlet#reRunJob(javax.servlet.http. 186 * HttpServletRequest, javax.servlet.http.HttpServletResponse, 187 * org.apache.hadoop.conf.Configuration) 188 */ 189 @Override 190 protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 191 throws XServletException, IOException { 192 JSONObject json = null; 193 String jobId = getResourceName(request); 194 if (jobId.endsWith("-W")) { 195 reRunWorkflowJob(request, response, conf); 196 } 197 else if (jobId.endsWith("-B")) { 198 rerunBundleJob(request, response, conf); 199 } 200 else { 201 json = reRunCoordinatorActions(request, response, conf); 202 } 203 return json; 204 } 205 206 /* 207 * protected method to get a job in JsonBean representation 208 */ 209 @Override 210 protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 211 IOException, BaseEngineException { 212 ServletInputStream is = request.getInputStream(); 213 byte[] b = new byte[101]; 214 while (is.readLine(b, 0, 100) != -1) { 215 XLog.getLog(getClass()).warn("Printing :" + new String(b)); 216 } 217 218 JsonBean jobBean = null; 219 String jobId = getResourceName(request); 220 if (jobId.endsWith("-B")) { 221 jobBean = getBundleJob(request, response); 222 } 223 else { 224 if (jobId.endsWith("-W")) { 225 jobBean = getWorkflowJob(request, response); 226 } 227 else { 228 if (jobId.contains("-W@")) { 229 jobBean = getWorkflowAction(request, response); 230 } 231 else { 232 if (jobId.contains("-C@")) { 233 jobBean = getCoordinatorAction(request, response); 234 } 235 else { 236 jobBean = getCoordinatorJob(request, response); 237 } 238 } 239 } 240 } 241 242 return jobBean; 243 } 244 245 /* 246 * protected method to get a job definition in String format 247 */ 248 @Override 249 protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response) 250 throws XServletException, IOException { 251 String jobDefinition = null; 252 String jobId = getResourceName(request); 253 if (jobId.endsWith("-W")) { 254 jobDefinition = getWorkflowJobDefinition(request, response); 255 } 256 else if (jobId.endsWith("-B")) { 257 jobDefinition = getBundleJobDefinition(request, response); 258 } 259 else { 260 jobDefinition = getCoordinatorJobDefinition(request, response); 261 } 262 return jobDefinition; 263 } 264 265 /* 266 * protected method to stream a job log into response object 267 */ 268 @Override 269 protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 270 IOException { 271 String jobId = getResourceName(request); 272 if (jobId.endsWith("-W")) { 273 streamWorkflowJobLog(request, response); 274 } 275 else if (jobId.endsWith("-B")) { 276 streamBundleJob(request, response); 277 } 278 else { 279 streamCoordinatorJobLog(request, response); 280 } 281 } 282 283 /** 284 * Start wf job 285 * 286 * @param request servlet request 287 * @param response servlet response 288 * @throws XServletException 289 */ 290 private void startWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 291 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 292 getAuthToken(request)); 293 294 String jobId = getResourceName(request); 295 try { 296 dagEngine.start(jobId); 297 } 298 catch (DagEngineException ex) { 299 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 300 } 301 } 302 303 /** 304 * Start bundle job 305 * 306 * @param request servlet request 307 * @param response servlet response 308 * @throws XServletException 309 */ 310 private void startBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 311 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 312 getAuthToken(request)); 313 String jobId = getResourceName(request); 314 try { 315 bundleEngine.start(jobId); 316 } 317 catch (BundleEngineException ex) { 318 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 319 } 320 } 321 322 /** 323 * Resume workflow job 324 * 325 * @param request servlet request 326 * @param response servlet response 327 * @throws XServletException 328 */ 329 private void resumeWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 330 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 331 getAuthToken(request)); 332 333 String jobId = getResourceName(request); 334 try { 335 dagEngine.resume(jobId); 336 } 337 catch (DagEngineException ex) { 338 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 339 } 340 } 341 342 /** 343 * Resume bundle job 344 * 345 * @param request servlet request 346 * @param response servlet response 347 * @throws XServletException 348 */ 349 private void resumeBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 350 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 351 getAuthToken(request)); 352 String jobId = getResourceName(request); 353 try { 354 bundleEngine.resume(jobId); 355 } 356 catch (BundleEngineException ex) { 357 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 358 } 359 } 360 361 /** 362 * Resume coordinator job 363 * 364 * @param request servlet request 365 * @param response servlet response 366 * @throws XServletException 367 * @throws CoordinatorEngineException 368 */ 369 private void resumeCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 370 throws XServletException { 371 String jobId = getResourceName(request); 372 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 373 getUser(request), getAuthToken(request)); 374 try { 375 coordEngine.resume(jobId); 376 } 377 catch (CoordinatorEngineException ex) { 378 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 379 } 380 } 381 382 /** 383 * Suspend a wf job 384 * 385 * @param request servlet request 386 * @param response servlet response 387 * @throws XServletException 388 */ 389 private void suspendWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 390 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 391 getAuthToken(request)); 392 393 String jobId = getResourceName(request); 394 try { 395 dagEngine.suspend(jobId); 396 } 397 catch (DagEngineException ex) { 398 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 399 } 400 } 401 402 /** 403 * Suspend bundle job 404 * 405 * @param request servlet request 406 * @param response servlet response 407 * @throws XServletException 408 */ 409 private void suspendBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 410 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 411 getAuthToken(request)); 412 String jobId = getResourceName(request); 413 try { 414 bundleEngine.suspend(jobId); 415 } 416 catch (BundleEngineException ex) { 417 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 418 } 419 } 420 421 /** 422 * Suspend coordinator job 423 * 424 * @param request servlet request 425 * @param response servlet response 426 * @throws XServletException 427 */ 428 private void suspendCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 429 throws XServletException { 430 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 431 getUser(request), getAuthToken(request)); 432 String jobId = getResourceName(request); 433 try { 434 coordEngine.suspend(jobId); 435 } 436 catch (CoordinatorEngineException ex) { 437 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 438 } 439 } 440 441 /** 442 * Kill a wf job 443 * @param request servlet request 444 * @param response servlet response 445 * @throws XServletException 446 */ 447 private void killWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 448 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 449 getAuthToken(request)); 450 451 String jobId = getResourceName(request); 452 try { 453 dagEngine.kill(jobId); 454 } 455 catch (DagEngineException ex) { 456 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 457 } 458 } 459 460 /** 461 * Kill a coord job 462 * @param request servlet request 463 * @param response servlet response 464 * @throws XServletException 465 */ 466 private void killCoordinatorJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 467 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 468 getUser(request), getAuthToken(request)); 469 String jobId = getResourceName(request); 470 try { 471 coordEngine.kill(jobId); 472 } 473 catch (CoordinatorEngineException ex) { 474 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 475 } 476 } 477 478 /** 479 * Kill bundle job 480 * 481 * @param request servlet request 482 * @param response servlet response 483 * @throws XServletException 484 */ 485 private void killBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 486 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 487 getAuthToken(request)); 488 String jobId = getResourceName(request); 489 try { 490 bundleEngine.kill(jobId); 491 } 492 catch (BundleEngineException ex) { 493 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 494 } 495 } 496 497 /** 498 * Change a coordinator job 499 * 500 * @param request servlet request 501 * @param response servlet response 502 * @throws XServletException 503 */ 504 private void changeCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 505 throws XServletException { 506 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 507 getUser(request), getAuthToken(request)); 508 String jobId = getResourceName(request); 509 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE); 510 try { 511 coordEngine.change(jobId, changeValue); 512 } 513 catch (CoordinatorEngineException ex) { 514 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 515 } 516 } 517 518 /** 519 * Change a bundle job 520 * 521 * @param request servlet request 522 * @param response servlet response 523 * @throws XServletException 524 */ 525 private void changeBundleJob(HttpServletRequest request, HttpServletResponse response) 526 throws XServletException { 527 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine( 528 getUser(request), getAuthToken(request)); 529 String jobId = getResourceName(request); 530 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE); 531 try { 532 bundleEngine.change(jobId, changeValue); 533 } 534 catch (BundleEngineException ex) { 535 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 536 } 537 } 538 539 /** 540 * Rerun a wf job 541 * 542 * @param request servlet request 543 * @param response servlet response 544 * @param conf configuration object 545 * @throws XServletException 546 */ 547 private void reRunWorkflowJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 548 throws XServletException { 549 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 550 getAuthToken(request)); 551 552 String jobId = getResourceName(request); 553 try { 554 dagEngine.reRun(jobId, conf); 555 } 556 catch (DagEngineException ex) { 557 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 558 } 559 } 560 561 /** 562 * Rerun bundle job 563 * 564 * @param request servlet request 565 * @param response servlet response 566 * @param conf configration object 567 * @throws XServletException 568 */ 569 private void rerunBundleJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 570 throws XServletException { 571 JSONObject json = new JSONObject(); 572 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 573 getAuthToken(request)); 574 String jobId = getResourceName(request); 575 576 String coordScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM); 577 String dateScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM); 578 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM); 579 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM); 580 581 XLog.getLog(getClass()).info( 582 "Rerun Bundle for jobId=" + jobId + ", coordScope=" + coordScope + ", dateScope=" + dateScope + ", refresh=" 583 + refresh + ", noCleanup=" + noCleanup); 584 585 try { 586 bundleEngine.reRun(jobId, coordScope, dateScope, Boolean.valueOf(refresh), Boolean.valueOf(noCleanup)); 587 } 588 catch (BaseEngineException ex) { 589 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 590 } 591 } 592 593 /** 594 * Rerun coordinator actions 595 * 596 * @param request servlet request 597 * @param response servlet response 598 * @param conf configuration object 599 * @throws XServletException 600 */ 601 @SuppressWarnings("unchecked") 602 private JSONObject reRunCoordinatorActions(HttpServletRequest request, HttpServletResponse response, 603 Configuration conf) throws XServletException { 604 JSONObject json = new JSONObject(); 605 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request), 606 getAuthToken(request)); 607 608 String jobId = getResourceName(request); 609 610 String rerunType = request.getParameter(RestConstants.JOB_COORD_RERUN_TYPE_PARAM); 611 String scope = request.getParameter(RestConstants.JOB_COORD_RERUN_SCOPE_PARAM); 612 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM); 613 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM); 614 615 XLog.getLog(getClass()).info( 616 "Rerun coordinator for jobId=" + jobId + ", rerunType=" + rerunType + ",scope=" + scope + ",refresh=" 617 + refresh + ", noCleanup=" + noCleanup); 618 619 try { 620 if (!(rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE) || rerunType 621 .equals(RestConstants.JOB_COORD_RERUN_ACTION))) { 622 throw new CommandException(ErrorCode.E1018, "date or action expected."); 623 } 624 CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh), 625 Boolean.valueOf(noCleanup)); 626 List<CoordinatorActionBean> coordActions; 627 if (coordInfo != null) { 628 coordActions = coordInfo.getCoordActions(); 629 } 630 else { 631 coordActions = CoordRerunXCommand.getCoordActions(rerunType, jobId, scope); 632 } 633 json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions)); 634 } 635 catch (BaseEngineException ex) { 636 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 637 } 638 catch (CommandException ex) { 639 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 640 } 641 642 return json; 643 } 644 645 646 647 /** 648 * Get workflow job 649 * 650 * @param request servlet request 651 * @param response servlet response 652 * @return JsonBean WorkflowJobBean 653 * @throws XServletException 654 */ 655 private JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 656 JsonBean jobBean = null; 657 String jobId = getResourceName(request); 658 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 659 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 660 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 661 start = (start < 1) ? 1 : start; 662 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0; 663 len = (len < 1) ? Integer.MAX_VALUE : len; 664 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 665 getAuthToken(request)); 666 try { 667 jobBean = (JsonBean) dagEngine.getJob(jobId, start, len); 668 } 669 catch (DagEngineException ex) { 670 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 671 } 672 673 return jobBean; 674 } 675 676 /** 677 * Get wf action info 678 * 679 * @param request servlet request 680 * @param response servlet response 681 * @return JsonBean WorkflowActionBean 682 * @throws XServletException 683 */ 684 private JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response) 685 throws XServletException { 686 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 687 getAuthToken(request)); 688 689 JsonBean actionBean = null; 690 String actionId = getResourceName(request); 691 try { 692 actionBean = dagEngine.getWorkflowAction(actionId); 693 } 694 catch (BaseEngineException ex) { 695 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 696 } 697 698 return actionBean; 699 } 700 701 /** 702 * Get coord job info 703 * 704 * @param request servlet request 705 * @param response servlet response 706 * @return JsonBean CoordinatorJobBean 707 * @throws XServletException 708 * @throws BaseEngineException 709 */ 710 private JsonBean getCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 711 throws XServletException, BaseEngineException { 712 JsonBean jobBean = null; 713 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 714 getUser(request), getAuthToken(request)); 715 String jobId = getResourceName(request); 716 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 717 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 718 String filter = request.getParameter(RestConstants.JOB_FILTER_PARAM); 719 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 720 start = (start < 1) ? 1 : start; 721 // Get default number of coordinator actions to be retrieved 722 int defaultLen = Services.get().getConf().getInt(COORD_ACTIONS_DEFAULT_LENGTH, 1000); 723 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0; 724 len = (len < 1) ? defaultLen : len; 725 try { 726 JsonCoordinatorJob coordJob = coordEngine.getCoordJob(jobId, filter, start, len); 727 jobBean = coordJob; 728 } 729 catch (CoordinatorEngineException ex) { 730 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 731 } 732 733 return jobBean; 734 } 735 736 /** 737 * Get bundle job info 738 * 739 * @param request servlet request 740 * @param response servlet response 741 * @return JsonBean bundle job bean 742 * @throws XServletException 743 * @throws BaseEngineException 744 */ 745 private JsonBean getBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 746 BaseEngineException { 747 JsonBean jobBean = null; 748 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 749 getAuthToken(request)); 750 String jobId = getResourceName(request); 751 752 try { 753 jobBean = (JsonBean) bundleEngine.getBundleJob(jobId); 754 755 return jobBean; 756 } 757 catch (BundleEngineException ex) { 758 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 759 } 760 } 761 762 /** 763 * Get coordinator action 764 * 765 * @param request servlet request 766 * @param response servlet response 767 * @return JsonBean CoordinatorActionBean 768 * @throws XServletException 769 * @throws BaseEngineException 770 */ 771 private JsonBean getCoordinatorAction(HttpServletRequest request, HttpServletResponse response) 772 throws XServletException, BaseEngineException { 773 JsonBean actionBean = null; 774 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 775 getUser(request), getAuthToken(request)); 776 String actionId = getResourceName(request); 777 try { 778 actionBean = coordEngine.getCoordAction(actionId); 779 } 780 catch (CoordinatorEngineException ex) { 781 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 782 } 783 784 return actionBean; 785 } 786 787 /** 788 * Get wf job definition 789 * 790 * @param request servlet request 791 * @param response servlet response 792 * @return String wf definition 793 * @throws XServletException 794 */ 795 private String getWorkflowJobDefinition(HttpServletRequest request, HttpServletResponse response) 796 throws XServletException { 797 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 798 getAuthToken(request)); 799 800 String wfDefinition; 801 String jobId = getResourceName(request); 802 try { 803 wfDefinition = dagEngine.getDefinition(jobId); 804 } 805 catch (DagEngineException ex) { 806 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 807 } 808 return wfDefinition; 809 } 810 811 /** 812 * Get bundle job definition 813 * 814 * @param request servlet request 815 * @param response servlet response 816 * @return String bundle definition 817 * @throws XServletException 818 */ 819 private String getBundleJobDefinition(HttpServletRequest request, HttpServletResponse response) throws XServletException { 820 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 821 getAuthToken(request)); 822 String bundleDefinition; 823 String jobId = getResourceName(request); 824 try { 825 bundleDefinition = bundleEngine.getDefinition(jobId); 826 } 827 catch (BundleEngineException ex) { 828 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 829 } 830 return bundleDefinition; 831 } 832 833 /** 834 * Get coordinator job definition 835 * 836 * @param request servlet request 837 * @param response servlet response 838 * @return String coord definition 839 * @throws XServletException 840 */ 841 private String getCoordinatorJobDefinition(HttpServletRequest request, HttpServletResponse response) 842 throws XServletException { 843 844 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 845 getUser(request), getAuthToken(request)); 846 847 String jobId = getResourceName(request); 848 849 String coordDefinition = null; 850 try { 851 coordDefinition = coordEngine.getDefinition(jobId); 852 } 853 catch (BaseEngineException ex) { 854 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 855 } 856 return coordDefinition; 857 } 858 859 /** 860 * Stream wf job log 861 * 862 * @param request servlet request 863 * @param response servlet response 864 * @throws XServletException 865 * @throws IOException 866 */ 867 private void streamWorkflowJobLog(HttpServletRequest request, HttpServletResponse response) 868 throws XServletException, IOException { 869 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 870 getAuthToken(request)); 871 String jobId = getResourceName(request); 872 try { 873 dagEngine.streamLog(jobId, response.getWriter()); 874 } 875 catch (DagEngineException ex) { 876 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 877 } 878 } 879 880 /** 881 * Stream bundle job log 882 * 883 * @param request servlet request 884 * @param response servlet response 885 * @throws XServletException 886 */ 887 private void streamBundleJob(HttpServletRequest request, HttpServletResponse response) 888 throws XServletException, IOException { 889 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 890 getAuthToken(request)); 891 String jobId = getResourceName(request); 892 try { 893 bundleEngine.streamLog(jobId, response.getWriter()); 894 } 895 catch (BundleEngineException ex) { 896 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 897 } 898 } 899 900 /** 901 * Stream coordinator job log 902 * 903 * @param request servlet request 904 * @param response servlet response 905 * @throws XServletException 906 * @throws IOException 907 */ 908 private void streamCoordinatorJobLog(HttpServletRequest request, HttpServletResponse response) 909 throws XServletException, IOException { 910 911 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 912 getUser(request), getAuthToken(request)); 913 String jobId = getResourceName(request); 914 String logRetrievalScope = request.getParameter(RestConstants.JOB_LOG_SCOPE_PARAM); 915 String logRetrievalType = request.getParameter(RestConstants.JOB_LOG_TYPE_PARAM); 916 try { 917 coordEngine.streamLog(jobId, logRetrievalScope, logRetrievalType, response.getWriter()); 918 } 919 catch (BaseEngineException ex) { 920 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 921 } 922 catch (CommandException ex) { 923 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 924 } 925 } 926 }