This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.servlet;
019
020 import java.io.IOException;
021 import java.util.List;
022
023 import javax.servlet.http.HttpServletRequest;
024 import javax.servlet.http.HttpServletResponse;
025
026 import org.apache.hadoop.conf.Configuration;
027 import org.apache.oozie.BaseEngineException;
028 import org.apache.oozie.BulkResponseInfo;
029 import org.apache.oozie.BundleJobBean;
030 import org.apache.oozie.BundleJobInfo;
031 import org.apache.oozie.CoordinatorEngine;
032 import org.apache.oozie.BundleEngine;
033 import org.apache.oozie.CoordinatorEngineException;
034 import org.apache.oozie.BundleEngineException;
035 import org.apache.oozie.CoordinatorJobBean;
036 import org.apache.oozie.CoordinatorJobInfo;
037 import org.apache.oozie.DagEngine;
038 import org.apache.oozie.DagEngineException;
039 import org.apache.oozie.ErrorCode;
040 import org.apache.oozie.WorkflowJobBean;
041 import org.apache.oozie.WorkflowsInfo;
042 import org.apache.oozie.client.OozieClient;
043 import org.apache.oozie.client.rest.BulkResponseImpl;
044 import org.apache.oozie.client.rest.JsonTags;
045 import org.apache.oozie.client.rest.RestConstants;
046 import org.apache.oozie.service.CoordinatorEngineService;
047 import org.apache.oozie.service.DagEngineService;
048 import org.apache.oozie.service.BundleEngineService;
049 import org.apache.oozie.service.Services;
050 import org.apache.oozie.util.XLog;
051 import org.apache.oozie.util.XmlUtils;
052 import org.json.simple.JSONObject;
053
054 public class V1JobsServlet extends BaseJobsServlet {
055
056 private static final String INSTRUMENTATION_NAME = "v1jobs";
057
058 public V1JobsServlet() {
059 super(INSTRUMENTATION_NAME);
060 }
061
062 /**
063 * v1 service implementation to submit a job, either workflow or coordinator
064 */
065 @Override
066 protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException,
067 IOException {
068 JSONObject json = null;
069
070 String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM);
071
072 if (jobType == null) {
073 String wfPath = conf.get(OozieClient.APP_PATH);
074 String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH);
075 String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH);
076
077 ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath);
078
079 if (wfPath != null) {
080 json = submitWorkflowJob(request, conf);
081 }
082 else if (coordPath != null) {
083 json = submitCoordinatorJob(request, conf);
084 }
085 else {
086 json = submitBundleJob(request, conf);
087 }
088 }
089 else { // This is a http submission job
090 if (jobType.equals("pig") || jobType.equals("mapreduce")) {
091 json = submitHttpJob(request, conf, jobType);
092 }
093 else {
094 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
095 RestConstants.JOBTYPE_PARAM, jobType);
096 }
097 }
098 return json;
099 }
100
101 /**
102 * v1 service implementation to get a JSONObject representation of a job from its external ID
103 */
104 @Override
105 protected JSONObject getJobIdForExternalId(HttpServletRequest request, String externalId) throws XServletException,
106 IOException {
107 JSONObject json = null;
108 /*
109 * Configuration conf = new XConfiguration(); String wfPath =
110 * conf.get(OozieClient.APP_PATH); String coordPath =
111 * conf.get(OozieClient.COORDINATOR_APP_PATH);
112 *
113 * ServletUtilities.ValidateAppPath(wfPath, coordPath);
114 */
115 String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM);
116 jobtype = (jobtype != null) ? jobtype : "wf";
117 if (jobtype.contains("wf")) {
118 json = getWorkflowJobIdForExternalId(request, externalId);
119 }
120 else {
121 json = getCoordinatorJobIdForExternalId(request, externalId);
122 }
123 return json;
124 }
125
126 /**
127 * v1 service implementation to get a list of workflows, coordinators, or bundles, with filtering or interested
128 * windows embedded in the request object
129 */
130 @Override
131 protected JSONObject getJobs(HttpServletRequest request) throws XServletException, IOException {
132 JSONObject json = null;
133 String isBulk = request.getParameter(RestConstants.JOBS_BULK_PARAM);
134 if(isBulk != null) {
135 json = getBulkJobs(request);
136 } else {
137 String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM);
138 jobtype = (jobtype != null) ? jobtype : "wf";
139
140 if (jobtype.contains("wf")) {
141 json = getWorkflowJobs(request);
142 }
143 else if (jobtype.contains("coord")) {
144 json = getCoordinatorJobs(request);
145 }
146 else if (jobtype.contains("bundle")) {
147 json = getBundleJobs(request);
148 }
149 }
150 return json;
151 }
152
153 /**
154 * v1 service implementation to submit a workflow job
155 */
156 @SuppressWarnings("unchecked")
157 private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException {
158
159 JSONObject json = new JSONObject();
160
161 try {
162 String action = request.getParameter(RestConstants.ACTION_PARAM);
163 if (action != null && !action.equals(RestConstants.JOB_ACTION_START)) {
164 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
165 RestConstants.ACTION_PARAM, action);
166 }
167 boolean startJob = (action != null);
168 String user = conf.get(OozieClient.USER_NAME);
169 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user, getAuthToken(request));
170 String id = dagEngine.submitJob(conf, startJob);
171 json.put(JsonTags.JOB_ID, id);
172 }
173 catch (DagEngineException ex) {
174 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
175 }
176
177 return json;
178 }
179
180 /**
181 * v1 service implementation to submit a coordinator job
182 */
183 @SuppressWarnings("unchecked")
184 private JSONObject submitCoordinatorJob(HttpServletRequest request, Configuration conf) throws XServletException {
185
186 JSONObject json = new JSONObject();
187 XLog.getLog(getClass()).warn("submitCoordinatorJob " + XmlUtils.prettyPrint(conf).toString());
188 try {
189 String action = request.getParameter(RestConstants.ACTION_PARAM);
190 if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
191 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
192 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
193 RestConstants.ACTION_PARAM, action);
194 }
195 boolean startJob = (action != null);
196 String user = conf.get(OozieClient.USER_NAME);
197 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
198 user, getAuthToken(request));
199 String id = null;
200 boolean dryrun = false;
201 if (action != null) {
202 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
203 }
204 if (dryrun) {
205 id = coordEngine.dryrunSubmit(conf, startJob);
206 }
207 else {
208 id = coordEngine.submitJob(conf, startJob);
209 }
210 json.put(JsonTags.JOB_ID, id);
211 }
212 catch (CoordinatorEngineException ex) {
213 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
214 }
215
216 return json;
217 }
218
219 /**
220 * v1 service implementation to submit a bundle job
221 */
222 @SuppressWarnings("unchecked")
223 private JSONObject submitBundleJob(HttpServletRequest request, Configuration conf) throws XServletException {
224 JSONObject json = new JSONObject();
225 XLog.getLog(getClass()).warn("submitBundleJob " + XmlUtils.prettyPrint(conf).toString());
226 try {
227 String action = request.getParameter(RestConstants.ACTION_PARAM);
228 if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
229 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
230 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
231 RestConstants.ACTION_PARAM, action);
232 }
233 boolean startJob = (action != null);
234 String user = conf.get(OozieClient.USER_NAME);
235 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(user,
236 getAuthToken(request));
237 String id = null;
238 boolean dryrun = false;
239 if (action != null) {
240 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
241 }
242 if (dryrun) {
243 id = bundleEngine.dryrunSubmit(conf, startJob);
244 }
245 else {
246 id = bundleEngine.submitJob(conf, startJob);
247 }
248 json.put(JsonTags.JOB_ID, id);
249 }
250 catch (BundleEngineException ex) {
251 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
252 }
253
254 return json;
255 }
256
257 /**
258 * v1 service implementation to get a JSONObject representation of a job from its external ID
259 */
260 @SuppressWarnings("unchecked")
261 private JSONObject getWorkflowJobIdForExternalId(HttpServletRequest request, String externalId)
262 throws XServletException {
263 JSONObject json = new JSONObject();
264 try {
265 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
266 getAuthToken(request));
267 String jobId = dagEngine.getJobIdForExternalId(externalId);
268 json.put(JsonTags.JOB_ID, jobId);
269 }
270 catch (DagEngineException ex) {
271 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
272 }
273 return json;
274 }
275
276 /**
277 * v1 service implementation to get a JSONObject representation of a job from its external ID
278 */
279 private JSONObject getCoordinatorJobIdForExternalId(HttpServletRequest request, String externalId)
280 throws XServletException {
281 JSONObject json = new JSONObject();
282 return json;
283 }
284
285 /**
286 * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the
287 * request object
288 */
289 private JSONObject getWorkflowJobs(HttpServletRequest request) throws XServletException {
290 JSONObject json = new JSONObject();
291 try {
292 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
293 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
294 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
295 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
296 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
297 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
298 start = (start < 1) ? 1 : start;
299 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
300 len = (len < 1) ? 50 : len;
301 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
302 getAuthToken(request));
303 WorkflowsInfo jobs = dagEngine.getJobs(filter, start, len);
304 List<WorkflowJobBean> jsonWorkflows = jobs.getWorkflows();
305 json.put(JsonTags.WORKFLOWS_JOBS, WorkflowJobBean.toJSONArray(jsonWorkflows, timeZoneId));
306 json.put(JsonTags.WORKFLOWS_TOTAL, jobs.getTotal());
307 json.put(JsonTags.WORKFLOWS_OFFSET, jobs.getStart());
308 json.put(JsonTags.WORKFLOWS_LEN, jobs.getLen());
309
310 }
311 catch (DagEngineException ex) {
312 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
313 }
314
315 return json;
316 }
317
318 /**
319 * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the
320 * request object
321 */
322 @SuppressWarnings("unchecked")
323 private JSONObject getCoordinatorJobs(HttpServletRequest request) throws XServletException {
324 JSONObject json = new JSONObject();
325 try {
326 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
327 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
328 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
329 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
330 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
331 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
332 start = (start < 1) ? 1 : start;
333 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
334 len = (len < 1) ? 50 : len;
335 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
336 getUser(request), getAuthToken(request));
337 CoordinatorJobInfo jobs = coordEngine.getCoordJobs(filter, start, len);
338 List<CoordinatorJobBean> jsonJobs = jobs.getCoordJobs();
339 json.put(JsonTags.COORDINATOR_JOBS, CoordinatorJobBean.toJSONArray(jsonJobs, timeZoneId));
340 json.put(JsonTags.COORD_JOB_TOTAL, jobs.getTotal());
341 json.put(JsonTags.COORD_JOB_OFFSET, jobs.getStart());
342 json.put(JsonTags.COORD_JOB_LEN, jobs.getLen());
343
344 }
345 catch (CoordinatorEngineException ex) {
346 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
347 }
348 return json;
349 }
350
351 @SuppressWarnings("unchecked")
352 private JSONObject getBundleJobs(HttpServletRequest request) throws XServletException {
353 JSONObject json = new JSONObject();
354 try {
355 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
356 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
357 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
358 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
359 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
360 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
361 start = (start < 1) ? 1 : start;
362 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
363 len = (len < 1) ? 50 : len;
364
365 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
366 getAuthToken(request));
367 BundleJobInfo jobs = bundleEngine.getBundleJobs(filter, start, len);
368 List<BundleJobBean> jsonJobs = jobs.getBundleJobs();
369
370 json.put(JsonTags.BUNDLE_JOBS, BundleJobBean.toJSONArray(jsonJobs, timeZoneId));
371 json.put(JsonTags.BUNDLE_JOB_TOTAL, jobs.getTotal());
372 json.put(JsonTags.BUNDLE_JOB_OFFSET, jobs.getStart());
373 json.put(JsonTags.BUNDLE_JOB_LEN, jobs.getLen());
374
375 }
376 catch (BundleEngineException ex) {
377 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
378 }
379 return json;
380 }
381
382 @SuppressWarnings("unchecked")
383 private JSONObject getBulkJobs(HttpServletRequest request) throws XServletException, IOException {
384 JSONObject json = new JSONObject();
385 try {
386 String bulkFilter = request.getParameter(RestConstants.JOBS_BULK_PARAM); //REST API
387 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
388 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
389 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
390 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
391 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
392 start = (start < 1) ? 1 : start;
393 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
394 len = (len < 1) ? 50 : len;
395
396 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
397 getAuthToken(request));
398 BulkResponseInfo bulkResponse = bundleEngine.getBulkJobs(bulkFilter, start, len);
399 List<BulkResponseImpl> jsonResponse = bulkResponse.getResponses();
400
401 json.put(JsonTags.BULK_RESPONSES, BulkResponseImpl.toJSONArray(jsonResponse, timeZoneId));
402 json.put(JsonTags.BULK_RESPONSE_TOTAL, bulkResponse.getTotal());
403 json.put(JsonTags.BULK_RESPONSE_OFFSET, bulkResponse.getStart());
404 json.put(JsonTags.BULK_RESPONSE_LEN, bulkResponse.getLen());
405
406 }
407 catch (BaseEngineException ex) {
408 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
409 }
410 return json;
411 }
412
413 /**
414 * service implementation to submit a http job
415 */
416 private JSONObject submitHttpJob(HttpServletRequest request, Configuration conf, String jobType)
417 throws XServletException {
418 JSONObject json = new JSONObject();
419
420 try {
421 String user = conf.get(OozieClient.USER_NAME);
422 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user, getAuthToken(request));
423 String id = dagEngine.submitHttpJob(conf, jobType);
424 json.put(JsonTags.JOB_ID, id);
425 }
426 catch (DagEngineException ex) {
427 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
428 }
429
430 return json;
431 }
432 }