This project has retired. For details please refer to its Attic page.
001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.servlet;
019    
020    import java.io.IOException;
021    import java.util.List;
022    import javax.servlet.ServletInputStream;
023    import javax.servlet.http.HttpServletRequest;
024    import javax.servlet.http.HttpServletResponse;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.oozie.*;
027    import org.apache.oozie.client.WorkflowAction;
028    import org.apache.oozie.client.WorkflowJob;
029    import org.apache.oozie.client.rest.*;
030    import org.apache.oozie.command.CommandException;
031    import org.apache.oozie.command.coord.CoordRerunXCommand;
032    import org.apache.oozie.service.BundleEngineService;
033    import org.apache.oozie.service.CoordinatorEngineService;
034    import org.apache.oozie.service.DagEngineService;
035    import org.apache.oozie.service.Services;
036    import org.apache.oozie.util.GraphGenerator;
037    import org.apache.oozie.util.XLog;
038    import org.json.simple.JSONObject;
039    
040    
041    @SuppressWarnings("serial")
042    public class V1JobServlet extends BaseJobServlet {
043    
044        private static final String INSTRUMENTATION_NAME = "v1job";
045        public static final String COORD_ACTIONS_DEFAULT_LENGTH = "oozie.coord.actions.default.length";
046    
047        public V1JobServlet() {
048            super(INSTRUMENTATION_NAME);
049        }
050    
051        protected V1JobServlet(String instrumentation_name){
052            super(instrumentation_name);
053        }
054    
055        /*
056         * protected method to start a job
057         */
058        @Override
059        protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
060                IOException {
061            /*
062             * Configuration conf = new XConfiguration(request.getInputStream());
063             * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
064             * conf.get(OozieClient.COORDINATOR_APP_PATH);
065             *
066             * ServletUtilities.ValidateAppPath(wfPath, coordPath);
067             */
068            String jobId = getResourceName(request);
069            if (jobId.endsWith("-W")) {
070                startWorkflowJob(request, response);
071            }
072            else if (jobId.endsWith("-B")) {
073                startBundleJob(request, response);
074            }
075            else {
076                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, RestConstants.ACTION_PARAM, RestConstants.JOB_ACTION_START);
077            }
078    
079        }
080    
081        /*
082         * protected method to resume a job
083         */
084        @Override
085        protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
086                IOException {
087            /*
088             * Configuration conf = new XConfiguration(request.getInputStream());
089             * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
090             * conf.get(OozieClient.COORDINATOR_APP_PATH);
091             *
092             * ServletUtilities.ValidateAppPath(wfPath, coordPath);
093             */
094            String jobId = getResourceName(request);
095            if (jobId.endsWith("-W")) {
096                resumeWorkflowJob(request, response);
097            }
098            else if (jobId.endsWith("-B")) {
099                resumeBundleJob(request, response);
100            }
101            else {
102                resumeCoordinatorJob(request, response);
103            }
104        }
105    
106        /*
107         * protected method to suspend a job
108         */
109        @Override
110        protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
111                IOException {
112            /*
113             * Configuration conf = new XConfiguration(request.getInputStream());
114             * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
115             * conf.get(OozieClient.COORDINATOR_APP_PATH);
116             *
117             * ServletUtilities.ValidateAppPath(wfPath, coordPath);
118             */
119            String jobId = getResourceName(request);
120            if (jobId.endsWith("-W")) {
121                suspendWorkflowJob(request, response);
122            }
123            else if (jobId.endsWith("-B")) {
124                suspendBundleJob(request, response);
125            }
126            else {
127                suspendCoordinatorJob(request, response);
128            }
129        }
130    
131        /*
132         * protected method to kill a job
133         */
134        @Override
135        protected void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
136                IOException {
137            /*
138             * Configuration conf = new XConfiguration(request.getInputStream());
139             * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
140             * conf.get(OozieClient.COORDINATOR_APP_PATH);
141             *
142             * ServletUtilities.ValidateAppPath(wfPath, coordPath);
143             */
144            String jobId = getResourceName(request);
145            if (jobId.endsWith("-W")) {
146                killWorkflowJob(request, response);
147            }
148            else if (jobId.endsWith("-B")) {
149                killBundleJob(request, response);
150            }
151            else {
152                killCoordinatorJob(request, response);
153            }
154        }
155    
156        /**
157         * protected method to change a coordinator job
158         * @param request request object
159         * @param response response object
160         * @throws XServletException
161         * @throws IOException
162         */
163        @Override
164        protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
165                IOException {
166            String jobId = getResourceName(request);
167            if (jobId.endsWith("-B")) {
168                changeBundleJob(request, response);
169            }
170            else {
171                changeCoordinatorJob(request, response);
172            }
173        }
174    
175        /*
176         * protected method to reRun a job
177         *
178         * @seeorg.apache.oozie.servlet.BaseJobServlet#reRunJob(javax.servlet.http.
179         * HttpServletRequest, javax.servlet.http.HttpServletResponse,
180         * org.apache.hadoop.conf.Configuration)
181         */
182        @Override
183        protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
184                throws XServletException, IOException {
185            JSONObject json = null;
186            String jobId = getResourceName(request);
187            if (jobId.endsWith("-W")) {
188                reRunWorkflowJob(request, response, conf);
189            }
190            else if (jobId.endsWith("-B")) {
191                rerunBundleJob(request, response, conf);
192            }
193            else {
194                json = reRunCoordinatorActions(request, response, conf);
195            }
196            return json;
197        }
198    
199        /*
200         * protected method to get a job in JsonBean representation
201         */
202        @Override
203        protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
204                IOException, BaseEngineException {
205            ServletInputStream is = request.getInputStream();
206            byte[] b = new byte[101];
207            while (is.readLine(b, 0, 100) != -1) {
208                XLog.getLog(getClass()).warn("Printing :" + new String(b));
209            }
210    
211            JsonBean jobBean = null;
212            String jobId = getResourceName(request);
213            if (jobId.endsWith("-B")) {
214                jobBean = getBundleJob(request, response);
215            }
216            else {
217                if (jobId.endsWith("-W")) {
218                    jobBean = getWorkflowJob(request, response);
219                }
220                else {
221                    if (jobId.contains("-W@")) {
222                        jobBean = getWorkflowAction(request, response);
223                    }
224                    else {
225                        if (jobId.contains("-C@")) {
226                            jobBean = getCoordinatorAction(request, response);
227                        }
228                        else {
229                            jobBean = getCoordinatorJob(request, response);
230                        }
231                    }
232                }
233            }
234    
235            return jobBean;
236        }
237    
238        /*
239         * protected method to get a job definition in String format
240         */
241        @Override
242        protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
243                throws XServletException, IOException {
244            String jobDefinition = null;
245            String jobId = getResourceName(request);
246            if (jobId.endsWith("-W")) {
247                jobDefinition = getWorkflowJobDefinition(request, response);
248            }
249            else if (jobId.endsWith("-B")) {
250                jobDefinition = getBundleJobDefinition(request, response);
251            }
252            else {
253                jobDefinition = getCoordinatorJobDefinition(request, response);
254            }
255            return jobDefinition;
256        }
257    
258        /*
259         * protected method to stream a job log into response object
260         */
261        @Override
262        protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
263                IOException {
264            String jobId = getResourceName(request);
265            if (jobId.endsWith("-W")) {
266                streamWorkflowJobLog(request, response);
267            }
268            else if (jobId.endsWith("-B")) {
269                streamBundleJob(request, response);
270            }
271            else {
272                streamCoordinatorJobLog(request, response);
273            }
274        }
275    
276        @Override
277        protected void streamJobGraph(HttpServletRequest request, HttpServletResponse response)
278                throws XServletException, IOException {
279            String jobId = getResourceName(request);
280            if (jobId.endsWith("-W")) {
281                // Applicable only to worflow, for now
282                response.setContentType(RestConstants.PNG_IMAGE_CONTENT_TYPE);
283                try {
284                    String showKill = request.getParameter(RestConstants.JOB_SHOW_KILL_PARAM);
285                    boolean sK = showKill != null && (showKill.equalsIgnoreCase("yes") || showKill.equals("1") || showKill.equalsIgnoreCase("true"));
286    
287                    new GraphGenerator(
288                            getWorkflowJobDefinition(request, response),
289                            (JsonWorkflowJob)getWorkflowJob(request, response),
290                            sK).write(response.getOutputStream());
291                }
292                catch (Exception e) {
293                    throw new XServletException(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, ErrorCode.E0307, e.getMessage(), e);
294                }
295            }
296            else {
297                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306);
298            }
299        }
300    
301        /**
302         * Start wf job
303         *
304         * @param request servlet request
305         * @param response servlet response
306         * @throws XServletException
307         */
308        private void startWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
309            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
310    
311            String jobId = getResourceName(request);
312            try {
313                dagEngine.start(jobId);
314            }
315            catch (DagEngineException ex) {
316                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
317            }
318        }
319    
320        /**
321         * Start bundle job
322         *
323         * @param request servlet request
324         * @param response servlet response
325         * @throws XServletException
326         */
327        private void startBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
328            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
329            String jobId = getResourceName(request);
330            try {
331                bundleEngine.start(jobId);
332            }
333            catch (BundleEngineException ex) {
334                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
335            }
336        }
337    
338        /**
339         * Resume workflow job
340         *
341         * @param request servlet request
342         * @param response servlet response
343         * @throws XServletException
344         */
345        private void resumeWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
346            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
347    
348            String jobId = getResourceName(request);
349            try {
350                dagEngine.resume(jobId);
351            }
352            catch (DagEngineException ex) {
353                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
354            }
355        }
356    
357        /**
358         * Resume bundle job
359         *
360         * @param request servlet request
361         * @param response servlet response
362         * @throws XServletException
363         */
364        private void resumeBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
365            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
366            String jobId = getResourceName(request);
367            try {
368                bundleEngine.resume(jobId);
369            }
370            catch (BundleEngineException ex) {
371                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
372            }
373        }
374    
375        /**
376         * Resume coordinator job
377         *
378         * @param request servlet request
379         * @param response servlet response
380         * @throws XServletException
381         * @throws CoordinatorEngineException
382         */
383        private void resumeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
384                throws XServletException {
385            String jobId = getResourceName(request);
386            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
387                    getUser(request));
388            try {
389                coordEngine.resume(jobId);
390            }
391            catch (CoordinatorEngineException ex) {
392                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
393            }
394        }
395    
396        /**
397         * Suspend a wf job
398         *
399         * @param request servlet request
400         * @param response servlet response
401         * @throws XServletException
402         */
403        private void suspendWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
404            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
405    
406            String jobId = getResourceName(request);
407            try {
408                dagEngine.suspend(jobId);
409            }
410            catch (DagEngineException ex) {
411                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
412            }
413        }
414    
415        /**
416         * Suspend bundle job
417         *
418         * @param request servlet request
419         * @param response servlet response
420         * @throws XServletException
421         */
422        private void suspendBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
423            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
424            String jobId = getResourceName(request);
425            try {
426                bundleEngine.suspend(jobId);
427            }
428            catch (BundleEngineException ex) {
429                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
430            }
431        }
432    
433        /**
434         * Suspend coordinator job
435         *
436         * @param request servlet request
437         * @param response servlet response
438         * @throws XServletException
439         */
440        private void suspendCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
441                throws XServletException {
442            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
443                    getUser(request));
444            String jobId = getResourceName(request);
445            try {
446                coordEngine.suspend(jobId);
447            }
448            catch (CoordinatorEngineException ex) {
449                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
450            }
451        }
452    
453        /**
454         * Kill a wf job
455         * @param request servlet request
456         * @param response servlet response
457         * @throws XServletException
458         */
459        private void killWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
460            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
461    
462            String jobId = getResourceName(request);
463            try {
464                dagEngine.kill(jobId);
465            }
466            catch (DagEngineException ex) {
467                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
468            }
469        }
470    
471        /**
472         * Kill a coord job
473         * @param request servlet request
474         * @param response servlet response
475         * @throws XServletException
476         */
477        private void killCoordinatorJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
478            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
479                    getUser(request));
480            String jobId = getResourceName(request);
481            try {
482                coordEngine.kill(jobId);
483            }
484            catch (CoordinatorEngineException ex) {
485                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
486            }
487        }
488    
489        /**
490         * Kill bundle job
491         *
492         * @param request servlet request
493         * @param response servlet response
494         * @throws XServletException
495         */
496        private void killBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
497            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
498            String jobId = getResourceName(request);
499            try {
500                bundleEngine.kill(jobId);
501            }
502            catch (BundleEngineException ex) {
503                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
504            }
505        }
506    
507        /**
508         * Change a coordinator job
509         *
510         * @param request servlet request
511         * @param response servlet response
512         * @throws XServletException
513         */
514        private void changeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
515                throws XServletException {
516            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
517                    getUser(request));
518            String jobId = getResourceName(request);
519            String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
520            try {
521                coordEngine.change(jobId, changeValue);
522            }
523            catch (CoordinatorEngineException ex) {
524                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
525            }
526        }
527    
528        /**
529         * Change a bundle job
530         *
531         * @param request servlet request
532         * @param response servlet response
533         * @throws XServletException
534         */
535        private void changeBundleJob(HttpServletRequest request, HttpServletResponse response)
536                throws XServletException {
537            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
538            String jobId = getResourceName(request);
539            String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
540            try {
541                bundleEngine.change(jobId, changeValue);
542            }
543            catch (BundleEngineException ex) {
544                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
545            }
546        }
547    
548        /**
549         * Rerun a wf job
550         *
551         * @param request servlet request
552         * @param response servlet response
553         * @param conf configuration object
554         * @throws XServletException
555         */
556        private void reRunWorkflowJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
557                throws XServletException {
558            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
559    
560            String jobId = getResourceName(request);
561            try {
562                dagEngine.reRun(jobId, conf);
563            }
564            catch (DagEngineException ex) {
565                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
566            }
567        }
568    
569        /**
570         * Rerun bundle job
571         *
572         * @param request servlet request
573         * @param response servlet response
574         * @param conf configration object
575         * @throws XServletException
576         */
577        private void rerunBundleJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
578                throws XServletException {
579            JSONObject json = new JSONObject();
580            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
581            String jobId = getResourceName(request);
582    
583            String coordScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM);
584            String dateScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM);
585            String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
586            String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
587    
588            XLog.getLog(getClass()).info(
589                    "Rerun Bundle for jobId=" + jobId + ", coordScope=" + coordScope + ", dateScope=" + dateScope + ", refresh="
590                            + refresh + ", noCleanup=" + noCleanup);
591    
592            try {
593                bundleEngine.reRun(jobId, coordScope, dateScope, Boolean.valueOf(refresh), Boolean.valueOf(noCleanup));
594            }
595            catch (BaseEngineException ex) {
596                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
597            }
598        }
599    
600        /**
601         * Rerun coordinator actions
602         *
603         * @param request servlet request
604         * @param response servlet response
605         * @param conf configuration object
606         * @throws XServletException
607         */
608        @SuppressWarnings("unchecked")
609        private JSONObject reRunCoordinatorActions(HttpServletRequest request, HttpServletResponse response,
610                Configuration conf) throws XServletException {
611            JSONObject json = new JSONObject();
612            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request));
613    
614            String jobId = getResourceName(request);
615    
616            String rerunType = request.getParameter(RestConstants.JOB_COORD_RERUN_TYPE_PARAM);
617            String scope = request.getParameter(RestConstants.JOB_COORD_RERUN_SCOPE_PARAM);
618            String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
619            String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
620    
621            XLog.getLog(getClass()).info(
622                    "Rerun coordinator for jobId=" + jobId + ", rerunType=" + rerunType + ",scope=" + scope + ",refresh="
623                            + refresh + ", noCleanup=" + noCleanup);
624    
625            try {
626                if (!(rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE) || rerunType
627                        .equals(RestConstants.JOB_COORD_RERUN_ACTION))) {
628                    throw new CommandException(ErrorCode.E1018, "date or action expected.");
629                }
630                CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh),
631                        Boolean.valueOf(noCleanup));
632                List<CoordinatorActionBean> coordActions;
633                if (coordInfo != null) {
634                    coordActions = coordInfo.getCoordActions();
635                }
636                else {
637                    coordActions = CoordRerunXCommand.getCoordActions(rerunType, jobId, scope);
638                }
639                json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT"));
640            }
641            catch (BaseEngineException ex) {
642                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
643            }
644            catch (CommandException ex) {
645                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
646            }
647    
648            return json;
649        }
650    
651    
652    
653        /**
654         * Get workflow job
655         *
656         * @param request servlet request
657         * @param response servlet response
658         * @return JsonBean WorkflowJobBean
659         * @throws XServletException
660         */
661        protected JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
662            JsonBean jobBean = getWorkflowJobBean(request, response);
663            // for backward compatibility (OOZIE-1231)
664            swapMRActionID((WorkflowJob)jobBean);
665            return jobBean;
666        }
667    
668        /**
669         * Get workflow job
670         *
671         * @param request servlet request
672         * @param response servlet response
673         * @return JsonBean WorkflowJobBean
674         * @throws XServletException
675         */
676        protected JsonBean getWorkflowJobBean(HttpServletRequest request, HttpServletResponse response) throws XServletException {
677            JsonBean jobBean = null;
678            String jobId = getResourceName(request);
679            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
680            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
681            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
682            start = (start < 1) ? 1 : start;
683            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
684            len = (len < 1) ? Integer.MAX_VALUE : len;
685            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
686            try {
687                jobBean = (JsonBean) dagEngine.getJob(jobId, start, len);
688            }
689            catch (DagEngineException ex) {
690                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
691            }
692            return jobBean;
693        }
694    
695        private void swapMRActionID(WorkflowJob wjBean) {
696            List<WorkflowAction> actions = wjBean.getActions();
697            if (actions != null) {
698                for (WorkflowAction wa : actions) {
699                    swapMRActionID(wa);
700                }
701            }
702        }
703    
704        private void swapMRActionID(WorkflowAction waBean) {
705            if (waBean.getType().equals("map-reduce")) {
706                String childId = waBean.getExternalChildIDs();
707                if (childId != null && !childId.equals("")) {
708                    String consoleBase = getConsoleBase(waBean.getConsoleUrl());
709                    ((WorkflowActionBean) waBean).setConsoleUrl(consoleBase + childId);
710                    ((WorkflowActionBean) waBean).setExternalId(childId);
711                    ((WorkflowActionBean) waBean).setExternalChildIDs("");
712                }
713            }
714        }
715    
716        private String getConsoleBase(String url) {
717            String consoleBase = null;
718            if (url.indexOf("application") != -1) {
719                consoleBase = url.split("application_[0-9]+_[0-9]+")[0];
720            }
721            else {
722                consoleBase = url.split("job_[0-9]+_[0-9]+")[0];
723            }
724            return consoleBase;
725        }
726    
727        /**
728         * Get wf action info
729         *
730         * @param request servlet request
731         * @param response servlet response
732         * @return JsonBean WorkflowActionBean
733         * @throws XServletException
734         */
735        protected JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response)
736                throws XServletException {
737    
738            JsonBean actionBean = getWorkflowActionBean(request, response);
739            // for backward compatibility (OOZIE-1231)
740            swapMRActionID((WorkflowAction)actionBean);
741            return actionBean;
742        }
743    
744        protected JsonBean getWorkflowActionBean(HttpServletRequest request, HttpServletResponse response)
745                throws XServletException {
746            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
747    
748            JsonBean actionBean = null;
749            String actionId = getResourceName(request);
750            try {
751                actionBean = dagEngine.getWorkflowAction(actionId);
752            }
753            catch (BaseEngineException ex) {
754                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
755            }
756            return actionBean;
757        }
758    
759        /**
760         * Get coord job info
761         *
762         * @param request servlet request
763         * @param response servlet response
764         * @return JsonBean CoordinatorJobBean
765         * @throws XServletException
766         * @throws BaseEngineException
767         */
768        protected JsonBean getCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
769                throws XServletException, BaseEngineException {
770            JsonBean jobBean = null;
771            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
772                    getUser(request));
773            String jobId = getResourceName(request);
774            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
775            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
776            String filter = request.getParameter(RestConstants.JOB_FILTER_PARAM);
777            String orderStr = request.getParameter(RestConstants.ORDER_PARAM);
778            boolean order = (orderStr != null && orderStr.equals("desc")) ? true : false;
779            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
780            start = (start < 1) ? 1 : start;
781            // Get default number of coordinator actions to be retrieved
782            int defaultLen = Services.get().getConf().getInt(COORD_ACTIONS_DEFAULT_LENGTH, 1000);
783            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
784            len = getCoordinatorJobLength(defaultLen, len);
785            try {
786                JsonCoordinatorJob coordJob = coordEngine.getCoordJob(jobId, filter, start, len, order);
787                jobBean = coordJob;
788            }
789            catch (CoordinatorEngineException ex) {
790                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
791            }
792    
793            return jobBean;
794        }
795    
796        /**
797         * Given the requested length and the default length, determine how many coordinator jobs to return.
798         * Used by {@link #getCoordinatorJob(javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)}
799         *
800         * @param defaultLen The default length
801         * @param len The requested length
802         * @return The length to use
803         */
804        protected int getCoordinatorJobLength(int defaultLen, int len) {
805            return (len < 1) ? defaultLen : len;
806        }
807    
808        /**
809         * Get bundle job info
810         *
811         * @param request servlet request
812         * @param response servlet response
813         * @return JsonBean bundle job bean
814         * @throws XServletException
815         * @throws BaseEngineException
816         */
817        private JsonBean getBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
818                BaseEngineException {
819            JsonBean jobBean = null;
820            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
821            String jobId = getResourceName(request);
822    
823            try {
824                jobBean = (JsonBean) bundleEngine.getBundleJob(jobId);
825    
826                return jobBean;
827            }
828            catch (BundleEngineException ex) {
829                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
830            }
831        }
832    
833        /**
834         * Get coordinator action
835         *
836         * @param request servlet request
837         * @param response servlet response
838         * @return JsonBean CoordinatorActionBean
839         * @throws XServletException
840         * @throws BaseEngineException
841         */
842        private JsonBean getCoordinatorAction(HttpServletRequest request, HttpServletResponse response)
843                throws XServletException, BaseEngineException {
844            JsonBean actionBean = null;
845            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
846                    getUser(request));
847            String actionId = getResourceName(request);
848            try {
849                actionBean = coordEngine.getCoordAction(actionId);
850            }
851            catch (CoordinatorEngineException ex) {
852                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
853            }
854    
855            return actionBean;
856        }
857    
858        /**
859         * Get wf job definition
860         *
861         * @param request servlet request
862         * @param response servlet response
863         * @return String wf definition
864         * @throws XServletException
865         */
866        private String getWorkflowJobDefinition(HttpServletRequest request, HttpServletResponse response)
867                throws XServletException {
868            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
869    
870            String wfDefinition;
871            String jobId = getResourceName(request);
872            try {
873                wfDefinition = dagEngine.getDefinition(jobId);
874            }
875            catch (DagEngineException ex) {
876                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
877            }
878            return wfDefinition;
879        }
880    
881        /**
882         * Get bundle job definition
883         *
884         * @param request servlet request
885         * @param response servlet response
886         * @return String bundle definition
887         * @throws XServletException
888         */
889        private String getBundleJobDefinition(HttpServletRequest request, HttpServletResponse response) throws XServletException {
890            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
891            String bundleDefinition;
892            String jobId = getResourceName(request);
893            try {
894                bundleDefinition = bundleEngine.getDefinition(jobId);
895            }
896            catch (BundleEngineException ex) {
897                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
898            }
899            return bundleDefinition;
900        }
901    
902        /**
903         * Get coordinator job definition
904         *
905         * @param request servlet request
906         * @param response servlet response
907         * @return String coord definition
908         * @throws XServletException
909         */
910        private String getCoordinatorJobDefinition(HttpServletRequest request, HttpServletResponse response)
911                throws XServletException {
912    
913            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
914                    getUser(request));
915    
916            String jobId = getResourceName(request);
917    
918            String coordDefinition = null;
919            try {
920                coordDefinition = coordEngine.getDefinition(jobId);
921            }
922            catch (BaseEngineException ex) {
923                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
924            }
925            return coordDefinition;
926        }
927    
928        /**
929         * Stream wf job log
930         *
931         * @param request servlet request
932         * @param response servlet response
933         * @throws XServletException
934         * @throws IOException
935         */
936        private void streamWorkflowJobLog(HttpServletRequest request, HttpServletResponse response)
937                throws XServletException, IOException {
938            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
939            String jobId = getResourceName(request);
940            try {
941                dagEngine.streamLog(jobId, response.getWriter());
942            }
943            catch (DagEngineException ex) {
944                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
945            }
946        }
947    
948        /**
949         * Stream bundle job log
950         *
951         * @param request servlet request
952         * @param response servlet response
953         * @throws XServletException
954         */
955        private void streamBundleJob(HttpServletRequest request, HttpServletResponse response)
956                throws XServletException, IOException {
957            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
958            String jobId = getResourceName(request);
959            try {
960                bundleEngine.streamLog(jobId, response.getWriter());
961            }
962            catch (BundleEngineException ex) {
963                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
964            }
965        }
966    
967        /**
968         * Stream coordinator job log
969         *
970         * @param request servlet request
971         * @param response servlet response
972         * @throws XServletException
973         * @throws IOException
974         */
975        private void streamCoordinatorJobLog(HttpServletRequest request, HttpServletResponse response)
976                throws XServletException, IOException {
977    
978            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
979                    getUser(request));
980            String jobId = getResourceName(request);
981            String logRetrievalScope = request.getParameter(RestConstants.JOB_LOG_SCOPE_PARAM);
982            String logRetrievalType = request.getParameter(RestConstants.JOB_LOG_TYPE_PARAM);
983            try {
984                coordEngine.streamLog(jobId, logRetrievalScope, logRetrievalType, response.getWriter());
985            }
986            catch (BaseEngineException ex) {
987                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
988            }
989            catch (CommandException ex) {
990                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
991            }
992        }
993    
994        @Override
995        protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException,
996                IOException {
997            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
998        }
999    
1000    }