001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.servlet; 019 020 import java.io.IOException; 021 import java.util.HashSet; 022 import java.util.List; 023 import java.util.Set; 024 025 import javax.servlet.http.HttpServletRequest; 026 import javax.servlet.http.HttpServletResponse; 027 028 import org.apache.hadoop.conf.Configuration; 029 import org.apache.oozie.BaseEngineException; 030 import org.apache.oozie.BulkResponseInfo; 031 import org.apache.oozie.BundleJobBean; 032 import org.apache.oozie.BundleJobInfo; 033 import org.apache.oozie.CoordinatorEngine; 034 import org.apache.oozie.BundleEngine; 035 import org.apache.oozie.CoordinatorEngineException; 036 import org.apache.oozie.BundleEngineException; 037 import org.apache.oozie.CoordinatorJobBean; 038 import org.apache.oozie.CoordinatorJobInfo; 039 import org.apache.oozie.DagEngine; 040 import org.apache.oozie.DagEngineException; 041 import org.apache.oozie.ErrorCode; 042 import org.apache.oozie.WorkflowJobBean; 043 import org.apache.oozie.WorkflowsInfo; 044 import org.apache.oozie.cli.OozieCLI; 045 import org.apache.oozie.client.OozieClient; 046 import org.apache.oozie.client.rest.BulkResponseImpl; 047 import org.apache.oozie.client.rest.JsonTags; 048 import org.apache.oozie.client.rest.RestConstants; 049 import org.apache.oozie.service.CoordinatorEngineService; 050 import org.apache.oozie.service.DagEngineService; 051 import org.apache.oozie.service.BundleEngineService; 052 import org.apache.oozie.service.Services; 053 import org.apache.oozie.util.XLog; 054 import org.apache.oozie.util.XmlUtils; 055 import org.json.simple.JSONObject; 056 057 public class V1JobsServlet extends BaseJobsServlet { 058 059 private static final String INSTRUMENTATION_NAME = "v1jobs"; 060 private static final Set<String> httpJobType = new HashSet<String>(){{ 061 this.add(OozieCLI.HIVE_CMD); 062 this.add(OozieCLI.PIG_CMD); 063 this.add(OozieCLI.MR_CMD); 064 }}; 065 066 public V1JobsServlet() { 067 super(INSTRUMENTATION_NAME); 068 } 069 070 /** 071 * v1 service implementation to submit a job, either workflow or coordinator 072 */ 073 @Override 074 protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException, 075 IOException { 076 JSONObject json = null; 077 078 String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM); 079 080 if (jobType == null) { 081 String wfPath = conf.get(OozieClient.APP_PATH); 082 String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH); 083 String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH); 084 085 ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath); 086 087 if (wfPath != null) { 088 json = submitWorkflowJob(request, conf); 089 } 090 else if (coordPath != null) { 091 json = submitCoordinatorJob(request, conf); 092 } 093 else { 094 json = submitBundleJob(request, conf); 095 } 096 } 097 else { // This is a http submission job 098 if (httpJobType.contains(jobType)) { 099 json = submitHttpJob(request, conf, jobType); 100 } 101 else { 102 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 103 RestConstants.JOBTYPE_PARAM, jobType); 104 } 105 } 106 return json; 107 } 108 109 /** 110 * v1 service implementation to get a JSONObject representation of a job from its external ID 111 */ 112 @Override 113 protected JSONObject getJobIdForExternalId(HttpServletRequest request, String externalId) throws XServletException, 114 IOException { 115 JSONObject json = null; 116 /* 117 * Configuration conf = new XConfiguration(); String wfPath = 118 * conf.get(OozieClient.APP_PATH); String coordPath = 119 * conf.get(OozieClient.COORDINATOR_APP_PATH); 120 * 121 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 122 */ 123 String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM); 124 jobtype = (jobtype != null) ? jobtype : "wf"; 125 if (jobtype.contains("wf")) { 126 json = getWorkflowJobIdForExternalId(request, externalId); 127 } 128 else { 129 json = getCoordinatorJobIdForExternalId(request, externalId); 130 } 131 return json; 132 } 133 134 /** 135 * v1 service implementation to get a list of workflows, coordinators, or bundles, with filtering or interested 136 * windows embedded in the request object 137 */ 138 @Override 139 protected JSONObject getJobs(HttpServletRequest request) throws XServletException, IOException { 140 JSONObject json = null; 141 String isBulk = request.getParameter(RestConstants.JOBS_BULK_PARAM); 142 if(isBulk != null) { 143 json = getBulkJobs(request); 144 } else { 145 String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM); 146 jobtype = (jobtype != null) ? jobtype : "wf"; 147 148 if (jobtype.contains("wf")) { 149 json = getWorkflowJobs(request); 150 } 151 else if (jobtype.contains("coord")) { 152 json = getCoordinatorJobs(request); 153 } 154 else if (jobtype.contains("bundle")) { 155 json = getBundleJobs(request); 156 } 157 } 158 return json; 159 } 160 161 /** 162 * v1 service implementation to submit a workflow job 163 */ 164 @SuppressWarnings("unchecked") 165 private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException { 166 167 JSONObject json = new JSONObject(); 168 169 try { 170 String action = request.getParameter(RestConstants.ACTION_PARAM); 171 if (action != null && !action.equals(RestConstants.JOB_ACTION_START) 172 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) { 173 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 174 RestConstants.ACTION_PARAM, action); 175 } 176 boolean startJob = (action != null); 177 String user = conf.get(OozieClient.USER_NAME); 178 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user); 179 String id; 180 boolean dryrun = false; 181 if (action != null) { 182 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN)); 183 } 184 if (dryrun) { 185 id = dagEngine.dryRunSubmit(conf); 186 } 187 else { 188 id = dagEngine.submitJob(conf, startJob); 189 } 190 json.put(JsonTags.JOB_ID, id); 191 } 192 catch (BaseEngineException ex) { 193 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 194 } 195 196 return json; 197 } 198 199 /** 200 * v1 service implementation to submit a coordinator job 201 */ 202 @SuppressWarnings("unchecked") 203 private JSONObject submitCoordinatorJob(HttpServletRequest request, Configuration conf) throws XServletException { 204 205 JSONObject json = new JSONObject(); 206 XLog.getLog(getClass()).warn("submitCoordinatorJob " + XmlUtils.prettyPrint(conf).toString()); 207 try { 208 String action = request.getParameter(RestConstants.ACTION_PARAM); 209 if (action != null && !action.equals(RestConstants.JOB_ACTION_START) 210 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) { 211 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 212 RestConstants.ACTION_PARAM, action); 213 } 214 boolean startJob = (action != null); 215 String user = conf.get(OozieClient.USER_NAME); 216 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 217 user); 218 String id = null; 219 boolean dryrun = false; 220 if (action != null) { 221 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN)); 222 } 223 if (dryrun) { 224 id = coordEngine.dryRunSubmit(conf); 225 } 226 else { 227 id = coordEngine.submitJob(conf, startJob); 228 } 229 json.put(JsonTags.JOB_ID, id); 230 } 231 catch (CoordinatorEngineException ex) { 232 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 233 } 234 235 return json; 236 } 237 238 /** 239 * v1 service implementation to submit a bundle job 240 */ 241 @SuppressWarnings("unchecked") 242 private JSONObject submitBundleJob(HttpServletRequest request, Configuration conf) throws XServletException { 243 JSONObject json = new JSONObject(); 244 XLog.getLog(getClass()).warn("submitBundleJob " + XmlUtils.prettyPrint(conf).toString()); 245 try { 246 String action = request.getParameter(RestConstants.ACTION_PARAM); 247 if (action != null && !action.equals(RestConstants.JOB_ACTION_START) 248 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) { 249 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 250 RestConstants.ACTION_PARAM, action); 251 } 252 boolean startJob = (action != null); 253 String user = conf.get(OozieClient.USER_NAME); 254 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(user); 255 String id = null; 256 boolean dryrun = false; 257 if (action != null) { 258 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN)); 259 } 260 if (dryrun) { 261 id = bundleEngine.dryRunSubmit(conf); 262 } 263 else { 264 id = bundleEngine.submitJob(conf, startJob); 265 } 266 json.put(JsonTags.JOB_ID, id); 267 } 268 catch (BundleEngineException ex) { 269 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 270 } 271 272 return json; 273 } 274 275 /** 276 * v1 service implementation to get a JSONObject representation of a job from its external ID 277 */ 278 @SuppressWarnings("unchecked") 279 private JSONObject getWorkflowJobIdForExternalId(HttpServletRequest request, String externalId) 280 throws XServletException { 281 JSONObject json = new JSONObject(); 282 try { 283 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 284 String jobId = dagEngine.getJobIdForExternalId(externalId); 285 json.put(JsonTags.JOB_ID, jobId); 286 } 287 catch (DagEngineException ex) { 288 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 289 } 290 return json; 291 } 292 293 /** 294 * v1 service implementation to get a JSONObject representation of a job from its external ID 295 */ 296 private JSONObject getCoordinatorJobIdForExternalId(HttpServletRequest request, String externalId) 297 throws XServletException { 298 JSONObject json = new JSONObject(); 299 return json; 300 } 301 302 /** 303 * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the 304 * request object 305 */ 306 private JSONObject getWorkflowJobs(HttpServletRequest request) throws XServletException { 307 JSONObject json = new JSONObject(); 308 try { 309 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 310 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 311 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 312 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 313 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 314 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 315 start = (start < 1) ? 1 : start; 316 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 317 len = (len < 1) ? 50 : len; 318 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 319 WorkflowsInfo jobs = dagEngine.getJobs(filter, start, len); 320 List<WorkflowJobBean> jsonWorkflows = jobs.getWorkflows(); 321 json.put(JsonTags.WORKFLOWS_JOBS, WorkflowJobBean.toJSONArray(jsonWorkflows, timeZoneId)); 322 json.put(JsonTags.WORKFLOWS_TOTAL, jobs.getTotal()); 323 json.put(JsonTags.WORKFLOWS_OFFSET, jobs.getStart()); 324 json.put(JsonTags.WORKFLOWS_LEN, jobs.getLen()); 325 326 } 327 catch (DagEngineException ex) { 328 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 329 } 330 331 return json; 332 } 333 334 /** 335 * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the 336 * request object 337 */ 338 @SuppressWarnings("unchecked") 339 private JSONObject getCoordinatorJobs(HttpServletRequest request) throws XServletException { 340 JSONObject json = new JSONObject(); 341 try { 342 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 343 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 344 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 345 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 346 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 347 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 348 start = (start < 1) ? 1 : start; 349 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 350 len = (len < 1) ? 50 : len; 351 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 352 getUser(request)); 353 CoordinatorJobInfo jobs = coordEngine.getCoordJobs(filter, start, len); 354 List<CoordinatorJobBean> jsonJobs = jobs.getCoordJobs(); 355 json.put(JsonTags.COORDINATOR_JOBS, CoordinatorJobBean.toJSONArray(jsonJobs, timeZoneId)); 356 json.put(JsonTags.COORD_JOB_TOTAL, jobs.getTotal()); 357 json.put(JsonTags.COORD_JOB_OFFSET, jobs.getStart()); 358 json.put(JsonTags.COORD_JOB_LEN, jobs.getLen()); 359 360 } 361 catch (CoordinatorEngineException ex) { 362 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 363 } 364 return json; 365 } 366 367 @SuppressWarnings("unchecked") 368 private JSONObject getBundleJobs(HttpServletRequest request) throws XServletException { 369 JSONObject json = new JSONObject(); 370 try { 371 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 372 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 373 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 374 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 375 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 376 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 377 start = (start < 1) ? 1 : start; 378 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 379 len = (len < 1) ? 50 : len; 380 381 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 382 BundleJobInfo jobs = bundleEngine.getBundleJobs(filter, start, len); 383 List<BundleJobBean> jsonJobs = jobs.getBundleJobs(); 384 385 json.put(JsonTags.BUNDLE_JOBS, BundleJobBean.toJSONArray(jsonJobs, timeZoneId)); 386 json.put(JsonTags.BUNDLE_JOB_TOTAL, jobs.getTotal()); 387 json.put(JsonTags.BUNDLE_JOB_OFFSET, jobs.getStart()); 388 json.put(JsonTags.BUNDLE_JOB_LEN, jobs.getLen()); 389 390 } 391 catch (BundleEngineException ex) { 392 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 393 } 394 return json; 395 } 396 397 @SuppressWarnings("unchecked") 398 private JSONObject getBulkJobs(HttpServletRequest request) throws XServletException, IOException { 399 JSONObject json = new JSONObject(); 400 try { 401 String bulkFilter = request.getParameter(RestConstants.JOBS_BULK_PARAM); //REST API 402 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 403 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 404 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 405 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 406 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 407 start = (start < 1) ? 1 : start; 408 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 409 len = (len < 1) ? 50 : len; 410 411 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 412 BulkResponseInfo bulkResponse = bundleEngine.getBulkJobs(bulkFilter, start, len); 413 List<BulkResponseImpl> responsesToJson = bulkResponse.getResponses(); 414 415 json.put(JsonTags.BULK_RESPONSES, BulkResponseImpl.toJSONArray(responsesToJson, timeZoneId)); 416 json.put(JsonTags.BULK_RESPONSE_TOTAL, bulkResponse.getTotal()); 417 json.put(JsonTags.BULK_RESPONSE_OFFSET, bulkResponse.getStart()); 418 json.put(JsonTags.BULK_RESPONSE_LEN, bulkResponse.getLen()); 419 420 } 421 catch (BaseEngineException ex) { 422 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 423 } 424 return json; 425 } 426 427 /** 428 * service implementation to submit a http job 429 */ 430 private JSONObject submitHttpJob(HttpServletRequest request, Configuration conf, String jobType) 431 throws XServletException { 432 JSONObject json = new JSONObject(); 433 434 try { 435 String user = conf.get(OozieClient.USER_NAME); 436 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user); 437 String id = dagEngine.submitHttpJob(conf, jobType); 438 json.put(JsonTags.JOB_ID, id); 439 } 440 catch (DagEngineException ex) { 441 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 442 } 443 444 return json; 445 } 446 }