001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.servlet; 019 020import java.io.IOException; 021import java.util.HashSet; 022import java.util.List; 023import java.util.Set; 024 025import javax.servlet.http.HttpServletRequest; 026import javax.servlet.http.HttpServletResponse; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.oozie.BaseEngineException; 030import org.apache.oozie.BulkResponseInfo; 031import org.apache.oozie.BundleJobBean; 032import org.apache.oozie.BundleJobInfo; 033import org.apache.oozie.CoordinatorEngine; 034import org.apache.oozie.BundleEngine; 035import org.apache.oozie.CoordinatorEngineException; 036import org.apache.oozie.BundleEngineException; 037import org.apache.oozie.CoordinatorJobBean; 038import org.apache.oozie.CoordinatorJobInfo; 039import org.apache.oozie.DagEngine; 040import org.apache.oozie.DagEngineException; 041import org.apache.oozie.ErrorCode; 042import org.apache.oozie.WorkflowJobBean; 043import org.apache.oozie.WorkflowsInfo; 044import org.apache.oozie.cli.OozieCLI; 045import org.apache.oozie.client.OozieClient; 046import org.apache.oozie.client.rest.BulkResponseImpl; 047import org.apache.oozie.client.rest.JsonTags; 048import org.apache.oozie.client.rest.RestConstants; 049import org.apache.oozie.service.CoordinatorEngineService; 050import org.apache.oozie.service.DagEngineService; 051import org.apache.oozie.service.BundleEngineService; 052import org.apache.oozie.service.Services; 053import org.apache.oozie.util.XLog; 054import org.apache.oozie.util.XmlUtils; 055import org.json.simple.JSONObject; 056 057public class V1JobsServlet extends BaseJobsServlet { 058 059 private static final String INSTRUMENTATION_NAME = "v1jobs"; 060 private static final Set<String> httpJobType = new HashSet<String>(){{ 061 this.add(OozieCLI.HIVE_CMD); 062 this.add(OozieCLI.SQOOP_CMD); 063 this.add(OozieCLI.PIG_CMD); 064 this.add(OozieCLI.MR_CMD); 065 }}; 066 067 public V1JobsServlet() { 068 super(INSTRUMENTATION_NAME); 069 } 070 071 /** 072 * v1 service implementation to submit a job, either workflow or coordinator 073 */ 074 @Override 075 protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException, 076 IOException { 077 JSONObject json = null; 078 079 String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM); 080 081 if (jobType == null) { 082 String wfPath = conf.get(OozieClient.APP_PATH); 083 String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH); 084 String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH); 085 086 ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath); 087 088 if (wfPath != null) { 089 json = submitWorkflowJob(request, conf); 090 } 091 else if (coordPath != null) { 092 json = submitCoordinatorJob(request, conf); 093 } 094 else { 095 json = submitBundleJob(request, conf); 096 } 097 } 098 else { // This is a http submission job 099 if (httpJobType.contains(jobType)) { 100 json = submitHttpJob(request, conf, jobType); 101 } 102 else { 103 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 104 RestConstants.JOBTYPE_PARAM, jobType); 105 } 106 } 107 return json; 108 } 109 110 /** 111 * v1 service implementation to get a JSONObject representation of a job from its external ID 112 */ 113 @Override 114 protected JSONObject getJobIdForExternalId(HttpServletRequest request, String externalId) throws XServletException, 115 IOException { 116 JSONObject json = null; 117 /* 118 * Configuration conf = new XConfiguration(); String wfPath = 119 * conf.get(OozieClient.APP_PATH); String coordPath = 120 * conf.get(OozieClient.COORDINATOR_APP_PATH); 121 * 122 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 123 */ 124 String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM); 125 jobtype = (jobtype != null) ? jobtype : "wf"; 126 if (jobtype.contains("wf")) { 127 json = getWorkflowJobIdForExternalId(request, externalId); 128 } 129 else { 130 json = getCoordinatorJobIdForExternalId(request, externalId); 131 } 132 return json; 133 } 134 135 /** 136 * v1 service implementation to get a list of workflows, coordinators, or bundles, with filtering or interested 137 * windows embedded in the request object 138 */ 139 @Override 140 protected JSONObject getJobs(HttpServletRequest request) throws XServletException, IOException { 141 JSONObject json = null; 142 String isBulk = request.getParameter(RestConstants.JOBS_BULK_PARAM); 143 if(isBulk != null) { 144 json = getBulkJobs(request); 145 } else { 146 String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM); 147 jobtype = (jobtype != null) ? jobtype : "wf"; 148 149 if (jobtype.contains("wf")) { 150 json = getWorkflowJobs(request); 151 } 152 else if (jobtype.contains("coord")) { 153 json = getCoordinatorJobs(request); 154 } 155 else if (jobtype.contains("bundle")) { 156 json = getBundleJobs(request); 157 } 158 } 159 return json; 160 } 161 162 /** 163 * v1 service implementation to submit a workflow job 164 */ 165 @SuppressWarnings("unchecked") 166 private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException { 167 168 JSONObject json = new JSONObject(); 169 170 try { 171 String action = request.getParameter(RestConstants.ACTION_PARAM); 172 if (action != null && !action.equals(RestConstants.JOB_ACTION_START) 173 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) { 174 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 175 RestConstants.ACTION_PARAM, action); 176 } 177 boolean startJob = (action != null); 178 String user = conf.get(OozieClient.USER_NAME); 179 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user); 180 String id; 181 boolean dryrun = false; 182 if (action != null) { 183 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN)); 184 } 185 if (dryrun) { 186 id = dagEngine.dryRunSubmit(conf); 187 } 188 else { 189 id = dagEngine.submitJob(conf, startJob); 190 } 191 json.put(JsonTags.JOB_ID, id); 192 } 193 catch (BaseEngineException ex) { 194 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 195 } 196 197 return json; 198 } 199 200 /** 201 * v1 service implementation to submit a coordinator job 202 */ 203 @SuppressWarnings("unchecked") 204 private JSONObject submitCoordinatorJob(HttpServletRequest request, Configuration conf) throws XServletException { 205 206 JSONObject json = new JSONObject(); 207 XLog.getLog(getClass()).warn("submitCoordinatorJob " + XmlUtils.prettyPrint(conf).toString()); 208 try { 209 String action = request.getParameter(RestConstants.ACTION_PARAM); 210 if (action != null && !action.equals(RestConstants.JOB_ACTION_START) 211 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) { 212 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 213 RestConstants.ACTION_PARAM, action); 214 } 215 boolean startJob = (action != null); 216 String user = conf.get(OozieClient.USER_NAME); 217 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 218 user); 219 String id = null; 220 boolean dryrun = false; 221 if (action != null) { 222 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN)); 223 } 224 if (dryrun) { 225 id = coordEngine.dryRunSubmit(conf); 226 } 227 else { 228 id = coordEngine.submitJob(conf, startJob); 229 } 230 json.put(JsonTags.JOB_ID, id); 231 } 232 catch (CoordinatorEngineException ex) { 233 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 234 } 235 236 return json; 237 } 238 239 /** 240 * v1 service implementation to submit a bundle job 241 */ 242 @SuppressWarnings("unchecked") 243 private JSONObject submitBundleJob(HttpServletRequest request, Configuration conf) throws XServletException { 244 JSONObject json = new JSONObject(); 245 XLog.getLog(getClass()).warn("submitBundleJob " + XmlUtils.prettyPrint(conf).toString()); 246 try { 247 String action = request.getParameter(RestConstants.ACTION_PARAM); 248 if (action != null && !action.equals(RestConstants.JOB_ACTION_START) 249 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) { 250 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 251 RestConstants.ACTION_PARAM, action); 252 } 253 boolean startJob = (action != null); 254 String user = conf.get(OozieClient.USER_NAME); 255 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(user); 256 String id = null; 257 boolean dryrun = false; 258 if (action != null) { 259 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN)); 260 } 261 if (dryrun) { 262 id = bundleEngine.dryRunSubmit(conf); 263 } 264 else { 265 id = bundleEngine.submitJob(conf, startJob); 266 } 267 json.put(JsonTags.JOB_ID, id); 268 } 269 catch (BundleEngineException ex) { 270 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 271 } 272 273 return json; 274 } 275 276 /** 277 * v1 service implementation to get a JSONObject representation of a job from its external ID 278 */ 279 @SuppressWarnings("unchecked") 280 private JSONObject getWorkflowJobIdForExternalId(HttpServletRequest request, String externalId) 281 throws XServletException { 282 JSONObject json = new JSONObject(); 283 try { 284 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 285 String jobId = dagEngine.getJobIdForExternalId(externalId); 286 json.put(JsonTags.JOB_ID, jobId); 287 } 288 catch (DagEngineException ex) { 289 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 290 } 291 return json; 292 } 293 294 /** 295 * v1 service implementation to get a JSONObject representation of a job from its external ID 296 */ 297 private JSONObject getCoordinatorJobIdForExternalId(HttpServletRequest request, String externalId) 298 throws XServletException { 299 JSONObject json = new JSONObject(); 300 return json; 301 } 302 303 /** 304 * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the 305 * request object 306 */ 307 private JSONObject getWorkflowJobs(HttpServletRequest request) throws XServletException { 308 JSONObject json = new JSONObject(); 309 try { 310 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 311 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 312 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 313 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 314 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 315 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 316 start = (start < 1) ? 1 : start; 317 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 318 len = (len < 1) ? 50 : len; 319 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 320 WorkflowsInfo jobs = dagEngine.getJobs(filter, start, len); 321 List<WorkflowJobBean> jsonWorkflows = jobs.getWorkflows(); 322 json.put(JsonTags.WORKFLOWS_JOBS, WorkflowJobBean.toJSONArray(jsonWorkflows, timeZoneId)); 323 json.put(JsonTags.WORKFLOWS_TOTAL, jobs.getTotal()); 324 json.put(JsonTags.WORKFLOWS_OFFSET, jobs.getStart()); 325 json.put(JsonTags.WORKFLOWS_LEN, jobs.getLen()); 326 327 } 328 catch (DagEngineException ex) { 329 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 330 } 331 332 return json; 333 } 334 335 /** 336 * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the 337 * request object 338 */ 339 @SuppressWarnings("unchecked") 340 private JSONObject getCoordinatorJobs(HttpServletRequest request) throws XServletException { 341 JSONObject json = new JSONObject(); 342 try { 343 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 344 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 345 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 346 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 347 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 348 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 349 start = (start < 1) ? 1 : start; 350 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 351 len = (len < 1) ? 50 : len; 352 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 353 getUser(request)); 354 CoordinatorJobInfo jobs = coordEngine.getCoordJobs(filter, start, len); 355 List<CoordinatorJobBean> jsonJobs = jobs.getCoordJobs(); 356 json.put(JsonTags.COORDINATOR_JOBS, CoordinatorJobBean.toJSONArray(jsonJobs, timeZoneId)); 357 json.put(JsonTags.COORD_JOB_TOTAL, jobs.getTotal()); 358 json.put(JsonTags.COORD_JOB_OFFSET, jobs.getStart()); 359 json.put(JsonTags.COORD_JOB_LEN, jobs.getLen()); 360 361 } 362 catch (CoordinatorEngineException ex) { 363 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 364 } 365 return json; 366 } 367 368 @SuppressWarnings("unchecked") 369 private JSONObject getBundleJobs(HttpServletRequest request) throws XServletException { 370 JSONObject json = new JSONObject(); 371 try { 372 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 373 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 374 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 375 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 376 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 377 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 378 start = (start < 1) ? 1 : start; 379 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 380 len = (len < 1) ? 50 : len; 381 382 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 383 BundleJobInfo jobs = bundleEngine.getBundleJobs(filter, start, len); 384 List<BundleJobBean> jsonJobs = jobs.getBundleJobs(); 385 386 json.put(JsonTags.BUNDLE_JOBS, BundleJobBean.toJSONArray(jsonJobs, timeZoneId)); 387 json.put(JsonTags.BUNDLE_JOB_TOTAL, jobs.getTotal()); 388 json.put(JsonTags.BUNDLE_JOB_OFFSET, jobs.getStart()); 389 json.put(JsonTags.BUNDLE_JOB_LEN, jobs.getLen()); 390 391 } 392 catch (BundleEngineException ex) { 393 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 394 } 395 return json; 396 } 397 398 @SuppressWarnings("unchecked") 399 private JSONObject getBulkJobs(HttpServletRequest request) throws XServletException, IOException { 400 JSONObject json = new JSONObject(); 401 try { 402 String bulkFilter = request.getParameter(RestConstants.JOBS_BULK_PARAM); //REST API 403 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 404 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 405 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 406 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 407 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 408 start = (start < 1) ? 1 : start; 409 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 410 len = (len < 1) ? 50 : len; 411 412 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 413 BulkResponseInfo bulkResponse = bundleEngine.getBulkJobs(bulkFilter, start, len); 414 List<BulkResponseImpl> responsesToJson = bulkResponse.getResponses(); 415 416 json.put(JsonTags.BULK_RESPONSES, BulkResponseImpl.toJSONArray(responsesToJson, timeZoneId)); 417 json.put(JsonTags.BULK_RESPONSE_TOTAL, bulkResponse.getTotal()); 418 json.put(JsonTags.BULK_RESPONSE_OFFSET, bulkResponse.getStart()); 419 json.put(JsonTags.BULK_RESPONSE_LEN, bulkResponse.getLen()); 420 421 } 422 catch (BaseEngineException ex) { 423 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 424 } 425 return json; 426 } 427 428 /** 429 * service implementation to submit a http job 430 */ 431 private JSONObject submitHttpJob(HttpServletRequest request, Configuration conf, String jobType) 432 throws XServletException { 433 JSONObject json = new JSONObject(); 434 435 try { 436 String user = conf.get(OozieClient.USER_NAME); 437 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user); 438 String id = dagEngine.submitHttpJob(conf, jobType); 439 json.put(JsonTags.JOB_ID, id); 440 } 441 catch (DagEngineException ex) { 442 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 443 } 444 445 return json; 446 } 447}