001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.servlet; 019 020 import java.io.IOException; 021 import java.util.List; 022 023 import javax.servlet.http.HttpServletRequest; 024 import javax.servlet.http.HttpServletResponse; 025 026 import org.apache.hadoop.conf.Configuration; 027 import org.apache.oozie.BaseEngineException; 028 import org.apache.oozie.BulkResponseInfo; 029 import org.apache.oozie.BundleJobBean; 030 import org.apache.oozie.BundleJobInfo; 031 import org.apache.oozie.CoordinatorEngine; 032 import org.apache.oozie.BundleEngine; 033 import org.apache.oozie.CoordinatorEngineException; 034 import org.apache.oozie.BundleEngineException; 035 import org.apache.oozie.CoordinatorJobBean; 036 import org.apache.oozie.CoordinatorJobInfo; 037 import org.apache.oozie.DagEngine; 038 import org.apache.oozie.DagEngineException; 039 import org.apache.oozie.ErrorCode; 040 import org.apache.oozie.WorkflowJobBean; 041 import org.apache.oozie.WorkflowsInfo; 042 import org.apache.oozie.client.OozieClient; 043 import org.apache.oozie.client.rest.BulkResponseImpl; 044 import org.apache.oozie.client.rest.JsonTags; 045 import org.apache.oozie.client.rest.RestConstants; 046 import org.apache.oozie.service.CoordinatorEngineService; 047 import org.apache.oozie.service.DagEngineService; 048 import org.apache.oozie.service.BundleEngineService; 049 import org.apache.oozie.service.Services; 050 import org.apache.oozie.util.XLog; 051 import org.apache.oozie.util.XmlUtils; 052 import org.json.simple.JSONObject; 053 054 public class V1JobsServlet extends BaseJobsServlet { 055 056 private static final String INSTRUMENTATION_NAME = "v1jobs"; 057 058 public V1JobsServlet() { 059 super(INSTRUMENTATION_NAME); 060 } 061 062 /** 063 * v1 service implementation to submit a job, either workflow or coordinator 064 */ 065 @Override 066 protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException, 067 IOException { 068 JSONObject json = null; 069 070 String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM); 071 072 if (jobType == null) { 073 String wfPath = conf.get(OozieClient.APP_PATH); 074 String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH); 075 String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH); 076 077 ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath); 078 079 if (wfPath != null) { 080 json = submitWorkflowJob(request, conf); 081 } 082 else if (coordPath != null) { 083 json = submitCoordinatorJob(request, conf); 084 } 085 else { 086 json = submitBundleJob(request, conf); 087 } 088 } 089 else { // This is a http submission job 090 if (jobType.equals("pig") || jobType.equals("mapreduce")) { 091 json = submitHttpJob(request, conf, jobType); 092 } 093 else { 094 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 095 RestConstants.JOBTYPE_PARAM, jobType); 096 } 097 } 098 return json; 099 } 100 101 /** 102 * v1 service implementation to get a JSONObject representation of a job from its external ID 103 */ 104 @Override 105 protected JSONObject getJobIdForExternalId(HttpServletRequest request, String externalId) throws XServletException, 106 IOException { 107 JSONObject json = null; 108 /* 109 * Configuration conf = new XConfiguration(); String wfPath = 110 * conf.get(OozieClient.APP_PATH); String coordPath = 111 * conf.get(OozieClient.COORDINATOR_APP_PATH); 112 * 113 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 114 */ 115 String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM); 116 jobtype = (jobtype != null) ? jobtype : "wf"; 117 if (jobtype.contains("wf")) { 118 json = getWorkflowJobIdForExternalId(request, externalId); 119 } 120 else { 121 json = getCoordinatorJobIdForExternalId(request, externalId); 122 } 123 return json; 124 } 125 126 /** 127 * v1 service implementation to get a list of workflows, coordinators, or bundles, with filtering or interested 128 * windows embedded in the request object 129 */ 130 @Override 131 protected JSONObject getJobs(HttpServletRequest request) throws XServletException, IOException { 132 JSONObject json = null; 133 String isBulk = request.getParameter(RestConstants.JOBS_BULK_PARAM); 134 if(isBulk != null) { 135 json = getBulkJobs(request); 136 } else { 137 String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM); 138 jobtype = (jobtype != null) ? jobtype : "wf"; 139 140 if (jobtype.contains("wf")) { 141 json = getWorkflowJobs(request); 142 } 143 else if (jobtype.contains("coord")) { 144 json = getCoordinatorJobs(request); 145 } 146 else if (jobtype.contains("bundle")) { 147 json = getBundleJobs(request); 148 } 149 } 150 return json; 151 } 152 153 /** 154 * v1 service implementation to submit a workflow job 155 */ 156 @SuppressWarnings("unchecked") 157 private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException { 158 159 JSONObject json = new JSONObject(); 160 161 try { 162 String action = request.getParameter(RestConstants.ACTION_PARAM); 163 if (action != null && !action.equals(RestConstants.JOB_ACTION_START) 164 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) { 165 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 166 RestConstants.ACTION_PARAM, action); 167 } 168 boolean startJob = (action != null); 169 String user = conf.get(OozieClient.USER_NAME); 170 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user, getAuthToken(request)); 171 String id; 172 boolean dryrun = false; 173 if (action != null) { 174 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN)); 175 } 176 if (dryrun) { 177 id = dagEngine.dryRunSubmit(conf); 178 } 179 else { 180 id = dagEngine.submitJob(conf, startJob); 181 } 182 json.put(JsonTags.JOB_ID, id); 183 } 184 catch (BaseEngineException ex) { 185 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 186 } 187 188 return json; 189 } 190 191 /** 192 * v1 service implementation to submit a coordinator job 193 */ 194 @SuppressWarnings("unchecked") 195 private JSONObject submitCoordinatorJob(HttpServletRequest request, Configuration conf) throws XServletException { 196 197 JSONObject json = new JSONObject(); 198 XLog.getLog(getClass()).warn("submitCoordinatorJob " + XmlUtils.prettyPrint(conf).toString()); 199 try { 200 String action = request.getParameter(RestConstants.ACTION_PARAM); 201 if (action != null && !action.equals(RestConstants.JOB_ACTION_START) 202 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) { 203 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 204 RestConstants.ACTION_PARAM, action); 205 } 206 boolean startJob = (action != null); 207 String user = conf.get(OozieClient.USER_NAME); 208 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 209 user, getAuthToken(request)); 210 String id = null; 211 boolean dryrun = false; 212 if (action != null) { 213 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN)); 214 } 215 if (dryrun) { 216 id = coordEngine.dryRunSubmit(conf); 217 } 218 else { 219 id = coordEngine.submitJob(conf, startJob); 220 } 221 json.put(JsonTags.JOB_ID, id); 222 } 223 catch (CoordinatorEngineException ex) { 224 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 225 } 226 227 return json; 228 } 229 230 /** 231 * v1 service implementation to submit a bundle job 232 */ 233 @SuppressWarnings("unchecked") 234 private JSONObject submitBundleJob(HttpServletRequest request, Configuration conf) throws XServletException { 235 JSONObject json = new JSONObject(); 236 XLog.getLog(getClass()).warn("submitBundleJob " + XmlUtils.prettyPrint(conf).toString()); 237 try { 238 String action = request.getParameter(RestConstants.ACTION_PARAM); 239 if (action != null && !action.equals(RestConstants.JOB_ACTION_START) 240 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) { 241 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 242 RestConstants.ACTION_PARAM, action); 243 } 244 boolean startJob = (action != null); 245 String user = conf.get(OozieClient.USER_NAME); 246 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(user, 247 getAuthToken(request)); 248 String id = null; 249 boolean dryrun = false; 250 if (action != null) { 251 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN)); 252 } 253 if (dryrun) { 254 id = bundleEngine.dryRunSubmit(conf); 255 } 256 else { 257 id = bundleEngine.submitJob(conf, startJob); 258 } 259 json.put(JsonTags.JOB_ID, id); 260 } 261 catch (BundleEngineException ex) { 262 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 263 } 264 265 return json; 266 } 267 268 /** 269 * v1 service implementation to get a JSONObject representation of a job from its external ID 270 */ 271 @SuppressWarnings("unchecked") 272 private JSONObject getWorkflowJobIdForExternalId(HttpServletRequest request, String externalId) 273 throws XServletException { 274 JSONObject json = new JSONObject(); 275 try { 276 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 277 getAuthToken(request)); 278 String jobId = dagEngine.getJobIdForExternalId(externalId); 279 json.put(JsonTags.JOB_ID, jobId); 280 } 281 catch (DagEngineException ex) { 282 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 283 } 284 return json; 285 } 286 287 /** 288 * v1 service implementation to get a JSONObject representation of a job from its external ID 289 */ 290 private JSONObject getCoordinatorJobIdForExternalId(HttpServletRequest request, String externalId) 291 throws XServletException { 292 JSONObject json = new JSONObject(); 293 return json; 294 } 295 296 /** 297 * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the 298 * request object 299 */ 300 private JSONObject getWorkflowJobs(HttpServletRequest request) throws XServletException { 301 JSONObject json = new JSONObject(); 302 try { 303 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 304 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 305 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 306 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 307 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 308 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 309 start = (start < 1) ? 1 : start; 310 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 311 len = (len < 1) ? 50 : len; 312 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 313 getAuthToken(request)); 314 WorkflowsInfo jobs = dagEngine.getJobs(filter, start, len); 315 List<WorkflowJobBean> jsonWorkflows = jobs.getWorkflows(); 316 json.put(JsonTags.WORKFLOWS_JOBS, WorkflowJobBean.toJSONArray(jsonWorkflows, timeZoneId)); 317 json.put(JsonTags.WORKFLOWS_TOTAL, jobs.getTotal()); 318 json.put(JsonTags.WORKFLOWS_OFFSET, jobs.getStart()); 319 json.put(JsonTags.WORKFLOWS_LEN, jobs.getLen()); 320 321 } 322 catch (DagEngineException ex) { 323 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 324 } 325 326 return json; 327 } 328 329 /** 330 * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the 331 * request object 332 */ 333 @SuppressWarnings("unchecked") 334 private JSONObject getCoordinatorJobs(HttpServletRequest request) throws XServletException { 335 JSONObject json = new JSONObject(); 336 try { 337 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 338 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 339 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 340 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 341 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 342 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 343 start = (start < 1) ? 1 : start; 344 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 345 len = (len < 1) ? 50 : len; 346 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 347 getUser(request), getAuthToken(request)); 348 CoordinatorJobInfo jobs = coordEngine.getCoordJobs(filter, start, len); 349 List<CoordinatorJobBean> jsonJobs = jobs.getCoordJobs(); 350 json.put(JsonTags.COORDINATOR_JOBS, CoordinatorJobBean.toJSONArray(jsonJobs, timeZoneId)); 351 json.put(JsonTags.COORD_JOB_TOTAL, jobs.getTotal()); 352 json.put(JsonTags.COORD_JOB_OFFSET, jobs.getStart()); 353 json.put(JsonTags.COORD_JOB_LEN, jobs.getLen()); 354 355 } 356 catch (CoordinatorEngineException ex) { 357 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 358 } 359 return json; 360 } 361 362 @SuppressWarnings("unchecked") 363 private JSONObject getBundleJobs(HttpServletRequest request) throws XServletException { 364 JSONObject json = new JSONObject(); 365 try { 366 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 367 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 368 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 369 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 370 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 371 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 372 start = (start < 1) ? 1 : start; 373 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 374 len = (len < 1) ? 50 : len; 375 376 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 377 getAuthToken(request)); 378 BundleJobInfo jobs = bundleEngine.getBundleJobs(filter, start, len); 379 List<BundleJobBean> jsonJobs = jobs.getBundleJobs(); 380 381 json.put(JsonTags.BUNDLE_JOBS, BundleJobBean.toJSONArray(jsonJobs, timeZoneId)); 382 json.put(JsonTags.BUNDLE_JOB_TOTAL, jobs.getTotal()); 383 json.put(JsonTags.BUNDLE_JOB_OFFSET, jobs.getStart()); 384 json.put(JsonTags.BUNDLE_JOB_LEN, jobs.getLen()); 385 386 } 387 catch (BundleEngineException ex) { 388 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 389 } 390 return json; 391 } 392 393 @SuppressWarnings("unchecked") 394 private JSONObject getBulkJobs(HttpServletRequest request) throws XServletException, IOException { 395 JSONObject json = new JSONObject(); 396 try { 397 String bulkFilter = request.getParameter(RestConstants.JOBS_BULK_PARAM); //REST API 398 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 399 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 400 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 401 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 402 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 403 start = (start < 1) ? 1 : start; 404 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 405 len = (len < 1) ? 50 : len; 406 407 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 408 getAuthToken(request)); 409 BulkResponseInfo bulkResponse = bundleEngine.getBulkJobs(bulkFilter, start, len); 410 List<BulkResponseImpl> jsonResponse = bulkResponse.getResponses(); 411 412 json.put(JsonTags.BULK_RESPONSES, BulkResponseImpl.toJSONArray(jsonResponse, timeZoneId)); 413 json.put(JsonTags.BULK_RESPONSE_TOTAL, bulkResponse.getTotal()); 414 json.put(JsonTags.BULK_RESPONSE_OFFSET, bulkResponse.getStart()); 415 json.put(JsonTags.BULK_RESPONSE_LEN, bulkResponse.getLen()); 416 417 } 418 catch (BaseEngineException ex) { 419 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 420 } 421 return json; 422 } 423 424 /** 425 * service implementation to submit a http job 426 */ 427 private JSONObject submitHttpJob(HttpServletRequest request, Configuration conf, String jobType) 428 throws XServletException { 429 JSONObject json = new JSONObject(); 430 431 try { 432 String user = conf.get(OozieClient.USER_NAME); 433 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user, getAuthToken(request)); 434 String id = dagEngine.submitHttpJob(conf, jobType); 435 json.put(JsonTags.JOB_ID, id); 436 } 437 catch (DagEngineException ex) { 438 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 439 } 440 441 return json; 442 } 443 }