001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.servlet; 020 021import java.io.IOException; 022import java.util.List; 023import java.util.Locale; 024 025import javax.servlet.ServletInputStream; 026import javax.servlet.http.HttpServletRequest; 027import javax.servlet.http.HttpServletResponse; 028 029import com.google.common.base.Strings; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.oozie.*; 032import org.apache.oozie.client.WorkflowAction; 033import org.apache.oozie.client.WorkflowJob; 034import org.apache.oozie.client.rest.*; 035import org.apache.oozie.command.CommandException; 036import org.apache.oozie.coord.CoordUtils; 037import org.apache.oozie.service.BundleEngineService; 038import org.apache.oozie.service.ConfigurationService; 039import org.apache.oozie.service.CoordinatorEngineService; 040import org.apache.oozie.service.DagEngineService; 041import org.apache.oozie.service.Services; 042import org.apache.oozie.service.UUIDService; 043import org.apache.oozie.util.Instrumentation; 044import org.apache.oozie.util.graph.GraphGenerator; 045import org.apache.oozie.util.XLog; 046import org.apache.oozie.util.graph.GraphRenderer; 047import org.apache.oozie.util.graph.GraphvizRenderer; 048import org.apache.oozie.util.graph.OutputFormat; 049import org.json.simple.JSONArray; 050import org.json.simple.JSONObject; 051 052 053@SuppressWarnings("serial") 054public class V1JobServlet extends BaseJobServlet { 055 056 private static final String INSTRUMENTATION_NAME = "v1job"; 057 public static final String COORD_ACTIONS_DEFAULT_LENGTH = "oozie.coord.actions.default.length"; 058 059 final static String NOT_SUPPORTED_MESSAGE = "Not supported in v1"; 060 061 062 public V1JobServlet() { 063 super(INSTRUMENTATION_NAME); 064 } 065 066 protected V1JobServlet(String instrumentation_name){ 067 super(instrumentation_name); 068 } 069 070 /* 071 * protected method to start a job 072 */ 073 @Override 074 protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 075 IOException { 076 /* 077 * Configuration conf = new XConfiguration(request.getInputStream()); 078 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 079 * conf.get(OozieClient.COORDINATOR_APP_PATH); 080 * 081 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 082 */ 083 String jobId = getResourceName(request); 084 if (jobId.endsWith("-W")) { 085 startWorkflowJob(request, response); 086 } 087 else if (jobId.endsWith("-B")) { 088 startBundleJob(request, response); 089 } 090 else { 091 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, RestConstants.ACTION_PARAM, RestConstants.JOB_ACTION_START); 092 } 093 094 } 095 096 /* 097 * protected method to resume a job 098 */ 099 @Override 100 protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 101 IOException { 102 /* 103 * Configuration conf = new XConfiguration(request.getInputStream()); 104 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 105 * conf.get(OozieClient.COORDINATOR_APP_PATH); 106 * 107 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 108 */ 109 String jobId = getResourceName(request); 110 if (jobId.endsWith("-W")) { 111 resumeWorkflowJob(request, response); 112 } 113 else if (jobId.endsWith("-B")) { 114 resumeBundleJob(request, response); 115 } 116 else { 117 resumeCoordinatorJob(request, response); 118 } 119 } 120 121 /* 122 * protected method to suspend a job 123 */ 124 @Override 125 protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 126 IOException { 127 /* 128 * Configuration conf = new XConfiguration(request.getInputStream()); 129 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 130 * conf.get(OozieClient.COORDINATOR_APP_PATH); 131 * 132 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 133 */ 134 String jobId = getResourceName(request); 135 if (jobId.endsWith("-W")) { 136 suspendWorkflowJob(request, response); 137 } 138 else if (jobId.endsWith("-B")) { 139 suspendBundleJob(request, response); 140 } 141 else { 142 suspendCoordinatorJob(request, response); 143 } 144 } 145 146 /* 147 * protected method to kill a job 148 */ 149 @Override 150 protected JSONObject killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 151 IOException { 152 /* 153 * Configuration conf = new XConfiguration(request.getInputStream()); 154 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 155 * conf.get(OozieClient.COORDINATOR_APP_PATH); 156 * 157 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 158 */ 159 String jobId = getResourceName(request); 160 JSONObject json = null; 161 if (jobId.endsWith("-W")) { 162 killWorkflowJob(request, response); 163 } 164 else if (jobId.endsWith("-B")) { 165 killBundleJob(request, response); 166 } 167 else { 168 json = killCoordinator(request, response); 169 } 170 return json; 171 } 172 173 /** 174 * protected method to change a coordinator job 175 * @param request request object 176 * @param response response object 177 * @throws XServletException 178 * @throws IOException 179 */ 180 @Override 181 protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 182 IOException { 183 String jobId = getResourceName(request); 184 if (jobId.endsWith("-B")) { 185 changeBundleJob(request, response); 186 } 187 else { 188 changeCoordinatorJob(request, response); 189 } 190 } 191 @Override 192 protected JSONObject ignoreJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { 193 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); 194 } 195 196 /* 197 * protected method to reRun a job 198 * 199 * @seeorg.apache.oozie.servlet.BaseJobServlet#reRunJob(javax.servlet.http. 200 * HttpServletRequest, javax.servlet.http.HttpServletResponse, 201 * org.apache.hadoop.conf.Configuration) 202 */ 203 @Override 204 protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 205 throws XServletException, IOException { 206 JSONObject json = null; 207 String jobId = getResourceName(request); 208 if (jobId.endsWith("-W")) { 209 reRunWorkflowJob(request, response, conf); 210 } 211 else if (jobId.endsWith("-B")) { 212 rerunBundleJob(request, response, conf); 213 } 214 else { 215 json = reRunCoordinatorActions(request, response, conf); 216 } 217 return json; 218 } 219 220 /* 221 * protected method to get a job in JsonBean representation 222 */ 223 @Override 224 protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 225 IOException, BaseEngineException { 226 ServletInputStream is = request.getInputStream(); 227 byte[] b = new byte[101]; 228 while (is.readLine(b, 0, 100) != -1) { 229 XLog.getLog(getClass()).warn("Printing :" + new String(b)); 230 } 231 232 JsonBean jobBean = null; 233 String jobId = getResourceName(request); 234 if (jobId.endsWith("-B")) { 235 jobBean = getBundleJob(request, response); 236 } 237 else { 238 if (jobId.endsWith("-W")) { 239 jobBean = getWorkflowJob(request, response); 240 } 241 else { 242 if (jobId.contains("-W@")) { 243 jobBean = getWorkflowAction(request, response); 244 } 245 else { 246 if (jobId.contains("-C@")) { 247 jobBean = getCoordinatorAction(request, response); 248 } 249 else { 250 jobBean = getCoordinatorJob(request, response); 251 } 252 } 253 } 254 } 255 256 return jobBean; 257 } 258 259 /* 260 * protected method to get a job definition in String format 261 */ 262 @Override 263 protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response) 264 throws XServletException, IOException { 265 String jobDefinition = null; 266 String jobId = getResourceName(request); 267 if (jobId.endsWith("-W")) { 268 jobDefinition = getWorkflowJobDefinition(request, response); 269 } 270 else if (jobId.endsWith("-B")) { 271 jobDefinition = getBundleJobDefinition(request, response); 272 } 273 else { 274 jobDefinition = getCoordinatorJobDefinition(request, response); 275 } 276 return jobDefinition; 277 } 278 279 /* 280 * protected method to stream a job log into response object 281 */ 282 @Override 283 protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 284 IOException { 285 try { 286 String jobId = getResourceName(request); 287 if (jobId.endsWith("-W")) { 288 streamWorkflowJobLog(request, response); 289 } 290 else if (jobId.endsWith("-B")) { 291 streamBundleJobLog(request, response); 292 } 293 else { 294 streamCoordinatorJobLog(request, response); 295 } 296 } 297 catch (Exception e) { 298 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0307, e.getMessage()); 299 } 300 } 301 302 @Override 303 protected void streamJobGraph(HttpServletRequest request, HttpServletResponse response) 304 throws XServletException, IOException { 305 String jobId = getResourceName(request); 306 if (jobId.endsWith("-W")) { 307 try { 308 309 final String showKillParameter = request.getParameter(RestConstants.JOB_SHOW_KILL_PARAM); 310 final boolean showKill = isShowKillSet(showKillParameter); 311 312 final String formatParameter = request.getParameter(RestConstants.JOB_FORMAT_PARAM); 313 final OutputFormat outputFormat = getOutputFormat(formatParameter); 314 315 final String contentType = getContentType(outputFormat); 316 317 response.setContentType(contentType); 318 319 final Instrumentation.Cron cron = new Instrumentation.Cron(); 320 cron.start(); 321 322 final GraphRenderer graphRenderer = new GraphvizRenderer(); 323 324 new GraphGenerator( 325 getWorkflowJobDefinition(request, response), 326 (WorkflowJobBean)getWorkflowJob(request, response), 327 showKill, 328 graphRenderer).write(response.getOutputStream(), outputFormat); 329 330 cron.stop(); 331 instrument(outputFormat, cron); 332 } 333 catch (final Exception e) { 334 throw new XServletException(HttpServletResponse.SC_NOT_FOUND, ErrorCode.E0307, e.getMessage(), e); 335 } 336 } 337 else { 338 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306); 339 } 340 } 341 342 private boolean isShowKillSet(final String showKillParameter) { 343 return showKillParameter != null && 344 (showKillParameter.equalsIgnoreCase("yes") || 345 showKillParameter.equals("1") || 346 showKillParameter.equalsIgnoreCase("true")); 347 } 348 349 private OutputFormat getOutputFormat(final String formatParameter) { 350 final OutputFormat outputFormat; 351 if (Strings.isNullOrEmpty(formatParameter)) { 352 outputFormat = OutputFormat.PNG; 353 } 354 else { 355 outputFormat = OutputFormat.valueOf(formatParameter.toUpperCase(Locale.getDefault())); 356 } 357 return outputFormat; 358 } 359 360 private String getContentType(final OutputFormat outputFormat) { 361 final String contentType; 362 363 switch (outputFormat) { 364 case PNG: 365 contentType = RestConstants.PNG_IMAGE_CONTENT_TYPE; 366 break; 367 case DOT: 368 contentType = RestConstants.TEXT_CONTENT_TYPE; 369 break; 370 case SVG: 371 contentType = RestConstants.SVG_IMAGE_CONTENT_TYPE; 372 break; 373 default: 374 throw new IllegalArgumentException("Unknown output format, cannot get content type: " + outputFormat); 375 } 376 377 return contentType; 378 } 379 380 private void instrument(final OutputFormat outputFormat, final Instrumentation.Cron cron) { 381 addCron(INSTRUMENTATION_NAME + "-graph", cron); 382 incrCounter(INSTRUMENTATION_NAME + "-graph", 1); 383 addCron(INSTRUMENTATION_NAME + "-graph-" + outputFormat.toString().toLowerCase(Locale.getDefault()), cron); 384 incrCounter(INSTRUMENTATION_NAME + "-graph-" + outputFormat.toString().toLowerCase(Locale.getDefault()), 1); 385 } 386 387 /** 388 * Start wf job 389 * 390 * @param request servlet request 391 * @param response servlet response 392 * @throws XServletException 393 */ 394 private void startWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 395 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 396 397 String jobId = getResourceName(request); 398 try { 399 dagEngine.start(jobId); 400 } 401 catch (DagEngineException ex) { 402 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 403 } 404 } 405 406 /** 407 * Start bundle job 408 * 409 * @param request servlet request 410 * @param response servlet response 411 * @throws XServletException 412 */ 413 private void startBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 414 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 415 String jobId = getResourceName(request); 416 try { 417 bundleEngine.start(jobId); 418 } 419 catch (BundleEngineException ex) { 420 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 421 } 422 } 423 424 /** 425 * Resume workflow job 426 * 427 * @param request servlet request 428 * @param response servlet response 429 * @throws XServletException 430 */ 431 private void resumeWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 432 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 433 434 String jobId = getResourceName(request); 435 try { 436 dagEngine.resume(jobId); 437 } 438 catch (DagEngineException ex) { 439 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 440 } 441 } 442 443 /** 444 * Resume bundle job 445 * 446 * @param request servlet request 447 * @param response servlet response 448 * @throws XServletException 449 */ 450 private void resumeBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 451 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 452 String jobId = getResourceName(request); 453 try { 454 bundleEngine.resume(jobId); 455 } 456 catch (BundleEngineException ex) { 457 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 458 } 459 } 460 461 /** 462 * Resume coordinator job 463 * 464 * @param request servlet request 465 * @param response servlet response 466 * @throws XServletException 467 * @throws CoordinatorEngineException 468 */ 469 private void resumeCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 470 throws XServletException { 471 String jobId = getResourceName(request); 472 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 473 getUser(request)); 474 try { 475 coordEngine.resume(jobId); 476 } 477 catch (CoordinatorEngineException ex) { 478 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 479 } 480 } 481 482 /** 483 * Suspend a wf job 484 * 485 * @param request servlet request 486 * @param response servlet response 487 * @throws XServletException 488 */ 489 private void suspendWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 490 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 491 492 String jobId = getResourceName(request); 493 try { 494 dagEngine.suspend(jobId); 495 } 496 catch (DagEngineException ex) { 497 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 498 } 499 } 500 501 /** 502 * Suspend bundle job 503 * 504 * @param request servlet request 505 * @param response servlet response 506 * @throws XServletException 507 */ 508 private void suspendBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 509 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 510 String jobId = getResourceName(request); 511 try { 512 bundleEngine.suspend(jobId); 513 } 514 catch (BundleEngineException ex) { 515 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 516 } 517 } 518 519 /** 520 * Suspend coordinator job 521 * 522 * @param request servlet request 523 * @param response servlet response 524 * @throws XServletException 525 */ 526 private void suspendCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 527 throws XServletException { 528 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 529 getUser(request)); 530 String jobId = getResourceName(request); 531 try { 532 coordEngine.suspend(jobId); 533 } 534 catch (CoordinatorEngineException ex) { 535 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 536 } 537 } 538 539 /** 540 * Kill a wf job 541 * @param request servlet request 542 * @param response servlet response 543 * @throws XServletException 544 */ 545 private void killWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 546 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 547 548 String jobId = getResourceName(request); 549 try { 550 dagEngine.kill(jobId); 551 } 552 catch (DagEngineException ex) { 553 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 554 } 555 } 556 557 /** 558 * Kill a coord job 559 * 560 * @param request servlet request 561 * @param response servlet response 562 * @throws XServletException 563 */ 564 @SuppressWarnings("unchecked") 565 private JSONObject killCoordinator(HttpServletRequest request, HttpServletResponse response) throws XServletException { 566 String jobId = getResourceName(request); 567 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class) 568 .getCoordinatorEngine(getUser(request)); 569 JSONObject json = null; 570 String rangeType = request.getParameter(RestConstants.JOB_COORD_RANGE_TYPE_PARAM); 571 String scope = request.getParameter(RestConstants.JOB_COORD_SCOPE_PARAM); 572 573 try { 574 if (rangeType != null && scope != null) { 575 XLog.getLog(getClass()).info( 576 "Kill coordinator actions for jobId=" + jobId + ", rangeType=" + rangeType + ",scope=" + scope); 577 578 json = new JSONObject(); 579 CoordinatorActionInfo coordInfo = coordEngine.killActions(jobId, rangeType, scope); 580 List<CoordinatorActionBean> coordActions; 581 if (coordInfo != null) { 582 coordActions = coordInfo.getCoordActions(); 583 } 584 else { 585 coordActions = CoordUtils.getCoordActions(rangeType, jobId, scope, true); 586 } 587 json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT")); 588 } 589 else { 590 coordEngine.kill(jobId); 591 } 592 } 593 catch (CoordinatorEngineException ex) { 594 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 595 } 596 catch (CommandException ex) { 597 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 598 } 599 return json; 600 } 601 602 /** 603 * Kill bundle job 604 * 605 * @param request servlet request 606 * @param response servlet response 607 * @throws XServletException 608 */ 609 private void killBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 610 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 611 String jobId = getResourceName(request); 612 try { 613 bundleEngine.kill(jobId); 614 } 615 catch (BundleEngineException ex) { 616 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 617 } 618 } 619 620 /** 621 * Change a coordinator job 622 * 623 * @param request servlet request 624 * @param response servlet response 625 * @throws XServletException 626 */ 627 private void changeCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 628 throws XServletException { 629 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 630 getUser(request)); 631 String jobId = getResourceName(request); 632 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE); 633 try { 634 coordEngine.change(jobId, changeValue); 635 } 636 catch (CoordinatorEngineException ex) { 637 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 638 } 639 } 640 641 /** 642 * Change a bundle job 643 * 644 * @param request servlet request 645 * @param response servlet response 646 * @throws XServletException 647 */ 648 private void changeBundleJob(HttpServletRequest request, HttpServletResponse response) 649 throws XServletException { 650 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 651 String jobId = getResourceName(request); 652 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE); 653 try { 654 bundleEngine.change(jobId, changeValue); 655 } 656 catch (BundleEngineException ex) { 657 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 658 } 659 } 660 661 /** 662 * Rerun a wf job 663 * 664 * @param request servlet request 665 * @param response servlet response 666 * @param conf configuration object 667 * @throws XServletException 668 */ 669 private void reRunWorkflowJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 670 throws XServletException { 671 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 672 673 String jobId = getResourceName(request); 674 try { 675 dagEngine.reRun(jobId, conf); 676 } 677 catch (DagEngineException ex) { 678 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 679 } 680 } 681 682 /** 683 * Rerun bundle job 684 * 685 * @param request servlet request 686 * @param response servlet response 687 * @param conf configration object 688 * @throws XServletException 689 */ 690 private void rerunBundleJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 691 throws XServletException { 692 JSONObject json = new JSONObject(); 693 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 694 String jobId = getResourceName(request); 695 696 String coordScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM); 697 String dateScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM); 698 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM); 699 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM); 700 701 XLog.getLog(getClass()).info( 702 "Rerun Bundle for jobId=" + jobId + ", coordScope=" + coordScope + ", dateScope=" + dateScope + ", refresh=" 703 + refresh + ", noCleanup=" + noCleanup); 704 705 try { 706 bundleEngine.reRun(jobId, coordScope, dateScope, Boolean.valueOf(refresh), Boolean.valueOf(noCleanup)); 707 } 708 catch (BaseEngineException ex) { 709 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 710 } 711 } 712 713 /** 714 * Rerun coordinator actions 715 * 716 * @param request servlet request 717 * @param response servlet response 718 * @param conf configuration object 719 * @throws XServletException 720 */ 721 @SuppressWarnings("unchecked") 722 private JSONObject reRunCoordinatorActions(HttpServletRequest request, HttpServletResponse response, 723 Configuration conf) throws XServletException { 724 JSONObject json = new JSONObject(); 725 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request)); 726 727 String jobId = getResourceName(request); 728 729 String rerunType = request.getParameter(RestConstants.JOB_COORD_RANGE_TYPE_PARAM); 730 String scope = request.getParameter(RestConstants.JOB_COORD_SCOPE_PARAM); 731 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM); 732 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM); 733 String failed = request.getParameter(RestConstants.JOB_COORD_RERUN_FAILED_PARAM); 734 735 XLog.getLog(getClass()).info( 736 "Rerun coordinator for jobId=" + jobId + ", rerunType=" + rerunType + ",scope=" + scope + ",refresh=" 737 + refresh + ", noCleanup=" + noCleanup); 738 739 try { 740 if (!(rerunType.equals(RestConstants.JOB_COORD_SCOPE_DATE) || rerunType 741 .equals(RestConstants.JOB_COORD_SCOPE_ACTION))) { 742 throw new CommandException(ErrorCode.E1018, "date or action expected."); 743 } 744 CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh), 745 Boolean.valueOf(noCleanup), Boolean.valueOf(failed), conf); 746 List<CoordinatorActionBean> coordActions; 747 if (coordInfo != null) { 748 coordActions = coordInfo.getCoordActions(); 749 } 750 else { 751 coordActions = CoordUtils.getCoordActions(rerunType, jobId, scope, false); 752 } 753 json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT")); 754 } 755 catch (BaseEngineException ex) { 756 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 757 } 758 catch (CommandException ex) { 759 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 760 } 761 762 return json; 763 } 764 765 766 767 /** 768 * Get workflow job 769 * 770 * @param request servlet request 771 * @param response servlet response 772 * @return JsonBean WorkflowJobBean 773 * @throws XServletException 774 */ 775 protected JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 776 JsonBean jobBean = getWorkflowJobBean(request, response); 777 // for backward compatibility (OOZIE-1231) 778 swapMRActionID((WorkflowJob)jobBean); 779 return jobBean; 780 } 781 782 /** 783 * Get workflow job 784 * 785 * @param request servlet request 786 * @param response servlet response 787 * @return JsonBean WorkflowJobBean 788 * @throws XServletException 789 */ 790 protected JsonBean getWorkflowJobBean(HttpServletRequest request, HttpServletResponse response) throws XServletException { 791 JsonBean jobBean = null; 792 String jobId = getResourceName(request); 793 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 794 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 795 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 796 start = (start < 1) ? 1 : start; 797 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0; 798 len = (len < 1) ? Integer.MAX_VALUE : len; 799 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 800 try { 801 jobBean = (JsonBean) dagEngine.getJob(jobId, start, len); 802 } 803 catch (DagEngineException ex) { 804 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 805 } 806 return jobBean; 807 } 808 809 private void swapMRActionID(WorkflowJob wjBean) { 810 List<WorkflowAction> actions = wjBean.getActions(); 811 if (actions != null) { 812 for (WorkflowAction wa : actions) { 813 swapMRActionID(wa); 814 } 815 } 816 } 817 818 private void swapMRActionID(WorkflowAction waBean) { 819 if (waBean.getType().equals("map-reduce")) { 820 String childId = waBean.getExternalChildIDs(); 821 if (childId != null && !childId.equals("")) { 822 String consoleBase = getConsoleBase(waBean.getConsoleUrl()); 823 ((WorkflowActionBean) waBean).setConsoleUrl(consoleBase + childId); 824 ((WorkflowActionBean) waBean).setExternalId(childId); 825 ((WorkflowActionBean) waBean).setExternalChildIDs(""); 826 } 827 } 828 } 829 830 private String getConsoleBase(String url) { 831 String consoleBase = null; 832 if (url.indexOf("application") != -1) { 833 consoleBase = url.split("application_[0-9]+_[0-9]+")[0]; 834 } 835 else { 836 consoleBase = url.split("job_[0-9]+_[0-9]+")[0]; 837 } 838 return consoleBase; 839 } 840 841 /** 842 * Get wf action info 843 * 844 * @param request servlet request 845 * @param response servlet response 846 * @return JsonBean WorkflowActionBean 847 * @throws XServletException 848 */ 849 protected JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response) 850 throws XServletException { 851 852 JsonBean actionBean = getWorkflowActionBean(request, response); 853 // for backward compatibility (OOZIE-1231) 854 swapMRActionID((WorkflowAction)actionBean); 855 return actionBean; 856 } 857 858 protected JsonBean getWorkflowActionBean(HttpServletRequest request, HttpServletResponse response) 859 throws XServletException { 860 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 861 862 JsonBean actionBean = null; 863 String actionId = getResourceName(request); 864 try { 865 actionBean = dagEngine.getWorkflowAction(actionId); 866 } 867 catch (BaseEngineException ex) { 868 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 869 } 870 return actionBean; 871 } 872 873 /** 874 * Get coord job info 875 * 876 * @param request servlet request 877 * @param response servlet response 878 * @return JsonBean CoordinatorJobBean 879 * @throws XServletException 880 * @throws BaseEngineException 881 */ 882 protected JsonBean getCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 883 throws XServletException, BaseEngineException { 884 JsonBean jobBean = null; 885 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 886 getUser(request)); 887 String jobId = getResourceName(request); 888 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 889 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 890 String filter = request.getParameter(RestConstants.JOB_FILTER_PARAM); 891 String orderStr = request.getParameter(RestConstants.ORDER_PARAM); 892 boolean order = (orderStr != null && orderStr.equals("desc")) ? true : false; 893 int offset = (startStr != null) ? Integer.parseInt(startStr) : 1; 894 offset = (offset < 1) ? 1 : offset; 895 // Get default number of coordinator actions to be retrieved 896 int defaultLen = ConfigurationService.getInt(COORD_ACTIONS_DEFAULT_LENGTH); 897 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0; 898 len = getCoordinatorJobLength(defaultLen, len); 899 try { 900 CoordinatorJobBean coordJob = coordEngine.getCoordJob(jobId, filter, offset, len, order); 901 jobBean = coordJob; 902 } 903 catch (CoordinatorEngineException ex) { 904 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 905 } 906 907 return jobBean; 908 } 909 910 /** 911 * Given the requested length and the default length, determine how many coordinator jobs to return. 912 * Used by {@link #getCoordinatorJob(javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)} 913 * 914 * @param defaultLen The default length 915 * @param len The requested length 916 * @return The length to use 917 */ 918 protected int getCoordinatorJobLength(int defaultLen, int len) { 919 return (len < 1) ? defaultLen : len; 920 } 921 922 /** 923 * Get bundle job info 924 * 925 * @param request servlet request 926 * @param response servlet response 927 * @return JsonBean bundle job bean 928 * @throws XServletException 929 * @throws BaseEngineException 930 */ 931 private JsonBean getBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 932 BaseEngineException { 933 JsonBean jobBean = null; 934 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 935 String jobId = getResourceName(request); 936 937 try { 938 jobBean = (JsonBean) bundleEngine.getBundleJob(jobId); 939 940 return jobBean; 941 } 942 catch (BundleEngineException ex) { 943 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 944 } 945 } 946 947 /** 948 * Get coordinator action 949 * 950 * @param request servlet request 951 * @param response servlet response 952 * @return JsonBean CoordinatorActionBean 953 * @throws XServletException 954 * @throws BaseEngineException 955 */ 956 private JsonBean getCoordinatorAction(HttpServletRequest request, HttpServletResponse response) 957 throws XServletException, BaseEngineException { 958 JsonBean actionBean = null; 959 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 960 getUser(request)); 961 String actionId = getResourceName(request); 962 try { 963 actionBean = coordEngine.getCoordAction(actionId); 964 } 965 catch (CoordinatorEngineException ex) { 966 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 967 } 968 969 return actionBean; 970 } 971 972 /** 973 * Get wf job definition 974 * 975 * @param request servlet request 976 * @param response servlet response 977 * @return String wf definition 978 * @throws XServletException 979 */ 980 private String getWorkflowJobDefinition(HttpServletRequest request, HttpServletResponse response) 981 throws XServletException { 982 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 983 984 String wfDefinition; 985 String jobId = getResourceName(request); 986 try { 987 wfDefinition = dagEngine.getDefinition(jobId); 988 } 989 catch (DagEngineException ex) { 990 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 991 } 992 return wfDefinition; 993 } 994 995 /** 996 * Get bundle job definition 997 * 998 * @param request servlet request 999 * @param response servlet response 1000 * @return String bundle definition 1001 * @throws XServletException 1002 */ 1003 private String getBundleJobDefinition(HttpServletRequest request, HttpServletResponse response) throws XServletException { 1004 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 1005 String bundleDefinition; 1006 String jobId = getResourceName(request); 1007 try { 1008 bundleDefinition = bundleEngine.getDefinition(jobId); 1009 } 1010 catch (BundleEngineException ex) { 1011 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 1012 } 1013 return bundleDefinition; 1014 } 1015 1016 /** 1017 * Get coordinator job definition 1018 * 1019 * @param request servlet request 1020 * @param response servlet response 1021 * @return String coord definition 1022 * @throws XServletException 1023 */ 1024 private String getCoordinatorJobDefinition(HttpServletRequest request, HttpServletResponse response) 1025 throws XServletException { 1026 1027 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 1028 getUser(request)); 1029 1030 String jobId = getResourceName(request); 1031 1032 String coordDefinition = null; 1033 try { 1034 coordDefinition = coordEngine.getDefinition(jobId); 1035 } 1036 catch (BaseEngineException ex) { 1037 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 1038 } 1039 return coordDefinition; 1040 } 1041 1042 /** 1043 * Stream wf job log 1044 * 1045 * @param request servlet request 1046 * @param response servlet response 1047 * @throws XServletException 1048 * @throws IOException 1049 */ 1050 private void streamWorkflowJobLog(HttpServletRequest request, HttpServletResponse response) 1051 throws XServletException, IOException { 1052 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 1053 String jobId = getResourceName(request); 1054 try { 1055 dagEngine.streamLog(jobId, response.getWriter(), request.getParameterMap()); 1056 } 1057 catch (BaseEngineException ex) { 1058 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 1059 } 1060 } 1061 1062 /** 1063 * Stream bundle job log 1064 * 1065 * @param request servlet request 1066 * @param response servlet response 1067 * @throws XServletException 1068 */ 1069 private void streamBundleJobLog(HttpServletRequest request, HttpServletResponse response) 1070 throws XServletException, IOException { 1071 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 1072 String jobId = getResourceName(request); 1073 try { 1074 bundleEngine.streamLog(jobId, response.getWriter(), request.getParameterMap()); 1075 } 1076 catch (BaseEngineException ex) { 1077 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 1078 } 1079 } 1080 1081 /** 1082 * Stream coordinator job log 1083 * 1084 * @param request servlet request 1085 * @param response servlet response 1086 * @throws XServletException 1087 * @throws IOException 1088 */ 1089 private void streamCoordinatorJobLog(HttpServletRequest request, HttpServletResponse response) 1090 throws XServletException, IOException { 1091 1092 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 1093 getUser(request)); 1094 String jobId = getResourceName(request); 1095 String logRetrievalScope = request.getParameter(RestConstants.JOB_LOG_SCOPE_PARAM); 1096 String logRetrievalType = request.getParameter(RestConstants.JOB_LOG_TYPE_PARAM); 1097 try { 1098 coordEngine.streamLog(jobId, logRetrievalScope, logRetrievalType, response.getWriter(), request.getParameterMap()); 1099 } 1100 catch (BaseEngineException ex) { 1101 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 1102 } 1103 catch (CommandException ex) { 1104 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 1105 } 1106 } 1107 1108 @Override 1109 protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException, 1110 IOException { 1111 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); 1112 } 1113 1114 @Override 1115 protected JSONObject getJobsByParentId(HttpServletRequest request, HttpServletResponse response) 1116 throws XServletException, IOException { 1117 JSONObject json = new JSONObject(); 1118 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class) 1119 .getCoordinatorEngine(getUser(request)); 1120 String coordActionId; 1121 String type = request.getParameter(RestConstants.JOB_COORD_RANGE_TYPE_PARAM); 1122 String scope = request.getParameter(RestConstants.JOB_COORD_SCOPE_PARAM); 1123 // for getting allruns for coordinator action - 2 alternate endpoints 1124 if (type != null && type.equals(RestConstants.JOB_COORD_SCOPE_ACTION) && scope != null) { 1125 // endpoint - oozie/v2/coord-job-id?type=action&scope=action-num&show=allruns 1126 String jobId = getResourceName(request); 1127 coordActionId = Services.get().get(UUIDService.class).generateChildId(jobId, scope); 1128 } 1129 else { 1130 // endpoint - oozie/v2/coord-action-id?show=allruns 1131 coordActionId = getResourceName(request); 1132 } 1133 try { 1134 List<WorkflowJobBean> wfs = coordEngine.getReruns(coordActionId); 1135 JSONArray array = new JSONArray(); 1136 if (wfs != null) { 1137 for (WorkflowJobBean wf : wfs) { 1138 JSONObject json1 = new JSONObject(); 1139 json1.put(JsonTags.WORKFLOW_ID, wf.getId()); 1140 json1.put(JsonTags.WORKFLOW_STATUS, wf.getStatus().toString()); 1141 json1.put(JsonTags.WORKFLOW_START_TIME, JsonUtils.formatDateRfc822(wf.getStartTime(), "GMT")); 1142 json1.put(JsonTags.WORKFLOW_ACTION_END_TIME, JsonUtils.formatDateRfc822(wf.getEndTime(), "GMT")); 1143 array.add(json1); 1144 } 1145 } 1146 json.put(JsonTags.WORKFLOWS_JOBS, array); 1147 return json; 1148 } 1149 catch (CoordinatorEngineException ex) { 1150 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 1151 } 1152 } 1153 /** 1154 * not supported for v1 1155 */ 1156 @Override 1157 protected JSONObject updateJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 1158 throws XServletException, IOException { 1159 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); 1160 } 1161 1162 @Override 1163 protected String getJobStatus(HttpServletRequest request, HttpServletResponse response) throws XServletException, 1164 IOException { 1165 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); 1166 } 1167 1168 @Override 1169 protected void streamJobErrorLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 1170 IOException { 1171 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); 1172 } 1173 @Override 1174 protected void streamJobAuditLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 1175 IOException { 1176 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); 1177 } 1178 @Override 1179 void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { 1180 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); 1181 } 1182 1183 @Override 1184 void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, 1185 IOException { 1186 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); 1187 } 1188 1189 @Override 1190 void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { 1191 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); 1192 } 1193 1194 @Override 1195 JSONObject getCoordActionMissingDependencies(HttpServletRequest request, HttpServletResponse response) 1196 throws XServletException, IOException { 1197 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); 1198 } 1199 1200 @Override 1201 JSONArray getActionRetries(HttpServletRequest request, HttpServletResponse response) throws XServletException, 1202 IOException { 1203 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); 1204 } 1205}