001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.servlet; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Set; 026 027import javax.servlet.http.HttpServletRequest; 028import javax.servlet.http.HttpServletResponse; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.oozie.BaseEngineException; 032import org.apache.oozie.BulkResponseInfo; 033import org.apache.oozie.BundleEngine; 034import org.apache.oozie.BundleEngineException; 035import org.apache.oozie.BundleJobInfo; 036import org.apache.oozie.CoordinatorEngine; 037import org.apache.oozie.CoordinatorEngineException; 038import org.apache.oozie.CoordinatorJobInfo; 039import org.apache.oozie.DagEngine; 040import org.apache.oozie.DagEngineException; 041import org.apache.oozie.ErrorCode; 042import org.apache.oozie.OozieJsonFactory; 043import org.apache.oozie.WorkflowsInfo; 044import org.apache.oozie.cli.OozieCLI; 045import org.apache.oozie.client.OozieClient; 046import org.apache.oozie.client.rest.BulkResponseImpl; 047import org.apache.oozie.client.rest.JsonTags; 048import org.apache.oozie.client.rest.RestConstants; 049import org.apache.oozie.service.BundleEngineService; 050import org.apache.oozie.service.CoordinatorEngineService; 051import org.apache.oozie.service.DagEngineService; 052import org.apache.oozie.service.Services; 053import org.apache.oozie.util.XLog; 054import org.apache.oozie.util.XmlUtils; 055import org.json.simple.JSONArray; 056import org.json.simple.JSONObject; 057 058public class V1JobsServlet extends BaseJobsServlet { 059 060 private static final String INSTRUMENTATION_NAME = "v1jobs"; 061 private static final Set<String> httpJobType = new HashSet<String>(){{ 062 this.add(OozieCLI.HIVE_CMD); 063 this.add(OozieCLI.SQOOP_CMD); 064 this.add(OozieCLI.PIG_CMD); 065 this.add(OozieCLI.MR_CMD); 066 }}; 067 068 public V1JobsServlet() { 069 super(INSTRUMENTATION_NAME); 070 } 071 072 /** 073 * v1 service implementation to submit a job, either workflow or coordinator 074 */ 075 @Override 076 protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException, 077 IOException { 078 JSONObject json = null; 079 080 String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM); 081 082 if (jobType == null) { 083 String wfPath = conf.get(OozieClient.APP_PATH); 084 String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH); 085 String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH); 086 087 ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath); 088 089 if (wfPath != null) { 090 json = submitWorkflowJob(request, conf); 091 } 092 else if (coordPath != null) { 093 json = submitCoordinatorJob(request, conf); 094 } 095 else { 096 json = submitBundleJob(request, conf); 097 } 098 } 099 else { // This is a http submission job 100 if (httpJobType.contains(jobType)) { 101 json = submitHttpJob(request, conf, jobType); 102 } 103 else { 104 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 105 RestConstants.JOBTYPE_PARAM, jobType); 106 } 107 } 108 return json; 109 } 110 111 /** 112 * v1 service implementation to get a JSONObject representation of a job from its external ID 113 */ 114 @Override 115 protected JSONObject getJobIdForExternalId(HttpServletRequest request, String externalId) throws XServletException, 116 IOException { 117 JSONObject json = null; 118 /* 119 * Configuration conf = new XConfiguration(); String wfPath = 120 * conf.get(OozieClient.APP_PATH); String coordPath = 121 * conf.get(OozieClient.COORDINATOR_APP_PATH); 122 * 123 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 124 */ 125 String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM); 126 jobtype = (jobtype != null) ? jobtype : "wf"; 127 if (jobtype.contains("wf")) { 128 json = getWorkflowJobIdForExternalId(request, externalId); 129 } 130 else { 131 json = getCoordinatorJobIdForExternalId(request, externalId); 132 } 133 return json; 134 } 135 136 /** 137 * v1 service implementation to get a list of workflows, coordinators, or bundles, with filtering or interested 138 * windows embedded in the request object 139 */ 140 @Override 141 protected JSONObject getJobs(HttpServletRequest request) throws XServletException, IOException { 142 JSONObject json = null; 143 String isBulk = request.getParameter(RestConstants.JOBS_BULK_PARAM); 144 if(isBulk != null) { 145 json = getBulkJobs(request); 146 } else { 147 String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM); 148 jobtype = (jobtype != null) ? jobtype : "wf"; 149 150 if (jobtype.contains("wf")) { 151 json = getWorkflowJobs(request); 152 } 153 else if (jobtype.contains("coord")) { 154 json = getCoordinatorJobs(request); 155 } 156 else if (jobtype.contains("bundle")) { 157 json = getBundleJobs(request); 158 } 159 } 160 return json; 161 } 162 163 /** 164 * v1 service implementation to submit a workflow job 165 */ 166 @SuppressWarnings("unchecked") 167 private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException { 168 169 JSONObject json = new JSONObject(); 170 171 try { 172 String action = request.getParameter(RestConstants.ACTION_PARAM); 173 if (action != null && !action.equals(RestConstants.JOB_ACTION_START) 174 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) { 175 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 176 RestConstants.ACTION_PARAM, action); 177 } 178 boolean startJob = (action != null); 179 String user = conf.get(OozieClient.USER_NAME); 180 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user); 181 String id; 182 boolean dryrun = false; 183 if (action != null) { 184 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN)); 185 } 186 if (dryrun) { 187 id = dagEngine.dryRunSubmit(conf); 188 } 189 else { 190 id = dagEngine.submitJob(conf, startJob); 191 } 192 json.put(JsonTags.JOB_ID, id); 193 } 194 catch (BaseEngineException ex) { 195 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 196 } 197 198 return json; 199 } 200 201 /** 202 * v1 service implementation to submit a coordinator job 203 */ 204 @SuppressWarnings("unchecked") 205 private JSONObject submitCoordinatorJob(HttpServletRequest request, Configuration conf) throws XServletException { 206 207 JSONObject json = new JSONObject(); 208 XLog.getLog(getClass()).warn("submitCoordinatorJob " + XmlUtils.prettyPrint(conf).toString()); 209 try { 210 String action = request.getParameter(RestConstants.ACTION_PARAM); 211 if (action != null && !action.equals(RestConstants.JOB_ACTION_START) 212 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) { 213 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 214 RestConstants.ACTION_PARAM, action); 215 } 216 boolean startJob = (action != null); 217 String user = conf.get(OozieClient.USER_NAME); 218 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 219 user); 220 String id = null; 221 boolean dryrun = false; 222 if (action != null) { 223 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN)); 224 } 225 if (dryrun) { 226 id = coordEngine.dryRunSubmit(conf); 227 } 228 else { 229 id = coordEngine.submitJob(conf, startJob); 230 } 231 json.put(JsonTags.JOB_ID, id); 232 } 233 catch (CoordinatorEngineException ex) { 234 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 235 } 236 237 return json; 238 } 239 240 /** 241 * v1 service implementation to submit a bundle job 242 */ 243 @SuppressWarnings("unchecked") 244 private JSONObject submitBundleJob(HttpServletRequest request, Configuration conf) throws XServletException { 245 JSONObject json = new JSONObject(); 246 XLog.getLog(getClass()).warn("submitBundleJob " + XmlUtils.prettyPrint(conf).toString()); 247 try { 248 String action = request.getParameter(RestConstants.ACTION_PARAM); 249 if (action != null && !action.equals(RestConstants.JOB_ACTION_START) 250 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) { 251 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 252 RestConstants.ACTION_PARAM, action); 253 } 254 boolean startJob = (action != null); 255 String user = conf.get(OozieClient.USER_NAME); 256 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(user); 257 String id = null; 258 boolean dryrun = false; 259 if (action != null) { 260 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN)); 261 } 262 if (dryrun) { 263 id = bundleEngine.dryRunSubmit(conf); 264 } 265 else { 266 id = bundleEngine.submitJob(conf, startJob); 267 } 268 json.put(JsonTags.JOB_ID, id); 269 } 270 catch (BundleEngineException ex) { 271 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 272 } 273 274 return json; 275 } 276 277 /** 278 * v1 service implementation to get a JSONObject representation of a job from its external ID 279 */ 280 @SuppressWarnings("unchecked") 281 private JSONObject getWorkflowJobIdForExternalId(HttpServletRequest request, String externalId) 282 throws XServletException { 283 JSONObject json = new JSONObject(); 284 try { 285 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 286 String jobId = dagEngine.getJobIdForExternalId(externalId); 287 json.put(JsonTags.JOB_ID, jobId); 288 } 289 catch (DagEngineException ex) { 290 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 291 } 292 return json; 293 } 294 295 /** 296 * v1 service implementation to get a JSONObject representation of a job from its external ID 297 */ 298 private JSONObject getCoordinatorJobIdForExternalId(HttpServletRequest request, String externalId) 299 throws XServletException { 300 JSONObject json = new JSONObject(); 301 return json; 302 } 303 304 /** 305 * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the 306 * request object 307 */ 308 private JSONObject getWorkflowJobs(HttpServletRequest request) throws XServletException { 309 JSONObject json; 310 try { 311 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 312 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 313 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 314 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 315 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 316 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 317 start = (start < 1) ? 1 : start; 318 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 319 len = (len < 1) ? 50 : len; 320 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 321 WorkflowsInfo jobs = dagEngine.getJobs(filter, start, len); 322 json = OozieJsonFactory.getWFJSONObject(jobs, timeZoneId); 323 } 324 catch (DagEngineException ex) { 325 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 326 } 327 328 return json; 329 } 330 331 /** 332 * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the 333 * request object 334 */ 335 @SuppressWarnings("unchecked") 336 private JSONObject getCoordinatorJobs(HttpServletRequest request) throws XServletException { 337 JSONObject json; 338 try { 339 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 340 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 341 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 342 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 343 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 344 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 345 start = (start < 1) ? 1 : start; 346 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 347 len = (len < 1) ? 50 : len; 348 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 349 getUser(request)); 350 CoordinatorJobInfo jobs = coordEngine.getCoordJobs(filter, start, len); 351 json = OozieJsonFactory.getCoordJSONObject(jobs, timeZoneId); 352 } 353 catch (CoordinatorEngineException ex) { 354 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 355 } 356 return json; 357 } 358 359 @SuppressWarnings("unchecked") 360 private JSONObject getBundleJobs(HttpServletRequest request) throws XServletException { 361 JSONObject json; 362 try { 363 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 364 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 365 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 366 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 367 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 368 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 369 start = (start < 1) ? 1 : start; 370 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 371 len = (len < 1) ? 50 : len; 372 373 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 374 BundleJobInfo jobs = bundleEngine.getBundleJobs(filter, start, len); 375 json = OozieJsonFactory.getBundleJSONObject(jobs, timeZoneId); 376 } 377 catch (BundleEngineException ex) { 378 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 379 } 380 return json; 381 } 382 383 @SuppressWarnings("unchecked") 384 private JSONObject getBulkJobs(HttpServletRequest request) throws XServletException, IOException { 385 JSONObject json = new JSONObject(); 386 try { 387 String bulkFilter = request.getParameter(RestConstants.JOBS_BULK_PARAM); //REST API 388 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 389 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 390 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 391 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 392 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 393 start = (start < 1) ? 1 : start; 394 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 395 len = (len < 1) ? 50 : len; 396 397 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 398 BulkResponseInfo bulkResponse = bundleEngine.getBulkJobs(bulkFilter, start, len); 399 List<BulkResponseImpl> responsesToJson = bulkResponse.getResponses(); 400 401 json.put(JsonTags.BULK_RESPONSES, BulkResponseImpl.toJSONArray(responsesToJson, timeZoneId)); 402 json.put(JsonTags.BULK_RESPONSE_TOTAL, bulkResponse.getTotal()); 403 json.put(JsonTags.BULK_RESPONSE_OFFSET, bulkResponse.getStart()); 404 json.put(JsonTags.BULK_RESPONSE_LEN, bulkResponse.getLen()); 405 406 } 407 catch (BaseEngineException ex) { 408 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 409 } 410 return json; 411 } 412 413 /** 414 * service implementation to submit a http job 415 */ 416 private JSONObject submitHttpJob(HttpServletRequest request, Configuration conf, String jobType) 417 throws XServletException { 418 JSONObject json = new JSONObject(); 419 420 try { 421 String user = conf.get(OozieClient.USER_NAME); 422 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user); 423 String id = dagEngine.submitHttpJob(conf, jobType); 424 json.put(JsonTags.JOB_ID, id); 425 } 426 catch (DagEngineException ex) { 427 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 428 } 429 430 return json; 431 } 432 433 /** 434 * service implementation to bulk kill jobs 435 * @param request 436 * @param response 437 * @return bulkModifyJobs implementation to bulk kill jobs 438 * @throws XServletException 439 * @throws IOException 440 */ 441 @Override 442 protected JSONObject killJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException, 443 IOException { 444 return bulkModifyJobs(request, response); 445 } 446 447 /** 448 * service implementation to bulk suspend jobs 449 * @param request 450 * @param response 451 * @return bulkModifyJobs implementation to bulk suspend jobs 452 * @throws XServletException 453 * @throws IOException 454 */ 455 @Override 456 protected JSONObject suspendJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException, 457 IOException { 458 return bulkModifyJobs(request, response); 459 } 460 461 /** 462 * service implementation to bulk resume jobs 463 * @param request 464 * @param response 465 * @return bulkModifyJobs implementation to bulk resume jobs 466 * @throws XServletException 467 * @throws IOException 468 */ 469 @Override 470 protected JSONObject resumeJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException, 471 IOException { 472 return bulkModifyJobs(request, response); 473 } 474 475 private JSONObject bulkModifyJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException, 476 IOException { 477 String action = request.getParameter(RestConstants.ACTION_PARAM); 478 String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM); 479 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 480 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 481 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 482 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 483 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 484 485 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 486 start = (start < 1) ? 1 : start; 487 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 488 len = (len < 1) ? 50 : len; 489 490 JSONObject json; 491 List<String> ids = new ArrayList<String>(); 492 493 if (jobType.equals("wf")) { 494 WorkflowsInfo jobs; 495 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 496 try { 497 switch (action) { 498 case RestConstants.JOB_ACTION_KILL: 499 jobs = dagEngine.killJobs(filter, start, len); 500 break; 501 case RestConstants.JOB_ACTION_SUSPEND: 502 jobs = dagEngine.suspendJobs(filter, start, len); 503 break; 504 case RestConstants.JOB_ACTION_RESUME: 505 jobs = dagEngine.resumeJobs(filter, start, len); 506 break; 507 default: 508 throw new DagEngineException(ErrorCode.E0301, action); 509 } 510 } catch (DagEngineException ex) { 511 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 512 } 513 json = OozieJsonFactory.getWFJSONObject(jobs, timeZoneId); 514 } 515 else if (jobType.equals("bundle")) { 516 BundleJobInfo jobs; 517 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 518 try { 519 switch (action) { 520 case RestConstants.JOB_ACTION_KILL: 521 jobs = bundleEngine.killJobs(filter, start, len); 522 break; 523 case RestConstants.JOB_ACTION_SUSPEND: 524 jobs = bundleEngine.suspendJobs(filter, start, len); 525 break; 526 case RestConstants.JOB_ACTION_RESUME: 527 jobs = bundleEngine.resumeJobs(filter, start, len); 528 break; 529 default: 530 throw new BundleEngineException(ErrorCode.E0301, action); 531 } 532 } catch (BundleEngineException ex) { 533 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 534 } 535 json = OozieJsonFactory.getBundleJSONObject(jobs, timeZoneId); 536 } 537 else { 538 CoordinatorJobInfo jobs; 539 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class). 540 getCoordinatorEngine(getUser(request)); 541 try { 542 switch (action) { 543 case RestConstants.JOB_ACTION_KILL: 544 jobs = coordEngine.killJobs(filter, start, len); 545 break; 546 case RestConstants.JOB_ACTION_SUSPEND: 547 jobs = coordEngine.suspendJobs(filter, start, len); 548 break; 549 case RestConstants.JOB_ACTION_RESUME: 550 jobs = coordEngine.resumeJobs(filter, start, len); 551 break; 552 default: 553 throw new CoordinatorEngineException(ErrorCode.E0301, action); 554 } 555 } catch (CoordinatorEngineException ex) { 556 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 557 } 558 json = OozieJsonFactory.getCoordJSONObject(jobs, timeZoneId); 559 } 560 json.put(JsonTags.JOB_IDS, toJSONArray(ids)); 561 return json; 562 } 563 564 private static JSONArray toJSONArray(List<String> ids) { 565 JSONArray array = new JSONArray(); 566 for (String id : ids) { 567 array.add(id); 568 } 569 return array; 570 } 571}