This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.servlet;
019
020 import java.io.IOException;
021 import java.util.HashSet;
022 import java.util.List;
023 import java.util.Set;
024
025 import javax.servlet.http.HttpServletRequest;
026 import javax.servlet.http.HttpServletResponse;
027
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.oozie.BaseEngineException;
030 import org.apache.oozie.BulkResponseInfo;
031 import org.apache.oozie.BundleJobBean;
032 import org.apache.oozie.BundleJobInfo;
033 import org.apache.oozie.CoordinatorEngine;
034 import org.apache.oozie.BundleEngine;
035 import org.apache.oozie.CoordinatorEngineException;
036 import org.apache.oozie.BundleEngineException;
037 import org.apache.oozie.CoordinatorJobBean;
038 import org.apache.oozie.CoordinatorJobInfo;
039 import org.apache.oozie.DagEngine;
040 import org.apache.oozie.DagEngineException;
041 import org.apache.oozie.ErrorCode;
042 import org.apache.oozie.WorkflowJobBean;
043 import org.apache.oozie.WorkflowsInfo;
044 import org.apache.oozie.cli.OozieCLI;
045 import org.apache.oozie.client.OozieClient;
046 import org.apache.oozie.client.rest.BulkResponseImpl;
047 import org.apache.oozie.client.rest.JsonTags;
048 import org.apache.oozie.client.rest.RestConstants;
049 import org.apache.oozie.service.CoordinatorEngineService;
050 import org.apache.oozie.service.DagEngineService;
051 import org.apache.oozie.service.BundleEngineService;
052 import org.apache.oozie.service.Services;
053 import org.apache.oozie.util.XLog;
054 import org.apache.oozie.util.XmlUtils;
055 import org.json.simple.JSONObject;
056
057 public class V1JobsServlet extends BaseJobsServlet {
058
059 private static final String INSTRUMENTATION_NAME = "v1jobs";
060 private static final Set<String> httpJobType = new HashSet<String>(){{
061 this.add(OozieCLI.HIVE_CMD);
062 this.add(OozieCLI.PIG_CMD);
063 this.add(OozieCLI.MR_CMD);
064 }};
065
066 public V1JobsServlet() {
067 super(INSTRUMENTATION_NAME);
068 }
069
070 /**
071 * v1 service implementation to submit a job, either workflow or coordinator
072 */
073 @Override
074 protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException,
075 IOException {
076 JSONObject json = null;
077
078 String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM);
079
080 if (jobType == null) {
081 String wfPath = conf.get(OozieClient.APP_PATH);
082 String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH);
083 String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH);
084
085 ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath);
086
087 if (wfPath != null) {
088 json = submitWorkflowJob(request, conf);
089 }
090 else if (coordPath != null) {
091 json = submitCoordinatorJob(request, conf);
092 }
093 else {
094 json = submitBundleJob(request, conf);
095 }
096 }
097 else { // This is a http submission job
098 if (httpJobType.contains(jobType)) {
099 json = submitHttpJob(request, conf, jobType);
100 }
101 else {
102 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
103 RestConstants.JOBTYPE_PARAM, jobType);
104 }
105 }
106 return json;
107 }
108
109 /**
110 * v1 service implementation to get a JSONObject representation of a job from its external ID
111 */
112 @Override
113 protected JSONObject getJobIdForExternalId(HttpServletRequest request, String externalId) throws XServletException,
114 IOException {
115 JSONObject json = null;
116 /*
117 * Configuration conf = new XConfiguration(); String wfPath =
118 * conf.get(OozieClient.APP_PATH); String coordPath =
119 * conf.get(OozieClient.COORDINATOR_APP_PATH);
120 *
121 * ServletUtilities.ValidateAppPath(wfPath, coordPath);
122 */
123 String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM);
124 jobtype = (jobtype != null) ? jobtype : "wf";
125 if (jobtype.contains("wf")) {
126 json = getWorkflowJobIdForExternalId(request, externalId);
127 }
128 else {
129 json = getCoordinatorJobIdForExternalId(request, externalId);
130 }
131 return json;
132 }
133
134 /**
135 * v1 service implementation to get a list of workflows, coordinators, or bundles, with filtering or interested
136 * windows embedded in the request object
137 */
138 @Override
139 protected JSONObject getJobs(HttpServletRequest request) throws XServletException, IOException {
140 JSONObject json = null;
141 String isBulk = request.getParameter(RestConstants.JOBS_BULK_PARAM);
142 if(isBulk != null) {
143 json = getBulkJobs(request);
144 } else {
145 String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM);
146 jobtype = (jobtype != null) ? jobtype : "wf";
147
148 if (jobtype.contains("wf")) {
149 json = getWorkflowJobs(request);
150 }
151 else if (jobtype.contains("coord")) {
152 json = getCoordinatorJobs(request);
153 }
154 else if (jobtype.contains("bundle")) {
155 json = getBundleJobs(request);
156 }
157 }
158 return json;
159 }
160
161 /**
162 * v1 service implementation to submit a workflow job
163 */
164 @SuppressWarnings("unchecked")
165 private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException {
166
167 JSONObject json = new JSONObject();
168
169 try {
170 String action = request.getParameter(RestConstants.ACTION_PARAM);
171 if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
172 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
173 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
174 RestConstants.ACTION_PARAM, action);
175 }
176 boolean startJob = (action != null);
177 String user = conf.get(OozieClient.USER_NAME);
178 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
179 String id;
180 boolean dryrun = false;
181 if (action != null) {
182 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
183 }
184 if (dryrun) {
185 id = dagEngine.dryRunSubmit(conf);
186 }
187 else {
188 id = dagEngine.submitJob(conf, startJob);
189 }
190 json.put(JsonTags.JOB_ID, id);
191 }
192 catch (BaseEngineException ex) {
193 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
194 }
195
196 return json;
197 }
198
199 /**
200 * v1 service implementation to submit a coordinator job
201 */
202 @SuppressWarnings("unchecked")
203 private JSONObject submitCoordinatorJob(HttpServletRequest request, Configuration conf) throws XServletException {
204
205 JSONObject json = new JSONObject();
206 XLog.getLog(getClass()).warn("submitCoordinatorJob " + XmlUtils.prettyPrint(conf).toString());
207 try {
208 String action = request.getParameter(RestConstants.ACTION_PARAM);
209 if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
210 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
211 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
212 RestConstants.ACTION_PARAM, action);
213 }
214 boolean startJob = (action != null);
215 String user = conf.get(OozieClient.USER_NAME);
216 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
217 user);
218 String id = null;
219 boolean dryrun = false;
220 if (action != null) {
221 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
222 }
223 if (dryrun) {
224 id = coordEngine.dryRunSubmit(conf);
225 }
226 else {
227 id = coordEngine.submitJob(conf, startJob);
228 }
229 json.put(JsonTags.JOB_ID, id);
230 }
231 catch (CoordinatorEngineException ex) {
232 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
233 }
234
235 return json;
236 }
237
238 /**
239 * v1 service implementation to submit a bundle job
240 */
241 @SuppressWarnings("unchecked")
242 private JSONObject submitBundleJob(HttpServletRequest request, Configuration conf) throws XServletException {
243 JSONObject json = new JSONObject();
244 XLog.getLog(getClass()).warn("submitBundleJob " + XmlUtils.prettyPrint(conf).toString());
245 try {
246 String action = request.getParameter(RestConstants.ACTION_PARAM);
247 if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
248 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
249 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
250 RestConstants.ACTION_PARAM, action);
251 }
252 boolean startJob = (action != null);
253 String user = conf.get(OozieClient.USER_NAME);
254 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(user);
255 String id = null;
256 boolean dryrun = false;
257 if (action != null) {
258 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
259 }
260 if (dryrun) {
261 id = bundleEngine.dryRunSubmit(conf);
262 }
263 else {
264 id = bundleEngine.submitJob(conf, startJob);
265 }
266 json.put(JsonTags.JOB_ID, id);
267 }
268 catch (BundleEngineException ex) {
269 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
270 }
271
272 return json;
273 }
274
275 /**
276 * v1 service implementation to get a JSONObject representation of a job from its external ID
277 */
278 @SuppressWarnings("unchecked")
279 private JSONObject getWorkflowJobIdForExternalId(HttpServletRequest request, String externalId)
280 throws XServletException {
281 JSONObject json = new JSONObject();
282 try {
283 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
284 String jobId = dagEngine.getJobIdForExternalId(externalId);
285 json.put(JsonTags.JOB_ID, jobId);
286 }
287 catch (DagEngineException ex) {
288 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
289 }
290 return json;
291 }
292
293 /**
294 * v1 service implementation to get a JSONObject representation of a job from its external ID
295 */
296 private JSONObject getCoordinatorJobIdForExternalId(HttpServletRequest request, String externalId)
297 throws XServletException {
298 JSONObject json = new JSONObject();
299 return json;
300 }
301
302 /**
303 * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the
304 * request object
305 */
306 private JSONObject getWorkflowJobs(HttpServletRequest request) throws XServletException {
307 JSONObject json = new JSONObject();
308 try {
309 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
310 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
311 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
312 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
313 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
314 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
315 start = (start < 1) ? 1 : start;
316 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
317 len = (len < 1) ? 50 : len;
318 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
319 WorkflowsInfo jobs = dagEngine.getJobs(filter, start, len);
320 List<WorkflowJobBean> jsonWorkflows = jobs.getWorkflows();
321 json.put(JsonTags.WORKFLOWS_JOBS, WorkflowJobBean.toJSONArray(jsonWorkflows, timeZoneId));
322 json.put(JsonTags.WORKFLOWS_TOTAL, jobs.getTotal());
323 json.put(JsonTags.WORKFLOWS_OFFSET, jobs.getStart());
324 json.put(JsonTags.WORKFLOWS_LEN, jobs.getLen());
325
326 }
327 catch (DagEngineException ex) {
328 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
329 }
330
331 return json;
332 }
333
334 /**
335 * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the
336 * request object
337 */
338 @SuppressWarnings("unchecked")
339 private JSONObject getCoordinatorJobs(HttpServletRequest request) throws XServletException {
340 JSONObject json = new JSONObject();
341 try {
342 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
343 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
344 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
345 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
346 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
347 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
348 start = (start < 1) ? 1 : start;
349 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
350 len = (len < 1) ? 50 : len;
351 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
352 getUser(request));
353 CoordinatorJobInfo jobs = coordEngine.getCoordJobs(filter, start, len);
354 List<CoordinatorJobBean> jsonJobs = jobs.getCoordJobs();
355 json.put(JsonTags.COORDINATOR_JOBS, CoordinatorJobBean.toJSONArray(jsonJobs, timeZoneId));
356 json.put(JsonTags.COORD_JOB_TOTAL, jobs.getTotal());
357 json.put(JsonTags.COORD_JOB_OFFSET, jobs.getStart());
358 json.put(JsonTags.COORD_JOB_LEN, jobs.getLen());
359
360 }
361 catch (CoordinatorEngineException ex) {
362 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
363 }
364 return json;
365 }
366
367 @SuppressWarnings("unchecked")
368 private JSONObject getBundleJobs(HttpServletRequest request) throws XServletException {
369 JSONObject json = new JSONObject();
370 try {
371 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
372 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
373 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
374 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
375 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
376 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
377 start = (start < 1) ? 1 : start;
378 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
379 len = (len < 1) ? 50 : len;
380
381 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
382 BundleJobInfo jobs = bundleEngine.getBundleJobs(filter, start, len);
383 List<BundleJobBean> jsonJobs = jobs.getBundleJobs();
384
385 json.put(JsonTags.BUNDLE_JOBS, BundleJobBean.toJSONArray(jsonJobs, timeZoneId));
386 json.put(JsonTags.BUNDLE_JOB_TOTAL, jobs.getTotal());
387 json.put(JsonTags.BUNDLE_JOB_OFFSET, jobs.getStart());
388 json.put(JsonTags.BUNDLE_JOB_LEN, jobs.getLen());
389
390 }
391 catch (BundleEngineException ex) {
392 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
393 }
394 return json;
395 }
396
397 @SuppressWarnings("unchecked")
398 private JSONObject getBulkJobs(HttpServletRequest request) throws XServletException, IOException {
399 JSONObject json = new JSONObject();
400 try {
401 String bulkFilter = request.getParameter(RestConstants.JOBS_BULK_PARAM); //REST API
402 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
403 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
404 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
405 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
406 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
407 start = (start < 1) ? 1 : start;
408 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
409 len = (len < 1) ? 50 : len;
410
411 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
412 BulkResponseInfo bulkResponse = bundleEngine.getBulkJobs(bulkFilter, start, len);
413 List<BulkResponseImpl> responsesToJson = bulkResponse.getResponses();
414
415 json.put(JsonTags.BULK_RESPONSES, BulkResponseImpl.toJSONArray(responsesToJson, timeZoneId));
416 json.put(JsonTags.BULK_RESPONSE_TOTAL, bulkResponse.getTotal());
417 json.put(JsonTags.BULK_RESPONSE_OFFSET, bulkResponse.getStart());
418 json.put(JsonTags.BULK_RESPONSE_LEN, bulkResponse.getLen());
419
420 }
421 catch (BaseEngineException ex) {
422 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
423 }
424 return json;
425 }
426
427 /**
428 * service implementation to submit a http job
429 */
430 private JSONObject submitHttpJob(HttpServletRequest request, Configuration conf, String jobType)
431 throws XServletException {
432 JSONObject json = new JSONObject();
433
434 try {
435 String user = conf.get(OozieClient.USER_NAME);
436 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
437 String id = dagEngine.submitHttpJob(conf, jobType);
438 json.put(JsonTags.JOB_ID, id);
439 }
440 catch (DagEngineException ex) {
441 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
442 }
443
444 return json;
445 }
446 }