This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.servlet;
019
020 import java.io.IOException;
021 import java.util.List;
022
023 import javax.servlet.http.HttpServletRequest;
024 import javax.servlet.http.HttpServletResponse;
025
026 import org.apache.hadoop.conf.Configuration;
027 import org.apache.oozie.BundleJobBean;
028 import org.apache.oozie.BundleJobInfo;
029 import org.apache.oozie.CoordinatorEngine;
030 import org.apache.oozie.BundleEngine;
031 import org.apache.oozie.CoordinatorEngineException;
032 import org.apache.oozie.BundleEngineException;
033 import org.apache.oozie.CoordinatorJobBean;
034 import org.apache.oozie.CoordinatorJobInfo;
035 import org.apache.oozie.DagEngine;
036 import org.apache.oozie.DagEngineException;
037 import org.apache.oozie.ErrorCode;
038 import org.apache.oozie.WorkflowJobBean;
039 import org.apache.oozie.WorkflowsInfo;
040 import org.apache.oozie.client.OozieClient;
041 import org.apache.oozie.client.rest.JsonTags;
042 import org.apache.oozie.client.rest.RestConstants;
043 import org.apache.oozie.service.CoordinatorEngineService;
044 import org.apache.oozie.service.DagEngineService;
045 import org.apache.oozie.service.BundleEngineService;
046 import org.apache.oozie.service.Services;
047 import org.apache.oozie.util.XLog;
048 import org.apache.oozie.util.XmlUtils;
049 import org.json.simple.JSONObject;
050
051 public class V1JobsServlet extends BaseJobsServlet {
052
053 private static final String INSTRUMENTATION_NAME = "v1jobs";
054
055 public V1JobsServlet() {
056 super(INSTRUMENTATION_NAME);
057 }
058
059 /**
060 * v1 service implementation to submit a job, either workflow or coordinator
061 */
062 @Override
063 protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException,
064 IOException {
065 JSONObject json = null;
066
067 String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM);
068
069 if (jobType == null) {
070 String wfPath = conf.get(OozieClient.APP_PATH);
071 String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH);
072 String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH);
073
074 ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath);
075
076 if (wfPath != null) {
077 json = submitWorkflowJob(request, conf);
078 }
079 else if (coordPath != null) {
080 json = submitCoordinatorJob(request, conf);
081 }
082 else {
083 json = submitBundleJob(request, conf);
084 }
085 }
086 else { // This is a http submission job
087 if (jobType.equals("pig") || jobType.equals("mapreduce")) {
088 json = submitHttpJob(request, conf, jobType);
089 }
090 else {
091 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
092 RestConstants.JOBTYPE_PARAM, jobType);
093 }
094 }
095 return json;
096 }
097
098 /**
099 * v1 service implementation to get a JSONObject representation of a job from its external ID
100 */
101 @Override
102 protected JSONObject getJobIdForExternalId(HttpServletRequest request, String externalId) throws XServletException,
103 IOException {
104 JSONObject json = null;
105 /*
106 * Configuration conf = new XConfiguration(); String wfPath =
107 * conf.get(OozieClient.APP_PATH); String coordPath =
108 * conf.get(OozieClient.COORDINATOR_APP_PATH);
109 *
110 * ServletUtilities.ValidateAppPath(wfPath, coordPath);
111 */
112 String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM);
113 jobtype = (jobtype != null) ? jobtype : "wf";
114 if (jobtype.contains("wf")) {
115 json = getWorkflowJobIdForExternalId(request, externalId);
116 }
117 else {
118 json = getCoordinatorJobIdForExternalId(request, externalId);
119 }
120 return json;
121 }
122
123 /**
124 * v1 service implementation to get a list of workflows, coordinators, or bundles, with filtering or interested
125 * windows embedded in the request object
126 */
127 @Override
128 protected JSONObject getJobs(HttpServletRequest request) throws XServletException, IOException {
129 JSONObject json = null;
130 String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM);
131 jobtype = (jobtype != null) ? jobtype : "wf";
132
133 if (jobtype.contains("wf")) {
134 json = getWorkflowJobs(request);
135 }
136 else if (jobtype.contains("coord")) {
137 json = getCoordinatorJobs(request);
138 }
139 else if (jobtype.contains("bundle")) {
140 json = getBundleJobs(request);
141 }
142 return json;
143 }
144
145 /**
146 * v1 service implementation to submit a workflow job
147 */
148 @SuppressWarnings("unchecked")
149 private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException {
150
151 JSONObject json = new JSONObject();
152
153 try {
154 String action = request.getParameter(RestConstants.ACTION_PARAM);
155 if (action != null && !action.equals(RestConstants.JOB_ACTION_START)) {
156 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
157 RestConstants.ACTION_PARAM, action);
158 }
159 boolean startJob = (action != null);
160 String user = conf.get(OozieClient.USER_NAME);
161 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user, getAuthToken(request));
162 String id = dagEngine.submitJob(conf, startJob);
163 json.put(JsonTags.JOB_ID, id);
164 }
165 catch (DagEngineException ex) {
166 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
167 }
168
169 return json;
170 }
171
172 /**
173 * v1 service implementation to submit a coordinator job
174 */
175 @SuppressWarnings("unchecked")
176 private JSONObject submitCoordinatorJob(HttpServletRequest request, Configuration conf) throws XServletException {
177
178 JSONObject json = new JSONObject();
179 XLog.getLog(getClass()).warn("submitCoordinatorJob " + XmlUtils.prettyPrint(conf).toString());
180 try {
181 String action = request.getParameter(RestConstants.ACTION_PARAM);
182 if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
183 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
184 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
185 RestConstants.ACTION_PARAM, action);
186 }
187 boolean startJob = (action != null);
188 String user = conf.get(OozieClient.USER_NAME);
189 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
190 user, getAuthToken(request));
191 String id = null;
192 boolean dryrun = false;
193 if (action != null) {
194 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
195 }
196 if (dryrun) {
197 id = coordEngine.dryrunSubmit(conf, startJob);
198 }
199 else {
200 id = coordEngine.submitJob(conf, startJob);
201 }
202 json.put(JsonTags.JOB_ID, id);
203 }
204 catch (CoordinatorEngineException ex) {
205 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
206 }
207
208 return json;
209 }
210
211 /**
212 * v1 service implementation to submit a bundle job
213 */
214 @SuppressWarnings("unchecked")
215 private JSONObject submitBundleJob(HttpServletRequest request, Configuration conf) throws XServletException {
216 JSONObject json = new JSONObject();
217 XLog.getLog(getClass()).warn("submitBundleJob " + XmlUtils.prettyPrint(conf).toString());
218 try {
219 String action = request.getParameter(RestConstants.ACTION_PARAM);
220 if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
221 && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
222 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
223 RestConstants.ACTION_PARAM, action);
224 }
225 boolean startJob = (action != null);
226 String user = conf.get(OozieClient.USER_NAME);
227 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(user,
228 getAuthToken(request));
229 String id = null;
230 boolean dryrun = false;
231 if (action != null) {
232 dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
233 }
234 if (dryrun) {
235 id = bundleEngine.dryrunSubmit(conf, startJob);
236 }
237 else {
238 id = bundleEngine.submitJob(conf, startJob);
239 }
240 json.put(JsonTags.JOB_ID, id);
241 }
242 catch (BundleEngineException ex) {
243 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
244 }
245
246 return json;
247 }
248
249 /**
250 * v1 service implementation to get a JSONObject representation of a job from its external ID
251 */
252 @SuppressWarnings("unchecked")
253 private JSONObject getWorkflowJobIdForExternalId(HttpServletRequest request, String externalId)
254 throws XServletException {
255 JSONObject json = new JSONObject();
256 try {
257 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
258 getAuthToken(request));
259 String jobId = dagEngine.getJobIdForExternalId(externalId);
260 json.put(JsonTags.JOB_ID, jobId);
261 }
262 catch (DagEngineException ex) {
263 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
264 }
265 return json;
266 }
267
268 /**
269 * v1 service implementation to get a JSONObject representation of a job from its external ID
270 */
271 private JSONObject getCoordinatorJobIdForExternalId(HttpServletRequest request, String externalId)
272 throws XServletException {
273 JSONObject json = new JSONObject();
274 return json;
275 }
276
277 /**
278 * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the
279 * request object
280 */
281 private JSONObject getWorkflowJobs(HttpServletRequest request) throws XServletException {
282 JSONObject json = new JSONObject();
283 try {
284 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
285 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
286 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
287 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
288 start = (start < 1) ? 1 : start;
289 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
290 len = (len < 1) ? 50 : len;
291 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
292 getAuthToken(request));
293 WorkflowsInfo jobs = dagEngine.getJobs(filter, start, len);
294 List<WorkflowJobBean> jsonWorkflows = jobs.getWorkflows();
295 json.put(JsonTags.WORKFLOWS_JOBS, WorkflowJobBean.toJSONArray(jsonWorkflows));
296 json.put(JsonTags.WORKFLOWS_TOTAL, jobs.getTotal());
297 json.put(JsonTags.WORKFLOWS_OFFSET, jobs.getStart());
298 json.put(JsonTags.WORKFLOWS_LEN, jobs.getLen());
299
300 }
301 catch (DagEngineException ex) {
302 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
303 }
304
305 return json;
306 }
307
308 /**
309 * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the
310 * request object
311 */
312 @SuppressWarnings("unchecked")
313 private JSONObject getCoordinatorJobs(HttpServletRequest request) throws XServletException {
314 JSONObject json = new JSONObject();
315 try {
316 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
317 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
318 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
319 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
320 start = (start < 1) ? 1 : start;
321 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
322 len = (len < 1) ? 50 : len;
323 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
324 getUser(request), getAuthToken(request));
325 CoordinatorJobInfo jobs = coordEngine.getCoordJobs(filter, start, len);
326 List<CoordinatorJobBean> jsonJobs = jobs.getCoordJobs();
327 json.put(JsonTags.COORDINATOR_JOBS, CoordinatorJobBean.toJSONArray(jsonJobs));
328 json.put(JsonTags.COORD_JOB_TOTAL, jobs.getTotal());
329 json.put(JsonTags.COORD_JOB_OFFSET, jobs.getStart());
330 json.put(JsonTags.COORD_JOB_LEN, jobs.getLen());
331
332 }
333 catch (CoordinatorEngineException ex) {
334 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
335 }
336 return json;
337 }
338
339 @SuppressWarnings("unchecked")
340 private JSONObject getBundleJobs(HttpServletRequest request) throws XServletException {
341 JSONObject json = new JSONObject();
342 try {
343 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
344 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
345 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
346 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
347 start = (start < 1) ? 1 : start;
348 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
349 len = (len < 1) ? 50 : len;
350
351 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
352 getAuthToken(request));
353 BundleJobInfo jobs = bundleEngine.getBundleJobs(filter, start, len);
354 List<BundleJobBean> jsonJobs = jobs.getBundleJobs();
355
356 json.put(JsonTags.BUNDLE_JOBS, BundleJobBean.toJSONArray(jsonJobs));
357 json.put(JsonTags.BUNDLE_JOB_TOTAL, jobs.getTotal());
358 json.put(JsonTags.BUNDLE_JOB_OFFSET, jobs.getStart());
359 json.put(JsonTags.BUNDLE_JOB_LEN, jobs.getLen());
360
361 }
362 catch (BundleEngineException ex) {
363 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
364 }
365 return json;
366 }
367
368 /**
369 * service implementation to submit a http job
370 */
371 private JSONObject submitHttpJob(HttpServletRequest request, Configuration conf, String jobType)
372 throws XServletException {
373 JSONObject json = new JSONObject();
374
375 try {
376 String user = conf.get(OozieClient.USER_NAME);
377 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user, getAuthToken(request));
378 String id = dagEngine.submitHttpJob(conf, jobType);
379 json.put(JsonTags.JOB_ID, id);
380 }
381 catch (DagEngineException ex) {
382 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
383 }
384
385 return json;
386 }
387 }