001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.servlet;
019    
020    import java.io.IOException;
021    import java.util.List;
022    
023    import javax.servlet.ServletInputStream;
024    import javax.servlet.http.HttpServletRequest;
025    import javax.servlet.http.HttpServletResponse;
026    
027    import org.apache.hadoop.conf.Configuration;
028    import org.apache.oozie.BaseEngineException;
029    import org.apache.oozie.BundleEngine;
030    import org.apache.oozie.BundleEngineException;
031    import org.apache.oozie.CoordinatorActionBean;
032    import org.apache.oozie.CoordinatorActionInfo;
033    import org.apache.oozie.CoordinatorEngine;
034    import org.apache.oozie.CoordinatorEngineException;
035    import org.apache.oozie.command.CommandException;
036    import org.apache.oozie.DagEngine;
037    import org.apache.oozie.DagEngineException;
038    import org.apache.oozie.ErrorCode;
039    import org.apache.oozie.client.rest.JsonBean;
040    import org.apache.oozie.client.rest.JsonCoordinatorJob;
041    import org.apache.oozie.client.rest.JsonTags;
042    import org.apache.oozie.client.rest.RestConstants;
043    import org.apache.oozie.service.BundleEngineService;
044    import org.apache.oozie.service.CoordinatorEngineService;
045    import org.apache.oozie.service.DagEngineService;
046    import org.apache.oozie.service.Services;
047    import org.apache.oozie.util.XLog;
048    import org.json.simple.JSONObject;
049    
050    @SuppressWarnings("serial")
051    public class V1JobServlet extends BaseJobServlet {
052    
053        private static final String INSTRUMENTATION_NAME = "v1job";
054    
055        public V1JobServlet() {
056            super(INSTRUMENTATION_NAME);
057        }
058    
059        /*
060         * protected method to start a job
061         */
062        @Override
063        protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
064                IOException {
065            /*
066             * Configuration conf = new XConfiguration(request.getInputStream());
067             * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
068             * conf.get(OozieClient.COORDINATOR_APP_PATH);
069             *
070             * ServletUtilities.ValidateAppPath(wfPath, coordPath);
071             */
072            String jobId = getResourceName(request);
073            if (jobId.endsWith("-W")) {
074                startWorkflowJob(request, response);
075            }
076            else if (jobId.endsWith("-B")) {
077                startBundleJob(request, response);
078            }
079            else {
080                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303);
081            }
082    
083        }
084    
085        /*
086         * protected method to resume a job
087         */
088        @Override
089        protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
090                IOException {
091            /*
092             * Configuration conf = new XConfiguration(request.getInputStream());
093             * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
094             * conf.get(OozieClient.COORDINATOR_APP_PATH);
095             *
096             * ServletUtilities.ValidateAppPath(wfPath, coordPath);
097             */
098            String jobId = getResourceName(request);
099            if (jobId.endsWith("-W")) {
100                resumeWorkflowJob(request, response);
101            }
102            else if (jobId.endsWith("-B")) {
103                resumeBundleJob(request, response);
104            }
105            else {
106                resumeCoordinatorJob(request, response);
107            }
108        }
109    
110        /*
111         * protected method to suspend a job
112         */
113        @Override
114        protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
115                IOException {
116            /*
117             * Configuration conf = new XConfiguration(request.getInputStream());
118             * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
119             * conf.get(OozieClient.COORDINATOR_APP_PATH);
120             *
121             * ServletUtilities.ValidateAppPath(wfPath, coordPath);
122             */
123            String jobId = getResourceName(request);
124            if (jobId.endsWith("-W")) {
125                suspendWorkflowJob(request, response);
126            }
127            else if (jobId.endsWith("-B")) {
128                suspendBundleJob(request, response);
129            }
130            else {
131                suspendCoordinatorJob(request, response);
132            }
133        }
134    
135        /*
136         * protected method to kill a job
137         */
138        @Override
139        protected void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
140                IOException {
141            /*
142             * Configuration conf = new XConfiguration(request.getInputStream());
143             * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
144             * conf.get(OozieClient.COORDINATOR_APP_PATH);
145             *
146             * ServletUtilities.ValidateAppPath(wfPath, coordPath);
147             */
148            String jobId = getResourceName(request);
149            if (jobId.endsWith("-W")) {
150                killWorkflowJob(request, response);
151            }
152            else if (jobId.endsWith("-B")) {
153                killBundleJob(request, response);
154            }
155            else {
156                killCoordinatorJob(request, response);
157            }
158        }
159    
160        /**
161         * protected method to change a coordinator job
162         * @param request request object
163         * @param response response object
164         * @throws XServletException
165         * @throws IOException
166         */
167        @Override
168        protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
169                IOException {
170            String jobId = getResourceName(request);
171            if (jobId.endsWith("-B")) {
172                changeBundleJob(request, response);
173            }
174            else {
175                changeCoordinatorJob(request, response);
176            }
177        }
178    
179        /*
180         * protected method to reRun a job
181         *
182         * @seeorg.apache.oozie.servlet.BaseJobServlet#reRunJob(javax.servlet.http.
183         * HttpServletRequest, javax.servlet.http.HttpServletResponse,
184         * org.apache.hadoop.conf.Configuration)
185         */
186        @Override
187        protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
188                throws XServletException, IOException {
189            JSONObject json = null;
190            String jobId = getResourceName(request);
191            if (jobId.endsWith("-W")) {
192                reRunWorkflowJob(request, response, conf);
193            }
194            else if (jobId.endsWith("-B")) {
195                rerunBundleJob(request, response, conf);
196            }
197            else {
198                json = reRunCoordinatorActions(request, response, conf);
199            }
200            return json;
201        }
202    
203        /*
204         * protected method to get a job in JsonBean representation
205         */
206        @Override
207        protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
208                IOException, BaseEngineException {
209            ServletInputStream is = request.getInputStream();
210            byte[] b = new byte[101];
211            while (is.readLine(b, 0, 100) != -1) {
212                XLog.getLog(getClass()).warn("Printing :" + new String(b));
213            }
214    
215            JsonBean jobBean = null;
216            String jobId = getResourceName(request);
217            if (jobId.endsWith("-B")) {
218                jobBean = getBundleJob(request, response);
219            }
220            else {
221                if (jobId.endsWith("-W")) {
222                    jobBean = getWorkflowJob(request, response);
223                }
224                else {
225                    if (jobId.contains("-W@")) {
226                        jobBean = getWorkflowAction(request, response);
227                    }
228                    else {
229                        if (jobId.contains("-C@")) {
230                            jobBean = getCoordinatorAction(request, response);
231                        }
232                        else {
233                            jobBean = getCoordinatorJob(request, response);
234                        }
235                    }
236                }
237            }
238    
239            return jobBean;
240        }
241    
242        /*
243         * protected method to get a job definition in String format
244         */
245        @Override
246        protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
247                throws XServletException, IOException {
248            String jobDefinition = null;
249            String jobId = getResourceName(request);
250            if (jobId.endsWith("-W")) {
251                jobDefinition = getWorkflowJobDefinition(request, response);
252            }
253            else if (jobId.endsWith("-B")) {
254                jobDefinition = getBundleJobDefinition(request, response);
255            }
256            else {
257                jobDefinition = getCoordinatorJobDefinition(request, response);
258            }
259            return jobDefinition;
260        }
261    
262        /*
263         * protected method to stream a job log into response object
264         */
265        @Override
266        protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
267                IOException {
268            String jobId = getResourceName(request);
269            if (jobId.endsWith("-W")) {
270                streamWorkflowJobLog(request, response);
271            }
272            else if (jobId.endsWith("-B")) {
273                streamBundleJob(request, response);
274            }
275            else {
276                streamCoordinatorJobLog(request, response);
277            }
278        }
279    
280        /**
281         * Start wf job
282         *
283         * @param request servlet request
284         * @param response servlet response
285         * @throws XServletException
286         */
287        private void startWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
288            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
289                    getAuthToken(request));
290    
291            String jobId = getResourceName(request);
292            try {
293                dagEngine.start(jobId);
294            }
295            catch (DagEngineException ex) {
296                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
297            }
298        }
299    
300        /**
301         * Start bundle job
302         *
303         * @param request servlet request
304         * @param response servlet response
305         * @throws XServletException
306         */
307        private void startBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
308            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
309                    getAuthToken(request));
310            String jobId = getResourceName(request);
311            try {
312                bundleEngine.start(jobId);
313            }
314            catch (BundleEngineException ex) {
315                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
316            }
317        }
318    
319        /**
320         * Resume workflow job
321         *
322         * @param request servlet request
323         * @param response servlet response
324         * @throws XServletException
325         */
326        private void resumeWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
327            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
328                    getAuthToken(request));
329    
330            String jobId = getResourceName(request);
331            try {
332                dagEngine.resume(jobId);
333            }
334            catch (DagEngineException ex) {
335                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
336            }
337        }
338    
339        /**
340         * Resume bundle job
341         *
342         * @param request servlet request
343         * @param response servlet response
344         * @throws XServletException
345         */
346        private void resumeBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
347            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
348                    getAuthToken(request));
349            String jobId = getResourceName(request);
350            try {
351                bundleEngine.resume(jobId);
352            }
353            catch (BundleEngineException ex) {
354                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
355            }
356        }
357    
358        /**
359         * Resume coordinator job
360         *
361         * @param request servlet request
362         * @param response servlet response
363         * @throws XServletException
364         * @throws CoordinatorEngineException
365         */
366        private void resumeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
367                throws XServletException {
368            String jobId = getResourceName(request);
369            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
370                    getUser(request), getAuthToken(request));
371            try {
372                coordEngine.resume(jobId);
373            }
374            catch (CoordinatorEngineException ex) {
375                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
376            }
377        }
378    
379        /**
380         * Suspend a wf job
381         *
382         * @param request servlet request
383         * @param response servlet response
384         * @throws XServletException
385         */
386        private void suspendWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
387            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
388                    getAuthToken(request));
389    
390            String jobId = getResourceName(request);
391            try {
392                dagEngine.suspend(jobId);
393            }
394            catch (DagEngineException ex) {
395                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
396            }
397        }
398    
399        /**
400         * Suspend bundle job
401         *
402         * @param request servlet request
403         * @param response servlet response
404         * @throws XServletException
405         */
406        private void suspendBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
407            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
408                    getAuthToken(request));
409            String jobId = getResourceName(request);
410            try {
411                bundleEngine.suspend(jobId);
412            }
413            catch (BundleEngineException ex) {
414                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
415            }
416        }
417    
418        /**
419         * Suspend coordinator job
420         *
421         * @param request servlet request
422         * @param response servlet response
423         * @throws XServletException
424         */
425        private void suspendCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
426                throws XServletException {
427            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
428                    getUser(request), getAuthToken(request));
429            String jobId = getResourceName(request);
430            try {
431                coordEngine.suspend(jobId);
432            }
433            catch (CoordinatorEngineException ex) {
434                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
435            }
436        }
437    
438        /**
439         * Kill a wf job
440         * @param request servlet request
441         * @param response servlet response
442         * @throws XServletException
443         */
444        private void killWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
445            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
446                    getAuthToken(request));
447    
448            String jobId = getResourceName(request);
449            try {
450                dagEngine.kill(jobId);
451            }
452            catch (DagEngineException ex) {
453                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
454            }
455        }
456    
457        /**
458         * Kill a coord job
459         * @param request servlet request
460         * @param response servlet response
461         * @throws XServletException
462         */
463        private void killCoordinatorJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
464            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
465                    getUser(request), getAuthToken(request));
466            String jobId = getResourceName(request);
467            try {
468                coordEngine.kill(jobId);
469            }
470            catch (CoordinatorEngineException ex) {
471                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
472            }
473        }
474    
475        /**
476         * Kill bundle job
477         *
478         * @param request servlet request
479         * @param response servlet response
480         * @throws XServletException
481         */
482        private void killBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
483            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
484                    getAuthToken(request));
485            String jobId = getResourceName(request);
486            try {
487                bundleEngine.kill(jobId);
488            }
489            catch (BundleEngineException ex) {
490                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
491            }
492        }
493    
494        /**
495         * Change a coordinator job
496         *
497         * @param request servlet request
498         * @param response servlet response
499         * @throws XServletException
500         */
501        private void changeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
502                throws XServletException {
503            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
504                    getUser(request), getAuthToken(request));
505            String jobId = getResourceName(request);
506            String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
507            try {
508                coordEngine.change(jobId, changeValue);
509            }
510            catch (CoordinatorEngineException ex) {
511                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
512            }
513        }
514    
515        /**
516         * Change a bundle job
517         *
518         * @param request servlet request
519         * @param response servlet response
520         * @throws XServletException
521         */
522        private void changeBundleJob(HttpServletRequest request, HttpServletResponse response)
523                throws XServletException {
524            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(
525                    getUser(request), getAuthToken(request));
526            String jobId = getResourceName(request);
527            String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
528            try {
529                bundleEngine.change(jobId, changeValue);
530            }
531            catch (BundleEngineException ex) {
532                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
533            }
534        }
535    
536        /**
537         * Rerun a wf job
538         *
539         * @param request servlet request
540         * @param response servlet response
541         * @param conf configuration object
542         * @throws XServletException
543         */
544        private void reRunWorkflowJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
545                throws XServletException {
546            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
547                    getAuthToken(request));
548    
549            String jobId = getResourceName(request);
550            try {
551                dagEngine.reRun(jobId, conf);
552            }
553            catch (DagEngineException ex) {
554                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
555            }
556        }
557    
558        /**
559         * Rerun bundle job
560         *
561         * @param request servlet request
562         * @param response servlet response
563         * @param conf configration object
564         * @throws XServletException
565         */
566        private void rerunBundleJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
567                throws XServletException {
568            JSONObject json = new JSONObject();
569            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
570                    getAuthToken(request));
571            String jobId = getResourceName(request);
572    
573            String coordScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM);
574            String dateScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM);
575            String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
576            String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
577    
578            XLog.getLog(getClass()).info(
579                    "Rerun Bundle for jobId=" + jobId + ", coordScope=" + coordScope + ", dateScope=" + dateScope + ", refresh="
580                            + refresh + ", noCleanup=" + noCleanup);
581    
582            try {
583                bundleEngine.reRun(jobId, coordScope, dateScope, Boolean.valueOf(refresh), Boolean.valueOf(noCleanup));
584            }
585            catch (BaseEngineException ex) {
586                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
587            }
588        }
589    
590        /**
591         * Rerun coordinator actions
592         *
593         * @param request servlet request
594         * @param response servlet response
595         * @param conf configuration object
596         * @throws XServletException
597         */
598        @SuppressWarnings("unchecked")
599        private JSONObject reRunCoordinatorActions(HttpServletRequest request, HttpServletResponse response,
600                Configuration conf) throws XServletException {
601            JSONObject json = new JSONObject();
602            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request),
603                    getAuthToken(request));
604    
605            String jobId = getResourceName(request);
606    
607            String rerunType = request.getParameter(RestConstants.JOB_COORD_RERUN_TYPE_PARAM);
608            String scope = request.getParameter(RestConstants.JOB_COORD_RERUN_SCOPE_PARAM);
609            String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
610            String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
611    
612            XLog.getLog(getClass()).info(
613                    "Rerun coordinator for jobId=" + jobId + ", rerunType=" + rerunType + ",scope=" + scope + ",refresh="
614                            + refresh + ", noCleanup=" + noCleanup);
615    
616            try {
617                CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh),
618                        Boolean.valueOf(noCleanup));
619                List<CoordinatorActionBean> actions = coordInfo.getCoordActions();
620                json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(actions));
621            }
622            catch (BaseEngineException ex) {
623                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
624            }
625    
626            return json;
627        }
628    
629        /**
630         * Get workflow job
631         *
632         * @param request servlet request
633         * @param response servlet response
634         * @return JsonBean WorkflowJobBean
635         * @throws XServletException
636         */
637        private JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
638            JsonBean jobBean = null;
639            String jobId = getResourceName(request);
640            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
641            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
642            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
643            start = (start < 1) ? 1 : start;
644            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
645            len = (len < 1) ? Integer.MAX_VALUE : len;
646            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
647                    getAuthToken(request));
648            try {
649                jobBean = (JsonBean) dagEngine.getJob(jobId, start, len);
650            }
651            catch (DagEngineException ex) {
652                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
653            }
654    
655            return jobBean;
656        }
657    
658        /**
659         * Get wf action info
660         *
661         * @param request servlet request
662         * @param response servlet response
663         * @return JsonBean WorkflowActionBean
664         * @throws XServletException
665         */
666        private JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response)
667                throws XServletException {
668            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
669                    getAuthToken(request));
670    
671            JsonBean actionBean = null;
672            String actionId = getResourceName(request);
673            try {
674                actionBean = dagEngine.getWorkflowAction(actionId);
675            }
676            catch (BaseEngineException ex) {
677                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
678            }
679    
680            return actionBean;
681        }
682    
683        /**
684         * Get coord job info
685         *
686         * @param request servlet request
687         * @param response servlet response
688         * @return JsonBean CoordinatorJobBean
689         * @throws XServletException
690         * @throws BaseEngineException
691         */
692        private JsonBean getCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
693                throws XServletException, BaseEngineException {
694            JsonBean jobBean = null;
695            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
696                    getUser(request), getAuthToken(request));
697            String jobId = getResourceName(request);
698            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
699            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
700            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
701            start = (start < 1) ? 1 : start;
702            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
703            len = (len < 1) ? Integer.MAX_VALUE : len;
704            try {
705                JsonCoordinatorJob coordJob = coordEngine.getCoordJob(jobId, start, len);
706                jobBean = coordJob;
707            }
708            catch (CoordinatorEngineException ex) {
709                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
710            }
711    
712            return jobBean;
713        }
714    
715        /**
716         * Get bundle job info
717         *
718         * @param request servlet request
719         * @param response servlet response
720         * @return JsonBean bundle job bean
721         * @throws XServletException
722         * @throws BaseEngineException
723         */
724        private JsonBean getBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
725                BaseEngineException {
726            JsonBean jobBean = null;
727            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
728                    getAuthToken(request));
729            String jobId = getResourceName(request);
730    
731            try {
732                jobBean = (JsonBean) bundleEngine.getBundleJob(jobId);
733    
734                return jobBean;
735            }
736            catch (BundleEngineException ex) {
737                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
738            }
739        }
740    
741        /**
742         * Get coordinator action
743         *
744         * @param request servlet request
745         * @param response servlet response
746         * @return JsonBean CoordinatorActionBean
747         * @throws XServletException
748         * @throws BaseEngineException
749         */
750        private JsonBean getCoordinatorAction(HttpServletRequest request, HttpServletResponse response)
751                throws XServletException, BaseEngineException {
752            JsonBean actionBean = null;
753            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
754                    getUser(request), getAuthToken(request));
755            String actionId = getResourceName(request);
756            try {
757                actionBean = coordEngine.getCoordAction(actionId);
758            }
759            catch (CoordinatorEngineException ex) {
760                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
761            }
762    
763            return actionBean;
764        }
765    
766        /**
767         * Get wf job definition
768         *
769         * @param request servlet request
770         * @param response servlet response
771         * @return String wf definition
772         * @throws XServletException
773         */
774        private String getWorkflowJobDefinition(HttpServletRequest request, HttpServletResponse response)
775                throws XServletException {
776            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
777                    getAuthToken(request));
778    
779            String wfDefinition;
780            String jobId = getResourceName(request);
781            try {
782                wfDefinition = dagEngine.getDefinition(jobId);
783            }
784            catch (DagEngineException ex) {
785                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
786            }
787            return wfDefinition;
788        }
789    
790        /**
791         * Get bundle job definition
792         *
793         * @param request servlet request
794         * @param response servlet response
795         * @return String bundle definition
796         * @throws XServletException
797         */
798        private String getBundleJobDefinition(HttpServletRequest request, HttpServletResponse response) throws XServletException {
799            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
800                    getAuthToken(request));
801            String bundleDefinition;
802            String jobId = getResourceName(request);
803            try {
804                bundleDefinition = bundleEngine.getDefinition(jobId);
805            }
806            catch (BundleEngineException ex) {
807                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
808            }
809            return bundleDefinition;
810        }
811    
812        /**
813         * Get coordinator job definition
814         *
815         * @param request servlet request
816         * @param response servlet response
817         * @return String coord definition
818         * @throws XServletException
819         */
820        private String getCoordinatorJobDefinition(HttpServletRequest request, HttpServletResponse response)
821                throws XServletException {
822    
823            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
824                    getUser(request), getAuthToken(request));
825    
826            String jobId = getResourceName(request);
827    
828            String coordDefinition = null;
829            try {
830                coordDefinition = coordEngine.getDefinition(jobId);
831            }
832            catch (BaseEngineException ex) {
833                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
834            }
835            return coordDefinition;
836        }
837    
838        /**
839         * Stream wf job log
840         *
841         * @param request servlet request
842         * @param response servlet response
843         * @throws XServletException
844         * @throws IOException
845         */
846        private void streamWorkflowJobLog(HttpServletRequest request, HttpServletResponse response)
847                throws XServletException, IOException {
848            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
849                    getAuthToken(request));
850            String jobId = getResourceName(request);
851            try {
852                dagEngine.streamLog(jobId, response.getWriter());
853            }
854            catch (DagEngineException ex) {
855                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
856            }
857        }
858    
859        /**
860         * Stream bundle job log
861         *
862         * @param request servlet request
863         * @param response servlet response
864         * @throws XServletException
865         */
866        private void streamBundleJob(HttpServletRequest request, HttpServletResponse response)
867                throws XServletException, IOException {
868            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
869                    getAuthToken(request));
870            String jobId = getResourceName(request);
871            try {
872                bundleEngine.streamLog(jobId, response.getWriter());
873            }
874            catch (BundleEngineException ex) {
875                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
876            }
877        }
878    
879        /**
880         * Stream coordinator job log
881         *
882         * @param request servlet request
883         * @param response servlet response
884         * @throws XServletException
885         * @throws IOException
886         */
887        private void streamCoordinatorJobLog(HttpServletRequest request, HttpServletResponse response)
888                throws XServletException, IOException {
889    
890            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
891                    getUser(request), getAuthToken(request));
892            String jobId = getResourceName(request);
893            String logRetrievalScope = request.getParameter(RestConstants.JOB_LOG_SCOPE_PARAM);
894            String logRetrievalType = request.getParameter(RestConstants.JOB_LOG_TYPE_PARAM);
895            try {
896                coordEngine.streamLog(jobId, logRetrievalScope, logRetrievalType, response.getWriter());
897            }
898            catch (BaseEngineException ex) {
899                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
900            }
901            catch (CommandException ex) {
902                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
903            }
904        }
905    }