001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.servlet;
019
020import java.io.IOException;
021import java.util.List;
022
023import javax.servlet.ServletInputStream;
024import javax.servlet.http.HttpServletRequest;
025import javax.servlet.http.HttpServletResponse;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.oozie.*;
029import org.apache.oozie.client.WorkflowAction;
030import org.apache.oozie.client.WorkflowJob;
031import org.apache.oozie.client.rest.*;
032import org.apache.oozie.command.CommandException;
033import org.apache.oozie.coord.CoordUtils;
034import org.apache.oozie.service.BundleEngineService;
035import org.apache.oozie.service.CoordinatorEngineService;
036import org.apache.oozie.service.DagEngineService;
037import org.apache.oozie.service.Services;
038import org.apache.oozie.service.UUIDService;
039import org.apache.oozie.util.GraphGenerator;
040import org.apache.oozie.util.XLog;
041import org.json.simple.JSONArray;
042import org.json.simple.JSONObject;
043
044
045@SuppressWarnings("serial")
046public class V1JobServlet extends BaseJobServlet {
047
048    private static final String INSTRUMENTATION_NAME = "v1job";
049    public static final String COORD_ACTIONS_DEFAULT_LENGTH = "oozie.coord.actions.default.length";
050
051    public V1JobServlet() {
052        super(INSTRUMENTATION_NAME);
053    }
054
055    protected V1JobServlet(String instrumentation_name){
056        super(instrumentation_name);
057    }
058
059    /*
060     * protected method to start a job
061     */
062    @Override
063    protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
064            IOException {
065        /*
066         * Configuration conf = new XConfiguration(request.getInputStream());
067         * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
068         * conf.get(OozieClient.COORDINATOR_APP_PATH);
069         *
070         * ServletUtilities.ValidateAppPath(wfPath, coordPath);
071         */
072        String jobId = getResourceName(request);
073        if (jobId.endsWith("-W")) {
074            startWorkflowJob(request, response);
075        }
076        else if (jobId.endsWith("-B")) {
077            startBundleJob(request, response);
078        }
079        else {
080            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, RestConstants.ACTION_PARAM, RestConstants.JOB_ACTION_START);
081        }
082
083    }
084
085    /*
086     * protected method to resume a job
087     */
088    @Override
089    protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
090            IOException {
091        /*
092         * Configuration conf = new XConfiguration(request.getInputStream());
093         * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
094         * conf.get(OozieClient.COORDINATOR_APP_PATH);
095         *
096         * ServletUtilities.ValidateAppPath(wfPath, coordPath);
097         */
098        String jobId = getResourceName(request);
099        if (jobId.endsWith("-W")) {
100            resumeWorkflowJob(request, response);
101        }
102        else if (jobId.endsWith("-B")) {
103            resumeBundleJob(request, response);
104        }
105        else {
106            resumeCoordinatorJob(request, response);
107        }
108    }
109
110    /*
111     * protected method to suspend a job
112     */
113    @Override
114    protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
115            IOException {
116        /*
117         * Configuration conf = new XConfiguration(request.getInputStream());
118         * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
119         * conf.get(OozieClient.COORDINATOR_APP_PATH);
120         *
121         * ServletUtilities.ValidateAppPath(wfPath, coordPath);
122         */
123        String jobId = getResourceName(request);
124        if (jobId.endsWith("-W")) {
125            suspendWorkflowJob(request, response);
126        }
127        else if (jobId.endsWith("-B")) {
128            suspendBundleJob(request, response);
129        }
130        else {
131            suspendCoordinatorJob(request, response);
132        }
133    }
134
135    /*
136     * protected method to kill a job
137     */
138    @Override
139    protected JSONObject killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
140            IOException {
141        /*
142         * Configuration conf = new XConfiguration(request.getInputStream());
143         * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
144         * conf.get(OozieClient.COORDINATOR_APP_PATH);
145         *
146         * ServletUtilities.ValidateAppPath(wfPath, coordPath);
147         */
148        String jobId = getResourceName(request);
149        JSONObject json = null;
150        if (jobId.endsWith("-W")) {
151            killWorkflowJob(request, response);
152        }
153        else if (jobId.endsWith("-B")) {
154            killBundleJob(request, response);
155        }
156        else {
157            json = killCoordinator(request, response);
158        }
159        return json;
160    }
161
162    /**
163     * protected method to change a coordinator job
164     * @param request request object
165     * @param response response object
166     * @throws XServletException
167     * @throws IOException
168     */
169    @Override
170    protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
171            IOException {
172        String jobId = getResourceName(request);
173        if (jobId.endsWith("-B")) {
174            changeBundleJob(request, response);
175        }
176        else {
177            changeCoordinatorJob(request, response);
178        }
179    }
180    @Override
181    protected JSONObject ignoreJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException {
182        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
183    }
184
185    /*
186     * protected method to reRun a job
187     *
188     * @seeorg.apache.oozie.servlet.BaseJobServlet#reRunJob(javax.servlet.http.
189     * HttpServletRequest, javax.servlet.http.HttpServletResponse,
190     * org.apache.hadoop.conf.Configuration)
191     */
192    @Override
193    protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
194            throws XServletException, IOException {
195        JSONObject json = null;
196        String jobId = getResourceName(request);
197        if (jobId.endsWith("-W")) {
198            reRunWorkflowJob(request, response, conf);
199        }
200        else if (jobId.endsWith("-B")) {
201            rerunBundleJob(request, response, conf);
202        }
203        else {
204            json = reRunCoordinatorActions(request, response, conf);
205        }
206        return json;
207    }
208
209    /*
210     * protected method to get a job in JsonBean representation
211     */
212    @Override
213    protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
214            IOException, BaseEngineException {
215        ServletInputStream is = request.getInputStream();
216        byte[] b = new byte[101];
217        while (is.readLine(b, 0, 100) != -1) {
218            XLog.getLog(getClass()).warn("Printing :" + new String(b));
219        }
220
221        JsonBean jobBean = null;
222        String jobId = getResourceName(request);
223        if (jobId.endsWith("-B")) {
224            jobBean = getBundleJob(request, response);
225        }
226        else {
227            if (jobId.endsWith("-W")) {
228                jobBean = getWorkflowJob(request, response);
229            }
230            else {
231                if (jobId.contains("-W@")) {
232                    jobBean = getWorkflowAction(request, response);
233                }
234                else {
235                    if (jobId.contains("-C@")) {
236                        jobBean = getCoordinatorAction(request, response);
237                    }
238                    else {
239                        jobBean = getCoordinatorJob(request, response);
240                    }
241                }
242            }
243        }
244
245        return jobBean;
246    }
247
248    /*
249     * protected method to get a job definition in String format
250     */
251    @Override
252    protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
253            throws XServletException, IOException {
254        String jobDefinition = null;
255        String jobId = getResourceName(request);
256        if (jobId.endsWith("-W")) {
257            jobDefinition = getWorkflowJobDefinition(request, response);
258        }
259        else if (jobId.endsWith("-B")) {
260            jobDefinition = getBundleJobDefinition(request, response);
261        }
262        else {
263            jobDefinition = getCoordinatorJobDefinition(request, response);
264        }
265        return jobDefinition;
266    }
267
268    /*
269     * protected method to stream a job log into response object
270     */
271    @Override
272    protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
273            IOException {
274        try {
275            String jobId = getResourceName(request);
276            if (jobId.endsWith("-W")) {
277                streamWorkflowJobLog(request, response);
278            }
279            else if (jobId.endsWith("-B")) {
280                streamBundleJobLog(request, response);
281            }
282            else {
283                streamCoordinatorJobLog(request, response);
284            }
285        }
286        catch (Exception e) {
287            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0307, e.getMessage());
288        }
289    }
290
291    @Override
292    protected void streamJobGraph(HttpServletRequest request, HttpServletResponse response)
293            throws XServletException, IOException {
294        String jobId = getResourceName(request);
295        if (jobId.endsWith("-W")) {
296            try {
297                // Applicable only to worflow, for now
298                response.setContentType(RestConstants.PNG_IMAGE_CONTENT_TYPE);
299
300                String showKill = request.getParameter(RestConstants.JOB_SHOW_KILL_PARAM);
301                boolean sK = showKill != null && (showKill.equalsIgnoreCase("yes") || showKill.equals("1") || showKill.equalsIgnoreCase("true"));
302
303                new GraphGenerator(
304                        getWorkflowJobDefinition(request, response),
305                        (WorkflowJobBean)getWorkflowJob(request, response),
306                        sK).write(response.getOutputStream());
307
308            }
309            catch (Exception e) {
310                throw new XServletException(HttpServletResponse.SC_NOT_FOUND, ErrorCode.E0307, e.getMessage(), e);
311            }
312        }
313        else {
314            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306);
315        }
316    }
317
318    /**
319     * Start wf job
320     *
321     * @param request servlet request
322     * @param response servlet response
323     * @throws XServletException
324     */
325    private void startWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
326        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
327
328        String jobId = getResourceName(request);
329        try {
330            dagEngine.start(jobId);
331        }
332        catch (DagEngineException ex) {
333            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
334        }
335    }
336
337    /**
338     * Start bundle job
339     *
340     * @param request servlet request
341     * @param response servlet response
342     * @throws XServletException
343     */
344    private void startBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
345        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
346        String jobId = getResourceName(request);
347        try {
348            bundleEngine.start(jobId);
349        }
350        catch (BundleEngineException ex) {
351            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
352        }
353    }
354
355    /**
356     * Resume workflow job
357     *
358     * @param request servlet request
359     * @param response servlet response
360     * @throws XServletException
361     */
362    private void resumeWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
363        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
364
365        String jobId = getResourceName(request);
366        try {
367            dagEngine.resume(jobId);
368        }
369        catch (DagEngineException ex) {
370            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
371        }
372    }
373
374    /**
375     * Resume bundle job
376     *
377     * @param request servlet request
378     * @param response servlet response
379     * @throws XServletException
380     */
381    private void resumeBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
382        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
383        String jobId = getResourceName(request);
384        try {
385            bundleEngine.resume(jobId);
386        }
387        catch (BundleEngineException ex) {
388            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
389        }
390    }
391
392    /**
393     * Resume coordinator job
394     *
395     * @param request servlet request
396     * @param response servlet response
397     * @throws XServletException
398     * @throws CoordinatorEngineException
399     */
400    private void resumeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
401            throws XServletException {
402        String jobId = getResourceName(request);
403        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
404                getUser(request));
405        try {
406            coordEngine.resume(jobId);
407        }
408        catch (CoordinatorEngineException ex) {
409            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
410        }
411    }
412
413    /**
414     * Suspend a wf job
415     *
416     * @param request servlet request
417     * @param response servlet response
418     * @throws XServletException
419     */
420    private void suspendWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
421        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
422
423        String jobId = getResourceName(request);
424        try {
425            dagEngine.suspend(jobId);
426        }
427        catch (DagEngineException ex) {
428            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
429        }
430    }
431
432    /**
433     * Suspend bundle job
434     *
435     * @param request servlet request
436     * @param response servlet response
437     * @throws XServletException
438     */
439    private void suspendBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
440        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
441        String jobId = getResourceName(request);
442        try {
443            bundleEngine.suspend(jobId);
444        }
445        catch (BundleEngineException ex) {
446            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
447        }
448    }
449
450    /**
451     * Suspend coordinator job
452     *
453     * @param request servlet request
454     * @param response servlet response
455     * @throws XServletException
456     */
457    private void suspendCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
458            throws XServletException {
459        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
460                getUser(request));
461        String jobId = getResourceName(request);
462        try {
463            coordEngine.suspend(jobId);
464        }
465        catch (CoordinatorEngineException ex) {
466            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
467        }
468    }
469
470    /**
471     * Kill a wf job
472     * @param request servlet request
473     * @param response servlet response
474     * @throws XServletException
475     */
476    private void killWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
477        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
478
479        String jobId = getResourceName(request);
480        try {
481            dagEngine.kill(jobId);
482        }
483        catch (DagEngineException ex) {
484            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
485        }
486    }
487
488    /**
489     * Kill a coord job
490     *
491     * @param request servlet request
492     * @param response servlet response
493     * @throws XServletException
494     */
495    @SuppressWarnings("unchecked")
496    private JSONObject killCoordinator(HttpServletRequest request, HttpServletResponse response) throws XServletException {
497        String jobId = getResourceName(request);
498        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class)
499                .getCoordinatorEngine(getUser(request));
500        JSONObject json = null;
501        String rangeType = request.getParameter(RestConstants.JOB_COORD_RANGE_TYPE_PARAM);
502        String scope = request.getParameter(RestConstants.JOB_COORD_SCOPE_PARAM);
503
504        try {
505            if (rangeType != null && scope != null) {
506                XLog.getLog(getClass()).info(
507                        "Kill coordinator actions for jobId=" + jobId + ", rangeType=" + rangeType + ",scope=" + scope);
508
509                json = new JSONObject();
510                CoordinatorActionInfo coordInfo = coordEngine.killActions(jobId, rangeType, scope);
511                List<CoordinatorActionBean> coordActions;
512                if (coordInfo != null) {
513                    coordActions = coordInfo.getCoordActions();
514                }
515                else {
516                    coordActions = CoordUtils.getCoordActions(rangeType, jobId, scope, true);
517                }
518                json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT"));
519            }
520            else {
521                coordEngine.kill(jobId);
522            }
523        }
524        catch (CoordinatorEngineException ex) {
525            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
526        }
527        catch (CommandException ex) {
528            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
529        }
530        return json;
531    }
532
533    /**
534     * Kill bundle job
535     *
536     * @param request servlet request
537     * @param response servlet response
538     * @throws XServletException
539     */
540    private void killBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
541        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
542        String jobId = getResourceName(request);
543        try {
544            bundleEngine.kill(jobId);
545        }
546        catch (BundleEngineException ex) {
547            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
548        }
549    }
550
551    /**
552     * Change a coordinator job
553     *
554     * @param request servlet request
555     * @param response servlet response
556     * @throws XServletException
557     */
558    private void changeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
559            throws XServletException {
560        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
561                getUser(request));
562        String jobId = getResourceName(request);
563        String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
564        try {
565            coordEngine.change(jobId, changeValue);
566        }
567        catch (CoordinatorEngineException ex) {
568            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
569        }
570    }
571
572    /**
573     * Change a bundle job
574     *
575     * @param request servlet request
576     * @param response servlet response
577     * @throws XServletException
578     */
579    private void changeBundleJob(HttpServletRequest request, HttpServletResponse response)
580            throws XServletException {
581        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
582        String jobId = getResourceName(request);
583        String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
584        try {
585            bundleEngine.change(jobId, changeValue);
586        }
587        catch (BundleEngineException ex) {
588            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
589        }
590    }
591
592    /**
593     * Rerun a wf job
594     *
595     * @param request servlet request
596     * @param response servlet response
597     * @param conf configuration object
598     * @throws XServletException
599     */
600    private void reRunWorkflowJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
601            throws XServletException {
602        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
603
604        String jobId = getResourceName(request);
605        try {
606            dagEngine.reRun(jobId, conf);
607        }
608        catch (DagEngineException ex) {
609            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
610        }
611    }
612
613    /**
614     * Rerun bundle job
615     *
616     * @param request servlet request
617     * @param response servlet response
618     * @param conf configration object
619     * @throws XServletException
620     */
621    private void rerunBundleJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
622            throws XServletException {
623        JSONObject json = new JSONObject();
624        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
625        String jobId = getResourceName(request);
626
627        String coordScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM);
628        String dateScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM);
629        String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
630        String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
631
632        XLog.getLog(getClass()).info(
633                "Rerun Bundle for jobId=" + jobId + ", coordScope=" + coordScope + ", dateScope=" + dateScope + ", refresh="
634                        + refresh + ", noCleanup=" + noCleanup);
635
636        try {
637            bundleEngine.reRun(jobId, coordScope, dateScope, Boolean.valueOf(refresh), Boolean.valueOf(noCleanup));
638        }
639        catch (BaseEngineException ex) {
640            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
641        }
642    }
643
644    /**
645     * Rerun coordinator actions
646     *
647     * @param request servlet request
648     * @param response servlet response
649     * @param conf configuration object
650     * @throws XServletException
651     */
652    @SuppressWarnings("unchecked")
653    private JSONObject reRunCoordinatorActions(HttpServletRequest request, HttpServletResponse response,
654            Configuration conf) throws XServletException {
655        JSONObject json = new JSONObject();
656        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request));
657
658        String jobId = getResourceName(request);
659
660        String rerunType = request.getParameter(RestConstants.JOB_COORD_RANGE_TYPE_PARAM);
661        String scope = request.getParameter(RestConstants.JOB_COORD_SCOPE_PARAM);
662        String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
663        String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
664
665        XLog.getLog(getClass()).info(
666                "Rerun coordinator for jobId=" + jobId + ", rerunType=" + rerunType + ",scope=" + scope + ",refresh="
667                        + refresh + ", noCleanup=" + noCleanup);
668
669        try {
670            if (!(rerunType.equals(RestConstants.JOB_COORD_SCOPE_DATE) || rerunType
671                    .equals(RestConstants.JOB_COORD_SCOPE_ACTION))) {
672                throw new CommandException(ErrorCode.E1018, "date or action expected.");
673            }
674            CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh),
675                    Boolean.valueOf(noCleanup));
676            List<CoordinatorActionBean> coordActions;
677            if (coordInfo != null) {
678                coordActions = coordInfo.getCoordActions();
679            }
680            else {
681                coordActions = CoordUtils.getCoordActions(rerunType, jobId, scope, false);
682            }
683            json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT"));
684        }
685        catch (BaseEngineException ex) {
686            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
687        }
688        catch (CommandException ex) {
689            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
690        }
691
692        return json;
693    }
694
695
696
697    /**
698     * Get workflow job
699     *
700     * @param request servlet request
701     * @param response servlet response
702     * @return JsonBean WorkflowJobBean
703     * @throws XServletException
704     */
705    protected JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
706        JsonBean jobBean = getWorkflowJobBean(request, response);
707        // for backward compatibility (OOZIE-1231)
708        swapMRActionID((WorkflowJob)jobBean);
709        return jobBean;
710    }
711
712    /**
713     * Get workflow job
714     *
715     * @param request servlet request
716     * @param response servlet response
717     * @return JsonBean WorkflowJobBean
718     * @throws XServletException
719     */
720    protected JsonBean getWorkflowJobBean(HttpServletRequest request, HttpServletResponse response) throws XServletException {
721        JsonBean jobBean = null;
722        String jobId = getResourceName(request);
723        String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
724        String lenStr = request.getParameter(RestConstants.LEN_PARAM);
725        int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
726        start = (start < 1) ? 1 : start;
727        int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
728        len = (len < 1) ? Integer.MAX_VALUE : len;
729        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
730        try {
731            jobBean = (JsonBean) dagEngine.getJob(jobId, start, len);
732        }
733        catch (DagEngineException ex) {
734            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
735        }
736        return jobBean;
737    }
738
739    private void swapMRActionID(WorkflowJob wjBean) {
740        List<WorkflowAction> actions = wjBean.getActions();
741        if (actions != null) {
742            for (WorkflowAction wa : actions) {
743                swapMRActionID(wa);
744            }
745        }
746    }
747
748    private void swapMRActionID(WorkflowAction waBean) {
749        if (waBean.getType().equals("map-reduce")) {
750            String childId = waBean.getExternalChildIDs();
751            if (childId != null && !childId.equals("")) {
752                String consoleBase = getConsoleBase(waBean.getConsoleUrl());
753                ((WorkflowActionBean) waBean).setConsoleUrl(consoleBase + childId);
754                ((WorkflowActionBean) waBean).setExternalId(childId);
755                ((WorkflowActionBean) waBean).setExternalChildIDs("");
756            }
757        }
758    }
759
760    private String getConsoleBase(String url) {
761        String consoleBase = null;
762        if (url.indexOf("application") != -1) {
763            consoleBase = url.split("application_[0-9]+_[0-9]+")[0];
764        }
765        else {
766            consoleBase = url.split("job_[0-9]+_[0-9]+")[0];
767        }
768        return consoleBase;
769    }
770
771    /**
772     * Get wf action info
773     *
774     * @param request servlet request
775     * @param response servlet response
776     * @return JsonBean WorkflowActionBean
777     * @throws XServletException
778     */
779    protected JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response)
780            throws XServletException {
781
782        JsonBean actionBean = getWorkflowActionBean(request, response);
783        // for backward compatibility (OOZIE-1231)
784        swapMRActionID((WorkflowAction)actionBean);
785        return actionBean;
786    }
787
788    protected JsonBean getWorkflowActionBean(HttpServletRequest request, HttpServletResponse response)
789            throws XServletException {
790        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
791
792        JsonBean actionBean = null;
793        String actionId = getResourceName(request);
794        try {
795            actionBean = dagEngine.getWorkflowAction(actionId);
796        }
797        catch (BaseEngineException ex) {
798            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
799        }
800        return actionBean;
801    }
802
803    /**
804     * Get coord job info
805     *
806     * @param request servlet request
807     * @param response servlet response
808     * @return JsonBean CoordinatorJobBean
809     * @throws XServletException
810     * @throws BaseEngineException
811     */
812    protected JsonBean getCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
813            throws XServletException, BaseEngineException {
814        JsonBean jobBean = null;
815        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
816                getUser(request));
817        String jobId = getResourceName(request);
818        String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
819        String lenStr = request.getParameter(RestConstants.LEN_PARAM);
820        String filter = request.getParameter(RestConstants.JOB_FILTER_PARAM);
821        String orderStr = request.getParameter(RestConstants.ORDER_PARAM);
822        boolean order = (orderStr != null && orderStr.equals("desc")) ? true : false;
823        int offset = (startStr != null) ? Integer.parseInt(startStr) : 1;
824        offset = (offset < 1) ? 1 : offset;
825        // Get default number of coordinator actions to be retrieved
826        int defaultLen = Services.get().getConf().getInt(COORD_ACTIONS_DEFAULT_LENGTH, 1000);
827        int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
828        len = getCoordinatorJobLength(defaultLen, len);
829        try {
830            CoordinatorJobBean coordJob = coordEngine.getCoordJob(jobId, filter, offset, len, order);
831            jobBean = coordJob;
832        }
833        catch (CoordinatorEngineException ex) {
834            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
835        }
836
837        return jobBean;
838    }
839
840    /**
841     * Given the requested length and the default length, determine how many coordinator jobs to return.
842     * Used by {@link #getCoordinatorJob(javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)}
843     *
844     * @param defaultLen The default length
845     * @param len The requested length
846     * @return The length to use
847     */
848    protected int getCoordinatorJobLength(int defaultLen, int len) {
849        return (len < 1) ? defaultLen : len;
850    }
851
852    /**
853     * Get bundle job info
854     *
855     * @param request servlet request
856     * @param response servlet response
857     * @return JsonBean bundle job bean
858     * @throws XServletException
859     * @throws BaseEngineException
860     */
861    private JsonBean getBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
862            BaseEngineException {
863        JsonBean jobBean = null;
864        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
865        String jobId = getResourceName(request);
866
867        try {
868            jobBean = (JsonBean) bundleEngine.getBundleJob(jobId);
869
870            return jobBean;
871        }
872        catch (BundleEngineException ex) {
873            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
874        }
875    }
876
877    /**
878     * Get coordinator action
879     *
880     * @param request servlet request
881     * @param response servlet response
882     * @return JsonBean CoordinatorActionBean
883     * @throws XServletException
884     * @throws BaseEngineException
885     */
886    private JsonBean getCoordinatorAction(HttpServletRequest request, HttpServletResponse response)
887            throws XServletException, BaseEngineException {
888        JsonBean actionBean = null;
889        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
890                getUser(request));
891        String actionId = getResourceName(request);
892        try {
893            actionBean = coordEngine.getCoordAction(actionId);
894        }
895        catch (CoordinatorEngineException ex) {
896            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
897        }
898
899        return actionBean;
900    }
901
902    /**
903     * Get wf job definition
904     *
905     * @param request servlet request
906     * @param response servlet response
907     * @return String wf definition
908     * @throws XServletException
909     */
910    private String getWorkflowJobDefinition(HttpServletRequest request, HttpServletResponse response)
911            throws XServletException {
912        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
913
914        String wfDefinition;
915        String jobId = getResourceName(request);
916        try {
917            wfDefinition = dagEngine.getDefinition(jobId);
918        }
919        catch (DagEngineException ex) {
920            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
921        }
922        return wfDefinition;
923    }
924
925    /**
926     * Get bundle job definition
927     *
928     * @param request servlet request
929     * @param response servlet response
930     * @return String bundle definition
931     * @throws XServletException
932     */
933    private String getBundleJobDefinition(HttpServletRequest request, HttpServletResponse response) throws XServletException {
934        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
935        String bundleDefinition;
936        String jobId = getResourceName(request);
937        try {
938            bundleDefinition = bundleEngine.getDefinition(jobId);
939        }
940        catch (BundleEngineException ex) {
941            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
942        }
943        return bundleDefinition;
944    }
945
946    /**
947     * Get coordinator job definition
948     *
949     * @param request servlet request
950     * @param response servlet response
951     * @return String coord definition
952     * @throws XServletException
953     */
954    private String getCoordinatorJobDefinition(HttpServletRequest request, HttpServletResponse response)
955            throws XServletException {
956
957        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
958                getUser(request));
959
960        String jobId = getResourceName(request);
961
962        String coordDefinition = null;
963        try {
964            coordDefinition = coordEngine.getDefinition(jobId);
965        }
966        catch (BaseEngineException ex) {
967            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
968        }
969        return coordDefinition;
970    }
971
972    /**
973     * Stream wf job log
974     *
975     * @param request servlet request
976     * @param response servlet response
977     * @throws XServletException
978     * @throws IOException
979     */
980    private void streamWorkflowJobLog(HttpServletRequest request, HttpServletResponse response)
981            throws XServletException, IOException {
982        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
983        String jobId = getResourceName(request);
984        try {
985            dagEngine.streamLog(jobId, response.getWriter(), request.getParameterMap());
986        }
987        catch (DagEngineException ex) {
988            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
989        }
990    }
991
992    /**
993     * Stream bundle job log
994     *
995     * @param request servlet request
996     * @param response servlet response
997     * @throws XServletException
998     */
999    private void streamBundleJobLog(HttpServletRequest request, HttpServletResponse response)
1000            throws XServletException, IOException {
1001        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
1002        String jobId = getResourceName(request);
1003        try {
1004            bundleEngine.streamLog(jobId, response.getWriter(), request.getParameterMap());
1005        }
1006        catch (BundleEngineException ex) {
1007            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
1008        }
1009    }
1010
1011    /**
1012     * Stream coordinator job log
1013     *
1014     * @param request servlet request
1015     * @param response servlet response
1016     * @throws XServletException
1017     * @throws IOException
1018     */
1019    private void streamCoordinatorJobLog(HttpServletRequest request, HttpServletResponse response)
1020            throws XServletException, IOException {
1021
1022        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
1023                getUser(request));
1024        String jobId = getResourceName(request);
1025        String logRetrievalScope = request.getParameter(RestConstants.JOB_LOG_SCOPE_PARAM);
1026        String logRetrievalType = request.getParameter(RestConstants.JOB_LOG_TYPE_PARAM);
1027        try {
1028            coordEngine.streamLog(jobId, logRetrievalScope, logRetrievalType, response.getWriter(), request.getParameterMap());
1029        }
1030        catch (BaseEngineException ex) {
1031            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
1032        }
1033        catch (CommandException ex) {
1034            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
1035        }
1036    }
1037
1038    @Override
1039    protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException,
1040            IOException {
1041        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
1042    }
1043
1044    @Override
1045    protected JSONObject getJobsByParentId(HttpServletRequest request, HttpServletResponse response)
1046            throws XServletException, IOException {
1047        JSONObject json = new JSONObject();
1048        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class)
1049                .getCoordinatorEngine(getUser(request));
1050        String coordActionId;
1051        String type = request.getParameter(RestConstants.JOB_COORD_RANGE_TYPE_PARAM);
1052        String scope = request.getParameter(RestConstants.JOB_COORD_SCOPE_PARAM);
1053        // for getting allruns for coordinator action - 2 alternate endpoints
1054        if (type != null && type.equals(RestConstants.JOB_COORD_SCOPE_ACTION) && scope != null) {
1055            // endpoint - oozie/v2/coord-job-id?type=action&scope=action-num&show=allruns
1056            String jobId = getResourceName(request);
1057            coordActionId = Services.get().get(UUIDService.class).generateChildId(jobId, scope);
1058        }
1059        else {
1060            // endpoint - oozie/v2/coord-action-id?show=allruns
1061            coordActionId = getResourceName(request);
1062        }
1063        try {
1064            List<WorkflowJobBean> wfs = coordEngine.getReruns(coordActionId);
1065            JSONArray array = new JSONArray();
1066            if (wfs != null) {
1067                for (WorkflowJobBean wf : wfs) {
1068                    JSONObject json1 = new JSONObject();
1069                    json1.put(JsonTags.WORKFLOW_ID, wf.getId());
1070                    json1.put(JsonTags.WORKFLOW_STATUS, wf.getStatus().toString());
1071                    json1.put(JsonTags.WORKFLOW_START_TIME, JsonUtils.formatDateRfc822(wf.getStartTime(), "GMT"));
1072                    json1.put(JsonTags.WORKFLOW_ACTION_END_TIME, JsonUtils.formatDateRfc822(wf.getEndTime(), "GMT"));
1073                    array.add(json1);
1074                }
1075            }
1076            json.put(JsonTags.WORKFLOWS_JOBS, array);
1077            return json;
1078        }
1079        catch (CoordinatorEngineException ex) {
1080            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
1081        }
1082    }
1083    /**
1084     * not supported for v1
1085     */
1086    @Override
1087    protected JSONObject updateJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
1088            throws XServletException, IOException {
1089        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
1090    }
1091}