001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.servlet;
020
021import java.io.IOException;
022import java.util.List;
023import java.util.Locale;
024
025import javax.servlet.ServletInputStream;
026import javax.servlet.http.HttpServletRequest;
027import javax.servlet.http.HttpServletResponse;
028
029import com.google.common.base.Strings;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.oozie.*;
032import org.apache.oozie.client.WorkflowAction;
033import org.apache.oozie.client.WorkflowJob;
034import org.apache.oozie.client.rest.*;
035import org.apache.oozie.command.CommandException;
036import org.apache.oozie.coord.CoordUtils;
037import org.apache.oozie.service.BundleEngineService;
038import org.apache.oozie.service.ConfigurationService;
039import org.apache.oozie.service.CoordinatorEngineService;
040import org.apache.oozie.service.DagEngineService;
041import org.apache.oozie.service.Services;
042import org.apache.oozie.service.UUIDService;
043import org.apache.oozie.util.Instrumentation;
044import org.apache.oozie.util.graph.GraphGenerator;
045import org.apache.oozie.util.XLog;
046import org.apache.oozie.util.graph.GraphRenderer;
047import org.apache.oozie.util.graph.GraphvizRenderer;
048import org.apache.oozie.util.graph.OutputFormat;
049import org.json.simple.JSONArray;
050import org.json.simple.JSONObject;
051
052
053@SuppressWarnings("serial")
054public class V1JobServlet extends BaseJobServlet {
055
056    private static final String INSTRUMENTATION_NAME = "v1job";
057    public static final String COORD_ACTIONS_DEFAULT_LENGTH = "oozie.coord.actions.default.length";
058
059    final static String NOT_SUPPORTED_MESSAGE = "Not supported in v1";
060
061
062    public V1JobServlet() {
063        super(INSTRUMENTATION_NAME);
064    }
065
066    protected V1JobServlet(String instrumentation_name){
067        super(instrumentation_name);
068    }
069
070    /*
071     * protected method to start a job
072     */
073    @Override
074    protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
075            IOException {
076        /*
077         * Configuration conf = new XConfiguration(request.getInputStream());
078         * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
079         * conf.get(OozieClient.COORDINATOR_APP_PATH);
080         *
081         * ServletUtilities.ValidateAppPath(wfPath, coordPath);
082         */
083        String jobId = getResourceName(request);
084        if (jobId.endsWith("-W")) {
085            startWorkflowJob(request, response);
086        }
087        else if (jobId.endsWith("-B")) {
088            startBundleJob(request, response);
089        }
090        else {
091            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, RestConstants.ACTION_PARAM, RestConstants.JOB_ACTION_START);
092        }
093
094    }
095
096    /*
097     * protected method to resume a job
098     */
099    @Override
100    protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
101            IOException {
102        /*
103         * Configuration conf = new XConfiguration(request.getInputStream());
104         * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
105         * conf.get(OozieClient.COORDINATOR_APP_PATH);
106         *
107         * ServletUtilities.ValidateAppPath(wfPath, coordPath);
108         */
109        String jobId = getResourceName(request);
110        if (jobId.endsWith("-W")) {
111            resumeWorkflowJob(request, response);
112        }
113        else if (jobId.endsWith("-B")) {
114            resumeBundleJob(request, response);
115        }
116        else {
117            resumeCoordinatorJob(request, response);
118        }
119    }
120
121    /*
122     * protected method to suspend a job
123     */
124    @Override
125    protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
126            IOException {
127        /*
128         * Configuration conf = new XConfiguration(request.getInputStream());
129         * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
130         * conf.get(OozieClient.COORDINATOR_APP_PATH);
131         *
132         * ServletUtilities.ValidateAppPath(wfPath, coordPath);
133         */
134        String jobId = getResourceName(request);
135        if (jobId.endsWith("-W")) {
136            suspendWorkflowJob(request, response);
137        }
138        else if (jobId.endsWith("-B")) {
139            suspendBundleJob(request, response);
140        }
141        else {
142            suspendCoordinatorJob(request, response);
143        }
144    }
145
146    /*
147     * protected method to kill a job
148     */
149    @Override
150    protected JSONObject killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
151            IOException {
152        /*
153         * Configuration conf = new XConfiguration(request.getInputStream());
154         * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
155         * conf.get(OozieClient.COORDINATOR_APP_PATH);
156         *
157         * ServletUtilities.ValidateAppPath(wfPath, coordPath);
158         */
159        String jobId = getResourceName(request);
160        JSONObject json = null;
161        if (jobId.endsWith("-W")) {
162            killWorkflowJob(request, response);
163        }
164        else if (jobId.endsWith("-B")) {
165            killBundleJob(request, response);
166        }
167        else {
168            json = killCoordinator(request, response);
169        }
170        return json;
171    }
172
173    /**
174     * protected method to change a coordinator job
175     * @param request request object
176     * @param response response object
177     * @throws XServletException
178     * @throws IOException
179     */
180    @Override
181    protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
182            IOException {
183        String jobId = getResourceName(request);
184        if (jobId.endsWith("-B")) {
185            changeBundleJob(request, response);
186        }
187        else {
188            changeCoordinatorJob(request, response);
189        }
190    }
191    @Override
192    protected JSONObject ignoreJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException {
193        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE);
194    }
195
196    /*
197     * protected method to reRun a job
198     *
199     * @seeorg.apache.oozie.servlet.BaseJobServlet#reRunJob(javax.servlet.http.
200     * HttpServletRequest, javax.servlet.http.HttpServletResponse,
201     * org.apache.hadoop.conf.Configuration)
202     */
203    @Override
204    protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
205            throws XServletException, IOException {
206        JSONObject json = null;
207        String jobId = getResourceName(request);
208        if (jobId.endsWith("-W")) {
209            reRunWorkflowJob(request, response, conf);
210        }
211        else if (jobId.endsWith("-B")) {
212            rerunBundleJob(request, response, conf);
213        }
214        else {
215            json = reRunCoordinatorActions(request, response, conf);
216        }
217        return json;
218    }
219
220    /*
221     * protected method to get a job in JsonBean representation
222     */
223    @Override
224    protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
225            IOException, BaseEngineException {
226        ServletInputStream is = request.getInputStream();
227        byte[] b = new byte[101];
228        while (is.readLine(b, 0, 100) != -1) {
229            XLog.getLog(getClass()).warn("Printing :" + new String(b));
230        }
231
232        JsonBean jobBean = null;
233        String jobId = getResourceName(request);
234        if (jobId.endsWith("-B")) {
235            jobBean = getBundleJob(request, response);
236        }
237        else {
238            if (jobId.endsWith("-W")) {
239                jobBean = getWorkflowJob(request, response);
240            }
241            else {
242                if (jobId.contains("-W@")) {
243                    jobBean = getWorkflowAction(request, response);
244                }
245                else {
246                    if (jobId.contains("-C@")) {
247                        jobBean = getCoordinatorAction(request, response);
248                    }
249                    else {
250                        jobBean = getCoordinatorJob(request, response);
251                    }
252                }
253            }
254        }
255
256        return jobBean;
257    }
258
259    /*
260     * protected method to get a job definition in String format
261     */
262    @Override
263    protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
264            throws XServletException, IOException {
265        String jobDefinition = null;
266        String jobId = getResourceName(request);
267        if (jobId.endsWith("-W")) {
268            jobDefinition = getWorkflowJobDefinition(request, response);
269        }
270        else if (jobId.endsWith("-B")) {
271            jobDefinition = getBundleJobDefinition(request, response);
272        }
273        else {
274            jobDefinition = getCoordinatorJobDefinition(request, response);
275        }
276        return jobDefinition;
277    }
278
279    /*
280     * protected method to stream a job log into response object
281     */
282    @Override
283    protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
284            IOException {
285        try {
286            String jobId = getResourceName(request);
287            if (jobId.endsWith("-W")) {
288                streamWorkflowJobLog(request, response);
289            }
290            else if (jobId.endsWith("-B")) {
291                streamBundleJobLog(request, response);
292            }
293            else {
294                streamCoordinatorJobLog(request, response);
295            }
296        }
297        catch (Exception e) {
298            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0307, e.getMessage());
299        }
300    }
301
302    @Override
303    protected void streamJobGraph(HttpServletRequest request, HttpServletResponse response)
304            throws XServletException, IOException {
305        String jobId = getResourceName(request);
306        if (jobId.endsWith("-W")) {
307            try {
308
309                final String showKillParameter = request.getParameter(RestConstants.JOB_SHOW_KILL_PARAM);
310                final boolean showKill = isShowKillSet(showKillParameter);
311
312                final String formatParameter = request.getParameter(RestConstants.JOB_FORMAT_PARAM);
313                final OutputFormat outputFormat = getOutputFormat(formatParameter);
314
315                final String contentType = getContentType(outputFormat);
316
317                response.setContentType(contentType);
318
319                final Instrumentation.Cron cron = new Instrumentation.Cron();
320                cron.start();
321
322                final GraphRenderer graphRenderer = new GraphvizRenderer();
323
324                new GraphGenerator(
325                            getWorkflowJobDefinition(request, response),
326                            (WorkflowJobBean)getWorkflowJob(request, response),
327                            showKill,
328                            graphRenderer).write(response.getOutputStream(), outputFormat);
329
330                cron.stop();
331                instrument(outputFormat, cron);
332            }
333            catch (final Exception e) {
334                throw new XServletException(HttpServletResponse.SC_NOT_FOUND, ErrorCode.E0307, e.getMessage(), e);
335            }
336        }
337        else {
338            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306);
339        }
340    }
341
342    private boolean isShowKillSet(final String showKillParameter) {
343        return showKillParameter != null &&
344                (showKillParameter.equalsIgnoreCase("yes") ||
345                        showKillParameter.equals("1") ||
346                        showKillParameter.equalsIgnoreCase("true"));
347    }
348
349    private OutputFormat getOutputFormat(final String formatParameter) {
350        final OutputFormat outputFormat;
351        if (Strings.isNullOrEmpty(formatParameter)) {
352            outputFormat = OutputFormat.PNG;
353        }
354        else {
355            outputFormat = OutputFormat.valueOf(formatParameter.toUpperCase(Locale.getDefault()));
356        }
357        return outputFormat;
358    }
359
360    private String getContentType(final OutputFormat outputFormat) {
361        final String contentType;
362
363        switch (outputFormat) {
364            case PNG:
365                contentType = RestConstants.PNG_IMAGE_CONTENT_TYPE;
366                break;
367            case DOT:
368                contentType = RestConstants.TEXT_CONTENT_TYPE;
369                break;
370            case SVG:
371                contentType = RestConstants.SVG_IMAGE_CONTENT_TYPE;
372                break;
373            default:
374                throw new IllegalArgumentException("Unknown output format, cannot get content type: " + outputFormat);
375        }
376
377        return contentType;
378    }
379
380    private void instrument(final OutputFormat outputFormat, final Instrumentation.Cron cron) {
381        addCron(INSTRUMENTATION_NAME + "-graph", cron);
382        incrCounter(INSTRUMENTATION_NAME + "-graph", 1);
383        addCron(INSTRUMENTATION_NAME + "-graph-" + outputFormat.toString().toLowerCase(Locale.getDefault()), cron);
384        incrCounter(INSTRUMENTATION_NAME + "-graph-" + outputFormat.toString().toLowerCase(Locale.getDefault()), 1);
385    }
386
387    /**
388     * Start wf job
389     *
390     * @param request servlet request
391     * @param response servlet response
392     * @throws XServletException
393     */
394    private void startWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
395        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
396
397        String jobId = getResourceName(request);
398        try {
399            dagEngine.start(jobId);
400        }
401        catch (DagEngineException ex) {
402            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
403        }
404    }
405
406    /**
407     * Start bundle job
408     *
409     * @param request servlet request
410     * @param response servlet response
411     * @throws XServletException
412     */
413    private void startBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
414        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
415        String jobId = getResourceName(request);
416        try {
417            bundleEngine.start(jobId);
418        }
419        catch (BundleEngineException ex) {
420            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
421        }
422    }
423
424    /**
425     * Resume workflow job
426     *
427     * @param request servlet request
428     * @param response servlet response
429     * @throws XServletException
430     */
431    private void resumeWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
432        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
433
434        String jobId = getResourceName(request);
435        try {
436            dagEngine.resume(jobId);
437        }
438        catch (DagEngineException ex) {
439            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
440        }
441    }
442
443    /**
444     * Resume bundle job
445     *
446     * @param request servlet request
447     * @param response servlet response
448     * @throws XServletException
449     */
450    private void resumeBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
451        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
452        String jobId = getResourceName(request);
453        try {
454            bundleEngine.resume(jobId);
455        }
456        catch (BundleEngineException ex) {
457            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
458        }
459    }
460
461    /**
462     * Resume coordinator job
463     *
464     * @param request servlet request
465     * @param response servlet response
466     * @throws XServletException
467     * @throws CoordinatorEngineException
468     */
469    private void resumeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
470            throws XServletException {
471        String jobId = getResourceName(request);
472        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
473                getUser(request));
474        try {
475            coordEngine.resume(jobId);
476        }
477        catch (CoordinatorEngineException ex) {
478            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
479        }
480    }
481
482    /**
483     * Suspend a wf job
484     *
485     * @param request servlet request
486     * @param response servlet response
487     * @throws XServletException
488     */
489    private void suspendWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
490        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
491
492        String jobId = getResourceName(request);
493        try {
494            dagEngine.suspend(jobId);
495        }
496        catch (DagEngineException ex) {
497            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
498        }
499    }
500
501    /**
502     * Suspend bundle job
503     *
504     * @param request servlet request
505     * @param response servlet response
506     * @throws XServletException
507     */
508    private void suspendBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
509        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
510        String jobId = getResourceName(request);
511        try {
512            bundleEngine.suspend(jobId);
513        }
514        catch (BundleEngineException ex) {
515            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
516        }
517    }
518
519    /**
520     * Suspend coordinator job
521     *
522     * @param request servlet request
523     * @param response servlet response
524     * @throws XServletException
525     */
526    private void suspendCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
527            throws XServletException {
528        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
529                getUser(request));
530        String jobId = getResourceName(request);
531        try {
532            coordEngine.suspend(jobId);
533        }
534        catch (CoordinatorEngineException ex) {
535            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
536        }
537    }
538
539    /**
540     * Kill a wf job
541     * @param request servlet request
542     * @param response servlet response
543     * @throws XServletException
544     */
545    private void killWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
546        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
547
548        String jobId = getResourceName(request);
549        try {
550            dagEngine.kill(jobId);
551        }
552        catch (DagEngineException ex) {
553            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
554        }
555    }
556
557    /**
558     * Kill a coord job
559     *
560     * @param request servlet request
561     * @param response servlet response
562     * @throws XServletException
563     */
564    @SuppressWarnings("unchecked")
565    private JSONObject killCoordinator(HttpServletRequest request, HttpServletResponse response) throws XServletException {
566        String jobId = getResourceName(request);
567        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class)
568                .getCoordinatorEngine(getUser(request));
569        JSONObject json = null;
570        String rangeType = request.getParameter(RestConstants.JOB_COORD_RANGE_TYPE_PARAM);
571        String scope = request.getParameter(RestConstants.JOB_COORD_SCOPE_PARAM);
572
573        try {
574            if (rangeType != null && scope != null) {
575                XLog.getLog(getClass()).info(
576                        "Kill coordinator actions for jobId=" + jobId + ", rangeType=" + rangeType + ",scope=" + scope);
577
578                json = new JSONObject();
579                CoordinatorActionInfo coordInfo = coordEngine.killActions(jobId, rangeType, scope);
580                List<CoordinatorActionBean> coordActions;
581                if (coordInfo != null) {
582                    coordActions = coordInfo.getCoordActions();
583                }
584                else {
585                    coordActions = CoordUtils.getCoordActions(rangeType, jobId, scope, true);
586                }
587                json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT"));
588            }
589            else {
590                coordEngine.kill(jobId);
591            }
592        }
593        catch (CoordinatorEngineException ex) {
594            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
595        }
596        catch (CommandException ex) {
597            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
598        }
599        return json;
600    }
601
602    /**
603     * Kill bundle job
604     *
605     * @param request servlet request
606     * @param response servlet response
607     * @throws XServletException
608     */
609    private void killBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
610        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
611        String jobId = getResourceName(request);
612        try {
613            bundleEngine.kill(jobId);
614        }
615        catch (BundleEngineException ex) {
616            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
617        }
618    }
619
620    /**
621     * Change a coordinator job
622     *
623     * @param request servlet request
624     * @param response servlet response
625     * @throws XServletException
626     */
627    private void changeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
628            throws XServletException {
629        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
630                getUser(request));
631        String jobId = getResourceName(request);
632        String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
633        try {
634            coordEngine.change(jobId, changeValue);
635        }
636        catch (CoordinatorEngineException ex) {
637            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
638        }
639    }
640
641    /**
642     * Change a bundle job
643     *
644     * @param request servlet request
645     * @param response servlet response
646     * @throws XServletException
647     */
648    private void changeBundleJob(HttpServletRequest request, HttpServletResponse response)
649            throws XServletException {
650        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
651        String jobId = getResourceName(request);
652        String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
653        try {
654            bundleEngine.change(jobId, changeValue);
655        }
656        catch (BundleEngineException ex) {
657            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
658        }
659    }
660
661    /**
662     * Rerun a wf job
663     *
664     * @param request servlet request
665     * @param response servlet response
666     * @param conf configuration object
667     * @throws XServletException
668     */
669    private void reRunWorkflowJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
670            throws XServletException {
671        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
672
673        String jobId = getResourceName(request);
674        try {
675            dagEngine.reRun(jobId, conf);
676        }
677        catch (DagEngineException ex) {
678            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
679        }
680    }
681
682    /**
683     * Rerun bundle job
684     *
685     * @param request servlet request
686     * @param response servlet response
687     * @param conf configration object
688     * @throws XServletException
689     */
690    private void rerunBundleJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
691            throws XServletException {
692        JSONObject json = new JSONObject();
693        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
694        String jobId = getResourceName(request);
695
696        String coordScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM);
697        String dateScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM);
698        String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
699        String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
700
701        XLog.getLog(getClass()).info(
702                "Rerun Bundle for jobId=" + jobId + ", coordScope=" + coordScope + ", dateScope=" + dateScope + ", refresh="
703                        + refresh + ", noCleanup=" + noCleanup);
704
705        try {
706            bundleEngine.reRun(jobId, coordScope, dateScope, Boolean.valueOf(refresh), Boolean.valueOf(noCleanup));
707        }
708        catch (BaseEngineException ex) {
709            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
710        }
711    }
712
713    /**
714     * Rerun coordinator actions
715     *
716     * @param request servlet request
717     * @param response servlet response
718     * @param conf configuration object
719     * @throws XServletException
720     */
721    @SuppressWarnings("unchecked")
722    private JSONObject reRunCoordinatorActions(HttpServletRequest request, HttpServletResponse response,
723            Configuration conf) throws XServletException {
724        JSONObject json = new JSONObject();
725        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request));
726
727        String jobId = getResourceName(request);
728
729        String rerunType = request.getParameter(RestConstants.JOB_COORD_RANGE_TYPE_PARAM);
730        String scope = request.getParameter(RestConstants.JOB_COORD_SCOPE_PARAM);
731        String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
732        String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
733        String failed = request.getParameter(RestConstants.JOB_COORD_RERUN_FAILED_PARAM);
734
735        XLog.getLog(getClass()).info(
736                "Rerun coordinator for jobId=" + jobId + ", rerunType=" + rerunType + ",scope=" + scope + ",refresh="
737                        + refresh + ", noCleanup=" + noCleanup);
738
739        try {
740            if (!(rerunType.equals(RestConstants.JOB_COORD_SCOPE_DATE) || rerunType
741                    .equals(RestConstants.JOB_COORD_SCOPE_ACTION))) {
742                throw new CommandException(ErrorCode.E1018, "date or action expected.");
743            }
744            CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh),
745                    Boolean.valueOf(noCleanup), Boolean.valueOf(failed), conf);
746            List<CoordinatorActionBean> coordActions;
747            if (coordInfo != null) {
748                coordActions = coordInfo.getCoordActions();
749            }
750            else {
751                coordActions = CoordUtils.getCoordActions(rerunType, jobId, scope, false);
752            }
753            json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT"));
754        }
755        catch (BaseEngineException ex) {
756            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
757        }
758        catch (CommandException ex) {
759            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
760        }
761
762        return json;
763    }
764
765
766
767    /**
768     * Get workflow job
769     *
770     * @param request servlet request
771     * @param response servlet response
772     * @return JsonBean WorkflowJobBean
773     * @throws XServletException
774     */
775    protected JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
776        JsonBean jobBean = getWorkflowJobBean(request, response);
777        // for backward compatibility (OOZIE-1231)
778        swapMRActionID((WorkflowJob)jobBean);
779        return jobBean;
780    }
781
782    /**
783     * Get workflow job
784     *
785     * @param request servlet request
786     * @param response servlet response
787     * @return JsonBean WorkflowJobBean
788     * @throws XServletException
789     */
790    protected JsonBean getWorkflowJobBean(HttpServletRequest request, HttpServletResponse response) throws XServletException {
791        JsonBean jobBean = null;
792        String jobId = getResourceName(request);
793        String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
794        String lenStr = request.getParameter(RestConstants.LEN_PARAM);
795        int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
796        start = (start < 1) ? 1 : start;
797        int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
798        len = (len < 1) ? Integer.MAX_VALUE : len;
799        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
800        try {
801            jobBean = (JsonBean) dagEngine.getJob(jobId, start, len);
802        }
803        catch (DagEngineException ex) {
804            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
805        }
806        return jobBean;
807    }
808
809    private void swapMRActionID(WorkflowJob wjBean) {
810        List<WorkflowAction> actions = wjBean.getActions();
811        if (actions != null) {
812            for (WorkflowAction wa : actions) {
813                swapMRActionID(wa);
814            }
815        }
816    }
817
818    private void swapMRActionID(WorkflowAction waBean) {
819        if (waBean.getType().equals("map-reduce")) {
820            String childId = waBean.getExternalChildIDs();
821            if (childId != null && !childId.equals("")) {
822                String consoleBase = getConsoleBase(waBean.getConsoleUrl());
823                ((WorkflowActionBean) waBean).setConsoleUrl(consoleBase + childId);
824                ((WorkflowActionBean) waBean).setExternalId(childId);
825                ((WorkflowActionBean) waBean).setExternalChildIDs("");
826            }
827        }
828    }
829
830    private String getConsoleBase(String url) {
831        String consoleBase = null;
832        if (url.indexOf("application") != -1) {
833            consoleBase = url.split("application_[0-9]+_[0-9]+")[0];
834        }
835        else {
836            consoleBase = url.split("job_[0-9]+_[0-9]+")[0];
837        }
838        return consoleBase;
839    }
840
841    /**
842     * Get wf action info
843     *
844     * @param request servlet request
845     * @param response servlet response
846     * @return JsonBean WorkflowActionBean
847     * @throws XServletException
848     */
849    protected JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response)
850            throws XServletException {
851
852        JsonBean actionBean = getWorkflowActionBean(request, response);
853        // for backward compatibility (OOZIE-1231)
854        swapMRActionID((WorkflowAction)actionBean);
855        return actionBean;
856    }
857
858    protected JsonBean getWorkflowActionBean(HttpServletRequest request, HttpServletResponse response)
859            throws XServletException {
860        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
861
862        JsonBean actionBean = null;
863        String actionId = getResourceName(request);
864        try {
865            actionBean = dagEngine.getWorkflowAction(actionId);
866        }
867        catch (BaseEngineException ex) {
868            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
869        }
870        return actionBean;
871    }
872
873    /**
874     * Get coord job info
875     *
876     * @param request servlet request
877     * @param response servlet response
878     * @return JsonBean CoordinatorJobBean
879     * @throws XServletException
880     * @throws BaseEngineException
881     */
882    protected JsonBean getCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
883            throws XServletException, BaseEngineException {
884        JsonBean jobBean = null;
885        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
886                getUser(request));
887        String jobId = getResourceName(request);
888        String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
889        String lenStr = request.getParameter(RestConstants.LEN_PARAM);
890        String filter = request.getParameter(RestConstants.JOB_FILTER_PARAM);
891        String orderStr = request.getParameter(RestConstants.ORDER_PARAM);
892        boolean order = (orderStr != null && orderStr.equals("desc")) ? true : false;
893        int offset = (startStr != null) ? Integer.parseInt(startStr) : 1;
894        offset = (offset < 1) ? 1 : offset;
895        // Get default number of coordinator actions to be retrieved
896        int defaultLen = ConfigurationService.getInt(COORD_ACTIONS_DEFAULT_LENGTH);
897        int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
898        len = getCoordinatorJobLength(defaultLen, len);
899        try {
900            CoordinatorJobBean coordJob = coordEngine.getCoordJob(jobId, filter, offset, len, order);
901            jobBean = coordJob;
902        }
903        catch (CoordinatorEngineException ex) {
904            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
905        }
906
907        return jobBean;
908    }
909
910    /**
911     * Given the requested length and the default length, determine how many coordinator jobs to return.
912     * Used by {@link #getCoordinatorJob(javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)}
913     *
914     * @param defaultLen The default length
915     * @param len The requested length
916     * @return The length to use
917     */
918    protected int getCoordinatorJobLength(int defaultLen, int len) {
919        return (len < 1) ? defaultLen : len;
920    }
921
922    /**
923     * Get bundle job info
924     *
925     * @param request servlet request
926     * @param response servlet response
927     * @return JsonBean bundle job bean
928     * @throws XServletException
929     * @throws BaseEngineException
930     */
931    private JsonBean getBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
932            BaseEngineException {
933        JsonBean jobBean = null;
934        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
935        String jobId = getResourceName(request);
936
937        try {
938            jobBean = (JsonBean) bundleEngine.getBundleJob(jobId);
939
940            return jobBean;
941        }
942        catch (BundleEngineException ex) {
943            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
944        }
945    }
946
947    /**
948     * Get coordinator action
949     *
950     * @param request servlet request
951     * @param response servlet response
952     * @return JsonBean CoordinatorActionBean
953     * @throws XServletException
954     * @throws BaseEngineException
955     */
956    private JsonBean getCoordinatorAction(HttpServletRequest request, HttpServletResponse response)
957            throws XServletException, BaseEngineException {
958        JsonBean actionBean = null;
959        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
960                getUser(request));
961        String actionId = getResourceName(request);
962        try {
963            actionBean = coordEngine.getCoordAction(actionId);
964        }
965        catch (CoordinatorEngineException ex) {
966            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
967        }
968
969        return actionBean;
970    }
971
972    /**
973     * Get wf job definition
974     *
975     * @param request servlet request
976     * @param response servlet response
977     * @return String wf definition
978     * @throws XServletException
979     */
980    private String getWorkflowJobDefinition(HttpServletRequest request, HttpServletResponse response)
981            throws XServletException {
982        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
983
984        String wfDefinition;
985        String jobId = getResourceName(request);
986        try {
987            wfDefinition = dagEngine.getDefinition(jobId);
988        }
989        catch (DagEngineException ex) {
990            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
991        }
992        return wfDefinition;
993    }
994
995    /**
996     * Get bundle job definition
997     *
998     * @param request servlet request
999     * @param response servlet response
1000     * @return String bundle definition
1001     * @throws XServletException
1002     */
1003    private String getBundleJobDefinition(HttpServletRequest request, HttpServletResponse response) throws XServletException {
1004        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
1005        String bundleDefinition;
1006        String jobId = getResourceName(request);
1007        try {
1008            bundleDefinition = bundleEngine.getDefinition(jobId);
1009        }
1010        catch (BundleEngineException ex) {
1011            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
1012        }
1013        return bundleDefinition;
1014    }
1015
1016    /**
1017     * Get coordinator job definition
1018     *
1019     * @param request servlet request
1020     * @param response servlet response
1021     * @return String coord definition
1022     * @throws XServletException
1023     */
1024    private String getCoordinatorJobDefinition(HttpServletRequest request, HttpServletResponse response)
1025            throws XServletException {
1026
1027        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
1028                getUser(request));
1029
1030        String jobId = getResourceName(request);
1031
1032        String coordDefinition = null;
1033        try {
1034            coordDefinition = coordEngine.getDefinition(jobId);
1035        }
1036        catch (BaseEngineException ex) {
1037            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
1038        }
1039        return coordDefinition;
1040    }
1041
1042    /**
1043     * Stream wf job log
1044     *
1045     * @param request servlet request
1046     * @param response servlet response
1047     * @throws XServletException
1048     * @throws IOException
1049     */
1050    private void streamWorkflowJobLog(HttpServletRequest request, HttpServletResponse response)
1051            throws XServletException, IOException {
1052        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
1053        String jobId = getResourceName(request);
1054        try {
1055            dagEngine.streamLog(jobId, response.getWriter(), request.getParameterMap());
1056        }
1057        catch (BaseEngineException ex) {
1058            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
1059        }
1060    }
1061
1062    /**
1063     * Stream bundle job log
1064     *
1065     * @param request servlet request
1066     * @param response servlet response
1067     * @throws XServletException
1068     */
1069    private void streamBundleJobLog(HttpServletRequest request, HttpServletResponse response)
1070            throws XServletException, IOException {
1071        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
1072        String jobId = getResourceName(request);
1073        try {
1074            bundleEngine.streamLog(jobId, response.getWriter(), request.getParameterMap());
1075        }
1076        catch (BaseEngineException ex) {
1077            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
1078        }
1079    }
1080
1081    /**
1082     * Stream coordinator job log
1083     *
1084     * @param request servlet request
1085     * @param response servlet response
1086     * @throws XServletException
1087     * @throws IOException
1088     */
1089    private void streamCoordinatorJobLog(HttpServletRequest request, HttpServletResponse response)
1090            throws XServletException, IOException {
1091
1092        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
1093                getUser(request));
1094        String jobId = getResourceName(request);
1095        String logRetrievalScope = request.getParameter(RestConstants.JOB_LOG_SCOPE_PARAM);
1096        String logRetrievalType = request.getParameter(RestConstants.JOB_LOG_TYPE_PARAM);
1097        try {
1098            coordEngine.streamLog(jobId, logRetrievalScope, logRetrievalType, response.getWriter(), request.getParameterMap());
1099        }
1100        catch (BaseEngineException ex) {
1101            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
1102        }
1103        catch (CommandException ex) {
1104            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
1105        }
1106    }
1107
1108    @Override
1109    protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException,
1110            IOException {
1111        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE);
1112    }
1113
1114    @Override
1115    protected JSONObject getJobsByParentId(HttpServletRequest request, HttpServletResponse response)
1116            throws XServletException, IOException {
1117        JSONObject json = new JSONObject();
1118        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class)
1119                .getCoordinatorEngine(getUser(request));
1120        String coordActionId;
1121        String type = request.getParameter(RestConstants.JOB_COORD_RANGE_TYPE_PARAM);
1122        String scope = request.getParameter(RestConstants.JOB_COORD_SCOPE_PARAM);
1123        // for getting allruns for coordinator action - 2 alternate endpoints
1124        if (type != null && type.equals(RestConstants.JOB_COORD_SCOPE_ACTION) && scope != null) {
1125            // endpoint - oozie/v2/coord-job-id?type=action&scope=action-num&show=allruns
1126            String jobId = getResourceName(request);
1127            coordActionId = Services.get().get(UUIDService.class).generateChildId(jobId, scope);
1128        }
1129        else {
1130            // endpoint - oozie/v2/coord-action-id?show=allruns
1131            coordActionId = getResourceName(request);
1132        }
1133        try {
1134            List<WorkflowJobBean> wfs = coordEngine.getReruns(coordActionId);
1135            JSONArray array = new JSONArray();
1136            if (wfs != null) {
1137                for (WorkflowJobBean wf : wfs) {
1138                    JSONObject json1 = new JSONObject();
1139                    json1.put(JsonTags.WORKFLOW_ID, wf.getId());
1140                    json1.put(JsonTags.WORKFLOW_STATUS, wf.getStatus().toString());
1141                    json1.put(JsonTags.WORKFLOW_START_TIME, JsonUtils.formatDateRfc822(wf.getStartTime(), "GMT"));
1142                    json1.put(JsonTags.WORKFLOW_ACTION_END_TIME, JsonUtils.formatDateRfc822(wf.getEndTime(), "GMT"));
1143                    array.add(json1);
1144                }
1145            }
1146            json.put(JsonTags.WORKFLOWS_JOBS, array);
1147            return json;
1148        }
1149        catch (CoordinatorEngineException ex) {
1150            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
1151        }
1152    }
1153    /**
1154     * not supported for v1
1155     */
1156    @Override
1157    protected JSONObject updateJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
1158            throws XServletException, IOException {
1159        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE);
1160    }
1161
1162    @Override
1163    protected String getJobStatus(HttpServletRequest request, HttpServletResponse response) throws XServletException,
1164            IOException {
1165        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE);
1166    }
1167
1168    @Override
1169    protected void streamJobErrorLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
1170            IOException {
1171        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE);
1172    }
1173    @Override
1174    protected void streamJobAuditLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
1175            IOException {
1176        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE);
1177    }
1178    @Override
1179    void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException {
1180        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE);
1181    }
1182
1183    @Override
1184    void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException,
1185            IOException {
1186        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE);
1187    }
1188
1189    @Override
1190    void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException {
1191        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE);
1192    }
1193
1194    @Override
1195    JSONObject getCoordActionMissingDependencies(HttpServletRequest request, HttpServletResponse response)
1196            throws XServletException, IOException {
1197        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE);
1198    }
1199
1200    @Override
1201    JSONArray getActionRetries(HttpServletRequest request, HttpServletResponse response) throws XServletException,
1202            IOException {
1203        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE);
1204    }
1205}