001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.servlet; 019 020 import java.io.IOException; 021 import java.util.List; 022 023 import javax.servlet.http.HttpServletRequest; 024 import javax.servlet.http.HttpServletResponse; 025 026 import org.apache.hadoop.conf.Configuration; 027 import org.apache.oozie.BaseEngineException; 028 import org.apache.oozie.BulkResponseInfo; 029 import org.apache.oozie.BundleJobBean; 030 import org.apache.oozie.BundleJobInfo; 031 import org.apache.oozie.CoordinatorEngine; 032 import org.apache.oozie.BundleEngine; 033 import org.apache.oozie.CoordinatorEngineException; 034 import org.apache.oozie.BundleEngineException; 035 import org.apache.oozie.CoordinatorJobBean; 036 import org.apache.oozie.CoordinatorJobInfo; 037 import org.apache.oozie.DagEngine; 038 import org.apache.oozie.DagEngineException; 039 import org.apache.oozie.ErrorCode; 040 import org.apache.oozie.WorkflowJobBean; 041 import org.apache.oozie.WorkflowsInfo; 042 import org.apache.oozie.client.OozieClient; 043 import org.apache.oozie.client.rest.BulkResponseImpl; 044 import org.apache.oozie.client.rest.JsonTags; 045 import org.apache.oozie.client.rest.RestConstants; 046 import org.apache.oozie.service.CoordinatorEngineService; 047 import org.apache.oozie.service.DagEngineService; 048 import org.apache.oozie.service.BundleEngineService; 049 import org.apache.oozie.service.Services; 050 import org.apache.oozie.util.XLog; 051 import org.apache.oozie.util.XmlUtils; 052 import org.json.simple.JSONObject; 053 054 public class V1JobsServlet extends BaseJobsServlet { 055 056 private static final String INSTRUMENTATION_NAME = "v1jobs"; 057 058 public V1JobsServlet() { 059 super(INSTRUMENTATION_NAME); 060 } 061 062 /** 063 * v1 service implementation to submit a job, either workflow or coordinator 064 */ 065 @Override 066 protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException, 067 IOException { 068 JSONObject json = null; 069 070 String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM); 071 072 if (jobType == null) { 073 String wfPath = conf.get(OozieClient.APP_PATH); 074 String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH); 075 String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH); 076 077 ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath); 078 079 if (wfPath != null) { 080 json = submitWorkflowJob(request, conf); 081 } 082 else if (coordPath != null) { 083 json = submitCoordinatorJob(request, conf); 084 } 085 else { 086 json = submitBundleJob(request, conf); 087 } 088 } 089 else { // This is a http submission job 090 if (jobType.equals("pig") || jobType.equals("mapreduce")) { 091 json = submitHttpJob(request, conf, jobType); 092 } 093 else { 094 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 095 RestConstants.JOBTYPE_PARAM, jobType); 096 } 097 } 098 return json; 099 } 100 101 /** 102 * v1 service implementation to get a JSONObject representation of a job from its external ID 103 */ 104 @Override 105 protected JSONObject getJobIdForExternalId(HttpServletRequest request, String externalId) throws XServletException, 106 IOException { 107 JSONObject json = null; 108 /* 109 * Configuration conf = new XConfiguration(); String wfPath = 110 * conf.get(OozieClient.APP_PATH); String coordPath = 111 * conf.get(OozieClient.COORDINATOR_APP_PATH); 112 * 113 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 114 */ 115 String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM); 116 jobtype = (jobtype != null) ? jobtype : "wf"; 117 if (jobtype.contains("wf")) { 118 json = getWorkflowJobIdForExternalId(request, externalId); 119 } 120 else { 121 json = getCoordinatorJobIdForExternalId(request, externalId); 122 } 123 return json; 124 } 125 126 /** 127 * v1 service implementation to get a list of workflows, coordinators, or bundles, with filtering or interested 128 * windows embedded in the request object 129 */ 130 @Override 131 protected JSONObject getJobs(HttpServletRequest request) throws XServletException, IOException { 132 JSONObject json = null; 133 String isBulk = request.getParameter(RestConstants.JOBS_BULK_PARAM); 134 if(isBulk != null) { 135 json = getBulkJobs(request); 136 } else { 137 String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM); 138 jobtype = (jobtype != null) ? jobtype : "wf"; 139 140 if (jobtype.contains("wf")) { 141 json = getWorkflowJobs(request); 142 } 143 else if (jobtype.contains("coord")) { 144 json = getCoordinatorJobs(request); 145 } 146 else if (jobtype.contains("bundle")) { 147 json = getBundleJobs(request); 148 } 149 } 150 return json; 151 } 152 153 /** 154 * v1 service implementation to submit a workflow job 155 */ 156 @SuppressWarnings("unchecked") 157 private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException { 158 159 JSONObject json = new JSONObject(); 160 161 try { 162 String action = request.getParameter(RestConstants.ACTION_PARAM); 163 if (action != null && !action.equals(RestConstants.JOB_ACTION_START)) { 164 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 165 RestConstants.ACTION_PARAM, action); 166 } 167 boolean startJob = (action != null); 168 String user = conf.get(OozieClient.USER_NAME); 169 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user, getAuthToken(request)); 170 String id = dagEngine.submitJob(conf, startJob); 171 json.put(JsonTags.JOB_ID, id); 172 } 173 catch (DagEngineException ex) { 174 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 175 } 176 177 return json; 178 } 179 180 /** 181 * v1 service implementation to submit a coordinator job 182 */ 183 @SuppressWarnings("unchecked") 184 private JSONObject submitCoordinatorJob(HttpServletRequest request, Configuration conf) throws XServletException { 185 186 JSONObject json = new JSONObject(); 187 XLog.getLog(getClass()).warn("submitCoordinatorJob " + XmlUtils.prettyPrint(conf).toString()); 188 try { 189 String action = request.getParameter(RestConstants.ACTION_PARAM); 190 if (action != null && !action.equals(RestConstants.JOB_ACTION_START) 191 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) { 192 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 193 RestConstants.ACTION_PARAM, action); 194 } 195 boolean startJob = (action != null); 196 String user = conf.get(OozieClient.USER_NAME); 197 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 198 user, getAuthToken(request)); 199 String id = null; 200 boolean dryrun = false; 201 if (action != null) { 202 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN)); 203 } 204 if (dryrun) { 205 id = coordEngine.dryrunSubmit(conf, startJob); 206 } 207 else { 208 id = coordEngine.submitJob(conf, startJob); 209 } 210 json.put(JsonTags.JOB_ID, id); 211 } 212 catch (CoordinatorEngineException ex) { 213 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 214 } 215 216 return json; 217 } 218 219 /** 220 * v1 service implementation to submit a bundle job 221 */ 222 @SuppressWarnings("unchecked") 223 private JSONObject submitBundleJob(HttpServletRequest request, Configuration conf) throws XServletException { 224 JSONObject json = new JSONObject(); 225 XLog.getLog(getClass()).warn("submitBundleJob " + XmlUtils.prettyPrint(conf).toString()); 226 try { 227 String action = request.getParameter(RestConstants.ACTION_PARAM); 228 if (action != null && !action.equals(RestConstants.JOB_ACTION_START) 229 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) { 230 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 231 RestConstants.ACTION_PARAM, action); 232 } 233 boolean startJob = (action != null); 234 String user = conf.get(OozieClient.USER_NAME); 235 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(user, 236 getAuthToken(request)); 237 String id = null; 238 boolean dryrun = false; 239 if (action != null) { 240 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN)); 241 } 242 if (dryrun) { 243 id = bundleEngine.dryrunSubmit(conf, startJob); 244 } 245 else { 246 id = bundleEngine.submitJob(conf, startJob); 247 } 248 json.put(JsonTags.JOB_ID, id); 249 } 250 catch (BundleEngineException ex) { 251 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 252 } 253 254 return json; 255 } 256 257 /** 258 * v1 service implementation to get a JSONObject representation of a job from its external ID 259 */ 260 @SuppressWarnings("unchecked") 261 private JSONObject getWorkflowJobIdForExternalId(HttpServletRequest request, String externalId) 262 throws XServletException { 263 JSONObject json = new JSONObject(); 264 try { 265 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 266 getAuthToken(request)); 267 String jobId = dagEngine.getJobIdForExternalId(externalId); 268 json.put(JsonTags.JOB_ID, jobId); 269 } 270 catch (DagEngineException ex) { 271 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 272 } 273 return json; 274 } 275 276 /** 277 * v1 service implementation to get a JSONObject representation of a job from its external ID 278 */ 279 private JSONObject getCoordinatorJobIdForExternalId(HttpServletRequest request, String externalId) 280 throws XServletException { 281 JSONObject json = new JSONObject(); 282 return json; 283 } 284 285 /** 286 * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the 287 * request object 288 */ 289 private JSONObject getWorkflowJobs(HttpServletRequest request) throws XServletException { 290 JSONObject json = new JSONObject(); 291 try { 292 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 293 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 294 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 295 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 296 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 297 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 298 start = (start < 1) ? 1 : start; 299 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 300 len = (len < 1) ? 50 : len; 301 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 302 getAuthToken(request)); 303 WorkflowsInfo jobs = dagEngine.getJobs(filter, start, len); 304 List<WorkflowJobBean> jsonWorkflows = jobs.getWorkflows(); 305 json.put(JsonTags.WORKFLOWS_JOBS, WorkflowJobBean.toJSONArray(jsonWorkflows, timeZoneId)); 306 json.put(JsonTags.WORKFLOWS_TOTAL, jobs.getTotal()); 307 json.put(JsonTags.WORKFLOWS_OFFSET, jobs.getStart()); 308 json.put(JsonTags.WORKFLOWS_LEN, jobs.getLen()); 309 310 } 311 catch (DagEngineException ex) { 312 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 313 } 314 315 return json; 316 } 317 318 /** 319 * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the 320 * request object 321 */ 322 @SuppressWarnings("unchecked") 323 private JSONObject getCoordinatorJobs(HttpServletRequest request) throws XServletException { 324 JSONObject json = new JSONObject(); 325 try { 326 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 327 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 328 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 329 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 330 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 331 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 332 start = (start < 1) ? 1 : start; 333 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 334 len = (len < 1) ? 50 : len; 335 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 336 getUser(request), getAuthToken(request)); 337 CoordinatorJobInfo jobs = coordEngine.getCoordJobs(filter, start, len); 338 List<CoordinatorJobBean> jsonJobs = jobs.getCoordJobs(); 339 json.put(JsonTags.COORDINATOR_JOBS, CoordinatorJobBean.toJSONArray(jsonJobs, timeZoneId)); 340 json.put(JsonTags.COORD_JOB_TOTAL, jobs.getTotal()); 341 json.put(JsonTags.COORD_JOB_OFFSET, jobs.getStart()); 342 json.put(JsonTags.COORD_JOB_LEN, jobs.getLen()); 343 344 } 345 catch (CoordinatorEngineException ex) { 346 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 347 } 348 return json; 349 } 350 351 @SuppressWarnings("unchecked") 352 private JSONObject getBundleJobs(HttpServletRequest request) throws XServletException { 353 JSONObject json = new JSONObject(); 354 try { 355 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 356 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 357 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 358 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 359 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 360 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 361 start = (start < 1) ? 1 : start; 362 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 363 len = (len < 1) ? 50 : len; 364 365 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 366 getAuthToken(request)); 367 BundleJobInfo jobs = bundleEngine.getBundleJobs(filter, start, len); 368 List<BundleJobBean> jsonJobs = jobs.getBundleJobs(); 369 370 json.put(JsonTags.BUNDLE_JOBS, BundleJobBean.toJSONArray(jsonJobs, timeZoneId)); 371 json.put(JsonTags.BUNDLE_JOB_TOTAL, jobs.getTotal()); 372 json.put(JsonTags.BUNDLE_JOB_OFFSET, jobs.getStart()); 373 json.put(JsonTags.BUNDLE_JOB_LEN, jobs.getLen()); 374 375 } 376 catch (BundleEngineException ex) { 377 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 378 } 379 return json; 380 } 381 382 @SuppressWarnings("unchecked") 383 private JSONObject getBulkJobs(HttpServletRequest request) throws XServletException, IOException { 384 JSONObject json = new JSONObject(); 385 try { 386 String bulkFilter = request.getParameter(RestConstants.JOBS_BULK_PARAM); //REST API 387 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 388 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 389 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 390 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 391 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 392 start = (start < 1) ? 1 : start; 393 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 394 len = (len < 1) ? 50 : len; 395 396 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 397 getAuthToken(request)); 398 BulkResponseInfo bulkResponse = bundleEngine.getBulkJobs(bulkFilter, start, len); 399 List<BulkResponseImpl> jsonResponse = bulkResponse.getResponses(); 400 401 json.put(JsonTags.BULK_RESPONSES, BulkResponseImpl.toJSONArray(jsonResponse, timeZoneId)); 402 json.put(JsonTags.BULK_RESPONSE_TOTAL, bulkResponse.getTotal()); 403 json.put(JsonTags.BULK_RESPONSE_OFFSET, bulkResponse.getStart()); 404 json.put(JsonTags.BULK_RESPONSE_LEN, bulkResponse.getLen()); 405 406 } 407 catch (BaseEngineException ex) { 408 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 409 } 410 return json; 411 } 412 413 /** 414 * service implementation to submit a http job 415 */ 416 private JSONObject submitHttpJob(HttpServletRequest request, Configuration conf, String jobType) 417 throws XServletException { 418 JSONObject json = new JSONObject(); 419 420 try { 421 String user = conf.get(OozieClient.USER_NAME); 422 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user, getAuthToken(request)); 423 String id = dagEngine.submitHttpJob(conf, jobType); 424 json.put(JsonTags.JOB_ID, id); 425 } 426 catch (DagEngineException ex) { 427 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 428 } 429 430 return json; 431 } 432 }