001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.servlet;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Set;
026
027import javax.servlet.http.HttpServletRequest;
028import javax.servlet.http.HttpServletResponse;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.oozie.BaseEngineException;
032import org.apache.oozie.BulkResponseInfo;
033import org.apache.oozie.BundleEngine;
034import org.apache.oozie.BundleEngineException;
035import org.apache.oozie.BundleJobInfo;
036import org.apache.oozie.CoordinatorEngine;
037import org.apache.oozie.CoordinatorEngineException;
038import org.apache.oozie.CoordinatorJobInfo;
039import org.apache.oozie.DagEngine;
040import org.apache.oozie.DagEngineException;
041import org.apache.oozie.ErrorCode;
042import org.apache.oozie.OozieJsonFactory;
043import org.apache.oozie.WorkflowsInfo;
044import org.apache.oozie.cli.OozieCLI;
045import org.apache.oozie.client.OozieClient;
046import org.apache.oozie.client.rest.BulkResponseImpl;
047import org.apache.oozie.client.rest.JsonTags;
048import org.apache.oozie.client.rest.RestConstants;
049import org.apache.oozie.service.BundleEngineService;
050import org.apache.oozie.service.CoordinatorEngineService;
051import org.apache.oozie.service.DagEngineService;
052import org.apache.oozie.service.Services;
053import org.apache.oozie.util.XLog;
054import org.apache.oozie.util.XmlUtils;
055import org.json.simple.JSONArray;
056import org.json.simple.JSONObject;
057
058public class V1JobsServlet extends BaseJobsServlet {
059
060    private static final String INSTRUMENTATION_NAME = "v1jobs";
061    private static final Set<String> httpJobType = new HashSet<String>(){{
062        this.add(OozieCLI.HIVE_CMD);
063        this.add(OozieCLI.SQOOP_CMD);
064        this.add(OozieCLI.PIG_CMD);
065        this.add(OozieCLI.MR_CMD);
066    }};
067
068    public V1JobsServlet() {
069        super(INSTRUMENTATION_NAME);
070    }
071
072    /**
073     * v1 service implementation to submit a job, either workflow or coordinator
074     */
075    @Override
076    protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException,
077            IOException {
078        JSONObject json = null;
079
080        String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM);
081
082        if (jobType == null) {
083            String wfPath = conf.get(OozieClient.APP_PATH);
084            String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH);
085            String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH);
086
087            ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath);
088
089            if (wfPath != null) {
090                json = submitWorkflowJob(request, conf);
091            }
092            else if (coordPath != null) {
093                json = submitCoordinatorJob(request, conf);
094            }
095            else {
096                json = submitBundleJob(request, conf);
097            }
098        }
099        else { // This is a http submission job
100            if (httpJobType.contains(jobType)) {
101                json = submitHttpJob(request, conf, jobType);
102            }
103            else {
104                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
105                        RestConstants.JOBTYPE_PARAM, jobType);
106            }
107        }
108        return json;
109    }
110
111    /**
112     * v1 service implementation to get a JSONObject representation of a job from its external ID
113     */
114    @Override
115    protected JSONObject getJobIdForExternalId(HttpServletRequest request, String externalId) throws XServletException,
116            IOException {
117        JSONObject json = null;
118        /*
119         * Configuration conf = new XConfiguration(); String wfPath =
120         * conf.get(OozieClient.APP_PATH); String coordPath =
121         * conf.get(OozieClient.COORDINATOR_APP_PATH);
122         *
123         * ServletUtilities.ValidateAppPath(wfPath, coordPath);
124         */
125        String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM);
126        jobtype = (jobtype != null) ? jobtype : "wf";
127        if (jobtype.contains("wf")) {
128            json = getWorkflowJobIdForExternalId(request, externalId);
129        }
130        else {
131            json = getCoordinatorJobIdForExternalId(request, externalId);
132        }
133        return json;
134    }
135
136    /**
137     * v1 service implementation to get a list of workflows, coordinators, or bundles, with filtering or interested
138     * windows embedded in the request object
139     */
140    @Override
141    protected JSONObject getJobs(HttpServletRequest request) throws XServletException, IOException {
142        JSONObject json = null;
143        String isBulk = request.getParameter(RestConstants.JOBS_BULK_PARAM);
144        if(isBulk != null) {
145            json = getBulkJobs(request);
146        } else {
147            String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM);
148            jobtype = (jobtype != null) ? jobtype : "wf";
149
150            if (jobtype.contains("wf")) {
151                json = getWorkflowJobs(request);
152            }
153            else if (jobtype.contains("coord")) {
154                json = getCoordinatorJobs(request);
155            }
156            else if (jobtype.contains("bundle")) {
157                json = getBundleJobs(request);
158            }
159        }
160        return json;
161    }
162
163    /**
164     * v1 service implementation to submit a workflow job
165     */
166    @SuppressWarnings("unchecked")
167    private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException {
168
169        JSONObject json = new JSONObject();
170
171        try {
172            String action = request.getParameter(RestConstants.ACTION_PARAM);
173            if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
174                    && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
175                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
176                        RestConstants.ACTION_PARAM, action);
177            }
178            boolean startJob = (action != null);
179            String user = conf.get(OozieClient.USER_NAME);
180            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
181            String id;
182            boolean dryrun = false;
183            if (action != null) {
184                dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
185            }
186            if (dryrun) {
187                id = dagEngine.dryRunSubmit(conf);
188            }
189            else {
190                id = dagEngine.submitJob(conf, startJob);
191            }
192            json.put(JsonTags.JOB_ID, id);
193        }
194        catch (BaseEngineException ex) {
195            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
196        }
197
198        return json;
199    }
200
201    /**
202     * v1 service implementation to submit a coordinator job
203     */
204    @SuppressWarnings("unchecked")
205    private JSONObject submitCoordinatorJob(HttpServletRequest request, Configuration conf) throws XServletException {
206
207        JSONObject json = new JSONObject();
208        XLog.getLog(getClass()).warn("submitCoordinatorJob " + XmlUtils.prettyPrint(conf).toString());
209        try {
210            String action = request.getParameter(RestConstants.ACTION_PARAM);
211            if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
212                    && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
213                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
214                        RestConstants.ACTION_PARAM, action);
215            }
216            boolean startJob = (action != null);
217            String user = conf.get(OozieClient.USER_NAME);
218            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
219                    user);
220            String id = null;
221            boolean dryrun = false;
222            if (action != null) {
223                dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
224            }
225            if (dryrun) {
226                id = coordEngine.dryRunSubmit(conf);
227            }
228            else {
229                id = coordEngine.submitJob(conf, startJob);
230            }
231            json.put(JsonTags.JOB_ID, id);
232        }
233        catch (CoordinatorEngineException ex) {
234            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
235        }
236
237        return json;
238    }
239
240    /**
241     * v1 service implementation to submit a bundle job
242     */
243    @SuppressWarnings("unchecked")
244    private JSONObject submitBundleJob(HttpServletRequest request, Configuration conf) throws XServletException {
245        JSONObject json = new JSONObject();
246        XLog.getLog(getClass()).warn("submitBundleJob " + XmlUtils.prettyPrint(conf).toString());
247        try {
248            String action = request.getParameter(RestConstants.ACTION_PARAM);
249            if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
250                    && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
251                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
252                        RestConstants.ACTION_PARAM, action);
253            }
254            boolean startJob = (action != null);
255            String user = conf.get(OozieClient.USER_NAME);
256            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(user);
257            String id = null;
258            boolean dryrun = false;
259            if (action != null) {
260                dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
261            }
262            if (dryrun) {
263                id = bundleEngine.dryRunSubmit(conf);
264            }
265            else {
266                id = bundleEngine.submitJob(conf, startJob);
267            }
268            json.put(JsonTags.JOB_ID, id);
269        }
270        catch (BundleEngineException ex) {
271            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
272        }
273
274        return json;
275    }
276
277    /**
278     * v1 service implementation to get a JSONObject representation of a job from its external ID
279     */
280    @SuppressWarnings("unchecked")
281    private JSONObject getWorkflowJobIdForExternalId(HttpServletRequest request, String externalId)
282            throws XServletException {
283        JSONObject json = new JSONObject();
284        try {
285            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
286            String jobId = dagEngine.getJobIdForExternalId(externalId);
287            json.put(JsonTags.JOB_ID, jobId);
288        }
289        catch (DagEngineException ex) {
290            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
291        }
292        return json;
293    }
294
295    /**
296     * v1 service implementation to get a JSONObject representation of a job from its external ID
297     */
298    private JSONObject getCoordinatorJobIdForExternalId(HttpServletRequest request, String externalId)
299            throws XServletException {
300        JSONObject json = new JSONObject();
301        return json;
302    }
303
304    /**
305     * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the
306     * request object
307     */
308    private JSONObject getWorkflowJobs(HttpServletRequest request) throws XServletException {
309        JSONObject json;
310        try {
311            String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
312            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
313            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
314            String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
315                    ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
316            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
317            start = (start < 1) ? 1 : start;
318            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
319            len = (len < 1) ? 50 : len;
320            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
321            WorkflowsInfo jobs = dagEngine.getJobs(filter, start, len);
322            json = OozieJsonFactory.getWFJSONObject(jobs, timeZoneId);
323        }
324        catch (DagEngineException ex) {
325            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
326        }
327
328        return json;
329    }
330
331    /**
332     * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the
333     * request object
334     */
335    @SuppressWarnings("unchecked")
336    private JSONObject getCoordinatorJobs(HttpServletRequest request) throws XServletException {
337        JSONObject json;
338        try {
339            String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
340            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
341            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
342            String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
343                    ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
344            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
345            start = (start < 1) ? 1 : start;
346            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
347            len = (len < 1) ? 50 : len;
348            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
349                    getUser(request));
350            CoordinatorJobInfo jobs = coordEngine.getCoordJobs(filter, start, len);
351            json = OozieJsonFactory.getCoordJSONObject(jobs, timeZoneId);
352        }
353        catch (CoordinatorEngineException ex) {
354            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
355        }
356        return json;
357    }
358
359    @SuppressWarnings("unchecked")
360    private JSONObject getBundleJobs(HttpServletRequest request) throws XServletException {
361        JSONObject json;
362        try {
363            String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
364            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
365            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
366            String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
367                    ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
368            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
369            start = (start < 1) ? 1 : start;
370            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
371            len = (len < 1) ? 50 : len;
372
373            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
374            BundleJobInfo jobs = bundleEngine.getBundleJobs(filter, start, len);
375            json = OozieJsonFactory.getBundleJSONObject(jobs, timeZoneId);
376        }
377        catch (BundleEngineException ex) {
378            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
379        }
380        return json;
381    }
382
383    @SuppressWarnings("unchecked")
384    private JSONObject getBulkJobs(HttpServletRequest request) throws XServletException, IOException {
385        JSONObject json = new JSONObject();
386        try {
387            String bulkFilter = request.getParameter(RestConstants.JOBS_BULK_PARAM); //REST API
388            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
389            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
390            String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
391                    ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
392            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
393            start = (start < 1) ? 1 : start;
394            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
395            len = (len < 1) ? 50 : len;
396
397            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
398            BulkResponseInfo bulkResponse = bundleEngine.getBulkJobs(bulkFilter, start, len);
399            List<BulkResponseImpl> responsesToJson = bulkResponse.getResponses();
400
401            json.put(JsonTags.BULK_RESPONSES, BulkResponseImpl.toJSONArray(responsesToJson, timeZoneId));
402            json.put(JsonTags.BULK_RESPONSE_TOTAL, bulkResponse.getTotal());
403            json.put(JsonTags.BULK_RESPONSE_OFFSET, bulkResponse.getStart());
404            json.put(JsonTags.BULK_RESPONSE_LEN, bulkResponse.getLen());
405
406        }
407        catch (BaseEngineException ex) {
408            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
409        }
410        return json;
411    }
412
413    /**
414     * service implementation to submit a http job
415     */
416    private JSONObject submitHttpJob(HttpServletRequest request, Configuration conf, String jobType)
417            throws XServletException {
418        JSONObject json = new JSONObject();
419
420        try {
421            String user = conf.get(OozieClient.USER_NAME);
422            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
423            String id = dagEngine.submitHttpJob(conf, jobType);
424            json.put(JsonTags.JOB_ID, id);
425        }
426        catch (DagEngineException ex) {
427            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
428        }
429
430        return json;
431    }
432
433    /**
434     * service implementation to bulk kill jobs
435     * @param request
436     * @param response
437     * @return bulkModifyJobs implementation to bulk kill jobs
438     * @throws XServletException
439     * @throws IOException
440     */
441    @Override
442    protected JSONObject killJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException,
443            IOException {
444        return bulkModifyJobs(request, response);
445    }
446
447    /**
448     * service implementation to bulk suspend jobs
449     * @param request
450     * @param response
451     * @return bulkModifyJobs implementation to bulk suspend jobs
452     * @throws XServletException
453     * @throws IOException
454     */
455    @Override
456    protected JSONObject suspendJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException,
457            IOException {
458        return bulkModifyJobs(request, response);
459    }
460
461    /**
462     * service implementation to bulk resume jobs
463     * @param request
464     * @param response
465     * @return bulkModifyJobs implementation to bulk resume jobs
466     * @throws XServletException
467     * @throws IOException
468     */
469    @Override
470    protected JSONObject resumeJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException,
471            IOException {
472        return bulkModifyJobs(request, response);
473    }
474
475    private JSONObject bulkModifyJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException,
476            IOException {
477        String action = request.getParameter(RestConstants.ACTION_PARAM);
478        String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM);
479        String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
480        String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
481        String lenStr = request.getParameter(RestConstants.LEN_PARAM);
482        String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
483                ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
484
485        int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
486        start = (start < 1) ? 1 : start;
487        int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
488        len = (len < 1) ? 50 : len;
489
490        JSONObject json;
491        List<String> ids = new ArrayList<String>();
492
493        if (jobType.equals("wf")) {
494            WorkflowsInfo jobs;
495            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
496            try {
497                switch (action) {
498                    case RestConstants.JOB_ACTION_KILL:
499                        jobs = dagEngine.killJobs(filter, start, len);
500                        break;
501                    case RestConstants.JOB_ACTION_SUSPEND:
502                        jobs = dagEngine.suspendJobs(filter, start, len);
503                        break;
504                    case RestConstants.JOB_ACTION_RESUME:
505                        jobs = dagEngine.resumeJobs(filter, start, len);
506                        break;
507                    default:
508                        throw new DagEngineException(ErrorCode.E0301, action);
509                }
510            } catch (DagEngineException ex) {
511                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
512            }
513            json = OozieJsonFactory.getWFJSONObject(jobs, timeZoneId);
514        }
515        else if (jobType.equals("bundle")) {
516            BundleJobInfo jobs;
517            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
518            try {
519                switch (action) {
520                    case RestConstants.JOB_ACTION_KILL:
521                        jobs = bundleEngine.killJobs(filter, start, len);
522                        break;
523                    case RestConstants.JOB_ACTION_SUSPEND:
524                        jobs = bundleEngine.suspendJobs(filter, start, len);
525                        break;
526                    case RestConstants.JOB_ACTION_RESUME:
527                        jobs = bundleEngine.resumeJobs(filter, start, len);
528                        break;
529                    default:
530                        throw new BundleEngineException(ErrorCode.E0301, action);
531                }
532            } catch (BundleEngineException ex) {
533                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
534            }
535            json = OozieJsonFactory.getBundleJSONObject(jobs, timeZoneId);
536        }
537        else {
538            CoordinatorJobInfo jobs;
539            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).
540                    getCoordinatorEngine(getUser(request));
541            try {
542                switch (action) {
543                    case RestConstants.JOB_ACTION_KILL:
544                        jobs = coordEngine.killJobs(filter, start, len);
545                        break;
546                    case RestConstants.JOB_ACTION_SUSPEND:
547                        jobs = coordEngine.suspendJobs(filter, start, len);
548                        break;
549                    case RestConstants.JOB_ACTION_RESUME:
550                        jobs = coordEngine.resumeJobs(filter, start, len);
551                        break;
552                    default:
553                        throw new CoordinatorEngineException(ErrorCode.E0301, action);
554                }
555            } catch (CoordinatorEngineException ex) {
556                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
557            }
558            json = OozieJsonFactory.getCoordJSONObject(jobs, timeZoneId);
559        }
560        json.put(JsonTags.JOB_IDS, toJSONArray(ids));
561        return json;
562    }
563
564    private static JSONArray toJSONArray(List<String> ids) {
565        JSONArray array = new JSONArray();
566        for (String id : ids) {
567            array.add(id);
568        }
569        return array;
570    }
571}