This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.servlet;
019
020 import java.io.IOException;
021 import java.util.List;
022
023 import javax.servlet.ServletInputStream;
024 import javax.servlet.http.HttpServletRequest;
025 import javax.servlet.http.HttpServletResponse;
026
027 import org.apache.hadoop.conf.Configuration;
028 import org.apache.oozie.BaseEngineException;
029 import org.apache.oozie.BundleEngine;
030 import org.apache.oozie.BundleEngineException;
031 import org.apache.oozie.CoordinatorActionBean;
032 import org.apache.oozie.CoordinatorActionInfo;
033 import org.apache.oozie.CoordinatorEngine;
034 import org.apache.oozie.CoordinatorEngineException;
035 import org.apache.oozie.command.CommandException;
036 import org.apache.oozie.DagEngine;
037 import org.apache.oozie.DagEngineException;
038 import org.apache.oozie.ErrorCode;
039 import org.apache.oozie.client.rest.JsonBean;
040 import org.apache.oozie.client.rest.JsonCoordinatorJob;
041 import org.apache.oozie.client.rest.JsonTags;
042 import org.apache.oozie.client.rest.RestConstants;
043 import org.apache.oozie.service.BundleEngineService;
044 import org.apache.oozie.service.CoordinatorEngineService;
045 import org.apache.oozie.service.DagEngineService;
046 import org.apache.oozie.service.Services;
047 import org.apache.oozie.util.XLog;
048 import org.json.simple.JSONObject;
049
050 @SuppressWarnings("serial")
051 public class V1JobServlet extends BaseJobServlet {
052
053 private static final String INSTRUMENTATION_NAME = "v1job";
054
055 public V1JobServlet() {
056 super(INSTRUMENTATION_NAME);
057 }
058
059 /*
060 * protected method to start a job
061 */
062 @Override
063 protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
064 IOException {
065 /*
066 * Configuration conf = new XConfiguration(request.getInputStream());
067 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
068 * conf.get(OozieClient.COORDINATOR_APP_PATH);
069 *
070 * ServletUtilities.ValidateAppPath(wfPath, coordPath);
071 */
072 String jobId = getResourceName(request);
073 if (jobId.endsWith("-W")) {
074 startWorkflowJob(request, response);
075 }
076 else if (jobId.endsWith("-B")) {
077 startBundleJob(request, response);
078 }
079 else {
080 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303);
081 }
082
083 }
084
085 /*
086 * protected method to resume a job
087 */
088 @Override
089 protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
090 IOException {
091 /*
092 * Configuration conf = new XConfiguration(request.getInputStream());
093 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
094 * conf.get(OozieClient.COORDINATOR_APP_PATH);
095 *
096 * ServletUtilities.ValidateAppPath(wfPath, coordPath);
097 */
098 String jobId = getResourceName(request);
099 if (jobId.endsWith("-W")) {
100 resumeWorkflowJob(request, response);
101 }
102 else if (jobId.endsWith("-B")) {
103 resumeBundleJob(request, response);
104 }
105 else {
106 resumeCoordinatorJob(request, response);
107 }
108 }
109
110 /*
111 * protected method to suspend a job
112 */
113 @Override
114 protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
115 IOException {
116 /*
117 * Configuration conf = new XConfiguration(request.getInputStream());
118 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
119 * conf.get(OozieClient.COORDINATOR_APP_PATH);
120 *
121 * ServletUtilities.ValidateAppPath(wfPath, coordPath);
122 */
123 String jobId = getResourceName(request);
124 if (jobId.endsWith("-W")) {
125 suspendWorkflowJob(request, response);
126 }
127 else if (jobId.endsWith("-B")) {
128 suspendBundleJob(request, response);
129 }
130 else {
131 suspendCoordinatorJob(request, response);
132 }
133 }
134
135 /*
136 * protected method to kill a job
137 */
138 @Override
139 protected void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
140 IOException {
141 /*
142 * Configuration conf = new XConfiguration(request.getInputStream());
143 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
144 * conf.get(OozieClient.COORDINATOR_APP_PATH);
145 *
146 * ServletUtilities.ValidateAppPath(wfPath, coordPath);
147 */
148 String jobId = getResourceName(request);
149 if (jobId.endsWith("-W")) {
150 killWorkflowJob(request, response);
151 }
152 else if (jobId.endsWith("-B")) {
153 killBundleJob(request, response);
154 }
155 else {
156 killCoordinatorJob(request, response);
157 }
158 }
159
160 /**
161 * protected method to change a coordinator job
162 * @param request request object
163 * @param response response object
164 * @throws XServletException
165 * @throws IOException
166 */
167 @Override
168 protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
169 IOException {
170 String jobId = getResourceName(request);
171 if (jobId.endsWith("-B")) {
172 changeBundleJob(request, response);
173 }
174 else {
175 changeCoordinatorJob(request, response);
176 }
177 }
178
179 /*
180 * protected method to reRun a job
181 *
182 * @seeorg.apache.oozie.servlet.BaseJobServlet#reRunJob(javax.servlet.http.
183 * HttpServletRequest, javax.servlet.http.HttpServletResponse,
184 * org.apache.hadoop.conf.Configuration)
185 */
186 @Override
187 protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
188 throws XServletException, IOException {
189 JSONObject json = null;
190 String jobId = getResourceName(request);
191 if (jobId.endsWith("-W")) {
192 reRunWorkflowJob(request, response, conf);
193 }
194 else if (jobId.endsWith("-B")) {
195 rerunBundleJob(request, response, conf);
196 }
197 else {
198 json = reRunCoordinatorActions(request, response, conf);
199 }
200 return json;
201 }
202
203 /*
204 * protected method to get a job in JsonBean representation
205 */
206 @Override
207 protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
208 IOException, BaseEngineException {
209 ServletInputStream is = request.getInputStream();
210 byte[] b = new byte[101];
211 while (is.readLine(b, 0, 100) != -1) {
212 XLog.getLog(getClass()).warn("Printing :" + new String(b));
213 }
214
215 JsonBean jobBean = null;
216 String jobId = getResourceName(request);
217 if (jobId.endsWith("-B")) {
218 jobBean = getBundleJob(request, response);
219 }
220 else {
221 if (jobId.endsWith("-W")) {
222 jobBean = getWorkflowJob(request, response);
223 }
224 else {
225 if (jobId.contains("-W@")) {
226 jobBean = getWorkflowAction(request, response);
227 }
228 else {
229 if (jobId.contains("-C@")) {
230 jobBean = getCoordinatorAction(request, response);
231 }
232 else {
233 jobBean = getCoordinatorJob(request, response);
234 }
235 }
236 }
237 }
238
239 return jobBean;
240 }
241
242 /*
243 * protected method to get a job definition in String format
244 */
245 @Override
246 protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
247 throws XServletException, IOException {
248 String jobDefinition = null;
249 String jobId = getResourceName(request);
250 if (jobId.endsWith("-W")) {
251 jobDefinition = getWorkflowJobDefinition(request, response);
252 }
253 else if (jobId.endsWith("-B")) {
254 jobDefinition = getBundleJobDefinition(request, response);
255 }
256 else {
257 jobDefinition = getCoordinatorJobDefinition(request, response);
258 }
259 return jobDefinition;
260 }
261
262 /*
263 * protected method to stream a job log into response object
264 */
265 @Override
266 protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
267 IOException {
268 String jobId = getResourceName(request);
269 if (jobId.endsWith("-W")) {
270 streamWorkflowJobLog(request, response);
271 }
272 else if (jobId.endsWith("-B")) {
273 streamBundleJob(request, response);
274 }
275 else {
276 streamCoordinatorJobLog(request, response);
277 }
278 }
279
280 /**
281 * Start wf job
282 *
283 * @param request servlet request
284 * @param response servlet response
285 * @throws XServletException
286 */
287 private void startWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
288 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
289 getAuthToken(request));
290
291 String jobId = getResourceName(request);
292 try {
293 dagEngine.start(jobId);
294 }
295 catch (DagEngineException ex) {
296 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
297 }
298 }
299
300 /**
301 * Start bundle job
302 *
303 * @param request servlet request
304 * @param response servlet response
305 * @throws XServletException
306 */
307 private void startBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
308 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
309 getAuthToken(request));
310 String jobId = getResourceName(request);
311 try {
312 bundleEngine.start(jobId);
313 }
314 catch (BundleEngineException ex) {
315 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
316 }
317 }
318
319 /**
320 * Resume workflow job
321 *
322 * @param request servlet request
323 * @param response servlet response
324 * @throws XServletException
325 */
326 private void resumeWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
327 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
328 getAuthToken(request));
329
330 String jobId = getResourceName(request);
331 try {
332 dagEngine.resume(jobId);
333 }
334 catch (DagEngineException ex) {
335 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
336 }
337 }
338
339 /**
340 * Resume bundle job
341 *
342 * @param request servlet request
343 * @param response servlet response
344 * @throws XServletException
345 */
346 private void resumeBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
347 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
348 getAuthToken(request));
349 String jobId = getResourceName(request);
350 try {
351 bundleEngine.resume(jobId);
352 }
353 catch (BundleEngineException ex) {
354 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
355 }
356 }
357
358 /**
359 * Resume coordinator job
360 *
361 * @param request servlet request
362 * @param response servlet response
363 * @throws XServletException
364 * @throws CoordinatorEngineException
365 */
366 private void resumeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
367 throws XServletException {
368 String jobId = getResourceName(request);
369 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
370 getUser(request), getAuthToken(request));
371 try {
372 coordEngine.resume(jobId);
373 }
374 catch (CoordinatorEngineException ex) {
375 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
376 }
377 }
378
379 /**
380 * Suspend a wf job
381 *
382 * @param request servlet request
383 * @param response servlet response
384 * @throws XServletException
385 */
386 private void suspendWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
387 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
388 getAuthToken(request));
389
390 String jobId = getResourceName(request);
391 try {
392 dagEngine.suspend(jobId);
393 }
394 catch (DagEngineException ex) {
395 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
396 }
397 }
398
399 /**
400 * Suspend bundle job
401 *
402 * @param request servlet request
403 * @param response servlet response
404 * @throws XServletException
405 */
406 private void suspendBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
407 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
408 getAuthToken(request));
409 String jobId = getResourceName(request);
410 try {
411 bundleEngine.suspend(jobId);
412 }
413 catch (BundleEngineException ex) {
414 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
415 }
416 }
417
418 /**
419 * Suspend coordinator job
420 *
421 * @param request servlet request
422 * @param response servlet response
423 * @throws XServletException
424 */
425 private void suspendCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
426 throws XServletException {
427 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
428 getUser(request), getAuthToken(request));
429 String jobId = getResourceName(request);
430 try {
431 coordEngine.suspend(jobId);
432 }
433 catch (CoordinatorEngineException ex) {
434 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
435 }
436 }
437
438 /**
439 * Kill a wf job
440 * @param request servlet request
441 * @param response servlet response
442 * @throws XServletException
443 */
444 private void killWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
445 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
446 getAuthToken(request));
447
448 String jobId = getResourceName(request);
449 try {
450 dagEngine.kill(jobId);
451 }
452 catch (DagEngineException ex) {
453 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
454 }
455 }
456
457 /**
458 * Kill a coord job
459 * @param request servlet request
460 * @param response servlet response
461 * @throws XServletException
462 */
463 private void killCoordinatorJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
464 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
465 getUser(request), getAuthToken(request));
466 String jobId = getResourceName(request);
467 try {
468 coordEngine.kill(jobId);
469 }
470 catch (CoordinatorEngineException ex) {
471 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
472 }
473 }
474
475 /**
476 * Kill bundle job
477 *
478 * @param request servlet request
479 * @param response servlet response
480 * @throws XServletException
481 */
482 private void killBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
483 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
484 getAuthToken(request));
485 String jobId = getResourceName(request);
486 try {
487 bundleEngine.kill(jobId);
488 }
489 catch (BundleEngineException ex) {
490 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
491 }
492 }
493
494 /**
495 * Change a coordinator job
496 *
497 * @param request servlet request
498 * @param response servlet response
499 * @throws XServletException
500 */
501 private void changeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
502 throws XServletException {
503 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
504 getUser(request), getAuthToken(request));
505 String jobId = getResourceName(request);
506 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
507 try {
508 coordEngine.change(jobId, changeValue);
509 }
510 catch (CoordinatorEngineException ex) {
511 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
512 }
513 }
514
515 /**
516 * Change a bundle job
517 *
518 * @param request servlet request
519 * @param response servlet response
520 * @throws XServletException
521 */
522 private void changeBundleJob(HttpServletRequest request, HttpServletResponse response)
523 throws XServletException {
524 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(
525 getUser(request), getAuthToken(request));
526 String jobId = getResourceName(request);
527 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
528 try {
529 bundleEngine.change(jobId, changeValue);
530 }
531 catch (BundleEngineException ex) {
532 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
533 }
534 }
535
536 /**
537 * Rerun a wf job
538 *
539 * @param request servlet request
540 * @param response servlet response
541 * @param conf configuration object
542 * @throws XServletException
543 */
544 private void reRunWorkflowJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
545 throws XServletException {
546 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
547 getAuthToken(request));
548
549 String jobId = getResourceName(request);
550 try {
551 dagEngine.reRun(jobId, conf);
552 }
553 catch (DagEngineException ex) {
554 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
555 }
556 }
557
558 /**
559 * Rerun bundle job
560 *
561 * @param request servlet request
562 * @param response servlet response
563 * @param conf configration object
564 * @throws XServletException
565 */
566 private void rerunBundleJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
567 throws XServletException {
568 JSONObject json = new JSONObject();
569 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
570 getAuthToken(request));
571 String jobId = getResourceName(request);
572
573 String coordScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM);
574 String dateScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM);
575 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
576 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
577
578 XLog.getLog(getClass()).info(
579 "Rerun Bundle for jobId=" + jobId + ", coordScope=" + coordScope + ", dateScope=" + dateScope + ", refresh="
580 + refresh + ", noCleanup=" + noCleanup);
581
582 try {
583 bundleEngine.reRun(jobId, coordScope, dateScope, Boolean.valueOf(refresh), Boolean.valueOf(noCleanup));
584 }
585 catch (BaseEngineException ex) {
586 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
587 }
588 }
589
590 /**
591 * Rerun coordinator actions
592 *
593 * @param request servlet request
594 * @param response servlet response
595 * @param conf configuration object
596 * @throws XServletException
597 */
598 @SuppressWarnings("unchecked")
599 private JSONObject reRunCoordinatorActions(HttpServletRequest request, HttpServletResponse response,
600 Configuration conf) throws XServletException {
601 JSONObject json = new JSONObject();
602 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request),
603 getAuthToken(request));
604
605 String jobId = getResourceName(request);
606
607 String rerunType = request.getParameter(RestConstants.JOB_COORD_RERUN_TYPE_PARAM);
608 String scope = request.getParameter(RestConstants.JOB_COORD_RERUN_SCOPE_PARAM);
609 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
610 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
611
612 XLog.getLog(getClass()).info(
613 "Rerun coordinator for jobId=" + jobId + ", rerunType=" + rerunType + ",scope=" + scope + ",refresh="
614 + refresh + ", noCleanup=" + noCleanup);
615
616 try {
617 CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh),
618 Boolean.valueOf(noCleanup));
619 List<CoordinatorActionBean> actions = coordInfo.getCoordActions();
620 json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(actions));
621 }
622 catch (BaseEngineException ex) {
623 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
624 }
625
626 return json;
627 }
628
629 /**
630 * Get workflow job
631 *
632 * @param request servlet request
633 * @param response servlet response
634 * @return JsonBean WorkflowJobBean
635 * @throws XServletException
636 */
637 private JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
638 JsonBean jobBean = null;
639 String jobId = getResourceName(request);
640 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
641 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
642 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
643 start = (start < 1) ? 1 : start;
644 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
645 len = (len < 1) ? Integer.MAX_VALUE : len;
646 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
647 getAuthToken(request));
648 try {
649 jobBean = (JsonBean) dagEngine.getJob(jobId, start, len);
650 }
651 catch (DagEngineException ex) {
652 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
653 }
654
655 return jobBean;
656 }
657
658 /**
659 * Get wf action info
660 *
661 * @param request servlet request
662 * @param response servlet response
663 * @return JsonBean WorkflowActionBean
664 * @throws XServletException
665 */
666 private JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response)
667 throws XServletException {
668 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
669 getAuthToken(request));
670
671 JsonBean actionBean = null;
672 String actionId = getResourceName(request);
673 try {
674 actionBean = dagEngine.getWorkflowAction(actionId);
675 }
676 catch (BaseEngineException ex) {
677 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
678 }
679
680 return actionBean;
681 }
682
683 /**
684 * Get coord job info
685 *
686 * @param request servlet request
687 * @param response servlet response
688 * @return JsonBean CoordinatorJobBean
689 * @throws XServletException
690 * @throws BaseEngineException
691 */
692 private JsonBean getCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
693 throws XServletException, BaseEngineException {
694 JsonBean jobBean = null;
695 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
696 getUser(request), getAuthToken(request));
697 String jobId = getResourceName(request);
698 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
699 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
700 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
701 start = (start < 1) ? 1 : start;
702 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
703 len = (len < 1) ? Integer.MAX_VALUE : len;
704 try {
705 JsonCoordinatorJob coordJob = coordEngine.getCoordJob(jobId, start, len);
706 jobBean = coordJob;
707 }
708 catch (CoordinatorEngineException ex) {
709 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
710 }
711
712 return jobBean;
713 }
714
715 /**
716 * Get bundle job info
717 *
718 * @param request servlet request
719 * @param response servlet response
720 * @return JsonBean bundle job bean
721 * @throws XServletException
722 * @throws BaseEngineException
723 */
724 private JsonBean getBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
725 BaseEngineException {
726 JsonBean jobBean = null;
727 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
728 getAuthToken(request));
729 String jobId = getResourceName(request);
730
731 try {
732 jobBean = (JsonBean) bundleEngine.getBundleJob(jobId);
733
734 return jobBean;
735 }
736 catch (BundleEngineException ex) {
737 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
738 }
739 }
740
741 /**
742 * Get coordinator action
743 *
744 * @param request servlet request
745 * @param response servlet response
746 * @return JsonBean CoordinatorActionBean
747 * @throws XServletException
748 * @throws BaseEngineException
749 */
750 private JsonBean getCoordinatorAction(HttpServletRequest request, HttpServletResponse response)
751 throws XServletException, BaseEngineException {
752 JsonBean actionBean = null;
753 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
754 getUser(request), getAuthToken(request));
755 String actionId = getResourceName(request);
756 try {
757 actionBean = coordEngine.getCoordAction(actionId);
758 }
759 catch (CoordinatorEngineException ex) {
760 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
761 }
762
763 return actionBean;
764 }
765
766 /**
767 * Get wf job definition
768 *
769 * @param request servlet request
770 * @param response servlet response
771 * @return String wf definition
772 * @throws XServletException
773 */
774 private String getWorkflowJobDefinition(HttpServletRequest request, HttpServletResponse response)
775 throws XServletException {
776 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
777 getAuthToken(request));
778
779 String wfDefinition;
780 String jobId = getResourceName(request);
781 try {
782 wfDefinition = dagEngine.getDefinition(jobId);
783 }
784 catch (DagEngineException ex) {
785 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
786 }
787 return wfDefinition;
788 }
789
790 /**
791 * Get bundle job definition
792 *
793 * @param request servlet request
794 * @param response servlet response
795 * @return String bundle definition
796 * @throws XServletException
797 */
798 private String getBundleJobDefinition(HttpServletRequest request, HttpServletResponse response) throws XServletException {
799 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
800 getAuthToken(request));
801 String bundleDefinition;
802 String jobId = getResourceName(request);
803 try {
804 bundleDefinition = bundleEngine.getDefinition(jobId);
805 }
806 catch (BundleEngineException ex) {
807 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
808 }
809 return bundleDefinition;
810 }
811
812 /**
813 * Get coordinator job definition
814 *
815 * @param request servlet request
816 * @param response servlet response
817 * @return String coord definition
818 * @throws XServletException
819 */
820 private String getCoordinatorJobDefinition(HttpServletRequest request, HttpServletResponse response)
821 throws XServletException {
822
823 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
824 getUser(request), getAuthToken(request));
825
826 String jobId = getResourceName(request);
827
828 String coordDefinition = null;
829 try {
830 coordDefinition = coordEngine.getDefinition(jobId);
831 }
832 catch (BaseEngineException ex) {
833 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
834 }
835 return coordDefinition;
836 }
837
838 /**
839 * Stream wf job log
840 *
841 * @param request servlet request
842 * @param response servlet response
843 * @throws XServletException
844 * @throws IOException
845 */
846 private void streamWorkflowJobLog(HttpServletRequest request, HttpServletResponse response)
847 throws XServletException, IOException {
848 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
849 getAuthToken(request));
850 String jobId = getResourceName(request);
851 try {
852 dagEngine.streamLog(jobId, response.getWriter());
853 }
854 catch (DagEngineException ex) {
855 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
856 }
857 }
858
859 /**
860 * Stream bundle job log
861 *
862 * @param request servlet request
863 * @param response servlet response
864 * @throws XServletException
865 */
866 private void streamBundleJob(HttpServletRequest request, HttpServletResponse response)
867 throws XServletException, IOException {
868 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
869 getAuthToken(request));
870 String jobId = getResourceName(request);
871 try {
872 bundleEngine.streamLog(jobId, response.getWriter());
873 }
874 catch (BundleEngineException ex) {
875 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
876 }
877 }
878
879 /**
880 * Stream coordinator job log
881 *
882 * @param request servlet request
883 * @param response servlet response
884 * @throws XServletException
885 * @throws IOException
886 */
887 private void streamCoordinatorJobLog(HttpServletRequest request, HttpServletResponse response)
888 throws XServletException, IOException {
889
890 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
891 getUser(request), getAuthToken(request));
892 String jobId = getResourceName(request);
893 String logRetrievalScope = request.getParameter(RestConstants.JOB_LOG_SCOPE_PARAM);
894 String logRetrievalType = request.getParameter(RestConstants.JOB_LOG_TYPE_PARAM);
895 try {
896 coordEngine.streamLog(jobId, logRetrievalScope, logRetrievalType, response.getWriter());
897 }
898 catch (BaseEngineException ex) {
899 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
900 }
901 catch (CommandException ex) {
902 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
903 }
904 }
905 }