001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.servlet;
019    
020    import java.io.IOException;
021    import java.util.List;
022    import javax.servlet.ServletInputStream;
023    import javax.servlet.http.HttpServletRequest;
024    import javax.servlet.http.HttpServletResponse;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.oozie.*;
027    import org.apache.oozie.client.WorkflowAction;
028    import org.apache.oozie.client.WorkflowJob;
029    import org.apache.oozie.client.rest.*;
030    import org.apache.oozie.command.CommandException;
031    import org.apache.oozie.command.coord.CoordRerunXCommand;
032    import org.apache.oozie.service.BundleEngineService;
033    import org.apache.oozie.service.CoordinatorEngineService;
034    import org.apache.oozie.service.DagEngineService;
035    import org.apache.oozie.service.Services;
036    import org.apache.oozie.util.GraphGenerator;
037    import org.apache.oozie.util.XLog;
038    import org.json.simple.JSONObject;
039    
040    
041    @SuppressWarnings("serial")
042    public class V1JobServlet extends BaseJobServlet {
043    
044        private static final String INSTRUMENTATION_NAME = "v1job";
045        public static final String COORD_ACTIONS_DEFAULT_LENGTH = "oozie.coord.actions.default.length";
046    
047        public V1JobServlet() {
048            super(INSTRUMENTATION_NAME);
049        }
050    
051        protected V1JobServlet(String instrumentation_name){
052            super(instrumentation_name);
053        }
054    
055        /*
056         * protected method to start a job
057         */
058        @Override
059        protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
060                IOException {
061            /*
062             * Configuration conf = new XConfiguration(request.getInputStream());
063             * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
064             * conf.get(OozieClient.COORDINATOR_APP_PATH);
065             *
066             * ServletUtilities.ValidateAppPath(wfPath, coordPath);
067             */
068            String jobId = getResourceName(request);
069            if (jobId.endsWith("-W")) {
070                startWorkflowJob(request, response);
071            }
072            else if (jobId.endsWith("-B")) {
073                startBundleJob(request, response);
074            }
075            else {
076                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, RestConstants.ACTION_PARAM, RestConstants.JOB_ACTION_START);
077            }
078    
079        }
080    
081        /*
082         * protected method to resume a job
083         */
084        @Override
085        protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
086                IOException {
087            /*
088             * Configuration conf = new XConfiguration(request.getInputStream());
089             * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
090             * conf.get(OozieClient.COORDINATOR_APP_PATH);
091             *
092             * ServletUtilities.ValidateAppPath(wfPath, coordPath);
093             */
094            String jobId = getResourceName(request);
095            if (jobId.endsWith("-W")) {
096                resumeWorkflowJob(request, response);
097            }
098            else if (jobId.endsWith("-B")) {
099                resumeBundleJob(request, response);
100            }
101            else {
102                resumeCoordinatorJob(request, response);
103            }
104        }
105    
106        /*
107         * protected method to suspend a job
108         */
109        @Override
110        protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
111                IOException {
112            /*
113             * Configuration conf = new XConfiguration(request.getInputStream());
114             * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
115             * conf.get(OozieClient.COORDINATOR_APP_PATH);
116             *
117             * ServletUtilities.ValidateAppPath(wfPath, coordPath);
118             */
119            String jobId = getResourceName(request);
120            if (jobId.endsWith("-W")) {
121                suspendWorkflowJob(request, response);
122            }
123            else if (jobId.endsWith("-B")) {
124                suspendBundleJob(request, response);
125            }
126            else {
127                suspendCoordinatorJob(request, response);
128            }
129        }
130    
131        /*
132         * protected method to kill a job
133         */
134        @Override
135        protected void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
136                IOException {
137            /*
138             * Configuration conf = new XConfiguration(request.getInputStream());
139             * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
140             * conf.get(OozieClient.COORDINATOR_APP_PATH);
141             *
142             * ServletUtilities.ValidateAppPath(wfPath, coordPath);
143             */
144            String jobId = getResourceName(request);
145            if (jobId.endsWith("-W")) {
146                killWorkflowJob(request, response);
147            }
148            else if (jobId.endsWith("-B")) {
149                killBundleJob(request, response);
150            }
151            else {
152                killCoordinatorJob(request, response);
153            }
154        }
155    
156        /**
157         * protected method to change a coordinator job
158         * @param request request object
159         * @param response response object
160         * @throws XServletException
161         * @throws IOException
162         */
163        @Override
164        protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
165                IOException {
166            String jobId = getResourceName(request);
167            if (jobId.endsWith("-B")) {
168                changeBundleJob(request, response);
169            }
170            else {
171                changeCoordinatorJob(request, response);
172            }
173        }
174    
175        /*
176         * protected method to reRun a job
177         *
178         * @seeorg.apache.oozie.servlet.BaseJobServlet#reRunJob(javax.servlet.http.
179         * HttpServletRequest, javax.servlet.http.HttpServletResponse,
180         * org.apache.hadoop.conf.Configuration)
181         */
182        @Override
183        protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
184                throws XServletException, IOException {
185            JSONObject json = null;
186            String jobId = getResourceName(request);
187            if (jobId.endsWith("-W")) {
188                reRunWorkflowJob(request, response, conf);
189            }
190            else if (jobId.endsWith("-B")) {
191                rerunBundleJob(request, response, conf);
192            }
193            else {
194                json = reRunCoordinatorActions(request, response, conf);
195            }
196            return json;
197        }
198    
199        /*
200         * protected method to get a job in JsonBean representation
201         */
202        @Override
203        protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
204                IOException, BaseEngineException {
205            ServletInputStream is = request.getInputStream();
206            byte[] b = new byte[101];
207            while (is.readLine(b, 0, 100) != -1) {
208                XLog.getLog(getClass()).warn("Printing :" + new String(b));
209            }
210    
211            JsonBean jobBean = null;
212            String jobId = getResourceName(request);
213            if (jobId.endsWith("-B")) {
214                jobBean = getBundleJob(request, response);
215            }
216            else {
217                if (jobId.endsWith("-W")) {
218                    jobBean = getWorkflowJob(request, response);
219                }
220                else {
221                    if (jobId.contains("-W@")) {
222                        jobBean = getWorkflowAction(request, response);
223                    }
224                    else {
225                        if (jobId.contains("-C@")) {
226                            jobBean = getCoordinatorAction(request, response);
227                        }
228                        else {
229                            jobBean = getCoordinatorJob(request, response);
230                        }
231                    }
232                }
233            }
234    
235            return jobBean;
236        }
237    
238        /*
239         * protected method to get a job definition in String format
240         */
241        @Override
242        protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
243                throws XServletException, IOException {
244            String jobDefinition = null;
245            String jobId = getResourceName(request);
246            if (jobId.endsWith("-W")) {
247                jobDefinition = getWorkflowJobDefinition(request, response);
248            }
249            else if (jobId.endsWith("-B")) {
250                jobDefinition = getBundleJobDefinition(request, response);
251            }
252            else {
253                jobDefinition = getCoordinatorJobDefinition(request, response);
254            }
255            return jobDefinition;
256        }
257    
258        /*
259         * protected method to stream a job log into response object
260         */
261        @Override
262        protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
263                IOException {
264            String jobId = getResourceName(request);
265            if (jobId.endsWith("-W")) {
266                streamWorkflowJobLog(request, response);
267            }
268            else if (jobId.endsWith("-B")) {
269                streamBundleJob(request, response);
270            }
271            else {
272                streamCoordinatorJobLog(request, response);
273            }
274        }
275    
276        @Override
277        protected void streamJobGraph(HttpServletRequest request, HttpServletResponse response)
278                throws XServletException, IOException {
279            String jobId = getResourceName(request);
280            if (jobId.endsWith("-W")) {
281                // Applicable only to worflow, for now
282                response.setContentType(RestConstants.PNG_IMAGE_CONTENT_TYPE);
283                try {
284                    String showKill = request.getParameter(RestConstants.JOB_SHOW_KILL_PARAM);
285                    boolean sK = showKill != null && (showKill.equalsIgnoreCase("yes") || showKill.equals("1") || showKill.equalsIgnoreCase("true"));
286    
287                    new GraphGenerator(
288                            getWorkflowJobDefinition(request, response),
289                            (JsonWorkflowJob)getWorkflowJob(request, response),
290                            sK).write(response.getOutputStream());
291                }
292                catch (Exception e) {
293                    throw new XServletException(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, ErrorCode.E0307, e.getMessage(), e);
294                }
295            }
296            else {
297                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306);
298            }
299        }
300    
301        /**
302         * Start wf job
303         *
304         * @param request servlet request
305         * @param response servlet response
306         * @throws XServletException
307         */
308        private void startWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
309            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
310    
311            String jobId = getResourceName(request);
312            try {
313                dagEngine.start(jobId);
314            }
315            catch (DagEngineException ex) {
316                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
317            }
318        }
319    
320        /**
321         * Start bundle job
322         *
323         * @param request servlet request
324         * @param response servlet response
325         * @throws XServletException
326         */
327        private void startBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
328            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
329            String jobId = getResourceName(request);
330            try {
331                bundleEngine.start(jobId);
332            }
333            catch (BundleEngineException ex) {
334                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
335            }
336        }
337    
338        /**
339         * Resume workflow job
340         *
341         * @param request servlet request
342         * @param response servlet response
343         * @throws XServletException
344         */
345        private void resumeWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
346            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
347    
348            String jobId = getResourceName(request);
349            try {
350                dagEngine.resume(jobId);
351            }
352            catch (DagEngineException ex) {
353                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
354            }
355        }
356    
357        /**
358         * Resume bundle job
359         *
360         * @param request servlet request
361         * @param response servlet response
362         * @throws XServletException
363         */
364        private void resumeBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
365            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
366            String jobId = getResourceName(request);
367            try {
368                bundleEngine.resume(jobId);
369            }
370            catch (BundleEngineException ex) {
371                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
372            }
373        }
374    
375        /**
376         * Resume coordinator job
377         *
378         * @param request servlet request
379         * @param response servlet response
380         * @throws XServletException
381         * @throws CoordinatorEngineException
382         */
383        private void resumeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
384                throws XServletException {
385            String jobId = getResourceName(request);
386            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
387                    getUser(request));
388            try {
389                coordEngine.resume(jobId);
390            }
391            catch (CoordinatorEngineException ex) {
392                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
393            }
394        }
395    
396        /**
397         * Suspend a wf job
398         *
399         * @param request servlet request
400         * @param response servlet response
401         * @throws XServletException
402         */
403        private void suspendWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
404            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
405    
406            String jobId = getResourceName(request);
407            try {
408                dagEngine.suspend(jobId);
409            }
410            catch (DagEngineException ex) {
411                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
412            }
413        }
414    
415        /**
416         * Suspend bundle job
417         *
418         * @param request servlet request
419         * @param response servlet response
420         * @throws XServletException
421         */
422        private void suspendBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
423            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
424            String jobId = getResourceName(request);
425            try {
426                bundleEngine.suspend(jobId);
427            }
428            catch (BundleEngineException ex) {
429                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
430            }
431        }
432    
433        /**
434         * Suspend coordinator job
435         *
436         * @param request servlet request
437         * @param response servlet response
438         * @throws XServletException
439         */
440        private void suspendCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
441                throws XServletException {
442            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
443                    getUser(request));
444            String jobId = getResourceName(request);
445            try {
446                coordEngine.suspend(jobId);
447            }
448            catch (CoordinatorEngineException ex) {
449                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
450            }
451        }
452    
453        /**
454         * Kill a wf job
455         * @param request servlet request
456         * @param response servlet response
457         * @throws XServletException
458         */
459        private void killWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
460            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
461    
462            String jobId = getResourceName(request);
463            try {
464                dagEngine.kill(jobId);
465            }
466            catch (DagEngineException ex) {
467                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
468            }
469        }
470    
471        /**
472         * Kill a coord job
473         * @param request servlet request
474         * @param response servlet response
475         * @throws XServletException
476         */
477        private void killCoordinatorJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
478            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
479                    getUser(request));
480            String jobId = getResourceName(request);
481            try {
482                coordEngine.kill(jobId);
483            }
484            catch (CoordinatorEngineException ex) {
485                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
486            }
487        }
488    
489        /**
490         * Kill bundle job
491         *
492         * @param request servlet request
493         * @param response servlet response
494         * @throws XServletException
495         */
496        private void killBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
497            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
498            String jobId = getResourceName(request);
499            try {
500                bundleEngine.kill(jobId);
501            }
502            catch (BundleEngineException ex) {
503                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
504            }
505        }
506    
507        /**
508         * Change a coordinator job
509         *
510         * @param request servlet request
511         * @param response servlet response
512         * @throws XServletException
513         */
514        private void changeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
515                throws XServletException {
516            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
517                    getUser(request));
518            String jobId = getResourceName(request);
519            String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
520            try {
521                coordEngine.change(jobId, changeValue);
522            }
523            catch (CoordinatorEngineException ex) {
524                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
525            }
526        }
527    
528        /**
529         * Change a bundle job
530         *
531         * @param request servlet request
532         * @param response servlet response
533         * @throws XServletException
534         */
535        private void changeBundleJob(HttpServletRequest request, HttpServletResponse response)
536                throws XServletException {
537            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
538            String jobId = getResourceName(request);
539            String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
540            try {
541                bundleEngine.change(jobId, changeValue);
542            }
543            catch (BundleEngineException ex) {
544                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
545            }
546        }
547    
548        /**
549         * Rerun a wf job
550         *
551         * @param request servlet request
552         * @param response servlet response
553         * @param conf configuration object
554         * @throws XServletException
555         */
556        private void reRunWorkflowJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
557                throws XServletException {
558            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
559    
560            String jobId = getResourceName(request);
561            try {
562                dagEngine.reRun(jobId, conf);
563            }
564            catch (DagEngineException ex) {
565                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
566            }
567        }
568    
569        /**
570         * Rerun bundle job
571         *
572         * @param request servlet request
573         * @param response servlet response
574         * @param conf configration object
575         * @throws XServletException
576         */
577        private void rerunBundleJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
578                throws XServletException {
579            JSONObject json = new JSONObject();
580            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
581            String jobId = getResourceName(request);
582    
583            String coordScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM);
584            String dateScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM);
585            String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
586            String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
587    
588            XLog.getLog(getClass()).info(
589                    "Rerun Bundle for jobId=" + jobId + ", coordScope=" + coordScope + ", dateScope=" + dateScope + ", refresh="
590                            + refresh + ", noCleanup=" + noCleanup);
591    
592            try {
593                bundleEngine.reRun(jobId, coordScope, dateScope, Boolean.valueOf(refresh), Boolean.valueOf(noCleanup));
594            }
595            catch (BaseEngineException ex) {
596                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
597            }
598        }
599    
600        /**
601         * Rerun coordinator actions
602         *
603         * @param request servlet request
604         * @param response servlet response
605         * @param conf configuration object
606         * @throws XServletException
607         */
608        @SuppressWarnings("unchecked")
609        private JSONObject reRunCoordinatorActions(HttpServletRequest request, HttpServletResponse response,
610                Configuration conf) throws XServletException {
611            JSONObject json = new JSONObject();
612            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request));
613    
614            String jobId = getResourceName(request);
615    
616            String rerunType = request.getParameter(RestConstants.JOB_COORD_RERUN_TYPE_PARAM);
617            String scope = request.getParameter(RestConstants.JOB_COORD_RERUN_SCOPE_PARAM);
618            String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
619            String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
620    
621            XLog.getLog(getClass()).info(
622                    "Rerun coordinator for jobId=" + jobId + ", rerunType=" + rerunType + ",scope=" + scope + ",refresh="
623                            + refresh + ", noCleanup=" + noCleanup);
624    
625            try {
626                if (!(rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE) || rerunType
627                        .equals(RestConstants.JOB_COORD_RERUN_ACTION))) {
628                    throw new CommandException(ErrorCode.E1018, "date or action expected.");
629                }
630                CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh),
631                        Boolean.valueOf(noCleanup));
632                List<CoordinatorActionBean> coordActions;
633                if (coordInfo != null) {
634                    coordActions = coordInfo.getCoordActions();
635                }
636                else {
637                    coordActions = CoordRerunXCommand.getCoordActions(rerunType, jobId, scope);
638                }
639                json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT"));
640            }
641            catch (BaseEngineException ex) {
642                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
643            }
644            catch (CommandException ex) {
645                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
646            }
647    
648            return json;
649        }
650    
651    
652    
653        /**
654         * Get workflow job
655         *
656         * @param request servlet request
657         * @param response servlet response
658         * @return JsonBean WorkflowJobBean
659         * @throws XServletException
660         */
661        protected JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
662            JsonBean jobBean = getWorkflowJobBean(request, response);
663            // for backward compatibility (OOZIE-1231)
664            swapMRActionID((WorkflowJob)jobBean);
665            return jobBean;
666        }
667    
668        /**
669         * Get workflow job
670         *
671         * @param request servlet request
672         * @param response servlet response
673         * @return JsonBean WorkflowJobBean
674         * @throws XServletException
675         */
676        protected JsonBean getWorkflowJobBean(HttpServletRequest request, HttpServletResponse response) throws XServletException {
677            JsonBean jobBean = null;
678            String jobId = getResourceName(request);
679            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
680            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
681            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
682            start = (start < 1) ? 1 : start;
683            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
684            len = (len < 1) ? Integer.MAX_VALUE : len;
685            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
686            try {
687                jobBean = (JsonBean) dagEngine.getJob(jobId, start, len);
688            }
689            catch (DagEngineException ex) {
690                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
691            }
692            return jobBean;
693        }
694    
695        private void swapMRActionID(WorkflowJob wjBean) {
696            List<WorkflowAction> actions = wjBean.getActions();
697            if (actions != null) {
698                for (WorkflowAction wa : actions) {
699                    swapMRActionID(wa);
700                }
701            }
702        }
703    
704        private void swapMRActionID(WorkflowAction waBean) {
705            if (waBean.getType().equals("map-reduce")) {
706                String childId = waBean.getExternalChildIDs();
707                if (childId != null && !childId.equals("")) {
708                    String consoleBase = getConsoleBase(waBean.getConsoleUrl());
709                    ((WorkflowActionBean) waBean).setConsoleUrl(consoleBase + childId);
710                    ((WorkflowActionBean) waBean).setExternalId(childId);
711                    ((WorkflowActionBean) waBean).setExternalChildIDs("");
712                }
713            }
714        }
715    
716        private String getConsoleBase(String url) {
717            String consoleBase = null;
718            if (url.indexOf("application") != -1) {
719                consoleBase = url.split("application_[0-9]+_[0-9]+")[0];
720            }
721            else {
722                consoleBase = url.split("job_[0-9]+_[0-9]+")[0];
723            }
724            return consoleBase;
725        }
726    
727        /**
728         * Get wf action info
729         *
730         * @param request servlet request
731         * @param response servlet response
732         * @return JsonBean WorkflowActionBean
733         * @throws XServletException
734         */
735        protected JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response)
736                throws XServletException {
737    
738            JsonBean actionBean = getWorkflowActionBean(request, response);
739            // for backward compatibility (OOZIE-1231)
740            swapMRActionID((WorkflowAction)actionBean);
741            return actionBean;
742        }
743    
744        protected JsonBean getWorkflowActionBean(HttpServletRequest request, HttpServletResponse response)
745                throws XServletException {
746            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
747    
748            JsonBean actionBean = null;
749            String actionId = getResourceName(request);
750            try {
751                actionBean = dagEngine.getWorkflowAction(actionId);
752            }
753            catch (BaseEngineException ex) {
754                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
755            }
756            return actionBean;
757        }
758    
759        /**
760         * Get coord job info
761         *
762         * @param request servlet request
763         * @param response servlet response
764         * @return JsonBean CoordinatorJobBean
765         * @throws XServletException
766         * @throws BaseEngineException
767         */
768        protected JsonBean getCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
769                throws XServletException, BaseEngineException {
770            JsonBean jobBean = null;
771            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
772                    getUser(request));
773            String jobId = getResourceName(request);
774            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
775            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
776            String filter = request.getParameter(RestConstants.JOB_FILTER_PARAM);
777            String orderStr = request.getParameter(RestConstants.ORDER_PARAM);
778            boolean order = (orderStr != null && orderStr.equals("desc")) ? true : false;
779            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
780            start = (start < 1) ? 1 : start;
781            // Get default number of coordinator actions to be retrieved
782            int defaultLen = Services.get().getConf().getInt(COORD_ACTIONS_DEFAULT_LENGTH, 1000);
783            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
784            len = (len < 0) ? defaultLen : len;
785            try {
786                JsonCoordinatorJob coordJob = coordEngine.getCoordJob(jobId, filter, start, len, order);
787                jobBean = coordJob;
788            }
789            catch (CoordinatorEngineException ex) {
790                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
791            }
792    
793            return jobBean;
794        }
795    
796        /**
797         * Get bundle job info
798         *
799         * @param request servlet request
800         * @param response servlet response
801         * @return JsonBean bundle job bean
802         * @throws XServletException
803         * @throws BaseEngineException
804         */
805        private JsonBean getBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
806                BaseEngineException {
807            JsonBean jobBean = null;
808            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
809            String jobId = getResourceName(request);
810    
811            try {
812                jobBean = (JsonBean) bundleEngine.getBundleJob(jobId);
813    
814                return jobBean;
815            }
816            catch (BundleEngineException ex) {
817                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
818            }
819        }
820    
821        /**
822         * Get coordinator action
823         *
824         * @param request servlet request
825         * @param response servlet response
826         * @return JsonBean CoordinatorActionBean
827         * @throws XServletException
828         * @throws BaseEngineException
829         */
830        private JsonBean getCoordinatorAction(HttpServletRequest request, HttpServletResponse response)
831                throws XServletException, BaseEngineException {
832            JsonBean actionBean = null;
833            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
834                    getUser(request));
835            String actionId = getResourceName(request);
836            try {
837                actionBean = coordEngine.getCoordAction(actionId);
838            }
839            catch (CoordinatorEngineException ex) {
840                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
841            }
842    
843            return actionBean;
844        }
845    
846        /**
847         * Get wf job definition
848         *
849         * @param request servlet request
850         * @param response servlet response
851         * @return String wf definition
852         * @throws XServletException
853         */
854        private String getWorkflowJobDefinition(HttpServletRequest request, HttpServletResponse response)
855                throws XServletException {
856            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
857    
858            String wfDefinition;
859            String jobId = getResourceName(request);
860            try {
861                wfDefinition = dagEngine.getDefinition(jobId);
862            }
863            catch (DagEngineException ex) {
864                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
865            }
866            return wfDefinition;
867        }
868    
869        /**
870         * Get bundle job definition
871         *
872         * @param request servlet request
873         * @param response servlet response
874         * @return String bundle definition
875         * @throws XServletException
876         */
877        private String getBundleJobDefinition(HttpServletRequest request, HttpServletResponse response) throws XServletException {
878            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
879            String bundleDefinition;
880            String jobId = getResourceName(request);
881            try {
882                bundleDefinition = bundleEngine.getDefinition(jobId);
883            }
884            catch (BundleEngineException ex) {
885                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
886            }
887            return bundleDefinition;
888        }
889    
890        /**
891         * Get coordinator job definition
892         *
893         * @param request servlet request
894         * @param response servlet response
895         * @return String coord definition
896         * @throws XServletException
897         */
898        private String getCoordinatorJobDefinition(HttpServletRequest request, HttpServletResponse response)
899                throws XServletException {
900    
901            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
902                    getUser(request));
903    
904            String jobId = getResourceName(request);
905    
906            String coordDefinition = null;
907            try {
908                coordDefinition = coordEngine.getDefinition(jobId);
909            }
910            catch (BaseEngineException ex) {
911                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
912            }
913            return coordDefinition;
914        }
915    
916        /**
917         * Stream wf job log
918         *
919         * @param request servlet request
920         * @param response servlet response
921         * @throws XServletException
922         * @throws IOException
923         */
924        private void streamWorkflowJobLog(HttpServletRequest request, HttpServletResponse response)
925                throws XServletException, IOException {
926            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
927            String jobId = getResourceName(request);
928            try {
929                dagEngine.streamLog(jobId, response.getWriter());
930            }
931            catch (DagEngineException ex) {
932                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
933            }
934        }
935    
936        /**
937         * Stream bundle job log
938         *
939         * @param request servlet request
940         * @param response servlet response
941         * @throws XServletException
942         */
943        private void streamBundleJob(HttpServletRequest request, HttpServletResponse response)
944                throws XServletException, IOException {
945            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
946            String jobId = getResourceName(request);
947            try {
948                bundleEngine.streamLog(jobId, response.getWriter());
949            }
950            catch (BundleEngineException ex) {
951                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
952            }
953        }
954    
955        /**
956         * Stream coordinator job log
957         *
958         * @param request servlet request
959         * @param response servlet response
960         * @throws XServletException
961         * @throws IOException
962         */
963        private void streamCoordinatorJobLog(HttpServletRequest request, HttpServletResponse response)
964                throws XServletException, IOException {
965    
966            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
967                    getUser(request));
968            String jobId = getResourceName(request);
969            String logRetrievalScope = request.getParameter(RestConstants.JOB_LOG_SCOPE_PARAM);
970            String logRetrievalType = request.getParameter(RestConstants.JOB_LOG_TYPE_PARAM);
971            try {
972                coordEngine.streamLog(jobId, logRetrievalScope, logRetrievalType, response.getWriter());
973            }
974            catch (BaseEngineException ex) {
975                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
976            }
977            catch (CommandException ex) {
978                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
979            }
980        }
981    
982        @Override
983        protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException,
984                IOException {
985            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
986        }
987    
988    }