001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.servlet;
019
020import java.io.IOException;
021import java.util.HashSet;
022import java.util.List;
023import java.util.Set;
024
025import javax.servlet.http.HttpServletRequest;
026import javax.servlet.http.HttpServletResponse;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.oozie.BaseEngineException;
030import org.apache.oozie.BulkResponseInfo;
031import org.apache.oozie.BundleJobBean;
032import org.apache.oozie.BundleJobInfo;
033import org.apache.oozie.CoordinatorEngine;
034import org.apache.oozie.BundleEngine;
035import org.apache.oozie.CoordinatorEngineException;
036import org.apache.oozie.BundleEngineException;
037import org.apache.oozie.CoordinatorJobBean;
038import org.apache.oozie.CoordinatorJobInfo;
039import org.apache.oozie.DagEngine;
040import org.apache.oozie.DagEngineException;
041import org.apache.oozie.ErrorCode;
042import org.apache.oozie.WorkflowJobBean;
043import org.apache.oozie.WorkflowsInfo;
044import org.apache.oozie.cli.OozieCLI;
045import org.apache.oozie.client.OozieClient;
046import org.apache.oozie.client.rest.BulkResponseImpl;
047import org.apache.oozie.client.rest.JsonTags;
048import org.apache.oozie.client.rest.RestConstants;
049import org.apache.oozie.service.CoordinatorEngineService;
050import org.apache.oozie.service.DagEngineService;
051import org.apache.oozie.service.BundleEngineService;
052import org.apache.oozie.service.Services;
053import org.apache.oozie.util.XLog;
054import org.apache.oozie.util.XmlUtils;
055import org.json.simple.JSONObject;
056
057public class V1JobsServlet extends BaseJobsServlet {
058
059    private static final String INSTRUMENTATION_NAME = "v1jobs";
060    private static final Set<String> httpJobType = new HashSet<String>(){{
061        this.add(OozieCLI.HIVE_CMD);
062        this.add(OozieCLI.SQOOP_CMD);
063        this.add(OozieCLI.PIG_CMD);
064        this.add(OozieCLI.MR_CMD);
065    }};
066
067    public V1JobsServlet() {
068        super(INSTRUMENTATION_NAME);
069    }
070
071    /**
072     * v1 service implementation to submit a job, either workflow or coordinator
073     */
074    @Override
075    protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException,
076            IOException {
077        JSONObject json = null;
078
079        String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM);
080
081        if (jobType == null) {
082            String wfPath = conf.get(OozieClient.APP_PATH);
083            String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH);
084            String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH);
085
086            ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath);
087
088            if (wfPath != null) {
089                json = submitWorkflowJob(request, conf);
090            }
091            else if (coordPath != null) {
092                json = submitCoordinatorJob(request, conf);
093            }
094            else {
095                json = submitBundleJob(request, conf);
096            }
097        }
098        else { // This is a http submission job
099            if (httpJobType.contains(jobType)) {
100                json = submitHttpJob(request, conf, jobType);
101            }
102            else {
103                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
104                        RestConstants.JOBTYPE_PARAM, jobType);
105            }
106        }
107        return json;
108    }
109
110    /**
111     * v1 service implementation to get a JSONObject representation of a job from its external ID
112     */
113    @Override
114    protected JSONObject getJobIdForExternalId(HttpServletRequest request, String externalId) throws XServletException,
115            IOException {
116        JSONObject json = null;
117        /*
118         * Configuration conf = new XConfiguration(); String wfPath =
119         * conf.get(OozieClient.APP_PATH); String coordPath =
120         * conf.get(OozieClient.COORDINATOR_APP_PATH);
121         *
122         * ServletUtilities.ValidateAppPath(wfPath, coordPath);
123         */
124        String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM);
125        jobtype = (jobtype != null) ? jobtype : "wf";
126        if (jobtype.contains("wf")) {
127            json = getWorkflowJobIdForExternalId(request, externalId);
128        }
129        else {
130            json = getCoordinatorJobIdForExternalId(request, externalId);
131        }
132        return json;
133    }
134
135    /**
136     * v1 service implementation to get a list of workflows, coordinators, or bundles, with filtering or interested
137     * windows embedded in the request object
138     */
139    @Override
140    protected JSONObject getJobs(HttpServletRequest request) throws XServletException, IOException {
141        JSONObject json = null;
142        String isBulk = request.getParameter(RestConstants.JOBS_BULK_PARAM);
143        if(isBulk != null) {
144            json = getBulkJobs(request);
145        } else {
146            String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM);
147            jobtype = (jobtype != null) ? jobtype : "wf";
148
149            if (jobtype.contains("wf")) {
150                json = getWorkflowJobs(request);
151            }
152            else if (jobtype.contains("coord")) {
153                json = getCoordinatorJobs(request);
154            }
155            else if (jobtype.contains("bundle")) {
156                json = getBundleJobs(request);
157            }
158        }
159        return json;
160    }
161
162    /**
163     * v1 service implementation to submit a workflow job
164     */
165    @SuppressWarnings("unchecked")
166    private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException {
167
168        JSONObject json = new JSONObject();
169
170        try {
171            String action = request.getParameter(RestConstants.ACTION_PARAM);
172            if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
173                    && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
174                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
175                        RestConstants.ACTION_PARAM, action);
176            }
177            boolean startJob = (action != null);
178            String user = conf.get(OozieClient.USER_NAME);
179            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
180            String id;
181            boolean dryrun = false;
182            if (action != null) {
183                dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
184            }
185            if (dryrun) {
186                id = dagEngine.dryRunSubmit(conf);
187            }
188            else {
189                id = dagEngine.submitJob(conf, startJob);
190            }
191            json.put(JsonTags.JOB_ID, id);
192        }
193        catch (BaseEngineException ex) {
194            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
195        }
196
197        return json;
198    }
199
200    /**
201     * v1 service implementation to submit a coordinator job
202     */
203    @SuppressWarnings("unchecked")
204    private JSONObject submitCoordinatorJob(HttpServletRequest request, Configuration conf) throws XServletException {
205
206        JSONObject json = new JSONObject();
207        XLog.getLog(getClass()).warn("submitCoordinatorJob " + XmlUtils.prettyPrint(conf).toString());
208        try {
209            String action = request.getParameter(RestConstants.ACTION_PARAM);
210            if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
211                    && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
212                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
213                        RestConstants.ACTION_PARAM, action);
214            }
215            boolean startJob = (action != null);
216            String user = conf.get(OozieClient.USER_NAME);
217            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
218                    user);
219            String id = null;
220            boolean dryrun = false;
221            if (action != null) {
222                dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
223            }
224            if (dryrun) {
225                id = coordEngine.dryRunSubmit(conf);
226            }
227            else {
228                id = coordEngine.submitJob(conf, startJob);
229            }
230            json.put(JsonTags.JOB_ID, id);
231        }
232        catch (CoordinatorEngineException ex) {
233            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
234        }
235
236        return json;
237    }
238
239    /**
240     * v1 service implementation to submit a bundle job
241     */
242    @SuppressWarnings("unchecked")
243    private JSONObject submitBundleJob(HttpServletRequest request, Configuration conf) throws XServletException {
244        JSONObject json = new JSONObject();
245        XLog.getLog(getClass()).warn("submitBundleJob " + XmlUtils.prettyPrint(conf).toString());
246        try {
247            String action = request.getParameter(RestConstants.ACTION_PARAM);
248            if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
249                    && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
250                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
251                        RestConstants.ACTION_PARAM, action);
252            }
253            boolean startJob = (action != null);
254            String user = conf.get(OozieClient.USER_NAME);
255            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(user);
256            String id = null;
257            boolean dryrun = false;
258            if (action != null) {
259                dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
260            }
261            if (dryrun) {
262                id = bundleEngine.dryRunSubmit(conf);
263            }
264            else {
265                id = bundleEngine.submitJob(conf, startJob);
266            }
267            json.put(JsonTags.JOB_ID, id);
268        }
269        catch (BundleEngineException ex) {
270            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
271        }
272
273        return json;
274    }
275
276    /**
277     * v1 service implementation to get a JSONObject representation of a job from its external ID
278     */
279    @SuppressWarnings("unchecked")
280    private JSONObject getWorkflowJobIdForExternalId(HttpServletRequest request, String externalId)
281            throws XServletException {
282        JSONObject json = new JSONObject();
283        try {
284            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
285            String jobId = dagEngine.getJobIdForExternalId(externalId);
286            json.put(JsonTags.JOB_ID, jobId);
287        }
288        catch (DagEngineException ex) {
289            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
290        }
291        return json;
292    }
293
294    /**
295     * v1 service implementation to get a JSONObject representation of a job from its external ID
296     */
297    private JSONObject getCoordinatorJobIdForExternalId(HttpServletRequest request, String externalId)
298            throws XServletException {
299        JSONObject json = new JSONObject();
300        return json;
301    }
302
303    /**
304     * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the
305     * request object
306     */
307    private JSONObject getWorkflowJobs(HttpServletRequest request) throws XServletException {
308        JSONObject json = new JSONObject();
309        try {
310            String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
311            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
312            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
313            String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 
314                    ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
315            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
316            start = (start < 1) ? 1 : start;
317            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
318            len = (len < 1) ? 50 : len;
319            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
320            WorkflowsInfo jobs = dagEngine.getJobs(filter, start, len);
321            List<WorkflowJobBean> jsonWorkflows = jobs.getWorkflows();
322            json.put(JsonTags.WORKFLOWS_JOBS, WorkflowJobBean.toJSONArray(jsonWorkflows, timeZoneId));
323            json.put(JsonTags.WORKFLOWS_TOTAL, jobs.getTotal());
324            json.put(JsonTags.WORKFLOWS_OFFSET, jobs.getStart());
325            json.put(JsonTags.WORKFLOWS_LEN, jobs.getLen());
326
327        }
328        catch (DagEngineException ex) {
329            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
330        }
331
332        return json;
333    }
334
335    /**
336     * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the
337     * request object
338     */
339    @SuppressWarnings("unchecked")
340    private JSONObject getCoordinatorJobs(HttpServletRequest request) throws XServletException {
341        JSONObject json = new JSONObject();
342        try {
343            String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
344            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
345            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
346            String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 
347                    ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
348            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
349            start = (start < 1) ? 1 : start;
350            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
351            len = (len < 1) ? 50 : len;
352            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
353                    getUser(request));
354            CoordinatorJobInfo jobs = coordEngine.getCoordJobs(filter, start, len);
355            List<CoordinatorJobBean> jsonJobs = jobs.getCoordJobs();
356            json.put(JsonTags.COORDINATOR_JOBS, CoordinatorJobBean.toJSONArray(jsonJobs, timeZoneId));
357            json.put(JsonTags.COORD_JOB_TOTAL, jobs.getTotal());
358            json.put(JsonTags.COORD_JOB_OFFSET, jobs.getStart());
359            json.put(JsonTags.COORD_JOB_LEN, jobs.getLen());
360
361        }
362        catch (CoordinatorEngineException ex) {
363            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
364        }
365        return json;
366    }
367
368    @SuppressWarnings("unchecked")
369    private JSONObject getBundleJobs(HttpServletRequest request) throws XServletException {
370        JSONObject json = new JSONObject();
371        try {
372            String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
373            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
374            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
375            String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 
376                    ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
377            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
378            start = (start < 1) ? 1 : start;
379            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
380            len = (len < 1) ? 50 : len;
381
382            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
383            BundleJobInfo jobs = bundleEngine.getBundleJobs(filter, start, len);
384            List<BundleJobBean> jsonJobs = jobs.getBundleJobs();
385
386            json.put(JsonTags.BUNDLE_JOBS, BundleJobBean.toJSONArray(jsonJobs, timeZoneId));
387            json.put(JsonTags.BUNDLE_JOB_TOTAL, jobs.getTotal());
388            json.put(JsonTags.BUNDLE_JOB_OFFSET, jobs.getStart());
389            json.put(JsonTags.BUNDLE_JOB_LEN, jobs.getLen());
390
391        }
392        catch (BundleEngineException ex) {
393            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
394        }
395        return json;
396    }
397
398    @SuppressWarnings("unchecked")
399    private JSONObject getBulkJobs(HttpServletRequest request) throws XServletException, IOException {
400        JSONObject json = new JSONObject();
401        try {
402            String bulkFilter = request.getParameter(RestConstants.JOBS_BULK_PARAM); //REST API
403            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
404            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
405            String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
406                    ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
407            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
408            start = (start < 1) ? 1 : start;
409            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
410            len = (len < 1) ? 50 : len;
411
412            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
413            BulkResponseInfo bulkResponse = bundleEngine.getBulkJobs(bulkFilter, start, len);
414            List<BulkResponseImpl> responsesToJson = bulkResponse.getResponses();
415
416            json.put(JsonTags.BULK_RESPONSES, BulkResponseImpl.toJSONArray(responsesToJson, timeZoneId));
417            json.put(JsonTags.BULK_RESPONSE_TOTAL, bulkResponse.getTotal());
418            json.put(JsonTags.BULK_RESPONSE_OFFSET, bulkResponse.getStart());
419            json.put(JsonTags.BULK_RESPONSE_LEN, bulkResponse.getLen());
420
421        }
422        catch (BaseEngineException ex) {
423            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
424        }
425        return json;
426    }
427
428    /**
429     * service implementation to submit a http job
430     */
431    private JSONObject submitHttpJob(HttpServletRequest request, Configuration conf, String jobType)
432            throws XServletException {
433        JSONObject json = new JSONObject();
434
435        try {
436            String user = conf.get(OozieClient.USER_NAME);
437            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
438            String id = dagEngine.submitHttpJob(conf, jobType);
439            json.put(JsonTags.JOB_ID, id);
440        }
441        catch (DagEngineException ex) {
442            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
443        }
444
445        return json;
446    }
447}