001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.servlet; 019 020import java.io.IOException; 021import java.util.List; 022 023import javax.servlet.ServletInputStream; 024import javax.servlet.http.HttpServletRequest; 025import javax.servlet.http.HttpServletResponse; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.oozie.*; 029import org.apache.oozie.client.WorkflowAction; 030import org.apache.oozie.client.WorkflowJob; 031import org.apache.oozie.client.rest.*; 032import org.apache.oozie.command.CommandException; 033import org.apache.oozie.coord.CoordUtils; 034import org.apache.oozie.service.BundleEngineService; 035import org.apache.oozie.service.CoordinatorEngineService; 036import org.apache.oozie.service.DagEngineService; 037import org.apache.oozie.service.Services; 038import org.apache.oozie.service.UUIDService; 039import org.apache.oozie.util.GraphGenerator; 040import org.apache.oozie.util.XLog; 041import org.json.simple.JSONArray; 042import org.json.simple.JSONObject; 043 044 045@SuppressWarnings("serial") 046public class V1JobServlet extends BaseJobServlet { 047 048 private static final String INSTRUMENTATION_NAME = "v1job"; 049 public static final String COORD_ACTIONS_DEFAULT_LENGTH = "oozie.coord.actions.default.length"; 050 051 public V1JobServlet() { 052 super(INSTRUMENTATION_NAME); 053 } 054 055 protected V1JobServlet(String instrumentation_name){ 056 super(instrumentation_name); 057 } 058 059 /* 060 * protected method to start a job 061 */ 062 @Override 063 protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 064 IOException { 065 /* 066 * Configuration conf = new XConfiguration(request.getInputStream()); 067 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 068 * conf.get(OozieClient.COORDINATOR_APP_PATH); 069 * 070 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 071 */ 072 String jobId = getResourceName(request); 073 if (jobId.endsWith("-W")) { 074 startWorkflowJob(request, response); 075 } 076 else if (jobId.endsWith("-B")) { 077 startBundleJob(request, response); 078 } 079 else { 080 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, RestConstants.ACTION_PARAM, RestConstants.JOB_ACTION_START); 081 } 082 083 } 084 085 /* 086 * protected method to resume a job 087 */ 088 @Override 089 protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 090 IOException { 091 /* 092 * Configuration conf = new XConfiguration(request.getInputStream()); 093 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 094 * conf.get(OozieClient.COORDINATOR_APP_PATH); 095 * 096 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 097 */ 098 String jobId = getResourceName(request); 099 if (jobId.endsWith("-W")) { 100 resumeWorkflowJob(request, response); 101 } 102 else if (jobId.endsWith("-B")) { 103 resumeBundleJob(request, response); 104 } 105 else { 106 resumeCoordinatorJob(request, response); 107 } 108 } 109 110 /* 111 * protected method to suspend a job 112 */ 113 @Override 114 protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 115 IOException { 116 /* 117 * Configuration conf = new XConfiguration(request.getInputStream()); 118 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 119 * conf.get(OozieClient.COORDINATOR_APP_PATH); 120 * 121 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 122 */ 123 String jobId = getResourceName(request); 124 if (jobId.endsWith("-W")) { 125 suspendWorkflowJob(request, response); 126 } 127 else if (jobId.endsWith("-B")) { 128 suspendBundleJob(request, response); 129 } 130 else { 131 suspendCoordinatorJob(request, response); 132 } 133 } 134 135 /* 136 * protected method to kill a job 137 */ 138 @Override 139 protected JSONObject killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 140 IOException { 141 /* 142 * Configuration conf = new XConfiguration(request.getInputStream()); 143 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 144 * conf.get(OozieClient.COORDINATOR_APP_PATH); 145 * 146 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 147 */ 148 String jobId = getResourceName(request); 149 JSONObject json = null; 150 if (jobId.endsWith("-W")) { 151 killWorkflowJob(request, response); 152 } 153 else if (jobId.endsWith("-B")) { 154 killBundleJob(request, response); 155 } 156 else { 157 json = killCoordinator(request, response); 158 } 159 return json; 160 } 161 162 /** 163 * protected method to change a coordinator job 164 * @param request request object 165 * @param response response object 166 * @throws XServletException 167 * @throws IOException 168 */ 169 @Override 170 protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 171 IOException { 172 String jobId = getResourceName(request); 173 if (jobId.endsWith("-B")) { 174 changeBundleJob(request, response); 175 } 176 else { 177 changeCoordinatorJob(request, response); 178 } 179 } 180 @Override 181 protected JSONObject ignoreJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { 182 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); 183 } 184 185 /* 186 * protected method to reRun a job 187 * 188 * @seeorg.apache.oozie.servlet.BaseJobServlet#reRunJob(javax.servlet.http. 189 * HttpServletRequest, javax.servlet.http.HttpServletResponse, 190 * org.apache.hadoop.conf.Configuration) 191 */ 192 @Override 193 protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 194 throws XServletException, IOException { 195 JSONObject json = null; 196 String jobId = getResourceName(request); 197 if (jobId.endsWith("-W")) { 198 reRunWorkflowJob(request, response, conf); 199 } 200 else if (jobId.endsWith("-B")) { 201 rerunBundleJob(request, response, conf); 202 } 203 else { 204 json = reRunCoordinatorActions(request, response, conf); 205 } 206 return json; 207 } 208 209 /* 210 * protected method to get a job in JsonBean representation 211 */ 212 @Override 213 protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 214 IOException, BaseEngineException { 215 ServletInputStream is = request.getInputStream(); 216 byte[] b = new byte[101]; 217 while (is.readLine(b, 0, 100) != -1) { 218 XLog.getLog(getClass()).warn("Printing :" + new String(b)); 219 } 220 221 JsonBean jobBean = null; 222 String jobId = getResourceName(request); 223 if (jobId.endsWith("-B")) { 224 jobBean = getBundleJob(request, response); 225 } 226 else { 227 if (jobId.endsWith("-W")) { 228 jobBean = getWorkflowJob(request, response); 229 } 230 else { 231 if (jobId.contains("-W@")) { 232 jobBean = getWorkflowAction(request, response); 233 } 234 else { 235 if (jobId.contains("-C@")) { 236 jobBean = getCoordinatorAction(request, response); 237 } 238 else { 239 jobBean = getCoordinatorJob(request, response); 240 } 241 } 242 } 243 } 244 245 return jobBean; 246 } 247 248 /* 249 * protected method to get a job definition in String format 250 */ 251 @Override 252 protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response) 253 throws XServletException, IOException { 254 String jobDefinition = null; 255 String jobId = getResourceName(request); 256 if (jobId.endsWith("-W")) { 257 jobDefinition = getWorkflowJobDefinition(request, response); 258 } 259 else if (jobId.endsWith("-B")) { 260 jobDefinition = getBundleJobDefinition(request, response); 261 } 262 else { 263 jobDefinition = getCoordinatorJobDefinition(request, response); 264 } 265 return jobDefinition; 266 } 267 268 /* 269 * protected method to stream a job log into response object 270 */ 271 @Override 272 protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 273 IOException { 274 try { 275 String jobId = getResourceName(request); 276 if (jobId.endsWith("-W")) { 277 streamWorkflowJobLog(request, response); 278 } 279 else if (jobId.endsWith("-B")) { 280 streamBundleJobLog(request, response); 281 } 282 else { 283 streamCoordinatorJobLog(request, response); 284 } 285 } 286 catch (Exception e) { 287 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0307, e.getMessage()); 288 } 289 } 290 291 @Override 292 protected void streamJobGraph(HttpServletRequest request, HttpServletResponse response) 293 throws XServletException, IOException { 294 String jobId = getResourceName(request); 295 if (jobId.endsWith("-W")) { 296 try { 297 // Applicable only to worflow, for now 298 response.setContentType(RestConstants.PNG_IMAGE_CONTENT_TYPE); 299 300 String showKill = request.getParameter(RestConstants.JOB_SHOW_KILL_PARAM); 301 boolean sK = showKill != null && (showKill.equalsIgnoreCase("yes") || showKill.equals("1") || showKill.equalsIgnoreCase("true")); 302 303 new GraphGenerator( 304 getWorkflowJobDefinition(request, response), 305 (WorkflowJobBean)getWorkflowJob(request, response), 306 sK).write(response.getOutputStream()); 307 308 } 309 catch (Exception e) { 310 throw new XServletException(HttpServletResponse.SC_NOT_FOUND, ErrorCode.E0307, e.getMessage(), e); 311 } 312 } 313 else { 314 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306); 315 } 316 } 317 318 /** 319 * Start wf job 320 * 321 * @param request servlet request 322 * @param response servlet response 323 * @throws XServletException 324 */ 325 private void startWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 326 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 327 328 String jobId = getResourceName(request); 329 try { 330 dagEngine.start(jobId); 331 } 332 catch (DagEngineException ex) { 333 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 334 } 335 } 336 337 /** 338 * Start bundle job 339 * 340 * @param request servlet request 341 * @param response servlet response 342 * @throws XServletException 343 */ 344 private void startBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 345 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 346 String jobId = getResourceName(request); 347 try { 348 bundleEngine.start(jobId); 349 } 350 catch (BundleEngineException ex) { 351 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 352 } 353 } 354 355 /** 356 * Resume workflow job 357 * 358 * @param request servlet request 359 * @param response servlet response 360 * @throws XServletException 361 */ 362 private void resumeWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 363 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 364 365 String jobId = getResourceName(request); 366 try { 367 dagEngine.resume(jobId); 368 } 369 catch (DagEngineException ex) { 370 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 371 } 372 } 373 374 /** 375 * Resume bundle job 376 * 377 * @param request servlet request 378 * @param response servlet response 379 * @throws XServletException 380 */ 381 private void resumeBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 382 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 383 String jobId = getResourceName(request); 384 try { 385 bundleEngine.resume(jobId); 386 } 387 catch (BundleEngineException ex) { 388 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 389 } 390 } 391 392 /** 393 * Resume coordinator job 394 * 395 * @param request servlet request 396 * @param response servlet response 397 * @throws XServletException 398 * @throws CoordinatorEngineException 399 */ 400 private void resumeCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 401 throws XServletException { 402 String jobId = getResourceName(request); 403 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 404 getUser(request)); 405 try { 406 coordEngine.resume(jobId); 407 } 408 catch (CoordinatorEngineException ex) { 409 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 410 } 411 } 412 413 /** 414 * Suspend a wf job 415 * 416 * @param request servlet request 417 * @param response servlet response 418 * @throws XServletException 419 */ 420 private void suspendWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 421 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 422 423 String jobId = getResourceName(request); 424 try { 425 dagEngine.suspend(jobId); 426 } 427 catch (DagEngineException ex) { 428 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 429 } 430 } 431 432 /** 433 * Suspend bundle job 434 * 435 * @param request servlet request 436 * @param response servlet response 437 * @throws XServletException 438 */ 439 private void suspendBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 440 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 441 String jobId = getResourceName(request); 442 try { 443 bundleEngine.suspend(jobId); 444 } 445 catch (BundleEngineException ex) { 446 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 447 } 448 } 449 450 /** 451 * Suspend coordinator job 452 * 453 * @param request servlet request 454 * @param response servlet response 455 * @throws XServletException 456 */ 457 private void suspendCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 458 throws XServletException { 459 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 460 getUser(request)); 461 String jobId = getResourceName(request); 462 try { 463 coordEngine.suspend(jobId); 464 } 465 catch (CoordinatorEngineException ex) { 466 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 467 } 468 } 469 470 /** 471 * Kill a wf job 472 * @param request servlet request 473 * @param response servlet response 474 * @throws XServletException 475 */ 476 private void killWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 477 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 478 479 String jobId = getResourceName(request); 480 try { 481 dagEngine.kill(jobId); 482 } 483 catch (DagEngineException ex) { 484 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 485 } 486 } 487 488 /** 489 * Kill a coord job 490 * 491 * @param request servlet request 492 * @param response servlet response 493 * @throws XServletException 494 */ 495 @SuppressWarnings("unchecked") 496 private JSONObject killCoordinator(HttpServletRequest request, HttpServletResponse response) throws XServletException { 497 String jobId = getResourceName(request); 498 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class) 499 .getCoordinatorEngine(getUser(request)); 500 JSONObject json = null; 501 String rangeType = request.getParameter(RestConstants.JOB_COORD_RANGE_TYPE_PARAM); 502 String scope = request.getParameter(RestConstants.JOB_COORD_SCOPE_PARAM); 503 504 try { 505 if (rangeType != null && scope != null) { 506 XLog.getLog(getClass()).info( 507 "Kill coordinator actions for jobId=" + jobId + ", rangeType=" + rangeType + ",scope=" + scope); 508 509 json = new JSONObject(); 510 CoordinatorActionInfo coordInfo = coordEngine.killActions(jobId, rangeType, scope); 511 List<CoordinatorActionBean> coordActions; 512 if (coordInfo != null) { 513 coordActions = coordInfo.getCoordActions(); 514 } 515 else { 516 coordActions = CoordUtils.getCoordActions(rangeType, jobId, scope, true); 517 } 518 json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT")); 519 } 520 else { 521 coordEngine.kill(jobId); 522 } 523 } 524 catch (CoordinatorEngineException ex) { 525 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 526 } 527 catch (CommandException ex) { 528 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 529 } 530 return json; 531 } 532 533 /** 534 * Kill bundle job 535 * 536 * @param request servlet request 537 * @param response servlet response 538 * @throws XServletException 539 */ 540 private void killBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 541 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 542 String jobId = getResourceName(request); 543 try { 544 bundleEngine.kill(jobId); 545 } 546 catch (BundleEngineException ex) { 547 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 548 } 549 } 550 551 /** 552 * Change a coordinator job 553 * 554 * @param request servlet request 555 * @param response servlet response 556 * @throws XServletException 557 */ 558 private void changeCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 559 throws XServletException { 560 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 561 getUser(request)); 562 String jobId = getResourceName(request); 563 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE); 564 try { 565 coordEngine.change(jobId, changeValue); 566 } 567 catch (CoordinatorEngineException ex) { 568 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 569 } 570 } 571 572 /** 573 * Change a bundle job 574 * 575 * @param request servlet request 576 * @param response servlet response 577 * @throws XServletException 578 */ 579 private void changeBundleJob(HttpServletRequest request, HttpServletResponse response) 580 throws XServletException { 581 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 582 String jobId = getResourceName(request); 583 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE); 584 try { 585 bundleEngine.change(jobId, changeValue); 586 } 587 catch (BundleEngineException ex) { 588 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 589 } 590 } 591 592 /** 593 * Rerun a wf job 594 * 595 * @param request servlet request 596 * @param response servlet response 597 * @param conf configuration object 598 * @throws XServletException 599 */ 600 private void reRunWorkflowJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 601 throws XServletException { 602 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 603 604 String jobId = getResourceName(request); 605 try { 606 dagEngine.reRun(jobId, conf); 607 } 608 catch (DagEngineException ex) { 609 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 610 } 611 } 612 613 /** 614 * Rerun bundle job 615 * 616 * @param request servlet request 617 * @param response servlet response 618 * @param conf configration object 619 * @throws XServletException 620 */ 621 private void rerunBundleJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 622 throws XServletException { 623 JSONObject json = new JSONObject(); 624 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 625 String jobId = getResourceName(request); 626 627 String coordScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM); 628 String dateScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM); 629 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM); 630 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM); 631 632 XLog.getLog(getClass()).info( 633 "Rerun Bundle for jobId=" + jobId + ", coordScope=" + coordScope + ", dateScope=" + dateScope + ", refresh=" 634 + refresh + ", noCleanup=" + noCleanup); 635 636 try { 637 bundleEngine.reRun(jobId, coordScope, dateScope, Boolean.valueOf(refresh), Boolean.valueOf(noCleanup)); 638 } 639 catch (BaseEngineException ex) { 640 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 641 } 642 } 643 644 /** 645 * Rerun coordinator actions 646 * 647 * @param request servlet request 648 * @param response servlet response 649 * @param conf configuration object 650 * @throws XServletException 651 */ 652 @SuppressWarnings("unchecked") 653 private JSONObject reRunCoordinatorActions(HttpServletRequest request, HttpServletResponse response, 654 Configuration conf) throws XServletException { 655 JSONObject json = new JSONObject(); 656 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request)); 657 658 String jobId = getResourceName(request); 659 660 String rerunType = request.getParameter(RestConstants.JOB_COORD_RANGE_TYPE_PARAM); 661 String scope = request.getParameter(RestConstants.JOB_COORD_SCOPE_PARAM); 662 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM); 663 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM); 664 665 XLog.getLog(getClass()).info( 666 "Rerun coordinator for jobId=" + jobId + ", rerunType=" + rerunType + ",scope=" + scope + ",refresh=" 667 + refresh + ", noCleanup=" + noCleanup); 668 669 try { 670 if (!(rerunType.equals(RestConstants.JOB_COORD_SCOPE_DATE) || rerunType 671 .equals(RestConstants.JOB_COORD_SCOPE_ACTION))) { 672 throw new CommandException(ErrorCode.E1018, "date or action expected."); 673 } 674 CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh), 675 Boolean.valueOf(noCleanup)); 676 List<CoordinatorActionBean> coordActions; 677 if (coordInfo != null) { 678 coordActions = coordInfo.getCoordActions(); 679 } 680 else { 681 coordActions = CoordUtils.getCoordActions(rerunType, jobId, scope, false); 682 } 683 json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT")); 684 } 685 catch (BaseEngineException ex) { 686 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 687 } 688 catch (CommandException ex) { 689 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 690 } 691 692 return json; 693 } 694 695 696 697 /** 698 * Get workflow job 699 * 700 * @param request servlet request 701 * @param response servlet response 702 * @return JsonBean WorkflowJobBean 703 * @throws XServletException 704 */ 705 protected JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 706 JsonBean jobBean = getWorkflowJobBean(request, response); 707 // for backward compatibility (OOZIE-1231) 708 swapMRActionID((WorkflowJob)jobBean); 709 return jobBean; 710 } 711 712 /** 713 * Get workflow job 714 * 715 * @param request servlet request 716 * @param response servlet response 717 * @return JsonBean WorkflowJobBean 718 * @throws XServletException 719 */ 720 protected JsonBean getWorkflowJobBean(HttpServletRequest request, HttpServletResponse response) throws XServletException { 721 JsonBean jobBean = null; 722 String jobId = getResourceName(request); 723 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 724 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 725 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 726 start = (start < 1) ? 1 : start; 727 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0; 728 len = (len < 1) ? Integer.MAX_VALUE : len; 729 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 730 try { 731 jobBean = (JsonBean) dagEngine.getJob(jobId, start, len); 732 } 733 catch (DagEngineException ex) { 734 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 735 } 736 return jobBean; 737 } 738 739 private void swapMRActionID(WorkflowJob wjBean) { 740 List<WorkflowAction> actions = wjBean.getActions(); 741 if (actions != null) { 742 for (WorkflowAction wa : actions) { 743 swapMRActionID(wa); 744 } 745 } 746 } 747 748 private void swapMRActionID(WorkflowAction waBean) { 749 if (waBean.getType().equals("map-reduce")) { 750 String childId = waBean.getExternalChildIDs(); 751 if (childId != null && !childId.equals("")) { 752 String consoleBase = getConsoleBase(waBean.getConsoleUrl()); 753 ((WorkflowActionBean) waBean).setConsoleUrl(consoleBase + childId); 754 ((WorkflowActionBean) waBean).setExternalId(childId); 755 ((WorkflowActionBean) waBean).setExternalChildIDs(""); 756 } 757 } 758 } 759 760 private String getConsoleBase(String url) { 761 String consoleBase = null; 762 if (url.indexOf("application") != -1) { 763 consoleBase = url.split("application_[0-9]+_[0-9]+")[0]; 764 } 765 else { 766 consoleBase = url.split("job_[0-9]+_[0-9]+")[0]; 767 } 768 return consoleBase; 769 } 770 771 /** 772 * Get wf action info 773 * 774 * @param request servlet request 775 * @param response servlet response 776 * @return JsonBean WorkflowActionBean 777 * @throws XServletException 778 */ 779 protected JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response) 780 throws XServletException { 781 782 JsonBean actionBean = getWorkflowActionBean(request, response); 783 // for backward compatibility (OOZIE-1231) 784 swapMRActionID((WorkflowAction)actionBean); 785 return actionBean; 786 } 787 788 protected JsonBean getWorkflowActionBean(HttpServletRequest request, HttpServletResponse response) 789 throws XServletException { 790 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 791 792 JsonBean actionBean = null; 793 String actionId = getResourceName(request); 794 try { 795 actionBean = dagEngine.getWorkflowAction(actionId); 796 } 797 catch (BaseEngineException ex) { 798 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 799 } 800 return actionBean; 801 } 802 803 /** 804 * Get coord job info 805 * 806 * @param request servlet request 807 * @param response servlet response 808 * @return JsonBean CoordinatorJobBean 809 * @throws XServletException 810 * @throws BaseEngineException 811 */ 812 protected JsonBean getCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 813 throws XServletException, BaseEngineException { 814 JsonBean jobBean = null; 815 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 816 getUser(request)); 817 String jobId = getResourceName(request); 818 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 819 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 820 String filter = request.getParameter(RestConstants.JOB_FILTER_PARAM); 821 String orderStr = request.getParameter(RestConstants.ORDER_PARAM); 822 boolean order = (orderStr != null && orderStr.equals("desc")) ? true : false; 823 int offset = (startStr != null) ? Integer.parseInt(startStr) : 1; 824 offset = (offset < 1) ? 1 : offset; 825 // Get default number of coordinator actions to be retrieved 826 int defaultLen = Services.get().getConf().getInt(COORD_ACTIONS_DEFAULT_LENGTH, 1000); 827 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0; 828 len = getCoordinatorJobLength(defaultLen, len); 829 try { 830 CoordinatorJobBean coordJob = coordEngine.getCoordJob(jobId, filter, offset, len, order); 831 jobBean = coordJob; 832 } 833 catch (CoordinatorEngineException ex) { 834 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 835 } 836 837 return jobBean; 838 } 839 840 /** 841 * Given the requested length and the default length, determine how many coordinator jobs to return. 842 * Used by {@link #getCoordinatorJob(javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)} 843 * 844 * @param defaultLen The default length 845 * @param len The requested length 846 * @return The length to use 847 */ 848 protected int getCoordinatorJobLength(int defaultLen, int len) { 849 return (len < 1) ? defaultLen : len; 850 } 851 852 /** 853 * Get bundle job info 854 * 855 * @param request servlet request 856 * @param response servlet response 857 * @return JsonBean bundle job bean 858 * @throws XServletException 859 * @throws BaseEngineException 860 */ 861 private JsonBean getBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 862 BaseEngineException { 863 JsonBean jobBean = null; 864 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 865 String jobId = getResourceName(request); 866 867 try { 868 jobBean = (JsonBean) bundleEngine.getBundleJob(jobId); 869 870 return jobBean; 871 } 872 catch (BundleEngineException ex) { 873 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 874 } 875 } 876 877 /** 878 * Get coordinator action 879 * 880 * @param request servlet request 881 * @param response servlet response 882 * @return JsonBean CoordinatorActionBean 883 * @throws XServletException 884 * @throws BaseEngineException 885 */ 886 private JsonBean getCoordinatorAction(HttpServletRequest request, HttpServletResponse response) 887 throws XServletException, BaseEngineException { 888 JsonBean actionBean = null; 889 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 890 getUser(request)); 891 String actionId = getResourceName(request); 892 try { 893 actionBean = coordEngine.getCoordAction(actionId); 894 } 895 catch (CoordinatorEngineException ex) { 896 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 897 } 898 899 return actionBean; 900 } 901 902 /** 903 * Get wf job definition 904 * 905 * @param request servlet request 906 * @param response servlet response 907 * @return String wf definition 908 * @throws XServletException 909 */ 910 private String getWorkflowJobDefinition(HttpServletRequest request, HttpServletResponse response) 911 throws XServletException { 912 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 913 914 String wfDefinition; 915 String jobId = getResourceName(request); 916 try { 917 wfDefinition = dagEngine.getDefinition(jobId); 918 } 919 catch (DagEngineException ex) { 920 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 921 } 922 return wfDefinition; 923 } 924 925 /** 926 * Get bundle job definition 927 * 928 * @param request servlet request 929 * @param response servlet response 930 * @return String bundle definition 931 * @throws XServletException 932 */ 933 private String getBundleJobDefinition(HttpServletRequest request, HttpServletResponse response) throws XServletException { 934 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 935 String bundleDefinition; 936 String jobId = getResourceName(request); 937 try { 938 bundleDefinition = bundleEngine.getDefinition(jobId); 939 } 940 catch (BundleEngineException ex) { 941 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 942 } 943 return bundleDefinition; 944 } 945 946 /** 947 * Get coordinator job definition 948 * 949 * @param request servlet request 950 * @param response servlet response 951 * @return String coord definition 952 * @throws XServletException 953 */ 954 private String getCoordinatorJobDefinition(HttpServletRequest request, HttpServletResponse response) 955 throws XServletException { 956 957 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 958 getUser(request)); 959 960 String jobId = getResourceName(request); 961 962 String coordDefinition = null; 963 try { 964 coordDefinition = coordEngine.getDefinition(jobId); 965 } 966 catch (BaseEngineException ex) { 967 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 968 } 969 return coordDefinition; 970 } 971 972 /** 973 * Stream wf job log 974 * 975 * @param request servlet request 976 * @param response servlet response 977 * @throws XServletException 978 * @throws IOException 979 */ 980 private void streamWorkflowJobLog(HttpServletRequest request, HttpServletResponse response) 981 throws XServletException, IOException { 982 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 983 String jobId = getResourceName(request); 984 try { 985 dagEngine.streamLog(jobId, response.getWriter(), request.getParameterMap()); 986 } 987 catch (DagEngineException ex) { 988 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 989 } 990 } 991 992 /** 993 * Stream bundle job log 994 * 995 * @param request servlet request 996 * @param response servlet response 997 * @throws XServletException 998 */ 999 private void streamBundleJobLog(HttpServletRequest request, HttpServletResponse response) 1000 throws XServletException, IOException { 1001 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 1002 String jobId = getResourceName(request); 1003 try { 1004 bundleEngine.streamLog(jobId, response.getWriter(), request.getParameterMap()); 1005 } 1006 catch (BundleEngineException ex) { 1007 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 1008 } 1009 } 1010 1011 /** 1012 * Stream coordinator job log 1013 * 1014 * @param request servlet request 1015 * @param response servlet response 1016 * @throws XServletException 1017 * @throws IOException 1018 */ 1019 private void streamCoordinatorJobLog(HttpServletRequest request, HttpServletResponse response) 1020 throws XServletException, IOException { 1021 1022 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 1023 getUser(request)); 1024 String jobId = getResourceName(request); 1025 String logRetrievalScope = request.getParameter(RestConstants.JOB_LOG_SCOPE_PARAM); 1026 String logRetrievalType = request.getParameter(RestConstants.JOB_LOG_TYPE_PARAM); 1027 try { 1028 coordEngine.streamLog(jobId, logRetrievalScope, logRetrievalType, response.getWriter(), request.getParameterMap()); 1029 } 1030 catch (BaseEngineException ex) { 1031 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 1032 } 1033 catch (CommandException ex) { 1034 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 1035 } 1036 } 1037 1038 @Override 1039 protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException, 1040 IOException { 1041 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); 1042 } 1043 1044 @Override 1045 protected JSONObject getJobsByParentId(HttpServletRequest request, HttpServletResponse response) 1046 throws XServletException, IOException { 1047 JSONObject json = new JSONObject(); 1048 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class) 1049 .getCoordinatorEngine(getUser(request)); 1050 String coordActionId; 1051 String type = request.getParameter(RestConstants.JOB_COORD_RANGE_TYPE_PARAM); 1052 String scope = request.getParameter(RestConstants.JOB_COORD_SCOPE_PARAM); 1053 // for getting allruns for coordinator action - 2 alternate endpoints 1054 if (type != null && type.equals(RestConstants.JOB_COORD_SCOPE_ACTION) && scope != null) { 1055 // endpoint - oozie/v2/coord-job-id?type=action&scope=action-num&show=allruns 1056 String jobId = getResourceName(request); 1057 coordActionId = Services.get().get(UUIDService.class).generateChildId(jobId, scope); 1058 } 1059 else { 1060 // endpoint - oozie/v2/coord-action-id?show=allruns 1061 coordActionId = getResourceName(request); 1062 } 1063 try { 1064 List<WorkflowJobBean> wfs = coordEngine.getReruns(coordActionId); 1065 JSONArray array = new JSONArray(); 1066 if (wfs != null) { 1067 for (WorkflowJobBean wf : wfs) { 1068 JSONObject json1 = new JSONObject(); 1069 json1.put(JsonTags.WORKFLOW_ID, wf.getId()); 1070 json1.put(JsonTags.WORKFLOW_STATUS, wf.getStatus().toString()); 1071 json1.put(JsonTags.WORKFLOW_START_TIME, JsonUtils.formatDateRfc822(wf.getStartTime(), "GMT")); 1072 json1.put(JsonTags.WORKFLOW_ACTION_END_TIME, JsonUtils.formatDateRfc822(wf.getEndTime(), "GMT")); 1073 array.add(json1); 1074 } 1075 } 1076 json.put(JsonTags.WORKFLOWS_JOBS, array); 1077 return json; 1078 } 1079 catch (CoordinatorEngineException ex) { 1080 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 1081 } 1082 } 1083 /** 1084 * not supported for v1 1085 */ 1086 @Override 1087 protected JSONObject updateJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 1088 throws XServletException, IOException { 1089 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); 1090 } 1091}