This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.servlet;
019
020 import java.io.IOException;
021 import java.util.List;
022
023 import javax.servlet.http.HttpServletRequest;
024 import javax.servlet.http.HttpServletResponse;
025
026 import org.apache.hadoop.conf.Configuration;
027 import org.apache.oozie.BaseEngineException;
028 import org.apache.oozie.BulkResponseInfo;
029 import org.apache.oozie.BundleJobBean;
030 import org.apache.oozie.BundleJobInfo;
031 import org.apache.oozie.CoordinatorEngine;
032 import org.apache.oozie.BundleEngine;
033 import org.apache.oozie.CoordinatorEngineException;
034 import org.apache.oozie.BundleEngineException;
035 import org.apache.oozie.CoordinatorJobBean;
036 import org.apache.oozie.CoordinatorJobInfo;
037 import org.apache.oozie.DagEngine;
038 import org.apache.oozie.DagEngineException;
039 import org.apache.oozie.ErrorCode;
040 import org.apache.oozie.WorkflowJobBean;
041 import org.apache.oozie.WorkflowsInfo;
042 import org.apache.oozie.client.OozieClient;
043 import org.apache.oozie.client.rest.BulkResponseImpl;
044 import org.apache.oozie.client.rest.JsonTags;
045 import org.apache.oozie.client.rest.RestConstants;
046 import org.apache.oozie.service.CoordinatorEngineService;
047 import org.apache.oozie.service.DagEngineService;
048 import org.apache.oozie.service.BundleEngineService;
049 import org.apache.oozie.service.Services;
050 import org.apache.oozie.util.XLog;
051 import org.apache.oozie.util.XmlUtils;
052 import org.json.simple.JSONObject;
053
054 public class V1JobsServlet extends BaseJobsServlet {
055
056 private static final String INSTRUMENTATION_NAME = "v1jobs";
057
058 public V1JobsServlet() {
059 super(INSTRUMENTATION_NAME);
060 }
061
062 /**
063 * v1 service implementation to submit a job, either workflow or coordinator
064 */
065 @Override
066 protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException,
067 IOException {
068 JSONObject json = null;
069
070 String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM);
071
072 if (jobType == null) {
073 String wfPath = conf.get(OozieClient.APP_PATH);
074 String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH);
075 String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH);
076
077 ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath);
078
079 if (wfPath != null) {
080 json = submitWorkflowJob(request, conf);
081 }
082 else if (coordPath != null) {
083 json = submitCoordinatorJob(request, conf);
084 }
085 else {
086 json = submitBundleJob(request, conf);
087 }
088 }
089 else { // This is a http submission job
090 if (jobType.equals("pig") || jobType.equals("mapreduce")) {
091 json = submitHttpJob(request, conf, jobType);
092 }
093 else {
094 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
095 RestConstants.JOBTYPE_PARAM, jobType);
096 }
097 }
098 return json;
099 }
100
101 /**
102 * v1 service implementation to get a JSONObject representation of a job from its external ID
103 */
104 @Override
105 protected JSONObject getJobIdForExternalId(HttpServletRequest request, String externalId) throws XServletException,
106 IOException {
107 JSONObject json = null;
108 /*
109 * Configuration conf = new XConfiguration(); String wfPath =
110 * conf.get(OozieClient.APP_PATH); String coordPath =
111 * conf.get(OozieClient.COORDINATOR_APP_PATH);
112 *
113 * ServletUtilities.ValidateAppPath(wfPath, coordPath);
114 */
115 String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM);
116 jobtype = (jobtype != null) ? jobtype : "wf";
117 if (jobtype.contains("wf")) {
118 json = getWorkflowJobIdForExternalId(request, externalId);
119 }
120 else {
121 json = getCoordinatorJobIdForExternalId(request, externalId);
122 }
123 return json;
124 }
125
126 /**
127 * v1 service implementation to get a list of workflows, coordinators, or bundles, with filtering or interested
128 * windows embedded in the request object
129 */
130 @Override
131 protected JSONObject getJobs(HttpServletRequest request) throws XServletException, IOException {
132 JSONObject json = null;
133 String isBulk = request.getParameter(RestConstants.JOBS_BULK_PARAM);
134 if(isBulk != null) {
135 json = getBulkJobs(request);
136 } else {
137 String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM);
138 jobtype = (jobtype != null) ? jobtype : "wf";
139
140 if (jobtype.contains("wf")) {
141 json = getWorkflowJobs(request);
142 }
143 else if (jobtype.contains("coord")) {
144 json = getCoordinatorJobs(request);
145 }
146 else if (jobtype.contains("bundle")) {
147 json = getBundleJobs(request);
148 }
149 }
150 return json;
151 }
152
153 /**
154 * v1 service implementation to submit a workflow job
155 */
156 @SuppressWarnings("unchecked")
157 private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException {
158
159 JSONObject json = new JSONObject();
160
161 try {
162 String action = request.getParameter(RestConstants.ACTION_PARAM);
163 if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
164 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
165 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
166 RestConstants.ACTION_PARAM, action);
167 }
168 boolean startJob = (action != null);
169 String user = conf.get(OozieClient.USER_NAME);
170 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user, getAuthToken(request));
171 String id;
172 boolean dryrun = false;
173 if (action != null) {
174 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
175 }
176 if (dryrun) {
177 id = dagEngine.dryRunSubmit(conf);
178 }
179 else {
180 id = dagEngine.submitJob(conf, startJob);
181 }
182 json.put(JsonTags.JOB_ID, id);
183 }
184 catch (BaseEngineException ex) {
185 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
186 }
187
188 return json;
189 }
190
191 /**
192 * v1 service implementation to submit a coordinator job
193 */
194 @SuppressWarnings("unchecked")
195 private JSONObject submitCoordinatorJob(HttpServletRequest request, Configuration conf) throws XServletException {
196
197 JSONObject json = new JSONObject();
198 XLog.getLog(getClass()).warn("submitCoordinatorJob " + XmlUtils.prettyPrint(conf).toString());
199 try {
200 String action = request.getParameter(RestConstants.ACTION_PARAM);
201 if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
202 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
203 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
204 RestConstants.ACTION_PARAM, action);
205 }
206 boolean startJob = (action != null);
207 String user = conf.get(OozieClient.USER_NAME);
208 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
209 user, getAuthToken(request));
210 String id = null;
211 boolean dryrun = false;
212 if (action != null) {
213 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
214 }
215 if (dryrun) {
216 id = coordEngine.dryRunSubmit(conf);
217 }
218 else {
219 id = coordEngine.submitJob(conf, startJob);
220 }
221 json.put(JsonTags.JOB_ID, id);
222 }
223 catch (CoordinatorEngineException ex) {
224 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
225 }
226
227 return json;
228 }
229
230 /**
231 * v1 service implementation to submit a bundle job
232 */
233 @SuppressWarnings("unchecked")
234 private JSONObject submitBundleJob(HttpServletRequest request, Configuration conf) throws XServletException {
235 JSONObject json = new JSONObject();
236 XLog.getLog(getClass()).warn("submitBundleJob " + XmlUtils.prettyPrint(conf).toString());
237 try {
238 String action = request.getParameter(RestConstants.ACTION_PARAM);
239 if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
240 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
241 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
242 RestConstants.ACTION_PARAM, action);
243 }
244 boolean startJob = (action != null);
245 String user = conf.get(OozieClient.USER_NAME);
246 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(user,
247 getAuthToken(request));
248 String id = null;
249 boolean dryrun = false;
250 if (action != null) {
251 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
252 }
253 if (dryrun) {
254 id = bundleEngine.dryRunSubmit(conf);
255 }
256 else {
257 id = bundleEngine.submitJob(conf, startJob);
258 }
259 json.put(JsonTags.JOB_ID, id);
260 }
261 catch (BundleEngineException ex) {
262 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
263 }
264
265 return json;
266 }
267
268 /**
269 * v1 service implementation to get a JSONObject representation of a job from its external ID
270 */
271 @SuppressWarnings("unchecked")
272 private JSONObject getWorkflowJobIdForExternalId(HttpServletRequest request, String externalId)
273 throws XServletException {
274 JSONObject json = new JSONObject();
275 try {
276 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
277 getAuthToken(request));
278 String jobId = dagEngine.getJobIdForExternalId(externalId);
279 json.put(JsonTags.JOB_ID, jobId);
280 }
281 catch (DagEngineException ex) {
282 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
283 }
284 return json;
285 }
286
287 /**
288 * v1 service implementation to get a JSONObject representation of a job from its external ID
289 */
290 private JSONObject getCoordinatorJobIdForExternalId(HttpServletRequest request, String externalId)
291 throws XServletException {
292 JSONObject json = new JSONObject();
293 return json;
294 }
295
296 /**
297 * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the
298 * request object
299 */
300 private JSONObject getWorkflowJobs(HttpServletRequest request) throws XServletException {
301 JSONObject json = new JSONObject();
302 try {
303 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
304 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
305 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
306 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
307 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
308 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
309 start = (start < 1) ? 1 : start;
310 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
311 len = (len < 1) ? 50 : len;
312 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
313 getAuthToken(request));
314 WorkflowsInfo jobs = dagEngine.getJobs(filter, start, len);
315 List<WorkflowJobBean> jsonWorkflows = jobs.getWorkflows();
316 json.put(JsonTags.WORKFLOWS_JOBS, WorkflowJobBean.toJSONArray(jsonWorkflows, timeZoneId));
317 json.put(JsonTags.WORKFLOWS_TOTAL, jobs.getTotal());
318 json.put(JsonTags.WORKFLOWS_OFFSET, jobs.getStart());
319 json.put(JsonTags.WORKFLOWS_LEN, jobs.getLen());
320
321 }
322 catch (DagEngineException ex) {
323 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
324 }
325
326 return json;
327 }
328
329 /**
330 * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the
331 * request object
332 */
333 @SuppressWarnings("unchecked")
334 private JSONObject getCoordinatorJobs(HttpServletRequest request) throws XServletException {
335 JSONObject json = new JSONObject();
336 try {
337 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
338 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
339 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
340 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
341 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
342 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
343 start = (start < 1) ? 1 : start;
344 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
345 len = (len < 1) ? 50 : len;
346 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
347 getUser(request), getAuthToken(request));
348 CoordinatorJobInfo jobs = coordEngine.getCoordJobs(filter, start, len);
349 List<CoordinatorJobBean> jsonJobs = jobs.getCoordJobs();
350 json.put(JsonTags.COORDINATOR_JOBS, CoordinatorJobBean.toJSONArray(jsonJobs, timeZoneId));
351 json.put(JsonTags.COORD_JOB_TOTAL, jobs.getTotal());
352 json.put(JsonTags.COORD_JOB_OFFSET, jobs.getStart());
353 json.put(JsonTags.COORD_JOB_LEN, jobs.getLen());
354
355 }
356 catch (CoordinatorEngineException ex) {
357 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
358 }
359 return json;
360 }
361
362 @SuppressWarnings("unchecked")
363 private JSONObject getBundleJobs(HttpServletRequest request) throws XServletException {
364 JSONObject json = new JSONObject();
365 try {
366 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
367 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
368 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
369 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
370 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
371 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
372 start = (start < 1) ? 1 : start;
373 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
374 len = (len < 1) ? 50 : len;
375
376 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
377 getAuthToken(request));
378 BundleJobInfo jobs = bundleEngine.getBundleJobs(filter, start, len);
379 List<BundleJobBean> jsonJobs = jobs.getBundleJobs();
380
381 json.put(JsonTags.BUNDLE_JOBS, BundleJobBean.toJSONArray(jsonJobs, timeZoneId));
382 json.put(JsonTags.BUNDLE_JOB_TOTAL, jobs.getTotal());
383 json.put(JsonTags.BUNDLE_JOB_OFFSET, jobs.getStart());
384 json.put(JsonTags.BUNDLE_JOB_LEN, jobs.getLen());
385
386 }
387 catch (BundleEngineException ex) {
388 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
389 }
390 return json;
391 }
392
393 @SuppressWarnings("unchecked")
394 private JSONObject getBulkJobs(HttpServletRequest request) throws XServletException, IOException {
395 JSONObject json = new JSONObject();
396 try {
397 String bulkFilter = request.getParameter(RestConstants.JOBS_BULK_PARAM); //REST API
398 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
399 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
400 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
401 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
402 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
403 start = (start < 1) ? 1 : start;
404 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
405 len = (len < 1) ? 50 : len;
406
407 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
408 getAuthToken(request));
409 BulkResponseInfo bulkResponse = bundleEngine.getBulkJobs(bulkFilter, start, len);
410 List<BulkResponseImpl> jsonResponse = bulkResponse.getResponses();
411
412 json.put(JsonTags.BULK_RESPONSES, BulkResponseImpl.toJSONArray(jsonResponse, timeZoneId));
413 json.put(JsonTags.BULK_RESPONSE_TOTAL, bulkResponse.getTotal());
414 json.put(JsonTags.BULK_RESPONSE_OFFSET, bulkResponse.getStart());
415 json.put(JsonTags.BULK_RESPONSE_LEN, bulkResponse.getLen());
416
417 }
418 catch (BaseEngineException ex) {
419 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
420 }
421 return json;
422 }
423
424 /**
425 * service implementation to submit a http job
426 */
427 private JSONObject submitHttpJob(HttpServletRequest request, Configuration conf, String jobType)
428 throws XServletException {
429 JSONObject json = new JSONObject();
430
431 try {
432 String user = conf.get(OozieClient.USER_NAME);
433 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user, getAuthToken(request));
434 String id = dagEngine.submitHttpJob(conf, jobType);
435 json.put(JsonTags.JOB_ID, id);
436 }
437 catch (DagEngineException ex) {
438 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
439 }
440
441 return json;
442 }
443 }