001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.servlet; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Set; 026 027import javax.servlet.http.HttpServletRequest; 028import javax.servlet.http.HttpServletResponse; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.oozie.BaseEngineException; 032import org.apache.oozie.BulkResponseInfo; 033import org.apache.oozie.BundleJobBean; 034import org.apache.oozie.BundleJobInfo; 035import org.apache.oozie.CoordinatorEngine; 036import org.apache.oozie.BundleEngine; 037import org.apache.oozie.CoordinatorEngineException; 038import org.apache.oozie.BundleEngineException; 039import org.apache.oozie.CoordinatorJobBean; 040import org.apache.oozie.CoordinatorJobInfo; 041import org.apache.oozie.DagEngine; 042import org.apache.oozie.DagEngineException; 043import org.apache.oozie.ErrorCode; 044import org.apache.oozie.WorkflowJobBean; 045import org.apache.oozie.WorkflowsInfo; 046import org.apache.oozie.cli.OozieCLI; 047import org.apache.oozie.client.OozieClient; 048import org.apache.oozie.client.rest.BulkResponseImpl; 049import org.apache.oozie.client.rest.JsonTags; 050import org.apache.oozie.client.rest.RestConstants; 051import org.apache.oozie.service.CoordinatorEngineService; 052import org.apache.oozie.service.DagEngineService; 053import org.apache.oozie.service.BundleEngineService; 054import org.apache.oozie.service.Services; 055import org.apache.oozie.util.XLog; 056import org.apache.oozie.util.XmlUtils; 057import org.json.simple.JSONArray; 058import org.json.simple.JSONObject; 059 060public class V1JobsServlet extends BaseJobsServlet { 061 062 private static final String INSTRUMENTATION_NAME = "v1jobs"; 063 private static final Set<String> httpJobType = new HashSet<String>(){{ 064 this.add(OozieCLI.HIVE_CMD); 065 this.add(OozieCLI.SQOOP_CMD); 066 this.add(OozieCLI.PIG_CMD); 067 this.add(OozieCLI.MR_CMD); 068 }}; 069 070 public V1JobsServlet() { 071 super(INSTRUMENTATION_NAME); 072 } 073 074 /** 075 * v1 service implementation to submit a job, either workflow or coordinator 076 */ 077 @Override 078 protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException, 079 IOException { 080 JSONObject json = null; 081 082 String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM); 083 084 if (jobType == null) { 085 String wfPath = conf.get(OozieClient.APP_PATH); 086 String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH); 087 String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH); 088 089 ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath); 090 091 if (wfPath != null) { 092 json = submitWorkflowJob(request, conf); 093 } 094 else if (coordPath != null) { 095 json = submitCoordinatorJob(request, conf); 096 } 097 else { 098 json = submitBundleJob(request, conf); 099 } 100 } 101 else { // This is a http submission job 102 if (httpJobType.contains(jobType)) { 103 json = submitHttpJob(request, conf, jobType); 104 } 105 else { 106 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 107 RestConstants.JOBTYPE_PARAM, jobType); 108 } 109 } 110 return json; 111 } 112 113 /** 114 * v1 service implementation to get a JSONObject representation of a job from its external ID 115 */ 116 @Override 117 protected JSONObject getJobIdForExternalId(HttpServletRequest request, String externalId) throws XServletException, 118 IOException { 119 JSONObject json = null; 120 /* 121 * Configuration conf = new XConfiguration(); String wfPath = 122 * conf.get(OozieClient.APP_PATH); String coordPath = 123 * conf.get(OozieClient.COORDINATOR_APP_PATH); 124 * 125 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 126 */ 127 String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM); 128 jobtype = (jobtype != null) ? jobtype : "wf"; 129 if (jobtype.contains("wf")) { 130 json = getWorkflowJobIdForExternalId(request, externalId); 131 } 132 else { 133 json = getCoordinatorJobIdForExternalId(request, externalId); 134 } 135 return json; 136 } 137 138 /** 139 * v1 service implementation to get a list of workflows, coordinators, or bundles, with filtering or interested 140 * windows embedded in the request object 141 */ 142 @Override 143 protected JSONObject getJobs(HttpServletRequest request) throws XServletException, IOException { 144 JSONObject json = null; 145 String isBulk = request.getParameter(RestConstants.JOBS_BULK_PARAM); 146 if(isBulk != null) { 147 json = getBulkJobs(request); 148 } else { 149 String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM); 150 jobtype = (jobtype != null) ? jobtype : "wf"; 151 152 if (jobtype.contains("wf")) { 153 json = getWorkflowJobs(request); 154 } 155 else if (jobtype.contains("coord")) { 156 json = getCoordinatorJobs(request); 157 } 158 else if (jobtype.contains("bundle")) { 159 json = getBundleJobs(request); 160 } 161 } 162 return json; 163 } 164 165 /** 166 * v1 service implementation to submit a workflow job 167 */ 168 @SuppressWarnings("unchecked") 169 private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException { 170 171 JSONObject json = new JSONObject(); 172 173 try { 174 String action = request.getParameter(RestConstants.ACTION_PARAM); 175 if (action != null && !action.equals(RestConstants.JOB_ACTION_START) 176 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) { 177 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 178 RestConstants.ACTION_PARAM, action); 179 } 180 boolean startJob = (action != null); 181 String user = conf.get(OozieClient.USER_NAME); 182 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user); 183 String id; 184 boolean dryrun = false; 185 if (action != null) { 186 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN)); 187 } 188 if (dryrun) { 189 id = dagEngine.dryRunSubmit(conf); 190 } 191 else { 192 id = dagEngine.submitJob(conf, startJob); 193 } 194 json.put(JsonTags.JOB_ID, id); 195 } 196 catch (BaseEngineException ex) { 197 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 198 } 199 200 return json; 201 } 202 203 /** 204 * v1 service implementation to submit a coordinator job 205 */ 206 @SuppressWarnings("unchecked") 207 private JSONObject submitCoordinatorJob(HttpServletRequest request, Configuration conf) throws XServletException { 208 209 JSONObject json = new JSONObject(); 210 XLog.getLog(getClass()).warn("submitCoordinatorJob " + XmlUtils.prettyPrint(conf).toString()); 211 try { 212 String action = request.getParameter(RestConstants.ACTION_PARAM); 213 if (action != null && !action.equals(RestConstants.JOB_ACTION_START) 214 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) { 215 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 216 RestConstants.ACTION_PARAM, action); 217 } 218 boolean startJob = (action != null); 219 String user = conf.get(OozieClient.USER_NAME); 220 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 221 user); 222 String id = null; 223 boolean dryrun = false; 224 if (action != null) { 225 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN)); 226 } 227 if (dryrun) { 228 id = coordEngine.dryRunSubmit(conf); 229 } 230 else { 231 id = coordEngine.submitJob(conf, startJob); 232 } 233 json.put(JsonTags.JOB_ID, id); 234 } 235 catch (CoordinatorEngineException ex) { 236 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 237 } 238 239 return json; 240 } 241 242 /** 243 * v1 service implementation to submit a bundle job 244 */ 245 @SuppressWarnings("unchecked") 246 private JSONObject submitBundleJob(HttpServletRequest request, Configuration conf) throws XServletException { 247 JSONObject json = new JSONObject(); 248 XLog.getLog(getClass()).warn("submitBundleJob " + XmlUtils.prettyPrint(conf).toString()); 249 try { 250 String action = request.getParameter(RestConstants.ACTION_PARAM); 251 if (action != null && !action.equals(RestConstants.JOB_ACTION_START) 252 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) { 253 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 254 RestConstants.ACTION_PARAM, action); 255 } 256 boolean startJob = (action != null); 257 String user = conf.get(OozieClient.USER_NAME); 258 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(user); 259 String id = null; 260 boolean dryrun = false; 261 if (action != null) { 262 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN)); 263 } 264 if (dryrun) { 265 id = bundleEngine.dryRunSubmit(conf); 266 } 267 else { 268 id = bundleEngine.submitJob(conf, startJob); 269 } 270 json.put(JsonTags.JOB_ID, id); 271 } 272 catch (BundleEngineException ex) { 273 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 274 } 275 276 return json; 277 } 278 279 /** 280 * v1 service implementation to get a JSONObject representation of a job from its external ID 281 */ 282 @SuppressWarnings("unchecked") 283 private JSONObject getWorkflowJobIdForExternalId(HttpServletRequest request, String externalId) 284 throws XServletException { 285 JSONObject json = new JSONObject(); 286 try { 287 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 288 String jobId = dagEngine.getJobIdForExternalId(externalId); 289 json.put(JsonTags.JOB_ID, jobId); 290 } 291 catch (DagEngineException ex) { 292 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 293 } 294 return json; 295 } 296 297 /** 298 * v1 service implementation to get a JSONObject representation of a job from its external ID 299 */ 300 private JSONObject getCoordinatorJobIdForExternalId(HttpServletRequest request, String externalId) 301 throws XServletException { 302 JSONObject json = new JSONObject(); 303 return json; 304 } 305 306 /** 307 * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the 308 * request object 309 */ 310 private JSONObject getWorkflowJobs(HttpServletRequest request) throws XServletException { 311 JSONObject json = new JSONObject(); 312 try { 313 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 314 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 315 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 316 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 317 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 318 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 319 start = (start < 1) ? 1 : start; 320 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 321 len = (len < 1) ? 50 : len; 322 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 323 WorkflowsInfo jobs = dagEngine.getJobs(filter, start, len); 324 List<WorkflowJobBean> jsonWorkflows = jobs.getWorkflows(); 325 json.put(JsonTags.WORKFLOWS_JOBS, WorkflowJobBean.toJSONArray(jsonWorkflows, timeZoneId)); 326 json.put(JsonTags.WORKFLOWS_TOTAL, jobs.getTotal()); 327 json.put(JsonTags.WORKFLOWS_OFFSET, jobs.getStart()); 328 json.put(JsonTags.WORKFLOWS_LEN, jobs.getLen()); 329 330 } 331 catch (DagEngineException ex) { 332 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 333 } 334 335 return json; 336 } 337 338 /** 339 * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the 340 * request object 341 */ 342 @SuppressWarnings("unchecked") 343 private JSONObject getCoordinatorJobs(HttpServletRequest request) throws XServletException { 344 JSONObject json = new JSONObject(); 345 try { 346 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 347 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 348 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 349 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 350 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 351 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 352 start = (start < 1) ? 1 : start; 353 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 354 len = (len < 1) ? 50 : len; 355 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 356 getUser(request)); 357 CoordinatorJobInfo jobs = coordEngine.getCoordJobs(filter, start, len); 358 List<CoordinatorJobBean> jsonJobs = jobs.getCoordJobs(); 359 json.put(JsonTags.COORDINATOR_JOBS, CoordinatorJobBean.toJSONArray(jsonJobs, timeZoneId)); 360 json.put(JsonTags.COORD_JOB_TOTAL, jobs.getTotal()); 361 json.put(JsonTags.COORD_JOB_OFFSET, jobs.getStart()); 362 json.put(JsonTags.COORD_JOB_LEN, jobs.getLen()); 363 364 } 365 catch (CoordinatorEngineException ex) { 366 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 367 } 368 return json; 369 } 370 371 @SuppressWarnings("unchecked") 372 private JSONObject getBundleJobs(HttpServletRequest request) throws XServletException { 373 JSONObject json = new JSONObject(); 374 try { 375 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 376 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 377 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 378 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 379 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 380 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 381 start = (start < 1) ? 1 : start; 382 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 383 len = (len < 1) ? 50 : len; 384 385 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 386 BundleJobInfo jobs = bundleEngine.getBundleJobs(filter, start, len); 387 List<BundleJobBean> jsonJobs = jobs.getBundleJobs(); 388 389 json.put(JsonTags.BUNDLE_JOBS, BundleJobBean.toJSONArray(jsonJobs, timeZoneId)); 390 json.put(JsonTags.BUNDLE_JOB_TOTAL, jobs.getTotal()); 391 json.put(JsonTags.BUNDLE_JOB_OFFSET, jobs.getStart()); 392 json.put(JsonTags.BUNDLE_JOB_LEN, jobs.getLen()); 393 394 } 395 catch (BundleEngineException ex) { 396 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 397 } 398 return json; 399 } 400 401 @SuppressWarnings("unchecked") 402 private JSONObject getBulkJobs(HttpServletRequest request) throws XServletException, IOException { 403 JSONObject json = new JSONObject(); 404 try { 405 String bulkFilter = request.getParameter(RestConstants.JOBS_BULK_PARAM); //REST API 406 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 407 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 408 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 409 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 410 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 411 start = (start < 1) ? 1 : start; 412 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 413 len = (len < 1) ? 50 : len; 414 415 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 416 BulkResponseInfo bulkResponse = bundleEngine.getBulkJobs(bulkFilter, start, len); 417 List<BulkResponseImpl> responsesToJson = bulkResponse.getResponses(); 418 419 json.put(JsonTags.BULK_RESPONSES, BulkResponseImpl.toJSONArray(responsesToJson, timeZoneId)); 420 json.put(JsonTags.BULK_RESPONSE_TOTAL, bulkResponse.getTotal()); 421 json.put(JsonTags.BULK_RESPONSE_OFFSET, bulkResponse.getStart()); 422 json.put(JsonTags.BULK_RESPONSE_LEN, bulkResponse.getLen()); 423 424 } 425 catch (BaseEngineException ex) { 426 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 427 } 428 return json; 429 } 430 431 /** 432 * service implementation to submit a http job 433 */ 434 private JSONObject submitHttpJob(HttpServletRequest request, Configuration conf, String jobType) 435 throws XServletException { 436 JSONObject json = new JSONObject(); 437 438 try { 439 String user = conf.get(OozieClient.USER_NAME); 440 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user); 441 String id = dagEngine.submitHttpJob(conf, jobType); 442 json.put(JsonTags.JOB_ID, id); 443 } 444 catch (DagEngineException ex) { 445 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 446 } 447 448 return json; 449 } 450 451 /** 452 * service implementation to bulk kill jobs 453 * @param request 454 * @param response 455 * @return 456 * @throws XServletException 457 * @throws IOException 458 */ 459 @Override 460 protected JSONObject killJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException, 461 IOException { 462 return bulkModifyJobs(request, response); 463 } 464 465 /** 466 * service implementation to bulk suspend jobs 467 * @param request 468 * @param response 469 * @return 470 * @throws XServletException 471 * @throws IOException 472 */ 473 @Override 474 protected JSONObject suspendJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException, 475 IOException { 476 return bulkModifyJobs(request, response); 477 } 478 479 /** 480 * service implementation to bulk resume jobs 481 * @param request 482 * @param response 483 * @return 484 * @throws XServletException 485 * @throws IOException 486 */ 487 @Override 488 protected JSONObject resumeJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException, 489 IOException { 490 return bulkModifyJobs(request, response); 491 } 492 493 private JSONObject bulkModifyJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException, 494 IOException { 495 String action = request.getParameter(RestConstants.ACTION_PARAM); 496 String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM); 497 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 498 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 499 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 500 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 501 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 502 503 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 504 start = (start < 1) ? 1 : start; 505 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 506 len = (len < 1) ? 50 : len; 507 508 JSONObject json = new JSONObject(); 509 List<String> ids = new ArrayList<String>(); 510 511 if (jobType.equals("wf")) { 512 WorkflowsInfo jobs = null; 513 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 514 if (action.equals(RestConstants.JOB_ACTION_KILL)) { 515 try { 516 jobs = dagEngine.killJobs(filter, start, len); 517 } 518 catch (DagEngineException ex) { 519 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 520 } 521 } else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) { 522 try { 523 jobs = dagEngine.suspendJobs(filter, start, len); 524 } 525 catch (DagEngineException ex) { 526 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 527 } 528 } else if (action.equals(RestConstants.JOB_ACTION_RESUME)) { 529 try { 530 jobs = dagEngine.resumeJobs(filter, start, len); 531 } 532 catch (DagEngineException ex) { 533 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 534 } 535 } 536 537 json.put(JsonTags.WORKFLOWS_JOBS, WorkflowJobBean.toJSONArray(jobs.getWorkflows(), timeZoneId)); 538 json.put(JsonTags.WORKFLOWS_TOTAL, jobs.getTotal()); 539 json.put(JsonTags.WORKFLOWS_OFFSET, jobs.getStart()); 540 json.put(JsonTags.WORKFLOWS_LEN, jobs.getLen()); 541 542 } 543 else if (jobType.equals("bundle")) { 544 BundleJobInfo jobs = null; 545 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class). 546 getBundleEngine(getUser(request)); 547 if (action.equals(RestConstants.JOB_ACTION_KILL)) { 548 try { 549 jobs = bundleEngine.killJobs(filter, start, len); 550 } 551 catch (BundleEngineException ex) { 552 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 553 } 554 } 555 else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) { 556 try { 557 jobs = bundleEngine.suspendJobs(filter, start, len); 558 } 559 catch (BundleEngineException ex) { 560 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 561 } 562 } 563 else if (action.equals(RestConstants.JOB_ACTION_RESUME)) { 564 try { 565 jobs = bundleEngine.resumeJobs(filter, start, len); 566 } 567 catch (BundleEngineException ex) { 568 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 569 } 570 } 571 572 json.put(JsonTags.BUNDLE_JOBS, BundleJobBean.toJSONArray(jobs.getBundleJobs(), timeZoneId)); 573 json.put(JsonTags.BUNDLE_JOB_TOTAL, jobs.getTotal()); 574 json.put(JsonTags.BUNDLE_JOB_OFFSET, jobs.getStart()); 575 json.put(JsonTags.BUNDLE_JOB_LEN, jobs.getLen()); 576 } 577 else { 578 CoordinatorJobInfo jobs = null; 579 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class). 580 getCoordinatorEngine(getUser(request)); 581 if (action.equals(RestConstants.JOB_ACTION_KILL)) { 582 try { 583 jobs = coordEngine.killJobs(filter, start, len); 584 } 585 catch (CoordinatorEngineException ex) { 586 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 587 } 588 } 589 else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) { 590 try { 591 jobs = coordEngine.suspendJobs(filter, start, len); 592 } 593 catch (CoordinatorEngineException ex) { 594 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 595 } 596 } 597 else if (action.equals(RestConstants.JOB_ACTION_RESUME)) { 598 try { 599 jobs = coordEngine.resumeJobs(filter, start, len); 600 } 601 catch (CoordinatorEngineException ex) { 602 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 603 } 604 } 605 606 json.put(JsonTags.COORDINATOR_JOBS, CoordinatorJobBean.toJSONArray(jobs.getCoordJobs(), timeZoneId)); 607 json.put(JsonTags.COORD_JOB_TOTAL, jobs.getTotal()); 608 json.put(JsonTags.COORD_JOB_OFFSET, jobs.getStart()); 609 json.put(JsonTags.COORD_JOB_LEN, jobs.getLen()); 610 } 611 612 json.put(JsonTags.JOB_IDS, toJSONArray(ids)); 613 return json; 614 } 615 616 private static JSONArray toJSONArray(List<String> ids) { 617 JSONArray array = new JSONArray(); 618 for (String id : ids) { 619 array.add(id); 620 } 621 return array; 622 } 623}