001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.servlet;
019    
020    import java.io.IOException;
021    import java.util.List;
022    
023    import javax.servlet.ServletInputStream;
024    import javax.servlet.http.HttpServletRequest;
025    import javax.servlet.http.HttpServletResponse;
026    
027    import org.apache.hadoop.conf.Configuration;
028    import org.apache.oozie.BaseEngineException;
029    import org.apache.oozie.BundleEngine;
030    import org.apache.oozie.BundleEngineException;
031    import org.apache.oozie.CoordinatorActionBean;
032    import org.apache.oozie.CoordinatorActionInfo;
033    import org.apache.oozie.CoordinatorEngine;
034    import org.apache.oozie.CoordinatorEngineException;
035    import org.apache.oozie.command.CommandException;
036    import org.apache.oozie.command.coord.CoordRerunXCommand;
037    import org.apache.oozie.coord.CoordUtils;
038    import org.apache.oozie.DagEngine;
039    import org.apache.oozie.DagEngineException;
040    import org.apache.oozie.ErrorCode;
041    import org.apache.oozie.client.rest.JsonBean;
042    import org.apache.oozie.client.rest.JsonCoordinatorJob;
043    import org.apache.oozie.client.rest.JsonTags;
044    import org.apache.oozie.client.rest.RestConstants;
045    import org.apache.oozie.service.BundleEngineService;
046    import org.apache.oozie.service.CoordinatorEngineService;
047    import org.apache.oozie.service.DagEngineService;
048    import org.apache.oozie.service.Services;
049    import org.apache.oozie.util.XLog;
050    import org.json.simple.JSONObject;
051    
052    @SuppressWarnings("serial")
053    public class V1JobServlet extends BaseJobServlet {
054    
055        private static final String INSTRUMENTATION_NAME = "v1job";
056        public static final String COORD_ACTIONS_DEFAULT_LENGTH = "oozie.coord.actions.default.length";
057    
058        public V1JobServlet() {
059            super(INSTRUMENTATION_NAME);
060        }
061    
062        /*
063         * protected method to start a job
064         */
065        @Override
066        protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
067                IOException {
068            /*
069             * Configuration conf = new XConfiguration(request.getInputStream());
070             * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
071             * conf.get(OozieClient.COORDINATOR_APP_PATH);
072             *
073             * ServletUtilities.ValidateAppPath(wfPath, coordPath);
074             */
075            String jobId = getResourceName(request);
076            if (jobId.endsWith("-W")) {
077                startWorkflowJob(request, response);
078            }
079            else if (jobId.endsWith("-B")) {
080                startBundleJob(request, response);
081            }
082            else {
083                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303);
084            }
085    
086        }
087    
088        /*
089         * protected method to resume a job
090         */
091        @Override
092        protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
093                IOException {
094            /*
095             * Configuration conf = new XConfiguration(request.getInputStream());
096             * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
097             * conf.get(OozieClient.COORDINATOR_APP_PATH);
098             *
099             * ServletUtilities.ValidateAppPath(wfPath, coordPath);
100             */
101            String jobId = getResourceName(request);
102            if (jobId.endsWith("-W")) {
103                resumeWorkflowJob(request, response);
104            }
105            else if (jobId.endsWith("-B")) {
106                resumeBundleJob(request, response);
107            }
108            else {
109                resumeCoordinatorJob(request, response);
110            }
111        }
112    
113        /*
114         * protected method to suspend a job
115         */
116        @Override
117        protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
118                IOException {
119            /*
120             * Configuration conf = new XConfiguration(request.getInputStream());
121             * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
122             * conf.get(OozieClient.COORDINATOR_APP_PATH);
123             *
124             * ServletUtilities.ValidateAppPath(wfPath, coordPath);
125             */
126            String jobId = getResourceName(request);
127            if (jobId.endsWith("-W")) {
128                suspendWorkflowJob(request, response);
129            }
130            else if (jobId.endsWith("-B")) {
131                suspendBundleJob(request, response);
132            }
133            else {
134                suspendCoordinatorJob(request, response);
135            }
136        }
137    
138        /*
139         * protected method to kill a job
140         */
141        @Override
142        protected void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
143                IOException {
144            /*
145             * Configuration conf = new XConfiguration(request.getInputStream());
146             * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
147             * conf.get(OozieClient.COORDINATOR_APP_PATH);
148             *
149             * ServletUtilities.ValidateAppPath(wfPath, coordPath);
150             */
151            String jobId = getResourceName(request);
152            if (jobId.endsWith("-W")) {
153                killWorkflowJob(request, response);
154            }
155            else if (jobId.endsWith("-B")) {
156                killBundleJob(request, response);
157            }
158            else {
159                killCoordinatorJob(request, response);
160            }
161        }
162    
163        /**
164         * protected method to change a coordinator job
165         * @param request request object
166         * @param response response object
167         * @throws XServletException
168         * @throws IOException
169         */
170        @Override
171        protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
172                IOException {
173            String jobId = getResourceName(request);
174            if (jobId.endsWith("-B")) {
175                changeBundleJob(request, response);
176            }
177            else {
178                changeCoordinatorJob(request, response);
179            }
180        }
181    
182        /*
183         * protected method to reRun a job
184         *
185         * @seeorg.apache.oozie.servlet.BaseJobServlet#reRunJob(javax.servlet.http.
186         * HttpServletRequest, javax.servlet.http.HttpServletResponse,
187         * org.apache.hadoop.conf.Configuration)
188         */
189        @Override
190        protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
191                throws XServletException, IOException {
192            JSONObject json = null;
193            String jobId = getResourceName(request);
194            if (jobId.endsWith("-W")) {
195                reRunWorkflowJob(request, response, conf);
196            }
197            else if (jobId.endsWith("-B")) {
198                rerunBundleJob(request, response, conf);
199            }
200            else {
201                json = reRunCoordinatorActions(request, response, conf);
202            }
203            return json;
204        }
205    
206        /*
207         * protected method to get a job in JsonBean representation
208         */
209        @Override
210        protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
211                IOException, BaseEngineException {
212            ServletInputStream is = request.getInputStream();
213            byte[] b = new byte[101];
214            while (is.readLine(b, 0, 100) != -1) {
215                XLog.getLog(getClass()).warn("Printing :" + new String(b));
216            }
217    
218            JsonBean jobBean = null;
219            String jobId = getResourceName(request);
220            if (jobId.endsWith("-B")) {
221                jobBean = getBundleJob(request, response);
222            }
223            else {
224                if (jobId.endsWith("-W")) {
225                    jobBean = getWorkflowJob(request, response);
226                }
227                else {
228                    if (jobId.contains("-W@")) {
229                        jobBean = getWorkflowAction(request, response);
230                    }
231                    else {
232                        if (jobId.contains("-C@")) {
233                            jobBean = getCoordinatorAction(request, response);
234                        }
235                        else {
236                            jobBean = getCoordinatorJob(request, response);
237                        }
238                    }
239                }
240            }
241    
242            return jobBean;
243        }
244    
245        /*
246         * protected method to get a job definition in String format
247         */
248        @Override
249        protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
250                throws XServletException, IOException {
251            String jobDefinition = null;
252            String jobId = getResourceName(request);
253            if (jobId.endsWith("-W")) {
254                jobDefinition = getWorkflowJobDefinition(request, response);
255            }
256            else if (jobId.endsWith("-B")) {
257                jobDefinition = getBundleJobDefinition(request, response);
258            }
259            else {
260                jobDefinition = getCoordinatorJobDefinition(request, response);
261            }
262            return jobDefinition;
263        }
264    
265        /*
266         * protected method to stream a job log into response object
267         */
268        @Override
269        protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
270                IOException {
271            String jobId = getResourceName(request);
272            if (jobId.endsWith("-W")) {
273                streamWorkflowJobLog(request, response);
274            }
275            else if (jobId.endsWith("-B")) {
276                streamBundleJob(request, response);
277            }
278            else {
279                streamCoordinatorJobLog(request, response);
280            }
281        }
282    
283        /**
284         * Start wf job
285         *
286         * @param request servlet request
287         * @param response servlet response
288         * @throws XServletException
289         */
290        private void startWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
291            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
292                    getAuthToken(request));
293    
294            String jobId = getResourceName(request);
295            try {
296                dagEngine.start(jobId);
297            }
298            catch (DagEngineException ex) {
299                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
300            }
301        }
302    
303        /**
304         * Start bundle job
305         *
306         * @param request servlet request
307         * @param response servlet response
308         * @throws XServletException
309         */
310        private void startBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
311            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
312                    getAuthToken(request));
313            String jobId = getResourceName(request);
314            try {
315                bundleEngine.start(jobId);
316            }
317            catch (BundleEngineException ex) {
318                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
319            }
320        }
321    
322        /**
323         * Resume workflow job
324         *
325         * @param request servlet request
326         * @param response servlet response
327         * @throws XServletException
328         */
329        private void resumeWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
330            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
331                    getAuthToken(request));
332    
333            String jobId = getResourceName(request);
334            try {
335                dagEngine.resume(jobId);
336            }
337            catch (DagEngineException ex) {
338                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
339            }
340        }
341    
342        /**
343         * Resume bundle job
344         *
345         * @param request servlet request
346         * @param response servlet response
347         * @throws XServletException
348         */
349        private void resumeBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
350            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
351                    getAuthToken(request));
352            String jobId = getResourceName(request);
353            try {
354                bundleEngine.resume(jobId);
355            }
356            catch (BundleEngineException ex) {
357                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
358            }
359        }
360    
361        /**
362         * Resume coordinator job
363         *
364         * @param request servlet request
365         * @param response servlet response
366         * @throws XServletException
367         * @throws CoordinatorEngineException
368         */
369        private void resumeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
370                throws XServletException {
371            String jobId = getResourceName(request);
372            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
373                    getUser(request), getAuthToken(request));
374            try {
375                coordEngine.resume(jobId);
376            }
377            catch (CoordinatorEngineException ex) {
378                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
379            }
380        }
381    
382        /**
383         * Suspend a wf job
384         *
385         * @param request servlet request
386         * @param response servlet response
387         * @throws XServletException
388         */
389        private void suspendWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
390            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
391                    getAuthToken(request));
392    
393            String jobId = getResourceName(request);
394            try {
395                dagEngine.suspend(jobId);
396            }
397            catch (DagEngineException ex) {
398                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
399            }
400        }
401    
402        /**
403         * Suspend bundle job
404         *
405         * @param request servlet request
406         * @param response servlet response
407         * @throws XServletException
408         */
409        private void suspendBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
410            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
411                    getAuthToken(request));
412            String jobId = getResourceName(request);
413            try {
414                bundleEngine.suspend(jobId);
415            }
416            catch (BundleEngineException ex) {
417                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
418            }
419        }
420    
421        /**
422         * Suspend coordinator job
423         *
424         * @param request servlet request
425         * @param response servlet response
426         * @throws XServletException
427         */
428        private void suspendCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
429                throws XServletException {
430            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
431                    getUser(request), getAuthToken(request));
432            String jobId = getResourceName(request);
433            try {
434                coordEngine.suspend(jobId);
435            }
436            catch (CoordinatorEngineException ex) {
437                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
438            }
439        }
440    
441        /**
442         * Kill a wf job
443         * @param request servlet request
444         * @param response servlet response
445         * @throws XServletException
446         */
447        private void killWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
448            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
449                    getAuthToken(request));
450    
451            String jobId = getResourceName(request);
452            try {
453                dagEngine.kill(jobId);
454            }
455            catch (DagEngineException ex) {
456                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
457            }
458        }
459    
460        /**
461         * Kill a coord job
462         * @param request servlet request
463         * @param response servlet response
464         * @throws XServletException
465         */
466        private void killCoordinatorJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
467            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
468                    getUser(request), getAuthToken(request));
469            String jobId = getResourceName(request);
470            try {
471                coordEngine.kill(jobId);
472            }
473            catch (CoordinatorEngineException ex) {
474                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
475            }
476        }
477    
478        /**
479         * Kill bundle job
480         *
481         * @param request servlet request
482         * @param response servlet response
483         * @throws XServletException
484         */
485        private void killBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
486            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
487                    getAuthToken(request));
488            String jobId = getResourceName(request);
489            try {
490                bundleEngine.kill(jobId);
491            }
492            catch (BundleEngineException ex) {
493                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
494            }
495        }
496    
497        /**
498         * Change a coordinator job
499         *
500         * @param request servlet request
501         * @param response servlet response
502         * @throws XServletException
503         */
504        private void changeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
505                throws XServletException {
506            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
507                    getUser(request), getAuthToken(request));
508            String jobId = getResourceName(request);
509            String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
510            try {
511                coordEngine.change(jobId, changeValue);
512            }
513            catch (CoordinatorEngineException ex) {
514                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
515            }
516        }
517    
518        /**
519         * Change a bundle job
520         *
521         * @param request servlet request
522         * @param response servlet response
523         * @throws XServletException
524         */
525        private void changeBundleJob(HttpServletRequest request, HttpServletResponse response)
526                throws XServletException {
527            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(
528                    getUser(request), getAuthToken(request));
529            String jobId = getResourceName(request);
530            String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
531            try {
532                bundleEngine.change(jobId, changeValue);
533            }
534            catch (BundleEngineException ex) {
535                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
536            }
537        }
538    
539        /**
540         * Rerun a wf job
541         *
542         * @param request servlet request
543         * @param response servlet response
544         * @param conf configuration object
545         * @throws XServletException
546         */
547        private void reRunWorkflowJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
548                throws XServletException {
549            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
550                    getAuthToken(request));
551    
552            String jobId = getResourceName(request);
553            try {
554                dagEngine.reRun(jobId, conf);
555            }
556            catch (DagEngineException ex) {
557                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
558            }
559        }
560    
561        /**
562         * Rerun bundle job
563         *
564         * @param request servlet request
565         * @param response servlet response
566         * @param conf configration object
567         * @throws XServletException
568         */
569        private void rerunBundleJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
570                throws XServletException {
571            JSONObject json = new JSONObject();
572            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
573                    getAuthToken(request));
574            String jobId = getResourceName(request);
575    
576            String coordScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM);
577            String dateScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM);
578            String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
579            String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
580    
581            XLog.getLog(getClass()).info(
582                    "Rerun Bundle for jobId=" + jobId + ", coordScope=" + coordScope + ", dateScope=" + dateScope + ", refresh="
583                            + refresh + ", noCleanup=" + noCleanup);
584    
585            try {
586                bundleEngine.reRun(jobId, coordScope, dateScope, Boolean.valueOf(refresh), Boolean.valueOf(noCleanup));
587            }
588            catch (BaseEngineException ex) {
589                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
590            }
591        }
592    
593        /**
594         * Rerun coordinator actions
595         *
596         * @param request servlet request
597         * @param response servlet response
598         * @param conf configuration object
599         * @throws XServletException
600         */
601        @SuppressWarnings("unchecked")
602        private JSONObject reRunCoordinatorActions(HttpServletRequest request, HttpServletResponse response,
603                Configuration conf) throws XServletException {
604            JSONObject json = new JSONObject();
605            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request),
606                    getAuthToken(request));
607    
608            String jobId = getResourceName(request);
609    
610            String rerunType = request.getParameter(RestConstants.JOB_COORD_RERUN_TYPE_PARAM);
611            String scope = request.getParameter(RestConstants.JOB_COORD_RERUN_SCOPE_PARAM);
612            String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
613            String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
614    
615            XLog.getLog(getClass()).info(
616                    "Rerun coordinator for jobId=" + jobId + ", rerunType=" + rerunType + ",scope=" + scope + ",refresh="
617                            + refresh + ", noCleanup=" + noCleanup);
618    
619            try {
620                if (!(rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE) || rerunType
621                        .equals(RestConstants.JOB_COORD_RERUN_ACTION))) {
622                    throw new CommandException(ErrorCode.E1018, "date or action expected.");
623                }
624                CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh),
625                        Boolean.valueOf(noCleanup));
626                List<CoordinatorActionBean> coordActions;
627                if (coordInfo != null) {
628                    coordActions = coordInfo.getCoordActions();
629                }
630                else {
631                    coordActions = CoordRerunXCommand.getCoordActions(rerunType, jobId, scope);
632                }
633                json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions));
634            }
635            catch (BaseEngineException ex) {
636                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
637            }
638            catch (CommandException ex) {
639                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
640            }
641    
642            return json;
643        }
644    
645    
646    
647        /**
648         * Get workflow job
649         *
650         * @param request servlet request
651         * @param response servlet response
652         * @return JsonBean WorkflowJobBean
653         * @throws XServletException
654         */
655        private JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
656            JsonBean jobBean = null;
657            String jobId = getResourceName(request);
658            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
659            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
660            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
661            start = (start < 1) ? 1 : start;
662            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
663            len = (len < 1) ? Integer.MAX_VALUE : len;
664            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
665                    getAuthToken(request));
666            try {
667                jobBean = (JsonBean) dagEngine.getJob(jobId, start, len);
668            }
669            catch (DagEngineException ex) {
670                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
671            }
672    
673            return jobBean;
674        }
675    
676        /**
677         * Get wf action info
678         *
679         * @param request servlet request
680         * @param response servlet response
681         * @return JsonBean WorkflowActionBean
682         * @throws XServletException
683         */
684        private JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response)
685                throws XServletException {
686            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
687                    getAuthToken(request));
688    
689            JsonBean actionBean = null;
690            String actionId = getResourceName(request);
691            try {
692                actionBean = dagEngine.getWorkflowAction(actionId);
693            }
694            catch (BaseEngineException ex) {
695                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
696            }
697    
698            return actionBean;
699        }
700    
701        /**
702         * Get coord job info
703         *
704         * @param request servlet request
705         * @param response servlet response
706         * @return JsonBean CoordinatorJobBean
707         * @throws XServletException
708         * @throws BaseEngineException
709         */
710        private JsonBean getCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
711                throws XServletException, BaseEngineException {
712            JsonBean jobBean = null;
713            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
714                    getUser(request), getAuthToken(request));
715            String jobId = getResourceName(request);
716            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
717            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
718            String filter = request.getParameter(RestConstants.JOB_FILTER_PARAM);
719            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
720            start = (start < 1) ? 1 : start;
721            // Get default number of coordinator actions to be retrieved
722            int defaultLen = Services.get().getConf().getInt(COORD_ACTIONS_DEFAULT_LENGTH, 1000);
723            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
724            len = (len < 1) ? defaultLen : len;
725            try {
726                JsonCoordinatorJob coordJob = coordEngine.getCoordJob(jobId, filter, start, len);
727                jobBean = coordJob;
728            }
729            catch (CoordinatorEngineException ex) {
730                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
731            }
732    
733            return jobBean;
734        }
735    
736        /**
737         * Get bundle job info
738         *
739         * @param request servlet request
740         * @param response servlet response
741         * @return JsonBean bundle job bean
742         * @throws XServletException
743         * @throws BaseEngineException
744         */
745        private JsonBean getBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
746                BaseEngineException {
747            JsonBean jobBean = null;
748            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
749                    getAuthToken(request));
750            String jobId = getResourceName(request);
751    
752            try {
753                jobBean = (JsonBean) bundleEngine.getBundleJob(jobId);
754    
755                return jobBean;
756            }
757            catch (BundleEngineException ex) {
758                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
759            }
760        }
761    
762        /**
763         * Get coordinator action
764         *
765         * @param request servlet request
766         * @param response servlet response
767         * @return JsonBean CoordinatorActionBean
768         * @throws XServletException
769         * @throws BaseEngineException
770         */
771        private JsonBean getCoordinatorAction(HttpServletRequest request, HttpServletResponse response)
772                throws XServletException, BaseEngineException {
773            JsonBean actionBean = null;
774            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
775                    getUser(request), getAuthToken(request));
776            String actionId = getResourceName(request);
777            try {
778                actionBean = coordEngine.getCoordAction(actionId);
779            }
780            catch (CoordinatorEngineException ex) {
781                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
782            }
783    
784            return actionBean;
785        }
786    
787        /**
788         * Get wf job definition
789         *
790         * @param request servlet request
791         * @param response servlet response
792         * @return String wf definition
793         * @throws XServletException
794         */
795        private String getWorkflowJobDefinition(HttpServletRequest request, HttpServletResponse response)
796                throws XServletException {
797            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
798                    getAuthToken(request));
799    
800            String wfDefinition;
801            String jobId = getResourceName(request);
802            try {
803                wfDefinition = dagEngine.getDefinition(jobId);
804            }
805            catch (DagEngineException ex) {
806                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
807            }
808            return wfDefinition;
809        }
810    
811        /**
812         * Get bundle job definition
813         *
814         * @param request servlet request
815         * @param response servlet response
816         * @return String bundle definition
817         * @throws XServletException
818         */
819        private String getBundleJobDefinition(HttpServletRequest request, HttpServletResponse response) throws XServletException {
820            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
821                    getAuthToken(request));
822            String bundleDefinition;
823            String jobId = getResourceName(request);
824            try {
825                bundleDefinition = bundleEngine.getDefinition(jobId);
826            }
827            catch (BundleEngineException ex) {
828                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
829            }
830            return bundleDefinition;
831        }
832    
833        /**
834         * Get coordinator job definition
835         *
836         * @param request servlet request
837         * @param response servlet response
838         * @return String coord definition
839         * @throws XServletException
840         */
841        private String getCoordinatorJobDefinition(HttpServletRequest request, HttpServletResponse response)
842                throws XServletException {
843    
844            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
845                    getUser(request), getAuthToken(request));
846    
847            String jobId = getResourceName(request);
848    
849            String coordDefinition = null;
850            try {
851                coordDefinition = coordEngine.getDefinition(jobId);
852            }
853            catch (BaseEngineException ex) {
854                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
855            }
856            return coordDefinition;
857        }
858    
859        /**
860         * Stream wf job log
861         *
862         * @param request servlet request
863         * @param response servlet response
864         * @throws XServletException
865         * @throws IOException
866         */
867        private void streamWorkflowJobLog(HttpServletRequest request, HttpServletResponse response)
868                throws XServletException, IOException {
869            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
870                    getAuthToken(request));
871            String jobId = getResourceName(request);
872            try {
873                dagEngine.streamLog(jobId, response.getWriter());
874            }
875            catch (DagEngineException ex) {
876                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
877            }
878        }
879    
880        /**
881         * Stream bundle job log
882         *
883         * @param request servlet request
884         * @param response servlet response
885         * @throws XServletException
886         */
887        private void streamBundleJob(HttpServletRequest request, HttpServletResponse response)
888                throws XServletException, IOException {
889            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
890                    getAuthToken(request));
891            String jobId = getResourceName(request);
892            try {
893                bundleEngine.streamLog(jobId, response.getWriter());
894            }
895            catch (BundleEngineException ex) {
896                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
897            }
898        }
899    
900        /**
901         * Stream coordinator job log
902         *
903         * @param request servlet request
904         * @param response servlet response
905         * @throws XServletException
906         * @throws IOException
907         */
908        private void streamCoordinatorJobLog(HttpServletRequest request, HttpServletResponse response)
909                throws XServletException, IOException {
910    
911            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
912                    getUser(request), getAuthToken(request));
913            String jobId = getResourceName(request);
914            String logRetrievalScope = request.getParameter(RestConstants.JOB_LOG_SCOPE_PARAM);
915            String logRetrievalType = request.getParameter(RestConstants.JOB_LOG_TYPE_PARAM);
916            try {
917                coordEngine.streamLog(jobId, logRetrievalScope, logRetrievalType, response.getWriter());
918            }
919            catch (BaseEngineException ex) {
920                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
921            }
922            catch (CommandException ex) {
923                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
924            }
925        }
926    }