001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.servlet;
020
021import java.io.IOException;
022import java.util.List;
023
024import javax.servlet.ServletInputStream;
025import javax.servlet.http.HttpServletRequest;
026import javax.servlet.http.HttpServletResponse;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.oozie.*;
030import org.apache.oozie.client.WorkflowAction;
031import org.apache.oozie.client.WorkflowJob;
032import org.apache.oozie.client.rest.*;
033import org.apache.oozie.command.CommandException;
034import org.apache.oozie.coord.CoordUtils;
035import org.apache.oozie.service.BundleEngineService;
036import org.apache.oozie.service.ConfigurationService;
037import org.apache.oozie.service.CoordinatorEngineService;
038import org.apache.oozie.service.DagEngineService;
039import org.apache.oozie.service.Services;
040import org.apache.oozie.service.UUIDService;
041import org.apache.oozie.util.GraphGenerator;
042import org.apache.oozie.util.XLog;
043import org.json.simple.JSONArray;
044import org.json.simple.JSONObject;
045
046
047@SuppressWarnings("serial")
048public class V1JobServlet extends BaseJobServlet {
049
050    private static final String INSTRUMENTATION_NAME = "v1job";
051    public static final String COORD_ACTIONS_DEFAULT_LENGTH = "oozie.coord.actions.default.length";
052
053    public V1JobServlet() {
054        super(INSTRUMENTATION_NAME);
055    }
056
057    protected V1JobServlet(String instrumentation_name){
058        super(instrumentation_name);
059    }
060
061    /*
062     * protected method to start a job
063     */
064    @Override
065    protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
066            IOException {
067        /*
068         * Configuration conf = new XConfiguration(request.getInputStream());
069         * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
070         * conf.get(OozieClient.COORDINATOR_APP_PATH);
071         *
072         * ServletUtilities.ValidateAppPath(wfPath, coordPath);
073         */
074        String jobId = getResourceName(request);
075        if (jobId.endsWith("-W")) {
076            startWorkflowJob(request, response);
077        }
078        else if (jobId.endsWith("-B")) {
079            startBundleJob(request, response);
080        }
081        else {
082            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, RestConstants.ACTION_PARAM, RestConstants.JOB_ACTION_START);
083        }
084
085    }
086
087    /*
088     * protected method to resume a job
089     */
090    @Override
091    protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
092            IOException {
093        /*
094         * Configuration conf = new XConfiguration(request.getInputStream());
095         * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
096         * conf.get(OozieClient.COORDINATOR_APP_PATH);
097         *
098         * ServletUtilities.ValidateAppPath(wfPath, coordPath);
099         */
100        String jobId = getResourceName(request);
101        if (jobId.endsWith("-W")) {
102            resumeWorkflowJob(request, response);
103        }
104        else if (jobId.endsWith("-B")) {
105            resumeBundleJob(request, response);
106        }
107        else {
108            resumeCoordinatorJob(request, response);
109        }
110    }
111
112    /*
113     * protected method to suspend a job
114     */
115    @Override
116    protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
117            IOException {
118        /*
119         * Configuration conf = new XConfiguration(request.getInputStream());
120         * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
121         * conf.get(OozieClient.COORDINATOR_APP_PATH);
122         *
123         * ServletUtilities.ValidateAppPath(wfPath, coordPath);
124         */
125        String jobId = getResourceName(request);
126        if (jobId.endsWith("-W")) {
127            suspendWorkflowJob(request, response);
128        }
129        else if (jobId.endsWith("-B")) {
130            suspendBundleJob(request, response);
131        }
132        else {
133            suspendCoordinatorJob(request, response);
134        }
135    }
136
137    /*
138     * protected method to kill a job
139     */
140    @Override
141    protected JSONObject killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
142            IOException {
143        /*
144         * Configuration conf = new XConfiguration(request.getInputStream());
145         * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
146         * conf.get(OozieClient.COORDINATOR_APP_PATH);
147         *
148         * ServletUtilities.ValidateAppPath(wfPath, coordPath);
149         */
150        String jobId = getResourceName(request);
151        JSONObject json = null;
152        if (jobId.endsWith("-W")) {
153            killWorkflowJob(request, response);
154        }
155        else if (jobId.endsWith("-B")) {
156            killBundleJob(request, response);
157        }
158        else {
159            json = killCoordinator(request, response);
160        }
161        return json;
162    }
163
164    /**
165     * protected method to change a coordinator job
166     * @param request request object
167     * @param response response object
168     * @throws XServletException
169     * @throws IOException
170     */
171    @Override
172    protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
173            IOException {
174        String jobId = getResourceName(request);
175        if (jobId.endsWith("-B")) {
176            changeBundleJob(request, response);
177        }
178        else {
179            changeCoordinatorJob(request, response);
180        }
181    }
182    @Override
183    protected JSONObject ignoreJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException {
184        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
185    }
186
187    /*
188     * protected method to reRun a job
189     *
190     * @seeorg.apache.oozie.servlet.BaseJobServlet#reRunJob(javax.servlet.http.
191     * HttpServletRequest, javax.servlet.http.HttpServletResponse,
192     * org.apache.hadoop.conf.Configuration)
193     */
194    @Override
195    protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
196            throws XServletException, IOException {
197        JSONObject json = null;
198        String jobId = getResourceName(request);
199        if (jobId.endsWith("-W")) {
200            reRunWorkflowJob(request, response, conf);
201        }
202        else if (jobId.endsWith("-B")) {
203            rerunBundleJob(request, response, conf);
204        }
205        else {
206            json = reRunCoordinatorActions(request, response, conf);
207        }
208        return json;
209    }
210
211    /*
212     * protected method to get a job in JsonBean representation
213     */
214    @Override
215    protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
216            IOException, BaseEngineException {
217        ServletInputStream is = request.getInputStream();
218        byte[] b = new byte[101];
219        while (is.readLine(b, 0, 100) != -1) {
220            XLog.getLog(getClass()).warn("Printing :" + new String(b));
221        }
222
223        JsonBean jobBean = null;
224        String jobId = getResourceName(request);
225        if (jobId.endsWith("-B")) {
226            jobBean = getBundleJob(request, response);
227        }
228        else {
229            if (jobId.endsWith("-W")) {
230                jobBean = getWorkflowJob(request, response);
231            }
232            else {
233                if (jobId.contains("-W@")) {
234                    jobBean = getWorkflowAction(request, response);
235                }
236                else {
237                    if (jobId.contains("-C@")) {
238                        jobBean = getCoordinatorAction(request, response);
239                    }
240                    else {
241                        jobBean = getCoordinatorJob(request, response);
242                    }
243                }
244            }
245        }
246
247        return jobBean;
248    }
249
250    /*
251     * protected method to get a job definition in String format
252     */
253    @Override
254    protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
255            throws XServletException, IOException {
256        String jobDefinition = null;
257        String jobId = getResourceName(request);
258        if (jobId.endsWith("-W")) {
259            jobDefinition = getWorkflowJobDefinition(request, response);
260        }
261        else if (jobId.endsWith("-B")) {
262            jobDefinition = getBundleJobDefinition(request, response);
263        }
264        else {
265            jobDefinition = getCoordinatorJobDefinition(request, response);
266        }
267        return jobDefinition;
268    }
269
270    /*
271     * protected method to stream a job log into response object
272     */
273    @Override
274    protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
275            IOException {
276        try {
277            String jobId = getResourceName(request);
278            if (jobId.endsWith("-W")) {
279                streamWorkflowJobLog(request, response);
280            }
281            else if (jobId.endsWith("-B")) {
282                streamBundleJobLog(request, response);
283            }
284            else {
285                streamCoordinatorJobLog(request, response);
286            }
287        }
288        catch (Exception e) {
289            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0307, e.getMessage());
290        }
291    }
292
293    @Override
294    protected void streamJobGraph(HttpServletRequest request, HttpServletResponse response)
295            throws XServletException, IOException {
296        String jobId = getResourceName(request);
297        if (jobId.endsWith("-W")) {
298            try {
299                // Applicable only to worflow, for now
300                response.setContentType(RestConstants.PNG_IMAGE_CONTENT_TYPE);
301
302                String showKill = request.getParameter(RestConstants.JOB_SHOW_KILL_PARAM);
303                boolean sK = showKill != null && (showKill.equalsIgnoreCase("yes") || showKill.equals("1") || showKill.equalsIgnoreCase("true"));
304
305                new GraphGenerator(
306                        getWorkflowJobDefinition(request, response),
307                        (WorkflowJobBean)getWorkflowJob(request, response),
308                        sK).write(response.getOutputStream());
309
310            }
311            catch (Exception e) {
312                throw new XServletException(HttpServletResponse.SC_NOT_FOUND, ErrorCode.E0307, e.getMessage(), e);
313            }
314        }
315        else {
316            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306);
317        }
318    }
319
320    /**
321     * Start wf job
322     *
323     * @param request servlet request
324     * @param response servlet response
325     * @throws XServletException
326     */
327    private void startWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
328        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
329
330        String jobId = getResourceName(request);
331        try {
332            dagEngine.start(jobId);
333        }
334        catch (DagEngineException ex) {
335            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
336        }
337    }
338
339    /**
340     * Start bundle job
341     *
342     * @param request servlet request
343     * @param response servlet response
344     * @throws XServletException
345     */
346    private void startBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
347        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
348        String jobId = getResourceName(request);
349        try {
350            bundleEngine.start(jobId);
351        }
352        catch (BundleEngineException ex) {
353            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
354        }
355    }
356
357    /**
358     * Resume workflow job
359     *
360     * @param request servlet request
361     * @param response servlet response
362     * @throws XServletException
363     */
364    private void resumeWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
365        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
366
367        String jobId = getResourceName(request);
368        try {
369            dagEngine.resume(jobId);
370        }
371        catch (DagEngineException ex) {
372            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
373        }
374    }
375
376    /**
377     * Resume bundle job
378     *
379     * @param request servlet request
380     * @param response servlet response
381     * @throws XServletException
382     */
383    private void resumeBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
384        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
385        String jobId = getResourceName(request);
386        try {
387            bundleEngine.resume(jobId);
388        }
389        catch (BundleEngineException ex) {
390            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
391        }
392    }
393
394    /**
395     * Resume coordinator job
396     *
397     * @param request servlet request
398     * @param response servlet response
399     * @throws XServletException
400     * @throws CoordinatorEngineException
401     */
402    private void resumeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
403            throws XServletException {
404        String jobId = getResourceName(request);
405        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
406                getUser(request));
407        try {
408            coordEngine.resume(jobId);
409        }
410        catch (CoordinatorEngineException ex) {
411            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
412        }
413    }
414
415    /**
416     * Suspend a wf job
417     *
418     * @param request servlet request
419     * @param response servlet response
420     * @throws XServletException
421     */
422    private void suspendWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
423        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
424
425        String jobId = getResourceName(request);
426        try {
427            dagEngine.suspend(jobId);
428        }
429        catch (DagEngineException ex) {
430            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
431        }
432    }
433
434    /**
435     * Suspend bundle job
436     *
437     * @param request servlet request
438     * @param response servlet response
439     * @throws XServletException
440     */
441    private void suspendBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
442        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
443        String jobId = getResourceName(request);
444        try {
445            bundleEngine.suspend(jobId);
446        }
447        catch (BundleEngineException ex) {
448            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
449        }
450    }
451
452    /**
453     * Suspend coordinator job
454     *
455     * @param request servlet request
456     * @param response servlet response
457     * @throws XServletException
458     */
459    private void suspendCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
460            throws XServletException {
461        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
462                getUser(request));
463        String jobId = getResourceName(request);
464        try {
465            coordEngine.suspend(jobId);
466        }
467        catch (CoordinatorEngineException ex) {
468            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
469        }
470    }
471
472    /**
473     * Kill a wf job
474     * @param request servlet request
475     * @param response servlet response
476     * @throws XServletException
477     */
478    private void killWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
479        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
480
481        String jobId = getResourceName(request);
482        try {
483            dagEngine.kill(jobId);
484        }
485        catch (DagEngineException ex) {
486            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
487        }
488    }
489
490    /**
491     * Kill a coord job
492     *
493     * @param request servlet request
494     * @param response servlet response
495     * @throws XServletException
496     */
497    @SuppressWarnings("unchecked")
498    private JSONObject killCoordinator(HttpServletRequest request, HttpServletResponse response) throws XServletException {
499        String jobId = getResourceName(request);
500        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class)
501                .getCoordinatorEngine(getUser(request));
502        JSONObject json = null;
503        String rangeType = request.getParameter(RestConstants.JOB_COORD_RANGE_TYPE_PARAM);
504        String scope = request.getParameter(RestConstants.JOB_COORD_SCOPE_PARAM);
505
506        try {
507            if (rangeType != null && scope != null) {
508                XLog.getLog(getClass()).info(
509                        "Kill coordinator actions for jobId=" + jobId + ", rangeType=" + rangeType + ",scope=" + scope);
510
511                json = new JSONObject();
512                CoordinatorActionInfo coordInfo = coordEngine.killActions(jobId, rangeType, scope);
513                List<CoordinatorActionBean> coordActions;
514                if (coordInfo != null) {
515                    coordActions = coordInfo.getCoordActions();
516                }
517                else {
518                    coordActions = CoordUtils.getCoordActions(rangeType, jobId, scope, true);
519                }
520                json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT"));
521            }
522            else {
523                coordEngine.kill(jobId);
524            }
525        }
526        catch (CoordinatorEngineException ex) {
527            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
528        }
529        catch (CommandException ex) {
530            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
531        }
532        return json;
533    }
534
535    /**
536     * Kill bundle job
537     *
538     * @param request servlet request
539     * @param response servlet response
540     * @throws XServletException
541     */
542    private void killBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
543        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
544        String jobId = getResourceName(request);
545        try {
546            bundleEngine.kill(jobId);
547        }
548        catch (BundleEngineException ex) {
549            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
550        }
551    }
552
553    /**
554     * Change a coordinator job
555     *
556     * @param request servlet request
557     * @param response servlet response
558     * @throws XServletException
559     */
560    private void changeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
561            throws XServletException {
562        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
563                getUser(request));
564        String jobId = getResourceName(request);
565        String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
566        try {
567            coordEngine.change(jobId, changeValue);
568        }
569        catch (CoordinatorEngineException ex) {
570            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
571        }
572    }
573
574    /**
575     * Change a bundle job
576     *
577     * @param request servlet request
578     * @param response servlet response
579     * @throws XServletException
580     */
581    private void changeBundleJob(HttpServletRequest request, HttpServletResponse response)
582            throws XServletException {
583        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
584        String jobId = getResourceName(request);
585        String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
586        try {
587            bundleEngine.change(jobId, changeValue);
588        }
589        catch (BundleEngineException ex) {
590            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
591        }
592    }
593
594    /**
595     * Rerun a wf job
596     *
597     * @param request servlet request
598     * @param response servlet response
599     * @param conf configuration object
600     * @throws XServletException
601     */
602    private void reRunWorkflowJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
603            throws XServletException {
604        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
605
606        String jobId = getResourceName(request);
607        try {
608            dagEngine.reRun(jobId, conf);
609        }
610        catch (DagEngineException ex) {
611            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
612        }
613    }
614
615    /**
616     * Rerun bundle job
617     *
618     * @param request servlet request
619     * @param response servlet response
620     * @param conf configration object
621     * @throws XServletException
622     */
623    private void rerunBundleJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
624            throws XServletException {
625        JSONObject json = new JSONObject();
626        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
627        String jobId = getResourceName(request);
628
629        String coordScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM);
630        String dateScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM);
631        String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
632        String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
633
634        XLog.getLog(getClass()).info(
635                "Rerun Bundle for jobId=" + jobId + ", coordScope=" + coordScope + ", dateScope=" + dateScope + ", refresh="
636                        + refresh + ", noCleanup=" + noCleanup);
637
638        try {
639            bundleEngine.reRun(jobId, coordScope, dateScope, Boolean.valueOf(refresh), Boolean.valueOf(noCleanup));
640        }
641        catch (BaseEngineException ex) {
642            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
643        }
644    }
645
646    /**
647     * Rerun coordinator actions
648     *
649     * @param request servlet request
650     * @param response servlet response
651     * @param conf configuration object
652     * @throws XServletException
653     */
654    @SuppressWarnings("unchecked")
655    private JSONObject reRunCoordinatorActions(HttpServletRequest request, HttpServletResponse response,
656            Configuration conf) throws XServletException {
657        JSONObject json = new JSONObject();
658        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request));
659
660        String jobId = getResourceName(request);
661
662        String rerunType = request.getParameter(RestConstants.JOB_COORD_RANGE_TYPE_PARAM);
663        String scope = request.getParameter(RestConstants.JOB_COORD_SCOPE_PARAM);
664        String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
665        String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
666        String failed = request.getParameter(RestConstants.JOB_COORD_RERUN_FAILED_PARAM);
667
668        XLog.getLog(getClass()).info(
669                "Rerun coordinator for jobId=" + jobId + ", rerunType=" + rerunType + ",scope=" + scope + ",refresh="
670                        + refresh + ", noCleanup=" + noCleanup);
671
672        try {
673            if (!(rerunType.equals(RestConstants.JOB_COORD_SCOPE_DATE) || rerunType
674                    .equals(RestConstants.JOB_COORD_SCOPE_ACTION))) {
675                throw new CommandException(ErrorCode.E1018, "date or action expected.");
676            }
677            CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh),
678                    Boolean.valueOf(noCleanup), Boolean.valueOf(failed), conf);
679            List<CoordinatorActionBean> coordActions;
680            if (coordInfo != null) {
681                coordActions = coordInfo.getCoordActions();
682            }
683            else {
684                coordActions = CoordUtils.getCoordActions(rerunType, jobId, scope, false);
685            }
686            json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT"));
687        }
688        catch (BaseEngineException ex) {
689            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
690        }
691        catch (CommandException ex) {
692            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
693        }
694
695        return json;
696    }
697
698
699
700    /**
701     * Get workflow job
702     *
703     * @param request servlet request
704     * @param response servlet response
705     * @return JsonBean WorkflowJobBean
706     * @throws XServletException
707     */
708    protected JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
709        JsonBean jobBean = getWorkflowJobBean(request, response);
710        // for backward compatibility (OOZIE-1231)
711        swapMRActionID((WorkflowJob)jobBean);
712        return jobBean;
713    }
714
715    /**
716     * Get workflow job
717     *
718     * @param request servlet request
719     * @param response servlet response
720     * @return JsonBean WorkflowJobBean
721     * @throws XServletException
722     */
723    protected JsonBean getWorkflowJobBean(HttpServletRequest request, HttpServletResponse response) throws XServletException {
724        JsonBean jobBean = null;
725        String jobId = getResourceName(request);
726        String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
727        String lenStr = request.getParameter(RestConstants.LEN_PARAM);
728        int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
729        start = (start < 1) ? 1 : start;
730        int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
731        len = (len < 1) ? Integer.MAX_VALUE : len;
732        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
733        try {
734            jobBean = (JsonBean) dagEngine.getJob(jobId, start, len);
735        }
736        catch (DagEngineException ex) {
737            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
738        }
739        return jobBean;
740    }
741
742    private void swapMRActionID(WorkflowJob wjBean) {
743        List<WorkflowAction> actions = wjBean.getActions();
744        if (actions != null) {
745            for (WorkflowAction wa : actions) {
746                swapMRActionID(wa);
747            }
748        }
749    }
750
751    private void swapMRActionID(WorkflowAction waBean) {
752        if (waBean.getType().equals("map-reduce")) {
753            String childId = waBean.getExternalChildIDs();
754            if (childId != null && !childId.equals("")) {
755                String consoleBase = getConsoleBase(waBean.getConsoleUrl());
756                ((WorkflowActionBean) waBean).setConsoleUrl(consoleBase + childId);
757                ((WorkflowActionBean) waBean).setExternalId(childId);
758                ((WorkflowActionBean) waBean).setExternalChildIDs("");
759            }
760        }
761    }
762
763    private String getConsoleBase(String url) {
764        String consoleBase = null;
765        if (url.indexOf("application") != -1) {
766            consoleBase = url.split("application_[0-9]+_[0-9]+")[0];
767        }
768        else {
769            consoleBase = url.split("job_[0-9]+_[0-9]+")[0];
770        }
771        return consoleBase;
772    }
773
774    /**
775     * Get wf action info
776     *
777     * @param request servlet request
778     * @param response servlet response
779     * @return JsonBean WorkflowActionBean
780     * @throws XServletException
781     */
782    protected JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response)
783            throws XServletException {
784
785        JsonBean actionBean = getWorkflowActionBean(request, response);
786        // for backward compatibility (OOZIE-1231)
787        swapMRActionID((WorkflowAction)actionBean);
788        return actionBean;
789    }
790
791    protected JsonBean getWorkflowActionBean(HttpServletRequest request, HttpServletResponse response)
792            throws XServletException {
793        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
794
795        JsonBean actionBean = null;
796        String actionId = getResourceName(request);
797        try {
798            actionBean = dagEngine.getWorkflowAction(actionId);
799        }
800        catch (BaseEngineException ex) {
801            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
802        }
803        return actionBean;
804    }
805
806    /**
807     * Get coord job info
808     *
809     * @param request servlet request
810     * @param response servlet response
811     * @return JsonBean CoordinatorJobBean
812     * @throws XServletException
813     * @throws BaseEngineException
814     */
815    protected JsonBean getCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
816            throws XServletException, BaseEngineException {
817        JsonBean jobBean = null;
818        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
819                getUser(request));
820        String jobId = getResourceName(request);
821        String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
822        String lenStr = request.getParameter(RestConstants.LEN_PARAM);
823        String filter = request.getParameter(RestConstants.JOB_FILTER_PARAM);
824        String orderStr = request.getParameter(RestConstants.ORDER_PARAM);
825        boolean order = (orderStr != null && orderStr.equals("desc")) ? true : false;
826        int offset = (startStr != null) ? Integer.parseInt(startStr) : 1;
827        offset = (offset < 1) ? 1 : offset;
828        // Get default number of coordinator actions to be retrieved
829        int defaultLen = ConfigurationService.getInt(COORD_ACTIONS_DEFAULT_LENGTH);
830        int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
831        len = getCoordinatorJobLength(defaultLen, len);
832        try {
833            CoordinatorJobBean coordJob = coordEngine.getCoordJob(jobId, filter, offset, len, order);
834            jobBean = coordJob;
835        }
836        catch (CoordinatorEngineException ex) {
837            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
838        }
839
840        return jobBean;
841    }
842
843    /**
844     * Given the requested length and the default length, determine how many coordinator jobs to return.
845     * Used by {@link #getCoordinatorJob(javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)}
846     *
847     * @param defaultLen The default length
848     * @param len The requested length
849     * @return The length to use
850     */
851    protected int getCoordinatorJobLength(int defaultLen, int len) {
852        return (len < 1) ? defaultLen : len;
853    }
854
855    /**
856     * Get bundle job info
857     *
858     * @param request servlet request
859     * @param response servlet response
860     * @return JsonBean bundle job bean
861     * @throws XServletException
862     * @throws BaseEngineException
863     */
864    private JsonBean getBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
865            BaseEngineException {
866        JsonBean jobBean = null;
867        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
868        String jobId = getResourceName(request);
869
870        try {
871            jobBean = (JsonBean) bundleEngine.getBundleJob(jobId);
872
873            return jobBean;
874        }
875        catch (BundleEngineException ex) {
876            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
877        }
878    }
879
880    /**
881     * Get coordinator action
882     *
883     * @param request servlet request
884     * @param response servlet response
885     * @return JsonBean CoordinatorActionBean
886     * @throws XServletException
887     * @throws BaseEngineException
888     */
889    private JsonBean getCoordinatorAction(HttpServletRequest request, HttpServletResponse response)
890            throws XServletException, BaseEngineException {
891        JsonBean actionBean = null;
892        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
893                getUser(request));
894        String actionId = getResourceName(request);
895        try {
896            actionBean = coordEngine.getCoordAction(actionId);
897        }
898        catch (CoordinatorEngineException ex) {
899            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
900        }
901
902        return actionBean;
903    }
904
905    /**
906     * Get wf job definition
907     *
908     * @param request servlet request
909     * @param response servlet response
910     * @return String wf definition
911     * @throws XServletException
912     */
913    private String getWorkflowJobDefinition(HttpServletRequest request, HttpServletResponse response)
914            throws XServletException {
915        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
916
917        String wfDefinition;
918        String jobId = getResourceName(request);
919        try {
920            wfDefinition = dagEngine.getDefinition(jobId);
921        }
922        catch (DagEngineException ex) {
923            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
924        }
925        return wfDefinition;
926    }
927
928    /**
929     * Get bundle job definition
930     *
931     * @param request servlet request
932     * @param response servlet response
933     * @return String bundle definition
934     * @throws XServletException
935     */
936    private String getBundleJobDefinition(HttpServletRequest request, HttpServletResponse response) throws XServletException {
937        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
938        String bundleDefinition;
939        String jobId = getResourceName(request);
940        try {
941            bundleDefinition = bundleEngine.getDefinition(jobId);
942        }
943        catch (BundleEngineException ex) {
944            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
945        }
946        return bundleDefinition;
947    }
948
949    /**
950     * Get coordinator job definition
951     *
952     * @param request servlet request
953     * @param response servlet response
954     * @return String coord definition
955     * @throws XServletException
956     */
957    private String getCoordinatorJobDefinition(HttpServletRequest request, HttpServletResponse response)
958            throws XServletException {
959
960        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
961                getUser(request));
962
963        String jobId = getResourceName(request);
964
965        String coordDefinition = null;
966        try {
967            coordDefinition = coordEngine.getDefinition(jobId);
968        }
969        catch (BaseEngineException ex) {
970            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
971        }
972        return coordDefinition;
973    }
974
975    /**
976     * Stream wf job log
977     *
978     * @param request servlet request
979     * @param response servlet response
980     * @throws XServletException
981     * @throws IOException
982     */
983    private void streamWorkflowJobLog(HttpServletRequest request, HttpServletResponse response)
984            throws XServletException, IOException {
985        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
986        String jobId = getResourceName(request);
987        try {
988            dagEngine.streamLog(jobId, response.getWriter(), request.getParameterMap());
989        }
990        catch (DagEngineException ex) {
991            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
992        }
993    }
994
995    /**
996     * Stream bundle job log
997     *
998     * @param request servlet request
999     * @param response servlet response
1000     * @throws XServletException
1001     */
1002    private void streamBundleJobLog(HttpServletRequest request, HttpServletResponse response)
1003            throws XServletException, IOException {
1004        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
1005        String jobId = getResourceName(request);
1006        try {
1007            bundleEngine.streamLog(jobId, response.getWriter(), request.getParameterMap());
1008        }
1009        catch (BundleEngineException ex) {
1010            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
1011        }
1012    }
1013
1014    /**
1015     * Stream coordinator job log
1016     *
1017     * @param request servlet request
1018     * @param response servlet response
1019     * @throws XServletException
1020     * @throws IOException
1021     */
1022    private void streamCoordinatorJobLog(HttpServletRequest request, HttpServletResponse response)
1023            throws XServletException, IOException {
1024
1025        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
1026                getUser(request));
1027        String jobId = getResourceName(request);
1028        String logRetrievalScope = request.getParameter(RestConstants.JOB_LOG_SCOPE_PARAM);
1029        String logRetrievalType = request.getParameter(RestConstants.JOB_LOG_TYPE_PARAM);
1030        try {
1031            coordEngine.streamLog(jobId, logRetrievalScope, logRetrievalType, response.getWriter(), request.getParameterMap());
1032        }
1033        catch (BaseEngineException ex) {
1034            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
1035        }
1036        catch (CommandException ex) {
1037            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
1038        }
1039    }
1040
1041    @Override
1042    protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException,
1043            IOException {
1044        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
1045    }
1046
1047    @Override
1048    protected JSONObject getJobsByParentId(HttpServletRequest request, HttpServletResponse response)
1049            throws XServletException, IOException {
1050        JSONObject json = new JSONObject();
1051        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class)
1052                .getCoordinatorEngine(getUser(request));
1053        String coordActionId;
1054        String type = request.getParameter(RestConstants.JOB_COORD_RANGE_TYPE_PARAM);
1055        String scope = request.getParameter(RestConstants.JOB_COORD_SCOPE_PARAM);
1056        // for getting allruns for coordinator action - 2 alternate endpoints
1057        if (type != null && type.equals(RestConstants.JOB_COORD_SCOPE_ACTION) && scope != null) {
1058            // endpoint - oozie/v2/coord-job-id?type=action&scope=action-num&show=allruns
1059            String jobId = getResourceName(request);
1060            coordActionId = Services.get().get(UUIDService.class).generateChildId(jobId, scope);
1061        }
1062        else {
1063            // endpoint - oozie/v2/coord-action-id?show=allruns
1064            coordActionId = getResourceName(request);
1065        }
1066        try {
1067            List<WorkflowJobBean> wfs = coordEngine.getReruns(coordActionId);
1068            JSONArray array = new JSONArray();
1069            if (wfs != null) {
1070                for (WorkflowJobBean wf : wfs) {
1071                    JSONObject json1 = new JSONObject();
1072                    json1.put(JsonTags.WORKFLOW_ID, wf.getId());
1073                    json1.put(JsonTags.WORKFLOW_STATUS, wf.getStatus().toString());
1074                    json1.put(JsonTags.WORKFLOW_START_TIME, JsonUtils.formatDateRfc822(wf.getStartTime(), "GMT"));
1075                    json1.put(JsonTags.WORKFLOW_ACTION_END_TIME, JsonUtils.formatDateRfc822(wf.getEndTime(), "GMT"));
1076                    array.add(json1);
1077                }
1078            }
1079            json.put(JsonTags.WORKFLOWS_JOBS, array);
1080            return json;
1081        }
1082        catch (CoordinatorEngineException ex) {
1083            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
1084        }
1085    }
1086    /**
1087     * not supported for v1
1088     */
1089    @Override
1090    protected JSONObject updateJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
1091            throws XServletException, IOException {
1092        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
1093    }
1094
1095    @Override
1096    protected String getJobStatus(HttpServletRequest request, HttpServletResponse response) throws XServletException,
1097            IOException {
1098        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
1099    }
1100
1101    @Override
1102    protected void streamJobErrorLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
1103            IOException {
1104        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
1105    }
1106    @Override
1107    protected void streamJobAuditLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
1108            IOException {
1109        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
1110    }
1111    @Override
1112    void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException {
1113        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
1114    }
1115
1116    @Override
1117    void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException,
1118            IOException {
1119        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
1120    }
1121
1122    @Override
1123    void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException {
1124        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
1125    }
1126}