001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.servlet;
019    
020    import java.io.IOException;
021    import java.util.List;
022    import javax.servlet.ServletInputStream;
023    import javax.servlet.http.HttpServletRequest;
024    import javax.servlet.http.HttpServletResponse;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.oozie.*;
027    import org.apache.oozie.client.rest.*;
028    import org.apache.oozie.command.CommandException;
029    import org.apache.oozie.command.coord.CoordRerunXCommand;
030    import org.apache.oozie.service.BundleEngineService;
031    import org.apache.oozie.service.CoordinatorEngineService;
032    import org.apache.oozie.service.DagEngineService;
033    import org.apache.oozie.service.Services;
034    import org.apache.oozie.util.GraphGenerator;
035    import org.apache.oozie.util.XLog;
036    import org.json.simple.JSONObject;
037    
038    
039    @SuppressWarnings("serial")
040    public class V1JobServlet extends BaseJobServlet {
041    
042        private static final String INSTRUMENTATION_NAME = "v1job";
043        public static final String COORD_ACTIONS_DEFAULT_LENGTH = "oozie.coord.actions.default.length";
044    
045        public V1JobServlet() {
046            super(INSTRUMENTATION_NAME);
047        }
048    
049        /*
050         * protected method to start a job
051         */
052        @Override
053        protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
054                IOException {
055            /*
056             * Configuration conf = new XConfiguration(request.getInputStream());
057             * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
058             * conf.get(OozieClient.COORDINATOR_APP_PATH);
059             *
060             * ServletUtilities.ValidateAppPath(wfPath, coordPath);
061             */
062            String jobId = getResourceName(request);
063            if (jobId.endsWith("-W")) {
064                startWorkflowJob(request, response);
065            }
066            else if (jobId.endsWith("-B")) {
067                startBundleJob(request, response);
068            }
069            else {
070                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, RestConstants.ACTION_PARAM, RestConstants.JOB_ACTION_START);
071            }
072    
073        }
074    
075        /*
076         * protected method to resume a job
077         */
078        @Override
079        protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
080                IOException {
081            /*
082             * Configuration conf = new XConfiguration(request.getInputStream());
083             * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
084             * conf.get(OozieClient.COORDINATOR_APP_PATH);
085             *
086             * ServletUtilities.ValidateAppPath(wfPath, coordPath);
087             */
088            String jobId = getResourceName(request);
089            if (jobId.endsWith("-W")) {
090                resumeWorkflowJob(request, response);
091            }
092            else if (jobId.endsWith("-B")) {
093                resumeBundleJob(request, response);
094            }
095            else {
096                resumeCoordinatorJob(request, response);
097            }
098        }
099    
100        /*
101         * protected method to suspend a job
102         */
103        @Override
104        protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
105                IOException {
106            /*
107             * Configuration conf = new XConfiguration(request.getInputStream());
108             * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
109             * conf.get(OozieClient.COORDINATOR_APP_PATH);
110             *
111             * ServletUtilities.ValidateAppPath(wfPath, coordPath);
112             */
113            String jobId = getResourceName(request);
114            if (jobId.endsWith("-W")) {
115                suspendWorkflowJob(request, response);
116            }
117            else if (jobId.endsWith("-B")) {
118                suspendBundleJob(request, response);
119            }
120            else {
121                suspendCoordinatorJob(request, response);
122            }
123        }
124    
125        /*
126         * protected method to kill a job
127         */
128        @Override
129        protected void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
130                IOException {
131            /*
132             * Configuration conf = new XConfiguration(request.getInputStream());
133             * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
134             * conf.get(OozieClient.COORDINATOR_APP_PATH);
135             *
136             * ServletUtilities.ValidateAppPath(wfPath, coordPath);
137             */
138            String jobId = getResourceName(request);
139            if (jobId.endsWith("-W")) {
140                killWorkflowJob(request, response);
141            }
142            else if (jobId.endsWith("-B")) {
143                killBundleJob(request, response);
144            }
145            else {
146                killCoordinatorJob(request, response);
147            }
148        }
149    
150        /**
151         * protected method to change a coordinator job
152         * @param request request object
153         * @param response response object
154         * @throws XServletException
155         * @throws IOException
156         */
157        @Override
158        protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
159                IOException {
160            String jobId = getResourceName(request);
161            if (jobId.endsWith("-B")) {
162                changeBundleJob(request, response);
163            }
164            else {
165                changeCoordinatorJob(request, response);
166            }
167        }
168    
169        /*
170         * protected method to reRun a job
171         *
172         * @seeorg.apache.oozie.servlet.BaseJobServlet#reRunJob(javax.servlet.http.
173         * HttpServletRequest, javax.servlet.http.HttpServletResponse,
174         * org.apache.hadoop.conf.Configuration)
175         */
176        @Override
177        protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
178                throws XServletException, IOException {
179            JSONObject json = null;
180            String jobId = getResourceName(request);
181            if (jobId.endsWith("-W")) {
182                reRunWorkflowJob(request, response, conf);
183            }
184            else if (jobId.endsWith("-B")) {
185                rerunBundleJob(request, response, conf);
186            }
187            else {
188                json = reRunCoordinatorActions(request, response, conf);
189            }
190            return json;
191        }
192    
193        /*
194         * protected method to get a job in JsonBean representation
195         */
196        @Override
197        protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
198                IOException, BaseEngineException {
199            ServletInputStream is = request.getInputStream();
200            byte[] b = new byte[101];
201            while (is.readLine(b, 0, 100) != -1) {
202                XLog.getLog(getClass()).warn("Printing :" + new String(b));
203            }
204    
205            JsonBean jobBean = null;
206            String jobId = getResourceName(request);
207            if (jobId.endsWith("-B")) {
208                jobBean = getBundleJob(request, response);
209            }
210            else {
211                if (jobId.endsWith("-W")) {
212                    jobBean = getWorkflowJob(request, response);
213                }
214                else {
215                    if (jobId.contains("-W@")) {
216                        jobBean = getWorkflowAction(request, response);
217                    }
218                    else {
219                        if (jobId.contains("-C@")) {
220                            jobBean = getCoordinatorAction(request, response);
221                        }
222                        else {
223                            jobBean = getCoordinatorJob(request, response);
224                        }
225                    }
226                }
227            }
228    
229            return jobBean;
230        }
231    
232        /*
233         * protected method to get a job definition in String format
234         */
235        @Override
236        protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
237                throws XServletException, IOException {
238            String jobDefinition = null;
239            String jobId = getResourceName(request);
240            if (jobId.endsWith("-W")) {
241                jobDefinition = getWorkflowJobDefinition(request, response);
242            }
243            else if (jobId.endsWith("-B")) {
244                jobDefinition = getBundleJobDefinition(request, response);
245            }
246            else {
247                jobDefinition = getCoordinatorJobDefinition(request, response);
248            }
249            return jobDefinition;
250        }
251    
252        /*
253         * protected method to stream a job log into response object
254         */
255        @Override
256        protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
257                IOException {
258            String jobId = getResourceName(request);
259            if (jobId.endsWith("-W")) {
260                streamWorkflowJobLog(request, response);
261            }
262            else if (jobId.endsWith("-B")) {
263                streamBundleJob(request, response);
264            }
265            else {
266                streamCoordinatorJobLog(request, response);
267            }
268        }
269    
270        @Override
271        protected void streamJobGraph(HttpServletRequest request, HttpServletResponse response)
272                throws XServletException, IOException {
273            String jobId = getResourceName(request);
274            if (jobId.endsWith("-W")) {
275                // Applicable only to worflow, for now
276                response.setContentType(RestConstants.PNG_IMAGE_CONTENT_TYPE);
277                try {
278                    String showKill = request.getParameter(RestConstants.JOB_SHOW_KILL_PARAM);
279                    boolean sK = showKill != null && (showKill.equalsIgnoreCase("yes") || showKill.equals("1") || showKill.equalsIgnoreCase("true"));
280    
281                    new GraphGenerator(
282                            getWorkflowJobDefinition(request, response),
283                            (JsonWorkflowJob)getWorkflowJob(request, response),
284                            sK).write(response.getOutputStream());
285                }
286                catch (Exception e) {
287                    throw new XServletException(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, ErrorCode.E0307, e);
288                }
289            }
290            else {
291                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306);
292            }
293        }
294    
295        /**
296         * Start wf job
297         *
298         * @param request servlet request
299         * @param response servlet response
300         * @throws XServletException
301         */
302        private void startWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
303            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
304                    getAuthToken(request));
305    
306            String jobId = getResourceName(request);
307            try {
308                dagEngine.start(jobId);
309            }
310            catch (DagEngineException ex) {
311                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
312            }
313        }
314    
315        /**
316         * Start bundle job
317         *
318         * @param request servlet request
319         * @param response servlet response
320         * @throws XServletException
321         */
322        private void startBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
323            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
324                    getAuthToken(request));
325            String jobId = getResourceName(request);
326            try {
327                bundleEngine.start(jobId);
328            }
329            catch (BundleEngineException ex) {
330                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
331            }
332        }
333    
334        /**
335         * Resume workflow job
336         *
337         * @param request servlet request
338         * @param response servlet response
339         * @throws XServletException
340         */
341        private void resumeWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
342            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
343                    getAuthToken(request));
344    
345            String jobId = getResourceName(request);
346            try {
347                dagEngine.resume(jobId);
348            }
349            catch (DagEngineException ex) {
350                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
351            }
352        }
353    
354        /**
355         * Resume bundle job
356         *
357         * @param request servlet request
358         * @param response servlet response
359         * @throws XServletException
360         */
361        private void resumeBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
362            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
363                    getAuthToken(request));
364            String jobId = getResourceName(request);
365            try {
366                bundleEngine.resume(jobId);
367            }
368            catch (BundleEngineException ex) {
369                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
370            }
371        }
372    
373        /**
374         * Resume coordinator job
375         *
376         * @param request servlet request
377         * @param response servlet response
378         * @throws XServletException
379         * @throws CoordinatorEngineException
380         */
381        private void resumeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
382                throws XServletException {
383            String jobId = getResourceName(request);
384            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
385                    getUser(request), getAuthToken(request));
386            try {
387                coordEngine.resume(jobId);
388            }
389            catch (CoordinatorEngineException ex) {
390                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
391            }
392        }
393    
394        /**
395         * Suspend a wf job
396         *
397         * @param request servlet request
398         * @param response servlet response
399         * @throws XServletException
400         */
401        private void suspendWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
402            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
403                    getAuthToken(request));
404    
405            String jobId = getResourceName(request);
406            try {
407                dagEngine.suspend(jobId);
408            }
409            catch (DagEngineException ex) {
410                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
411            }
412        }
413    
414        /**
415         * Suspend bundle job
416         *
417         * @param request servlet request
418         * @param response servlet response
419         * @throws XServletException
420         */
421        private void suspendBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
422            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
423                    getAuthToken(request));
424            String jobId = getResourceName(request);
425            try {
426                bundleEngine.suspend(jobId);
427            }
428            catch (BundleEngineException ex) {
429                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
430            }
431        }
432    
433        /**
434         * Suspend coordinator job
435         *
436         * @param request servlet request
437         * @param response servlet response
438         * @throws XServletException
439         */
440        private void suspendCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
441                throws XServletException {
442            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
443                    getUser(request), getAuthToken(request));
444            String jobId = getResourceName(request);
445            try {
446                coordEngine.suspend(jobId);
447            }
448            catch (CoordinatorEngineException ex) {
449                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
450            }
451        }
452    
453        /**
454         * Kill a wf job
455         * @param request servlet request
456         * @param response servlet response
457         * @throws XServletException
458         */
459        private void killWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
460            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
461                    getAuthToken(request));
462    
463            String jobId = getResourceName(request);
464            try {
465                dagEngine.kill(jobId);
466            }
467            catch (DagEngineException ex) {
468                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
469            }
470        }
471    
472        /**
473         * Kill a coord job
474         * @param request servlet request
475         * @param response servlet response
476         * @throws XServletException
477         */
478        private void killCoordinatorJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
479            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
480                    getUser(request), getAuthToken(request));
481            String jobId = getResourceName(request);
482            try {
483                coordEngine.kill(jobId);
484            }
485            catch (CoordinatorEngineException ex) {
486                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
487            }
488        }
489    
490        /**
491         * Kill bundle job
492         *
493         * @param request servlet request
494         * @param response servlet response
495         * @throws XServletException
496         */
497        private void killBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
498            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
499                    getAuthToken(request));
500            String jobId = getResourceName(request);
501            try {
502                bundleEngine.kill(jobId);
503            }
504            catch (BundleEngineException ex) {
505                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
506            }
507        }
508    
509        /**
510         * Change a coordinator job
511         *
512         * @param request servlet request
513         * @param response servlet response
514         * @throws XServletException
515         */
516        private void changeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
517                throws XServletException {
518            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
519                    getUser(request), getAuthToken(request));
520            String jobId = getResourceName(request);
521            String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
522            try {
523                coordEngine.change(jobId, changeValue);
524            }
525            catch (CoordinatorEngineException ex) {
526                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
527            }
528        }
529    
530        /**
531         * Change a bundle job
532         *
533         * @param request servlet request
534         * @param response servlet response
535         * @throws XServletException
536         */
537        private void changeBundleJob(HttpServletRequest request, HttpServletResponse response)
538                throws XServletException {
539            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(
540                    getUser(request), getAuthToken(request));
541            String jobId = getResourceName(request);
542            String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
543            try {
544                bundleEngine.change(jobId, changeValue);
545            }
546            catch (BundleEngineException ex) {
547                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
548            }
549        }
550    
551        /**
552         * Rerun a wf job
553         *
554         * @param request servlet request
555         * @param response servlet response
556         * @param conf configuration object
557         * @throws XServletException
558         */
559        private void reRunWorkflowJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
560                throws XServletException {
561            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
562                    getAuthToken(request));
563    
564            String jobId = getResourceName(request);
565            try {
566                dagEngine.reRun(jobId, conf);
567            }
568            catch (DagEngineException ex) {
569                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
570            }
571        }
572    
573        /**
574         * Rerun bundle job
575         *
576         * @param request servlet request
577         * @param response servlet response
578         * @param conf configration object
579         * @throws XServletException
580         */
581        private void rerunBundleJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
582                throws XServletException {
583            JSONObject json = new JSONObject();
584            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
585                    getAuthToken(request));
586            String jobId = getResourceName(request);
587    
588            String coordScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM);
589            String dateScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM);
590            String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
591            String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
592    
593            XLog.getLog(getClass()).info(
594                    "Rerun Bundle for jobId=" + jobId + ", coordScope=" + coordScope + ", dateScope=" + dateScope + ", refresh="
595                            + refresh + ", noCleanup=" + noCleanup);
596    
597            try {
598                bundleEngine.reRun(jobId, coordScope, dateScope, Boolean.valueOf(refresh), Boolean.valueOf(noCleanup));
599            }
600            catch (BaseEngineException ex) {
601                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
602            }
603        }
604    
605        /**
606         * Rerun coordinator actions
607         *
608         * @param request servlet request
609         * @param response servlet response
610         * @param conf configuration object
611         * @throws XServletException
612         */
613        @SuppressWarnings("unchecked")
614        private JSONObject reRunCoordinatorActions(HttpServletRequest request, HttpServletResponse response,
615                Configuration conf) throws XServletException {
616            JSONObject json = new JSONObject();
617            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request),
618                    getAuthToken(request));
619    
620            String jobId = getResourceName(request);
621    
622            String rerunType = request.getParameter(RestConstants.JOB_COORD_RERUN_TYPE_PARAM);
623            String scope = request.getParameter(RestConstants.JOB_COORD_RERUN_SCOPE_PARAM);
624            String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
625            String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
626    
627            XLog.getLog(getClass()).info(
628                    "Rerun coordinator for jobId=" + jobId + ", rerunType=" + rerunType + ",scope=" + scope + ",refresh="
629                            + refresh + ", noCleanup=" + noCleanup);
630    
631            try {
632                if (!(rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE) || rerunType
633                        .equals(RestConstants.JOB_COORD_RERUN_ACTION))) {
634                    throw new CommandException(ErrorCode.E1018, "date or action expected.");
635                }
636                CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh),
637                        Boolean.valueOf(noCleanup));
638                List<CoordinatorActionBean> coordActions;
639                if (coordInfo != null) {
640                    coordActions = coordInfo.getCoordActions();
641                }
642                else {
643                    coordActions = CoordRerunXCommand.getCoordActions(rerunType, jobId, scope);
644                }
645                json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT"));
646            }
647            catch (BaseEngineException ex) {
648                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
649            }
650            catch (CommandException ex) {
651                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
652            }
653    
654            return json;
655        }
656    
657    
658    
659        /**
660         * Get workflow job
661         *
662         * @param request servlet request
663         * @param response servlet response
664         * @return JsonBean WorkflowJobBean
665         * @throws XServletException
666         */
667        private JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
668            JsonBean jobBean = null;
669            String jobId = getResourceName(request);
670            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
671            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
672            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
673            start = (start < 1) ? 1 : start;
674            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
675            len = (len < 1) ? Integer.MAX_VALUE : len;
676            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
677                    getAuthToken(request));
678            try {
679                jobBean = (JsonBean) dagEngine.getJob(jobId, start, len);
680            }
681            catch (DagEngineException ex) {
682                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
683            }
684    
685            return jobBean;
686        }
687    
688        /**
689         * Get wf action info
690         *
691         * @param request servlet request
692         * @param response servlet response
693         * @return JsonBean WorkflowActionBean
694         * @throws XServletException
695         */
696        private JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response)
697                throws XServletException {
698            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
699                    getAuthToken(request));
700    
701            JsonBean actionBean = null;
702            String actionId = getResourceName(request);
703            try {
704                actionBean = dagEngine.getWorkflowAction(actionId);
705            }
706            catch (BaseEngineException ex) {
707                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
708            }
709    
710            return actionBean;
711        }
712    
713        /**
714         * Get coord job info
715         *
716         * @param request servlet request
717         * @param response servlet response
718         * @return JsonBean CoordinatorJobBean
719         * @throws XServletException
720         * @throws BaseEngineException
721         */
722        private JsonBean getCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
723                throws XServletException, BaseEngineException {
724            JsonBean jobBean = null;
725            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
726                    getUser(request), getAuthToken(request));
727            String jobId = getResourceName(request);
728            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
729            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
730            String filter = request.getParameter(RestConstants.JOB_FILTER_PARAM);
731            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
732            start = (start < 1) ? 1 : start;
733            // Get default number of coordinator actions to be retrieved
734            int defaultLen = Services.get().getConf().getInt(COORD_ACTIONS_DEFAULT_LENGTH, 1000);
735            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
736            len = (len < 1) ? defaultLen : len;
737            try {
738                JsonCoordinatorJob coordJob = coordEngine.getCoordJob(jobId, filter, start, len);
739                jobBean = coordJob;
740            }
741            catch (CoordinatorEngineException ex) {
742                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
743            }
744    
745            return jobBean;
746        }
747    
748        /**
749         * Get bundle job info
750         *
751         * @param request servlet request
752         * @param response servlet response
753         * @return JsonBean bundle job bean
754         * @throws XServletException
755         * @throws BaseEngineException
756         */
757        private JsonBean getBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
758                BaseEngineException {
759            JsonBean jobBean = null;
760            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
761                    getAuthToken(request));
762            String jobId = getResourceName(request);
763    
764            try {
765                jobBean = (JsonBean) bundleEngine.getBundleJob(jobId);
766    
767                return jobBean;
768            }
769            catch (BundleEngineException ex) {
770                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
771            }
772        }
773    
774        /**
775         * Get coordinator action
776         *
777         * @param request servlet request
778         * @param response servlet response
779         * @return JsonBean CoordinatorActionBean
780         * @throws XServletException
781         * @throws BaseEngineException
782         */
783        private JsonBean getCoordinatorAction(HttpServletRequest request, HttpServletResponse response)
784                throws XServletException, BaseEngineException {
785            JsonBean actionBean = null;
786            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
787                    getUser(request), getAuthToken(request));
788            String actionId = getResourceName(request);
789            try {
790                actionBean = coordEngine.getCoordAction(actionId);
791            }
792            catch (CoordinatorEngineException ex) {
793                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
794            }
795    
796            return actionBean;
797        }
798    
799        /**
800         * Get wf job definition
801         *
802         * @param request servlet request
803         * @param response servlet response
804         * @return String wf definition
805         * @throws XServletException
806         */
807        private String getWorkflowJobDefinition(HttpServletRequest request, HttpServletResponse response)
808                throws XServletException {
809            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
810                    getAuthToken(request));
811    
812            String wfDefinition;
813            String jobId = getResourceName(request);
814            try {
815                wfDefinition = dagEngine.getDefinition(jobId);
816            }
817            catch (DagEngineException ex) {
818                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
819            }
820            return wfDefinition;
821        }
822    
823        /**
824         * Get bundle job definition
825         *
826         * @param request servlet request
827         * @param response servlet response
828         * @return String bundle definition
829         * @throws XServletException
830         */
831        private String getBundleJobDefinition(HttpServletRequest request, HttpServletResponse response) throws XServletException {
832            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
833                    getAuthToken(request));
834            String bundleDefinition;
835            String jobId = getResourceName(request);
836            try {
837                bundleDefinition = bundleEngine.getDefinition(jobId);
838            }
839            catch (BundleEngineException ex) {
840                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
841            }
842            return bundleDefinition;
843        }
844    
845        /**
846         * Get coordinator job definition
847         *
848         * @param request servlet request
849         * @param response servlet response
850         * @return String coord definition
851         * @throws XServletException
852         */
853        private String getCoordinatorJobDefinition(HttpServletRequest request, HttpServletResponse response)
854                throws XServletException {
855    
856            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
857                    getUser(request), getAuthToken(request));
858    
859            String jobId = getResourceName(request);
860    
861            String coordDefinition = null;
862            try {
863                coordDefinition = coordEngine.getDefinition(jobId);
864            }
865            catch (BaseEngineException ex) {
866                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
867            }
868            return coordDefinition;
869        }
870    
871        /**
872         * Stream wf job log
873         *
874         * @param request servlet request
875         * @param response servlet response
876         * @throws XServletException
877         * @throws IOException
878         */
879        private void streamWorkflowJobLog(HttpServletRequest request, HttpServletResponse response)
880                throws XServletException, IOException {
881            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
882                    getAuthToken(request));
883            String jobId = getResourceName(request);
884            try {
885                dagEngine.streamLog(jobId, response.getWriter());
886            }
887            catch (DagEngineException ex) {
888                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
889            }
890        }
891    
892        /**
893         * Stream bundle job log
894         *
895         * @param request servlet request
896         * @param response servlet response
897         * @throws XServletException
898         */
899        private void streamBundleJob(HttpServletRequest request, HttpServletResponse response)
900                throws XServletException, IOException {
901            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
902                    getAuthToken(request));
903            String jobId = getResourceName(request);
904            try {
905                bundleEngine.streamLog(jobId, response.getWriter());
906            }
907            catch (BundleEngineException ex) {
908                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
909            }
910        }
911    
912        /**
913         * Stream coordinator job log
914         *
915         * @param request servlet request
916         * @param response servlet response
917         * @throws XServletException
918         * @throws IOException
919         */
920        private void streamCoordinatorJobLog(HttpServletRequest request, HttpServletResponse response)
921                throws XServletException, IOException {
922    
923            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
924                    getUser(request), getAuthToken(request));
925            String jobId = getResourceName(request);
926            String logRetrievalScope = request.getParameter(RestConstants.JOB_LOG_SCOPE_PARAM);
927            String logRetrievalType = request.getParameter(RestConstants.JOB_LOG_TYPE_PARAM);
928            try {
929                coordEngine.streamLog(jobId, logRetrievalScope, logRetrievalType, response.getWriter());
930            }
931            catch (BaseEngineException ex) {
932                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
933            }
934            catch (CommandException ex) {
935                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
936            }
937        }
938    }