001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.servlet;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Set;
026
027import javax.servlet.http.HttpServletRequest;
028import javax.servlet.http.HttpServletResponse;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.oozie.BaseEngineException;
032import org.apache.oozie.BulkResponseInfo;
033import org.apache.oozie.BundleJobBean;
034import org.apache.oozie.BundleJobInfo;
035import org.apache.oozie.CoordinatorEngine;
036import org.apache.oozie.BundleEngine;
037import org.apache.oozie.CoordinatorEngineException;
038import org.apache.oozie.BundleEngineException;
039import org.apache.oozie.CoordinatorJobBean;
040import org.apache.oozie.CoordinatorJobInfo;
041import org.apache.oozie.DagEngine;
042import org.apache.oozie.DagEngineException;
043import org.apache.oozie.ErrorCode;
044import org.apache.oozie.WorkflowJobBean;
045import org.apache.oozie.WorkflowsInfo;
046import org.apache.oozie.cli.OozieCLI;
047import org.apache.oozie.client.OozieClient;
048import org.apache.oozie.client.rest.BulkResponseImpl;
049import org.apache.oozie.client.rest.JsonTags;
050import org.apache.oozie.client.rest.RestConstants;
051import org.apache.oozie.service.CoordinatorEngineService;
052import org.apache.oozie.service.DagEngineService;
053import org.apache.oozie.service.BundleEngineService;
054import org.apache.oozie.service.Services;
055import org.apache.oozie.util.XLog;
056import org.apache.oozie.util.XmlUtils;
057import org.json.simple.JSONArray;
058import org.json.simple.JSONObject;
059
060public class V1JobsServlet extends BaseJobsServlet {
061
062    private static final String INSTRUMENTATION_NAME = "v1jobs";
063    private static final Set<String> httpJobType = new HashSet<String>(){{
064        this.add(OozieCLI.HIVE_CMD);
065        this.add(OozieCLI.SQOOP_CMD);
066        this.add(OozieCLI.PIG_CMD);
067        this.add(OozieCLI.MR_CMD);
068    }};
069
070    public V1JobsServlet() {
071        super(INSTRUMENTATION_NAME);
072    }
073
074    /**
075     * v1 service implementation to submit a job, either workflow or coordinator
076     */
077    @Override
078    protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException,
079            IOException {
080        JSONObject json = null;
081
082        String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM);
083
084        if (jobType == null) {
085            String wfPath = conf.get(OozieClient.APP_PATH);
086            String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH);
087            String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH);
088
089            ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath);
090
091            if (wfPath != null) {
092                json = submitWorkflowJob(request, conf);
093            }
094            else if (coordPath != null) {
095                json = submitCoordinatorJob(request, conf);
096            }
097            else {
098                json = submitBundleJob(request, conf);
099            }
100        }
101        else { // This is a http submission job
102            if (httpJobType.contains(jobType)) {
103                json = submitHttpJob(request, conf, jobType);
104            }
105            else {
106                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
107                        RestConstants.JOBTYPE_PARAM, jobType);
108            }
109        }
110        return json;
111    }
112
113    /**
114     * v1 service implementation to get a JSONObject representation of a job from its external ID
115     */
116    @Override
117    protected JSONObject getJobIdForExternalId(HttpServletRequest request, String externalId) throws XServletException,
118            IOException {
119        JSONObject json = null;
120        /*
121         * Configuration conf = new XConfiguration(); String wfPath =
122         * conf.get(OozieClient.APP_PATH); String coordPath =
123         * conf.get(OozieClient.COORDINATOR_APP_PATH);
124         *
125         * ServletUtilities.ValidateAppPath(wfPath, coordPath);
126         */
127        String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM);
128        jobtype = (jobtype != null) ? jobtype : "wf";
129        if (jobtype.contains("wf")) {
130            json = getWorkflowJobIdForExternalId(request, externalId);
131        }
132        else {
133            json = getCoordinatorJobIdForExternalId(request, externalId);
134        }
135        return json;
136    }
137
138    /**
139     * v1 service implementation to get a list of workflows, coordinators, or bundles, with filtering or interested
140     * windows embedded in the request object
141     */
142    @Override
143    protected JSONObject getJobs(HttpServletRequest request) throws XServletException, IOException {
144        JSONObject json = null;
145        String isBulk = request.getParameter(RestConstants.JOBS_BULK_PARAM);
146        if(isBulk != null) {
147            json = getBulkJobs(request);
148        } else {
149            String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM);
150            jobtype = (jobtype != null) ? jobtype : "wf";
151
152            if (jobtype.contains("wf")) {
153                json = getWorkflowJobs(request);
154            }
155            else if (jobtype.contains("coord")) {
156                json = getCoordinatorJobs(request);
157            }
158            else if (jobtype.contains("bundle")) {
159                json = getBundleJobs(request);
160            }
161        }
162        return json;
163    }
164
165    /**
166     * v1 service implementation to submit a workflow job
167     */
168    @SuppressWarnings("unchecked")
169    private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException {
170
171        JSONObject json = new JSONObject();
172
173        try {
174            String action = request.getParameter(RestConstants.ACTION_PARAM);
175            if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
176                    && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
177                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
178                        RestConstants.ACTION_PARAM, action);
179            }
180            boolean startJob = (action != null);
181            String user = conf.get(OozieClient.USER_NAME);
182            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
183            String id;
184            boolean dryrun = false;
185            if (action != null) {
186                dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
187            }
188            if (dryrun) {
189                id = dagEngine.dryRunSubmit(conf);
190            }
191            else {
192                id = dagEngine.submitJob(conf, startJob);
193            }
194            json.put(JsonTags.JOB_ID, id);
195        }
196        catch (BaseEngineException ex) {
197            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
198        }
199
200        return json;
201    }
202
203    /**
204     * v1 service implementation to submit a coordinator job
205     */
206    @SuppressWarnings("unchecked")
207    private JSONObject submitCoordinatorJob(HttpServletRequest request, Configuration conf) throws XServletException {
208
209        JSONObject json = new JSONObject();
210        XLog.getLog(getClass()).warn("submitCoordinatorJob " + XmlUtils.prettyPrint(conf).toString());
211        try {
212            String action = request.getParameter(RestConstants.ACTION_PARAM);
213            if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
214                    && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
215                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
216                        RestConstants.ACTION_PARAM, action);
217            }
218            boolean startJob = (action != null);
219            String user = conf.get(OozieClient.USER_NAME);
220            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
221                    user);
222            String id = null;
223            boolean dryrun = false;
224            if (action != null) {
225                dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
226            }
227            if (dryrun) {
228                id = coordEngine.dryRunSubmit(conf);
229            }
230            else {
231                id = coordEngine.submitJob(conf, startJob);
232            }
233            json.put(JsonTags.JOB_ID, id);
234        }
235        catch (CoordinatorEngineException ex) {
236            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
237        }
238
239        return json;
240    }
241
242    /**
243     * v1 service implementation to submit a bundle job
244     */
245    @SuppressWarnings("unchecked")
246    private JSONObject submitBundleJob(HttpServletRequest request, Configuration conf) throws XServletException {
247        JSONObject json = new JSONObject();
248        XLog.getLog(getClass()).warn("submitBundleJob " + XmlUtils.prettyPrint(conf).toString());
249        try {
250            String action = request.getParameter(RestConstants.ACTION_PARAM);
251            if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
252                    && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
253                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
254                        RestConstants.ACTION_PARAM, action);
255            }
256            boolean startJob = (action != null);
257            String user = conf.get(OozieClient.USER_NAME);
258            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(user);
259            String id = null;
260            boolean dryrun = false;
261            if (action != null) {
262                dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
263            }
264            if (dryrun) {
265                id = bundleEngine.dryRunSubmit(conf);
266            }
267            else {
268                id = bundleEngine.submitJob(conf, startJob);
269            }
270            json.put(JsonTags.JOB_ID, id);
271        }
272        catch (BundleEngineException ex) {
273            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
274        }
275
276        return json;
277    }
278
279    /**
280     * v1 service implementation to get a JSONObject representation of a job from its external ID
281     */
282    @SuppressWarnings("unchecked")
283    private JSONObject getWorkflowJobIdForExternalId(HttpServletRequest request, String externalId)
284            throws XServletException {
285        JSONObject json = new JSONObject();
286        try {
287            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
288            String jobId = dagEngine.getJobIdForExternalId(externalId);
289            json.put(JsonTags.JOB_ID, jobId);
290        }
291        catch (DagEngineException ex) {
292            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
293        }
294        return json;
295    }
296
297    /**
298     * v1 service implementation to get a JSONObject representation of a job from its external ID
299     */
300    private JSONObject getCoordinatorJobIdForExternalId(HttpServletRequest request, String externalId)
301            throws XServletException {
302        JSONObject json = new JSONObject();
303        return json;
304    }
305
306    /**
307     * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the
308     * request object
309     */
310    private JSONObject getWorkflowJobs(HttpServletRequest request) throws XServletException {
311        JSONObject json = new JSONObject();
312        try {
313            String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
314            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
315            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
316            String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 
317                    ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
318            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
319            start = (start < 1) ? 1 : start;
320            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
321            len = (len < 1) ? 50 : len;
322            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
323            WorkflowsInfo jobs = dagEngine.getJobs(filter, start, len);
324            List<WorkflowJobBean> jsonWorkflows = jobs.getWorkflows();
325            json.put(JsonTags.WORKFLOWS_JOBS, WorkflowJobBean.toJSONArray(jsonWorkflows, timeZoneId));
326            json.put(JsonTags.WORKFLOWS_TOTAL, jobs.getTotal());
327            json.put(JsonTags.WORKFLOWS_OFFSET, jobs.getStart());
328            json.put(JsonTags.WORKFLOWS_LEN, jobs.getLen());
329
330        }
331        catch (DagEngineException ex) {
332            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
333        }
334
335        return json;
336    }
337
338    /**
339     * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the
340     * request object
341     */
342    @SuppressWarnings("unchecked")
343    private JSONObject getCoordinatorJobs(HttpServletRequest request) throws XServletException {
344        JSONObject json = new JSONObject();
345        try {
346            String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
347            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
348            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
349            String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 
350                    ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
351            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
352            start = (start < 1) ? 1 : start;
353            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
354            len = (len < 1) ? 50 : len;
355            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
356                    getUser(request));
357            CoordinatorJobInfo jobs = coordEngine.getCoordJobs(filter, start, len);
358            List<CoordinatorJobBean> jsonJobs = jobs.getCoordJobs();
359            json.put(JsonTags.COORDINATOR_JOBS, CoordinatorJobBean.toJSONArray(jsonJobs, timeZoneId));
360            json.put(JsonTags.COORD_JOB_TOTAL, jobs.getTotal());
361            json.put(JsonTags.COORD_JOB_OFFSET, jobs.getStart());
362            json.put(JsonTags.COORD_JOB_LEN, jobs.getLen());
363
364        }
365        catch (CoordinatorEngineException ex) {
366            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
367        }
368        return json;
369    }
370
371    @SuppressWarnings("unchecked")
372    private JSONObject getBundleJobs(HttpServletRequest request) throws XServletException {
373        JSONObject json = new JSONObject();
374        try {
375            String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
376            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
377            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
378            String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 
379                    ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
380            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
381            start = (start < 1) ? 1 : start;
382            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
383            len = (len < 1) ? 50 : len;
384
385            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
386            BundleJobInfo jobs = bundleEngine.getBundleJobs(filter, start, len);
387            List<BundleJobBean> jsonJobs = jobs.getBundleJobs();
388
389            json.put(JsonTags.BUNDLE_JOBS, BundleJobBean.toJSONArray(jsonJobs, timeZoneId));
390            json.put(JsonTags.BUNDLE_JOB_TOTAL, jobs.getTotal());
391            json.put(JsonTags.BUNDLE_JOB_OFFSET, jobs.getStart());
392            json.put(JsonTags.BUNDLE_JOB_LEN, jobs.getLen());
393
394        }
395        catch (BundleEngineException ex) {
396            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
397        }
398        return json;
399    }
400
401    @SuppressWarnings("unchecked")
402    private JSONObject getBulkJobs(HttpServletRequest request) throws XServletException, IOException {
403        JSONObject json = new JSONObject();
404        try {
405            String bulkFilter = request.getParameter(RestConstants.JOBS_BULK_PARAM); //REST API
406            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
407            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
408            String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
409                    ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
410            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
411            start = (start < 1) ? 1 : start;
412            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
413            len = (len < 1) ? 50 : len;
414
415            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
416            BulkResponseInfo bulkResponse = bundleEngine.getBulkJobs(bulkFilter, start, len);
417            List<BulkResponseImpl> responsesToJson = bulkResponse.getResponses();
418
419            json.put(JsonTags.BULK_RESPONSES, BulkResponseImpl.toJSONArray(responsesToJson, timeZoneId));
420            json.put(JsonTags.BULK_RESPONSE_TOTAL, bulkResponse.getTotal());
421            json.put(JsonTags.BULK_RESPONSE_OFFSET, bulkResponse.getStart());
422            json.put(JsonTags.BULK_RESPONSE_LEN, bulkResponse.getLen());
423
424        }
425        catch (BaseEngineException ex) {
426            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
427        }
428        return json;
429    }
430
431    /**
432     * service implementation to submit a http job
433     */
434    private JSONObject submitHttpJob(HttpServletRequest request, Configuration conf, String jobType)
435            throws XServletException {
436        JSONObject json = new JSONObject();
437
438        try {
439            String user = conf.get(OozieClient.USER_NAME);
440            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
441            String id = dagEngine.submitHttpJob(conf, jobType);
442            json.put(JsonTags.JOB_ID, id);
443        }
444        catch (DagEngineException ex) {
445            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
446        }
447
448        return json;
449    }
450
451    /**
452     * service implementation to bulk kill jobs
453     * @param request
454     * @param response
455     * @return
456     * @throws XServletException
457     * @throws IOException
458     */
459    @Override
460    protected JSONObject killJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException,
461            IOException {
462        return bulkModifyJobs(request, response);
463    }
464
465    /**
466     * service implementation to bulk suspend jobs
467     * @param request
468     * @param response
469     * @return
470     * @throws XServletException
471     * @throws IOException
472     */
473    @Override
474    protected JSONObject suspendJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException,
475            IOException {
476        return bulkModifyJobs(request, response);
477    }
478
479    /**
480     * service implementation to bulk resume jobs
481     * @param request
482     * @param response
483     * @return
484     * @throws XServletException
485     * @throws IOException
486     */
487    @Override
488    protected JSONObject resumeJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException,
489            IOException {
490        return bulkModifyJobs(request, response);
491    }
492
493    private JSONObject bulkModifyJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException,
494            IOException {
495        String action = request.getParameter(RestConstants.ACTION_PARAM);
496        String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM);
497        String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
498        String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
499        String lenStr = request.getParameter(RestConstants.LEN_PARAM);
500        String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
501                ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
502
503        int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
504        start = (start < 1) ? 1 : start;
505        int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
506        len = (len < 1) ? 50 : len;
507
508        JSONObject json = new JSONObject();
509        List<String> ids = new ArrayList<String>();
510
511        if (jobType.equals("wf")) {
512            WorkflowsInfo jobs = null;
513            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
514            if (action.equals(RestConstants.JOB_ACTION_KILL)) {
515                try {
516                    jobs = dagEngine.killJobs(filter, start, len);
517                }
518                catch (DagEngineException ex) {
519                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
520                }
521            } else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) {
522                try {
523                    jobs = dagEngine.suspendJobs(filter, start, len);
524                }
525                catch (DagEngineException ex) {
526                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
527                }
528            } else if (action.equals(RestConstants.JOB_ACTION_RESUME)) {
529                try {
530                    jobs = dagEngine.resumeJobs(filter, start, len);
531                }
532                catch (DagEngineException ex) {
533                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
534                }
535            }
536
537            json.put(JsonTags.WORKFLOWS_JOBS, WorkflowJobBean.toJSONArray(jobs.getWorkflows(), timeZoneId));
538            json.put(JsonTags.WORKFLOWS_TOTAL, jobs.getTotal());
539            json.put(JsonTags.WORKFLOWS_OFFSET, jobs.getStart());
540            json.put(JsonTags.WORKFLOWS_LEN, jobs.getLen());
541
542        }
543        else if (jobType.equals("bundle")) {
544            BundleJobInfo jobs = null;
545            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).
546                    getBundleEngine(getUser(request));
547            if (action.equals(RestConstants.JOB_ACTION_KILL)) {
548                try {
549                    jobs = bundleEngine.killJobs(filter, start, len);
550                }
551                catch (BundleEngineException ex) {
552                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
553                }
554            }
555            else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) {
556                try {
557                    jobs = bundleEngine.suspendJobs(filter, start, len);
558                }
559                catch (BundleEngineException ex) {
560                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
561                }
562            }
563            else if (action.equals(RestConstants.JOB_ACTION_RESUME)) {
564                try {
565                    jobs = bundleEngine.resumeJobs(filter, start, len);
566                }
567                catch (BundleEngineException ex) {
568                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
569                }
570            }
571
572            json.put(JsonTags.BUNDLE_JOBS, BundleJobBean.toJSONArray(jobs.getBundleJobs(), timeZoneId));
573            json.put(JsonTags.BUNDLE_JOB_TOTAL, jobs.getTotal());
574            json.put(JsonTags.BUNDLE_JOB_OFFSET, jobs.getStart());
575            json.put(JsonTags.BUNDLE_JOB_LEN, jobs.getLen());
576        }
577        else {
578            CoordinatorJobInfo jobs = null;
579            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).
580                    getCoordinatorEngine(getUser(request));
581            if (action.equals(RestConstants.JOB_ACTION_KILL)) {
582                try {
583                    jobs = coordEngine.killJobs(filter, start, len);
584                }
585                catch (CoordinatorEngineException ex) {
586                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
587                }
588            }
589            else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) {
590                try {
591                    jobs = coordEngine.suspendJobs(filter, start, len);
592                }
593                catch (CoordinatorEngineException ex) {
594                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
595                }
596            }
597            else if (action.equals(RestConstants.JOB_ACTION_RESUME)) {
598                try {
599                    jobs = coordEngine.resumeJobs(filter, start, len);
600                }
601                catch (CoordinatorEngineException ex) {
602                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
603                }
604            }
605
606            json.put(JsonTags.COORDINATOR_JOBS, CoordinatorJobBean.toJSONArray(jobs.getCoordJobs(), timeZoneId));
607            json.put(JsonTags.COORD_JOB_TOTAL, jobs.getTotal());
608            json.put(JsonTags.COORD_JOB_OFFSET, jobs.getStart());
609            json.put(JsonTags.COORD_JOB_LEN, jobs.getLen());
610        }
611
612        json.put(JsonTags.JOB_IDS, toJSONArray(ids));
613        return json;
614    }
615
616    private static JSONArray toJSONArray(List<String> ids) {
617        JSONArray array = new JSONArray();
618        for (String id : ids) {
619            array.add(id);
620        }
621        return array;
622    }
623}