This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.servlet;
019
020 import java.io.IOException;
021 import java.util.List;
022
023 import javax.servlet.ServletInputStream;
024 import javax.servlet.http.HttpServletRequest;
025 import javax.servlet.http.HttpServletResponse;
026
027 import org.apache.hadoop.conf.Configuration;
028 import org.apache.oozie.BaseEngineException;
029 import org.apache.oozie.BundleEngine;
030 import org.apache.oozie.BundleEngineException;
031 import org.apache.oozie.CoordinatorActionBean;
032 import org.apache.oozie.CoordinatorActionInfo;
033 import org.apache.oozie.CoordinatorEngine;
034 import org.apache.oozie.CoordinatorEngineException;
035 import org.apache.oozie.command.CommandException;
036 import org.apache.oozie.command.coord.CoordRerunXCommand;
037 import org.apache.oozie.coord.CoordUtils;
038 import org.apache.oozie.DagEngine;
039 import org.apache.oozie.DagEngineException;
040 import org.apache.oozie.ErrorCode;
041 import org.apache.oozie.client.rest.JsonBean;
042 import org.apache.oozie.client.rest.JsonCoordinatorJob;
043 import org.apache.oozie.client.rest.JsonTags;
044 import org.apache.oozie.client.rest.RestConstants;
045 import org.apache.oozie.service.BundleEngineService;
046 import org.apache.oozie.service.CoordinatorEngineService;
047 import org.apache.oozie.service.DagEngineService;
048 import org.apache.oozie.service.Services;
049 import org.apache.oozie.util.XLog;
050 import org.json.simple.JSONObject;
051
052 @SuppressWarnings("serial")
053 public class V1JobServlet extends BaseJobServlet {
054
055 private static final String INSTRUMENTATION_NAME = "v1job";
056 public static final String COORD_ACTIONS_DEFAULT_LENGTH = "oozie.coord.actions.default.length";
057
058 public V1JobServlet() {
059 super(INSTRUMENTATION_NAME);
060 }
061
062 /*
063 * protected method to start a job
064 */
065 @Override
066 protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
067 IOException {
068 /*
069 * Configuration conf = new XConfiguration(request.getInputStream());
070 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
071 * conf.get(OozieClient.COORDINATOR_APP_PATH);
072 *
073 * ServletUtilities.ValidateAppPath(wfPath, coordPath);
074 */
075 String jobId = getResourceName(request);
076 if (jobId.endsWith("-W")) {
077 startWorkflowJob(request, response);
078 }
079 else if (jobId.endsWith("-B")) {
080 startBundleJob(request, response);
081 }
082 else {
083 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303);
084 }
085
086 }
087
088 /*
089 * protected method to resume a job
090 */
091 @Override
092 protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
093 IOException {
094 /*
095 * Configuration conf = new XConfiguration(request.getInputStream());
096 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
097 * conf.get(OozieClient.COORDINATOR_APP_PATH);
098 *
099 * ServletUtilities.ValidateAppPath(wfPath, coordPath);
100 */
101 String jobId = getResourceName(request);
102 if (jobId.endsWith("-W")) {
103 resumeWorkflowJob(request, response);
104 }
105 else if (jobId.endsWith("-B")) {
106 resumeBundleJob(request, response);
107 }
108 else {
109 resumeCoordinatorJob(request, response);
110 }
111 }
112
113 /*
114 * protected method to suspend a job
115 */
116 @Override
117 protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
118 IOException {
119 /*
120 * Configuration conf = new XConfiguration(request.getInputStream());
121 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
122 * conf.get(OozieClient.COORDINATOR_APP_PATH);
123 *
124 * ServletUtilities.ValidateAppPath(wfPath, coordPath);
125 */
126 String jobId = getResourceName(request);
127 if (jobId.endsWith("-W")) {
128 suspendWorkflowJob(request, response);
129 }
130 else if (jobId.endsWith("-B")) {
131 suspendBundleJob(request, response);
132 }
133 else {
134 suspendCoordinatorJob(request, response);
135 }
136 }
137
138 /*
139 * protected method to kill a job
140 */
141 @Override
142 protected void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
143 IOException {
144 /*
145 * Configuration conf = new XConfiguration(request.getInputStream());
146 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
147 * conf.get(OozieClient.COORDINATOR_APP_PATH);
148 *
149 * ServletUtilities.ValidateAppPath(wfPath, coordPath);
150 */
151 String jobId = getResourceName(request);
152 if (jobId.endsWith("-W")) {
153 killWorkflowJob(request, response);
154 }
155 else if (jobId.endsWith("-B")) {
156 killBundleJob(request, response);
157 }
158 else {
159 killCoordinatorJob(request, response);
160 }
161 }
162
163 /**
164 * protected method to change a coordinator job
165 * @param request request object
166 * @param response response object
167 * @throws XServletException
168 * @throws IOException
169 */
170 @Override
171 protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
172 IOException {
173 String jobId = getResourceName(request);
174 if (jobId.endsWith("-B")) {
175 changeBundleJob(request, response);
176 }
177 else {
178 changeCoordinatorJob(request, response);
179 }
180 }
181
182 /*
183 * protected method to reRun a job
184 *
185 * @seeorg.apache.oozie.servlet.BaseJobServlet#reRunJob(javax.servlet.http.
186 * HttpServletRequest, javax.servlet.http.HttpServletResponse,
187 * org.apache.hadoop.conf.Configuration)
188 */
189 @Override
190 protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
191 throws XServletException, IOException {
192 JSONObject json = null;
193 String jobId = getResourceName(request);
194 if (jobId.endsWith("-W")) {
195 reRunWorkflowJob(request, response, conf);
196 }
197 else if (jobId.endsWith("-B")) {
198 rerunBundleJob(request, response, conf);
199 }
200 else {
201 json = reRunCoordinatorActions(request, response, conf);
202 }
203 return json;
204 }
205
206 /*
207 * protected method to get a job in JsonBean representation
208 */
209 @Override
210 protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
211 IOException, BaseEngineException {
212 ServletInputStream is = request.getInputStream();
213 byte[] b = new byte[101];
214 while (is.readLine(b, 0, 100) != -1) {
215 XLog.getLog(getClass()).warn("Printing :" + new String(b));
216 }
217
218 JsonBean jobBean = null;
219 String jobId = getResourceName(request);
220 if (jobId.endsWith("-B")) {
221 jobBean = getBundleJob(request, response);
222 }
223 else {
224 if (jobId.endsWith("-W")) {
225 jobBean = getWorkflowJob(request, response);
226 }
227 else {
228 if (jobId.contains("-W@")) {
229 jobBean = getWorkflowAction(request, response);
230 }
231 else {
232 if (jobId.contains("-C@")) {
233 jobBean = getCoordinatorAction(request, response);
234 }
235 else {
236 jobBean = getCoordinatorJob(request, response);
237 }
238 }
239 }
240 }
241
242 return jobBean;
243 }
244
245 /*
246 * protected method to get a job definition in String format
247 */
248 @Override
249 protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
250 throws XServletException, IOException {
251 String jobDefinition = null;
252 String jobId = getResourceName(request);
253 if (jobId.endsWith("-W")) {
254 jobDefinition = getWorkflowJobDefinition(request, response);
255 }
256 else if (jobId.endsWith("-B")) {
257 jobDefinition = getBundleJobDefinition(request, response);
258 }
259 else {
260 jobDefinition = getCoordinatorJobDefinition(request, response);
261 }
262 return jobDefinition;
263 }
264
265 /*
266 * protected method to stream a job log into response object
267 */
268 @Override
269 protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
270 IOException {
271 String jobId = getResourceName(request);
272 if (jobId.endsWith("-W")) {
273 streamWorkflowJobLog(request, response);
274 }
275 else if (jobId.endsWith("-B")) {
276 streamBundleJob(request, response);
277 }
278 else {
279 streamCoordinatorJobLog(request, response);
280 }
281 }
282
283 /**
284 * Start wf job
285 *
286 * @param request servlet request
287 * @param response servlet response
288 * @throws XServletException
289 */
290 private void startWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
291 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
292 getAuthToken(request));
293
294 String jobId = getResourceName(request);
295 try {
296 dagEngine.start(jobId);
297 }
298 catch (DagEngineException ex) {
299 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
300 }
301 }
302
303 /**
304 * Start bundle job
305 *
306 * @param request servlet request
307 * @param response servlet response
308 * @throws XServletException
309 */
310 private void startBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
311 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
312 getAuthToken(request));
313 String jobId = getResourceName(request);
314 try {
315 bundleEngine.start(jobId);
316 }
317 catch (BundleEngineException ex) {
318 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
319 }
320 }
321
322 /**
323 * Resume workflow job
324 *
325 * @param request servlet request
326 * @param response servlet response
327 * @throws XServletException
328 */
329 private void resumeWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
330 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
331 getAuthToken(request));
332
333 String jobId = getResourceName(request);
334 try {
335 dagEngine.resume(jobId);
336 }
337 catch (DagEngineException ex) {
338 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
339 }
340 }
341
342 /**
343 * Resume bundle job
344 *
345 * @param request servlet request
346 * @param response servlet response
347 * @throws XServletException
348 */
349 private void resumeBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
350 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
351 getAuthToken(request));
352 String jobId = getResourceName(request);
353 try {
354 bundleEngine.resume(jobId);
355 }
356 catch (BundleEngineException ex) {
357 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
358 }
359 }
360
361 /**
362 * Resume coordinator job
363 *
364 * @param request servlet request
365 * @param response servlet response
366 * @throws XServletException
367 * @throws CoordinatorEngineException
368 */
369 private void resumeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
370 throws XServletException {
371 String jobId = getResourceName(request);
372 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
373 getUser(request), getAuthToken(request));
374 try {
375 coordEngine.resume(jobId);
376 }
377 catch (CoordinatorEngineException ex) {
378 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
379 }
380 }
381
382 /**
383 * Suspend a wf job
384 *
385 * @param request servlet request
386 * @param response servlet response
387 * @throws XServletException
388 */
389 private void suspendWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
390 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
391 getAuthToken(request));
392
393 String jobId = getResourceName(request);
394 try {
395 dagEngine.suspend(jobId);
396 }
397 catch (DagEngineException ex) {
398 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
399 }
400 }
401
402 /**
403 * Suspend bundle job
404 *
405 * @param request servlet request
406 * @param response servlet response
407 * @throws XServletException
408 */
409 private void suspendBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
410 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
411 getAuthToken(request));
412 String jobId = getResourceName(request);
413 try {
414 bundleEngine.suspend(jobId);
415 }
416 catch (BundleEngineException ex) {
417 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
418 }
419 }
420
421 /**
422 * Suspend coordinator job
423 *
424 * @param request servlet request
425 * @param response servlet response
426 * @throws XServletException
427 */
428 private void suspendCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
429 throws XServletException {
430 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
431 getUser(request), getAuthToken(request));
432 String jobId = getResourceName(request);
433 try {
434 coordEngine.suspend(jobId);
435 }
436 catch (CoordinatorEngineException ex) {
437 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
438 }
439 }
440
441 /**
442 * Kill a wf job
443 * @param request servlet request
444 * @param response servlet response
445 * @throws XServletException
446 */
447 private void killWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
448 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
449 getAuthToken(request));
450
451 String jobId = getResourceName(request);
452 try {
453 dagEngine.kill(jobId);
454 }
455 catch (DagEngineException ex) {
456 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
457 }
458 }
459
460 /**
461 * Kill a coord job
462 * @param request servlet request
463 * @param response servlet response
464 * @throws XServletException
465 */
466 private void killCoordinatorJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
467 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
468 getUser(request), getAuthToken(request));
469 String jobId = getResourceName(request);
470 try {
471 coordEngine.kill(jobId);
472 }
473 catch (CoordinatorEngineException ex) {
474 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
475 }
476 }
477
478 /**
479 * Kill bundle job
480 *
481 * @param request servlet request
482 * @param response servlet response
483 * @throws XServletException
484 */
485 private void killBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
486 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
487 getAuthToken(request));
488 String jobId = getResourceName(request);
489 try {
490 bundleEngine.kill(jobId);
491 }
492 catch (BundleEngineException ex) {
493 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
494 }
495 }
496
497 /**
498 * Change a coordinator job
499 *
500 * @param request servlet request
501 * @param response servlet response
502 * @throws XServletException
503 */
504 private void changeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
505 throws XServletException {
506 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
507 getUser(request), getAuthToken(request));
508 String jobId = getResourceName(request);
509 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
510 try {
511 coordEngine.change(jobId, changeValue);
512 }
513 catch (CoordinatorEngineException ex) {
514 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
515 }
516 }
517
518 /**
519 * Change a bundle job
520 *
521 * @param request servlet request
522 * @param response servlet response
523 * @throws XServletException
524 */
525 private void changeBundleJob(HttpServletRequest request, HttpServletResponse response)
526 throws XServletException {
527 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(
528 getUser(request), getAuthToken(request));
529 String jobId = getResourceName(request);
530 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
531 try {
532 bundleEngine.change(jobId, changeValue);
533 }
534 catch (BundleEngineException ex) {
535 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
536 }
537 }
538
539 /**
540 * Rerun a wf job
541 *
542 * @param request servlet request
543 * @param response servlet response
544 * @param conf configuration object
545 * @throws XServletException
546 */
547 private void reRunWorkflowJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
548 throws XServletException {
549 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
550 getAuthToken(request));
551
552 String jobId = getResourceName(request);
553 try {
554 dagEngine.reRun(jobId, conf);
555 }
556 catch (DagEngineException ex) {
557 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
558 }
559 }
560
561 /**
562 * Rerun bundle job
563 *
564 * @param request servlet request
565 * @param response servlet response
566 * @param conf configration object
567 * @throws XServletException
568 */
569 private void rerunBundleJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
570 throws XServletException {
571 JSONObject json = new JSONObject();
572 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
573 getAuthToken(request));
574 String jobId = getResourceName(request);
575
576 String coordScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM);
577 String dateScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM);
578 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
579 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
580
581 XLog.getLog(getClass()).info(
582 "Rerun Bundle for jobId=" + jobId + ", coordScope=" + coordScope + ", dateScope=" + dateScope + ", refresh="
583 + refresh + ", noCleanup=" + noCleanup);
584
585 try {
586 bundleEngine.reRun(jobId, coordScope, dateScope, Boolean.valueOf(refresh), Boolean.valueOf(noCleanup));
587 }
588 catch (BaseEngineException ex) {
589 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
590 }
591 }
592
593 /**
594 * Rerun coordinator actions
595 *
596 * @param request servlet request
597 * @param response servlet response
598 * @param conf configuration object
599 * @throws XServletException
600 */
601 @SuppressWarnings("unchecked")
602 private JSONObject reRunCoordinatorActions(HttpServletRequest request, HttpServletResponse response,
603 Configuration conf) throws XServletException {
604 JSONObject json = new JSONObject();
605 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request),
606 getAuthToken(request));
607
608 String jobId = getResourceName(request);
609
610 String rerunType = request.getParameter(RestConstants.JOB_COORD_RERUN_TYPE_PARAM);
611 String scope = request.getParameter(RestConstants.JOB_COORD_RERUN_SCOPE_PARAM);
612 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
613 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
614
615 XLog.getLog(getClass()).info(
616 "Rerun coordinator for jobId=" + jobId + ", rerunType=" + rerunType + ",scope=" + scope + ",refresh="
617 + refresh + ", noCleanup=" + noCleanup);
618
619 try {
620 if (!(rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE) || rerunType
621 .equals(RestConstants.JOB_COORD_RERUN_ACTION))) {
622 throw new CommandException(ErrorCode.E1018, "date or action expected.");
623 }
624 CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh),
625 Boolean.valueOf(noCleanup));
626 List<CoordinatorActionBean> coordActions;
627 if (coordInfo != null) {
628 coordActions = coordInfo.getCoordActions();
629 }
630 else {
631 coordActions = CoordRerunXCommand.getCoordActions(rerunType, jobId, scope);
632 }
633 json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions));
634 }
635 catch (BaseEngineException ex) {
636 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
637 }
638 catch (CommandException ex) {
639 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
640 }
641
642 return json;
643 }
644
645
646
647 /**
648 * Get workflow job
649 *
650 * @param request servlet request
651 * @param response servlet response
652 * @return JsonBean WorkflowJobBean
653 * @throws XServletException
654 */
655 private JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
656 JsonBean jobBean = null;
657 String jobId = getResourceName(request);
658 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
659 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
660 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
661 start = (start < 1) ? 1 : start;
662 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
663 len = (len < 1) ? Integer.MAX_VALUE : len;
664 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
665 getAuthToken(request));
666 try {
667 jobBean = (JsonBean) dagEngine.getJob(jobId, start, len);
668 }
669 catch (DagEngineException ex) {
670 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
671 }
672
673 return jobBean;
674 }
675
676 /**
677 * Get wf action info
678 *
679 * @param request servlet request
680 * @param response servlet response
681 * @return JsonBean WorkflowActionBean
682 * @throws XServletException
683 */
684 private JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response)
685 throws XServletException {
686 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
687 getAuthToken(request));
688
689 JsonBean actionBean = null;
690 String actionId = getResourceName(request);
691 try {
692 actionBean = dagEngine.getWorkflowAction(actionId);
693 }
694 catch (BaseEngineException ex) {
695 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
696 }
697
698 return actionBean;
699 }
700
701 /**
702 * Get coord job info
703 *
704 * @param request servlet request
705 * @param response servlet response
706 * @return JsonBean CoordinatorJobBean
707 * @throws XServletException
708 * @throws BaseEngineException
709 */
710 private JsonBean getCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
711 throws XServletException, BaseEngineException {
712 JsonBean jobBean = null;
713 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
714 getUser(request), getAuthToken(request));
715 String jobId = getResourceName(request);
716 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
717 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
718 String filter = request.getParameter(RestConstants.JOB_FILTER_PARAM);
719 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
720 start = (start < 1) ? 1 : start;
721 // Get default number of coordinator actions to be retrieved
722 int defaultLen = Services.get().getConf().getInt(COORD_ACTIONS_DEFAULT_LENGTH, 1000);
723 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
724 len = (len < 1) ? defaultLen : len;
725 try {
726 JsonCoordinatorJob coordJob = coordEngine.getCoordJob(jobId, filter, start, len);
727 jobBean = coordJob;
728 }
729 catch (CoordinatorEngineException ex) {
730 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
731 }
732
733 return jobBean;
734 }
735
736 /**
737 * Get bundle job info
738 *
739 * @param request servlet request
740 * @param response servlet response
741 * @return JsonBean bundle job bean
742 * @throws XServletException
743 * @throws BaseEngineException
744 */
745 private JsonBean getBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
746 BaseEngineException {
747 JsonBean jobBean = null;
748 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
749 getAuthToken(request));
750 String jobId = getResourceName(request);
751
752 try {
753 jobBean = (JsonBean) bundleEngine.getBundleJob(jobId);
754
755 return jobBean;
756 }
757 catch (BundleEngineException ex) {
758 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
759 }
760 }
761
762 /**
763 * Get coordinator action
764 *
765 * @param request servlet request
766 * @param response servlet response
767 * @return JsonBean CoordinatorActionBean
768 * @throws XServletException
769 * @throws BaseEngineException
770 */
771 private JsonBean getCoordinatorAction(HttpServletRequest request, HttpServletResponse response)
772 throws XServletException, BaseEngineException {
773 JsonBean actionBean = null;
774 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
775 getUser(request), getAuthToken(request));
776 String actionId = getResourceName(request);
777 try {
778 actionBean = coordEngine.getCoordAction(actionId);
779 }
780 catch (CoordinatorEngineException ex) {
781 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
782 }
783
784 return actionBean;
785 }
786
787 /**
788 * Get wf job definition
789 *
790 * @param request servlet request
791 * @param response servlet response
792 * @return String wf definition
793 * @throws XServletException
794 */
795 private String getWorkflowJobDefinition(HttpServletRequest request, HttpServletResponse response)
796 throws XServletException {
797 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
798 getAuthToken(request));
799
800 String wfDefinition;
801 String jobId = getResourceName(request);
802 try {
803 wfDefinition = dagEngine.getDefinition(jobId);
804 }
805 catch (DagEngineException ex) {
806 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
807 }
808 return wfDefinition;
809 }
810
811 /**
812 * Get bundle job definition
813 *
814 * @param request servlet request
815 * @param response servlet response
816 * @return String bundle definition
817 * @throws XServletException
818 */
819 private String getBundleJobDefinition(HttpServletRequest request, HttpServletResponse response) throws XServletException {
820 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
821 getAuthToken(request));
822 String bundleDefinition;
823 String jobId = getResourceName(request);
824 try {
825 bundleDefinition = bundleEngine.getDefinition(jobId);
826 }
827 catch (BundleEngineException ex) {
828 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
829 }
830 return bundleDefinition;
831 }
832
833 /**
834 * Get coordinator job definition
835 *
836 * @param request servlet request
837 * @param response servlet response
838 * @return String coord definition
839 * @throws XServletException
840 */
841 private String getCoordinatorJobDefinition(HttpServletRequest request, HttpServletResponse response)
842 throws XServletException {
843
844 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
845 getUser(request), getAuthToken(request));
846
847 String jobId = getResourceName(request);
848
849 String coordDefinition = null;
850 try {
851 coordDefinition = coordEngine.getDefinition(jobId);
852 }
853 catch (BaseEngineException ex) {
854 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
855 }
856 return coordDefinition;
857 }
858
859 /**
860 * Stream wf job log
861 *
862 * @param request servlet request
863 * @param response servlet response
864 * @throws XServletException
865 * @throws IOException
866 */
867 private void streamWorkflowJobLog(HttpServletRequest request, HttpServletResponse response)
868 throws XServletException, IOException {
869 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
870 getAuthToken(request));
871 String jobId = getResourceName(request);
872 try {
873 dagEngine.streamLog(jobId, response.getWriter());
874 }
875 catch (DagEngineException ex) {
876 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
877 }
878 }
879
880 /**
881 * Stream bundle job log
882 *
883 * @param request servlet request
884 * @param response servlet response
885 * @throws XServletException
886 */
887 private void streamBundleJob(HttpServletRequest request, HttpServletResponse response)
888 throws XServletException, IOException {
889 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
890 getAuthToken(request));
891 String jobId = getResourceName(request);
892 try {
893 bundleEngine.streamLog(jobId, response.getWriter());
894 }
895 catch (BundleEngineException ex) {
896 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
897 }
898 }
899
900 /**
901 * Stream coordinator job log
902 *
903 * @param request servlet request
904 * @param response servlet response
905 * @throws XServletException
906 * @throws IOException
907 */
908 private void streamCoordinatorJobLog(HttpServletRequest request, HttpServletResponse response)
909 throws XServletException, IOException {
910
911 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
912 getUser(request), getAuthToken(request));
913 String jobId = getResourceName(request);
914 String logRetrievalScope = request.getParameter(RestConstants.JOB_LOG_SCOPE_PARAM);
915 String logRetrievalType = request.getParameter(RestConstants.JOB_LOG_TYPE_PARAM);
916 try {
917 coordEngine.streamLog(jobId, logRetrievalScope, logRetrievalType, response.getWriter());
918 }
919 catch (BaseEngineException ex) {
920 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
921 }
922 catch (CommandException ex) {
923 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
924 }
925 }
926 }