001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.servlet; 019 020 import java.io.IOException; 021 import java.util.List; 022 023 import javax.servlet.http.HttpServletRequest; 024 import javax.servlet.http.HttpServletResponse; 025 026 import org.apache.hadoop.conf.Configuration; 027 import org.apache.oozie.BundleJobBean; 028 import org.apache.oozie.BundleJobInfo; 029 import org.apache.oozie.CoordinatorEngine; 030 import org.apache.oozie.BundleEngine; 031 import org.apache.oozie.CoordinatorEngineException; 032 import org.apache.oozie.BundleEngineException; 033 import org.apache.oozie.CoordinatorJobBean; 034 import org.apache.oozie.CoordinatorJobInfo; 035 import org.apache.oozie.DagEngine; 036 import org.apache.oozie.DagEngineException; 037 import org.apache.oozie.ErrorCode; 038 import org.apache.oozie.WorkflowJobBean; 039 import org.apache.oozie.WorkflowsInfo; 040 import org.apache.oozie.client.OozieClient; 041 import org.apache.oozie.client.rest.JsonTags; 042 import org.apache.oozie.client.rest.RestConstants; 043 import org.apache.oozie.service.CoordinatorEngineService; 044 import org.apache.oozie.service.DagEngineService; 045 import org.apache.oozie.service.BundleEngineService; 046 import org.apache.oozie.service.Services; 047 import org.apache.oozie.util.XLog; 048 import org.apache.oozie.util.XmlUtils; 049 import org.json.simple.JSONObject; 050 051 public class V1JobsServlet extends BaseJobsServlet { 052 053 private static final String INSTRUMENTATION_NAME = "v1jobs"; 054 055 public V1JobsServlet() { 056 super(INSTRUMENTATION_NAME); 057 } 058 059 /** 060 * v1 service implementation to submit a job, either workflow or coordinator 061 */ 062 @Override 063 protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException, 064 IOException { 065 JSONObject json = null; 066 067 String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM); 068 069 if (jobType == null) { 070 String wfPath = conf.get(OozieClient.APP_PATH); 071 String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH); 072 String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH); 073 074 ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath); 075 076 if (wfPath != null) { 077 json = submitWorkflowJob(request, conf); 078 } 079 else if (coordPath != null) { 080 json = submitCoordinatorJob(request, conf); 081 } 082 else { 083 json = submitBundleJob(request, conf); 084 } 085 } 086 else { // This is a http submission job 087 if (jobType.equals("pig") || jobType.equals("mapreduce")) { 088 json = submitHttpJob(request, conf, jobType); 089 } 090 else { 091 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 092 RestConstants.JOBTYPE_PARAM, jobType); 093 } 094 } 095 return json; 096 } 097 098 /** 099 * v1 service implementation to get a JSONObject representation of a job from its external ID 100 */ 101 @Override 102 protected JSONObject getJobIdForExternalId(HttpServletRequest request, String externalId) throws XServletException, 103 IOException { 104 JSONObject json = null; 105 /* 106 * Configuration conf = new XConfiguration(); String wfPath = 107 * conf.get(OozieClient.APP_PATH); String coordPath = 108 * conf.get(OozieClient.COORDINATOR_APP_PATH); 109 * 110 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 111 */ 112 String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM); 113 jobtype = (jobtype != null) ? jobtype : "wf"; 114 if (jobtype.contains("wf")) { 115 json = getWorkflowJobIdForExternalId(request, externalId); 116 } 117 else { 118 json = getCoordinatorJobIdForExternalId(request, externalId); 119 } 120 return json; 121 } 122 123 /** 124 * v1 service implementation to get a list of workflows, coordinators, or bundles, with filtering or interested 125 * windows embedded in the request object 126 */ 127 @Override 128 protected JSONObject getJobs(HttpServletRequest request) throws XServletException, IOException { 129 JSONObject json = null; 130 String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM); 131 jobtype = (jobtype != null) ? jobtype : "wf"; 132 133 if (jobtype.contains("wf")) { 134 json = getWorkflowJobs(request); 135 } 136 else if (jobtype.contains("coord")) { 137 json = getCoordinatorJobs(request); 138 } 139 else if (jobtype.contains("bundle")) { 140 json = getBundleJobs(request); 141 } 142 return json; 143 } 144 145 /** 146 * v1 service implementation to submit a workflow job 147 */ 148 @SuppressWarnings("unchecked") 149 private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException { 150 151 JSONObject json = new JSONObject(); 152 153 try { 154 String action = request.getParameter(RestConstants.ACTION_PARAM); 155 if (action != null && !action.equals(RestConstants.JOB_ACTION_START)) { 156 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 157 RestConstants.ACTION_PARAM, action); 158 } 159 boolean startJob = (action != null); 160 String user = conf.get(OozieClient.USER_NAME); 161 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user, getAuthToken(request)); 162 String id = dagEngine.submitJob(conf, startJob); 163 json.put(JsonTags.JOB_ID, id); 164 } 165 catch (DagEngineException ex) { 166 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 167 } 168 169 return json; 170 } 171 172 /** 173 * v1 service implementation to submit a coordinator job 174 */ 175 @SuppressWarnings("unchecked") 176 private JSONObject submitCoordinatorJob(HttpServletRequest request, Configuration conf) throws XServletException { 177 178 JSONObject json = new JSONObject(); 179 XLog.getLog(getClass()).warn("submitCoordinatorJob " + XmlUtils.prettyPrint(conf).toString()); 180 try { 181 String action = request.getParameter(RestConstants.ACTION_PARAM); 182 if (action != null && !action.equals(RestConstants.JOB_ACTION_START) 183 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) { 184 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 185 RestConstants.ACTION_PARAM, action); 186 } 187 boolean startJob = (action != null); 188 String user = conf.get(OozieClient.USER_NAME); 189 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 190 user, getAuthToken(request)); 191 String id = null; 192 boolean dryrun = false; 193 if (action != null) { 194 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN)); 195 } 196 if (dryrun) { 197 id = coordEngine.dryrunSubmit(conf, startJob); 198 } 199 else { 200 id = coordEngine.submitJob(conf, startJob); 201 } 202 json.put(JsonTags.JOB_ID, id); 203 } 204 catch (CoordinatorEngineException ex) { 205 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 206 } 207 208 return json; 209 } 210 211 /** 212 * v1 service implementation to submit a bundle job 213 */ 214 @SuppressWarnings("unchecked") 215 private JSONObject submitBundleJob(HttpServletRequest request, Configuration conf) throws XServletException { 216 JSONObject json = new JSONObject(); 217 XLog.getLog(getClass()).warn("submitBundleJob " + XmlUtils.prettyPrint(conf).toString()); 218 try { 219 String action = request.getParameter(RestConstants.ACTION_PARAM); 220 if (action != null && !action.equals(RestConstants.JOB_ACTION_START) 221 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) { 222 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 223 RestConstants.ACTION_PARAM, action); 224 } 225 boolean startJob = (action != null); 226 String user = conf.get(OozieClient.USER_NAME); 227 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(user, 228 getAuthToken(request)); 229 String id = null; 230 boolean dryrun = false; 231 if (action != null) { 232 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN)); 233 } 234 if (dryrun) { 235 id = bundleEngine.dryrunSubmit(conf, startJob); 236 } 237 else { 238 id = bundleEngine.submitJob(conf, startJob); 239 } 240 json.put(JsonTags.JOB_ID, id); 241 } 242 catch (BundleEngineException ex) { 243 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 244 } 245 246 return json; 247 } 248 249 /** 250 * v1 service implementation to get a JSONObject representation of a job from its external ID 251 */ 252 @SuppressWarnings("unchecked") 253 private JSONObject getWorkflowJobIdForExternalId(HttpServletRequest request, String externalId) 254 throws XServletException { 255 JSONObject json = new JSONObject(); 256 try { 257 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 258 getAuthToken(request)); 259 String jobId = dagEngine.getJobIdForExternalId(externalId); 260 json.put(JsonTags.JOB_ID, jobId); 261 } 262 catch (DagEngineException ex) { 263 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 264 } 265 return json; 266 } 267 268 /** 269 * v1 service implementation to get a JSONObject representation of a job from its external ID 270 */ 271 private JSONObject getCoordinatorJobIdForExternalId(HttpServletRequest request, String externalId) 272 throws XServletException { 273 JSONObject json = new JSONObject(); 274 return json; 275 } 276 277 /** 278 * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the 279 * request object 280 */ 281 private JSONObject getWorkflowJobs(HttpServletRequest request) throws XServletException { 282 JSONObject json = new JSONObject(); 283 try { 284 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 285 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 286 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 287 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 288 start = (start < 1) ? 1 : start; 289 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 290 len = (len < 1) ? 50 : len; 291 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 292 getAuthToken(request)); 293 WorkflowsInfo jobs = dagEngine.getJobs(filter, start, len); 294 List<WorkflowJobBean> jsonWorkflows = jobs.getWorkflows(); 295 json.put(JsonTags.WORKFLOWS_JOBS, WorkflowJobBean.toJSONArray(jsonWorkflows)); 296 json.put(JsonTags.WORKFLOWS_TOTAL, jobs.getTotal()); 297 json.put(JsonTags.WORKFLOWS_OFFSET, jobs.getStart()); 298 json.put(JsonTags.WORKFLOWS_LEN, jobs.getLen()); 299 300 } 301 catch (DagEngineException ex) { 302 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 303 } 304 305 return json; 306 } 307 308 /** 309 * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the 310 * request object 311 */ 312 @SuppressWarnings("unchecked") 313 private JSONObject getCoordinatorJobs(HttpServletRequest request) throws XServletException { 314 JSONObject json = new JSONObject(); 315 try { 316 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 317 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 318 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 319 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 320 start = (start < 1) ? 1 : start; 321 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 322 len = (len < 1) ? 50 : len; 323 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 324 getUser(request), getAuthToken(request)); 325 CoordinatorJobInfo jobs = coordEngine.getCoordJobs(filter, start, len); 326 List<CoordinatorJobBean> jsonJobs = jobs.getCoordJobs(); 327 json.put(JsonTags.COORDINATOR_JOBS, CoordinatorJobBean.toJSONArray(jsonJobs)); 328 json.put(JsonTags.COORD_JOB_TOTAL, jobs.getTotal()); 329 json.put(JsonTags.COORD_JOB_OFFSET, jobs.getStart()); 330 json.put(JsonTags.COORD_JOB_LEN, jobs.getLen()); 331 332 } 333 catch (CoordinatorEngineException ex) { 334 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 335 } 336 return json; 337 } 338 339 @SuppressWarnings("unchecked") 340 private JSONObject getBundleJobs(HttpServletRequest request) throws XServletException { 341 JSONObject json = new JSONObject(); 342 try { 343 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 344 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 345 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 346 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 347 start = (start < 1) ? 1 : start; 348 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 349 len = (len < 1) ? 50 : len; 350 351 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request), 352 getAuthToken(request)); 353 BundleJobInfo jobs = bundleEngine.getBundleJobs(filter, start, len); 354 List<BundleJobBean> jsonJobs = jobs.getBundleJobs(); 355 356 json.put(JsonTags.BUNDLE_JOBS, BundleJobBean.toJSONArray(jsonJobs)); 357 json.put(JsonTags.BUNDLE_JOB_TOTAL, jobs.getTotal()); 358 json.put(JsonTags.BUNDLE_JOB_OFFSET, jobs.getStart()); 359 json.put(JsonTags.BUNDLE_JOB_LEN, jobs.getLen()); 360 361 } 362 catch (BundleEngineException ex) { 363 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 364 } 365 return json; 366 } 367 368 /** 369 * service implementation to submit a http job 370 */ 371 private JSONObject submitHttpJob(HttpServletRequest request, Configuration conf, String jobType) 372 throws XServletException { 373 JSONObject json = new JSONObject(); 374 375 try { 376 String user = conf.get(OozieClient.USER_NAME); 377 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user, getAuthToken(request)); 378 String id = dagEngine.submitHttpJob(conf, jobType); 379 json.put(JsonTags.JOB_ID, id); 380 } 381 catch (DagEngineException ex) { 382 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 383 } 384 385 return json; 386 } 387 }