This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.servlet;
019
020 import java.io.IOException;
021 import java.util.List;
022 import javax.servlet.ServletInputStream;
023 import javax.servlet.http.HttpServletRequest;
024 import javax.servlet.http.HttpServletResponse;
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.oozie.*;
027 import org.apache.oozie.client.rest.*;
028 import org.apache.oozie.command.CommandException;
029 import org.apache.oozie.command.coord.CoordRerunXCommand;
030 import org.apache.oozie.service.BundleEngineService;
031 import org.apache.oozie.service.CoordinatorEngineService;
032 import org.apache.oozie.service.DagEngineService;
033 import org.apache.oozie.service.Services;
034 import org.apache.oozie.util.GraphGenerator;
035 import org.apache.oozie.util.XLog;
036 import org.json.simple.JSONObject;
037
038
039 @SuppressWarnings("serial")
040 public class V1JobServlet extends BaseJobServlet {
041
042 private static final String INSTRUMENTATION_NAME = "v1job";
043 public static final String COORD_ACTIONS_DEFAULT_LENGTH = "oozie.coord.actions.default.length";
044
045 public V1JobServlet() {
046 super(INSTRUMENTATION_NAME);
047 }
048
049 /*
050 * protected method to start a job
051 */
052 @Override
053 protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
054 IOException {
055 /*
056 * Configuration conf = new XConfiguration(request.getInputStream());
057 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
058 * conf.get(OozieClient.COORDINATOR_APP_PATH);
059 *
060 * ServletUtilities.ValidateAppPath(wfPath, coordPath);
061 */
062 String jobId = getResourceName(request);
063 if (jobId.endsWith("-W")) {
064 startWorkflowJob(request, response);
065 }
066 else if (jobId.endsWith("-B")) {
067 startBundleJob(request, response);
068 }
069 else {
070 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, RestConstants.ACTION_PARAM, RestConstants.JOB_ACTION_START);
071 }
072
073 }
074
075 /*
076 * protected method to resume a job
077 */
078 @Override
079 protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
080 IOException {
081 /*
082 * Configuration conf = new XConfiguration(request.getInputStream());
083 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
084 * conf.get(OozieClient.COORDINATOR_APP_PATH);
085 *
086 * ServletUtilities.ValidateAppPath(wfPath, coordPath);
087 */
088 String jobId = getResourceName(request);
089 if (jobId.endsWith("-W")) {
090 resumeWorkflowJob(request, response);
091 }
092 else if (jobId.endsWith("-B")) {
093 resumeBundleJob(request, response);
094 }
095 else {
096 resumeCoordinatorJob(request, response);
097 }
098 }
099
100 /*
101 * protected method to suspend a job
102 */
103 @Override
104 protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
105 IOException {
106 /*
107 * Configuration conf = new XConfiguration(request.getInputStream());
108 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
109 * conf.get(OozieClient.COORDINATOR_APP_PATH);
110 *
111 * ServletUtilities.ValidateAppPath(wfPath, coordPath);
112 */
113 String jobId = getResourceName(request);
114 if (jobId.endsWith("-W")) {
115 suspendWorkflowJob(request, response);
116 }
117 else if (jobId.endsWith("-B")) {
118 suspendBundleJob(request, response);
119 }
120 else {
121 suspendCoordinatorJob(request, response);
122 }
123 }
124
125 /*
126 * protected method to kill a job
127 */
128 @Override
129 protected void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
130 IOException {
131 /*
132 * Configuration conf = new XConfiguration(request.getInputStream());
133 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
134 * conf.get(OozieClient.COORDINATOR_APP_PATH);
135 *
136 * ServletUtilities.ValidateAppPath(wfPath, coordPath);
137 */
138 String jobId = getResourceName(request);
139 if (jobId.endsWith("-W")) {
140 killWorkflowJob(request, response);
141 }
142 else if (jobId.endsWith("-B")) {
143 killBundleJob(request, response);
144 }
145 else {
146 killCoordinatorJob(request, response);
147 }
148 }
149
150 /**
151 * protected method to change a coordinator job
152 * @param request request object
153 * @param response response object
154 * @throws XServletException
155 * @throws IOException
156 */
157 @Override
158 protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
159 IOException {
160 String jobId = getResourceName(request);
161 if (jobId.endsWith("-B")) {
162 changeBundleJob(request, response);
163 }
164 else {
165 changeCoordinatorJob(request, response);
166 }
167 }
168
169 /*
170 * protected method to reRun a job
171 *
172 * @seeorg.apache.oozie.servlet.BaseJobServlet#reRunJob(javax.servlet.http.
173 * HttpServletRequest, javax.servlet.http.HttpServletResponse,
174 * org.apache.hadoop.conf.Configuration)
175 */
176 @Override
177 protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
178 throws XServletException, IOException {
179 JSONObject json = null;
180 String jobId = getResourceName(request);
181 if (jobId.endsWith("-W")) {
182 reRunWorkflowJob(request, response, conf);
183 }
184 else if (jobId.endsWith("-B")) {
185 rerunBundleJob(request, response, conf);
186 }
187 else {
188 json = reRunCoordinatorActions(request, response, conf);
189 }
190 return json;
191 }
192
193 /*
194 * protected method to get a job in JsonBean representation
195 */
196 @Override
197 protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
198 IOException, BaseEngineException {
199 ServletInputStream is = request.getInputStream();
200 byte[] b = new byte[101];
201 while (is.readLine(b, 0, 100) != -1) {
202 XLog.getLog(getClass()).warn("Printing :" + new String(b));
203 }
204
205 JsonBean jobBean = null;
206 String jobId = getResourceName(request);
207 if (jobId.endsWith("-B")) {
208 jobBean = getBundleJob(request, response);
209 }
210 else {
211 if (jobId.endsWith("-W")) {
212 jobBean = getWorkflowJob(request, response);
213 }
214 else {
215 if (jobId.contains("-W@")) {
216 jobBean = getWorkflowAction(request, response);
217 }
218 else {
219 if (jobId.contains("-C@")) {
220 jobBean = getCoordinatorAction(request, response);
221 }
222 else {
223 jobBean = getCoordinatorJob(request, response);
224 }
225 }
226 }
227 }
228
229 return jobBean;
230 }
231
232 /*
233 * protected method to get a job definition in String format
234 */
235 @Override
236 protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
237 throws XServletException, IOException {
238 String jobDefinition = null;
239 String jobId = getResourceName(request);
240 if (jobId.endsWith("-W")) {
241 jobDefinition = getWorkflowJobDefinition(request, response);
242 }
243 else if (jobId.endsWith("-B")) {
244 jobDefinition = getBundleJobDefinition(request, response);
245 }
246 else {
247 jobDefinition = getCoordinatorJobDefinition(request, response);
248 }
249 return jobDefinition;
250 }
251
252 /*
253 * protected method to stream a job log into response object
254 */
255 @Override
256 protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
257 IOException {
258 String jobId = getResourceName(request);
259 if (jobId.endsWith("-W")) {
260 streamWorkflowJobLog(request, response);
261 }
262 else if (jobId.endsWith("-B")) {
263 streamBundleJob(request, response);
264 }
265 else {
266 streamCoordinatorJobLog(request, response);
267 }
268 }
269
270 @Override
271 protected void streamJobGraph(HttpServletRequest request, HttpServletResponse response)
272 throws XServletException, IOException {
273 String jobId = getResourceName(request);
274 if (jobId.endsWith("-W")) {
275 // Applicable only to worflow, for now
276 response.setContentType(RestConstants.PNG_IMAGE_CONTENT_TYPE);
277 try {
278 String showKill = request.getParameter(RestConstants.JOB_SHOW_KILL_PARAM);
279 boolean sK = showKill != null && (showKill.equalsIgnoreCase("yes") || showKill.equals("1") || showKill.equalsIgnoreCase("true"));
280
281 new GraphGenerator(
282 getWorkflowJobDefinition(request, response),
283 (JsonWorkflowJob)getWorkflowJob(request, response),
284 sK).write(response.getOutputStream());
285 }
286 catch (Exception e) {
287 throw new XServletException(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, ErrorCode.E0307, e.getMessage(), e);
288 }
289 }
290 else {
291 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306);
292 }
293 }
294
295 /**
296 * Start wf job
297 *
298 * @param request servlet request
299 * @param response servlet response
300 * @throws XServletException
301 */
302 private void startWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
303 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
304 getAuthToken(request));
305
306 String jobId = getResourceName(request);
307 try {
308 dagEngine.start(jobId);
309 }
310 catch (DagEngineException ex) {
311 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
312 }
313 }
314
315 /**
316 * Start bundle job
317 *
318 * @param request servlet request
319 * @param response servlet response
320 * @throws XServletException
321 */
322 private void startBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
323 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
324 getAuthToken(request));
325 String jobId = getResourceName(request);
326 try {
327 bundleEngine.start(jobId);
328 }
329 catch (BundleEngineException ex) {
330 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
331 }
332 }
333
334 /**
335 * Resume workflow job
336 *
337 * @param request servlet request
338 * @param response servlet response
339 * @throws XServletException
340 */
341 private void resumeWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
342 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
343 getAuthToken(request));
344
345 String jobId = getResourceName(request);
346 try {
347 dagEngine.resume(jobId);
348 }
349 catch (DagEngineException ex) {
350 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
351 }
352 }
353
354 /**
355 * Resume bundle job
356 *
357 * @param request servlet request
358 * @param response servlet response
359 * @throws XServletException
360 */
361 private void resumeBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
362 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
363 getAuthToken(request));
364 String jobId = getResourceName(request);
365 try {
366 bundleEngine.resume(jobId);
367 }
368 catch (BundleEngineException ex) {
369 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
370 }
371 }
372
373 /**
374 * Resume coordinator job
375 *
376 * @param request servlet request
377 * @param response servlet response
378 * @throws XServletException
379 * @throws CoordinatorEngineException
380 */
381 private void resumeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
382 throws XServletException {
383 String jobId = getResourceName(request);
384 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
385 getUser(request), getAuthToken(request));
386 try {
387 coordEngine.resume(jobId);
388 }
389 catch (CoordinatorEngineException ex) {
390 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
391 }
392 }
393
394 /**
395 * Suspend a wf job
396 *
397 * @param request servlet request
398 * @param response servlet response
399 * @throws XServletException
400 */
401 private void suspendWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
402 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
403 getAuthToken(request));
404
405 String jobId = getResourceName(request);
406 try {
407 dagEngine.suspend(jobId);
408 }
409 catch (DagEngineException ex) {
410 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
411 }
412 }
413
414 /**
415 * Suspend bundle job
416 *
417 * @param request servlet request
418 * @param response servlet response
419 * @throws XServletException
420 */
421 private void suspendBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
422 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
423 getAuthToken(request));
424 String jobId = getResourceName(request);
425 try {
426 bundleEngine.suspend(jobId);
427 }
428 catch (BundleEngineException ex) {
429 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
430 }
431 }
432
433 /**
434 * Suspend coordinator job
435 *
436 * @param request servlet request
437 * @param response servlet response
438 * @throws XServletException
439 */
440 private void suspendCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
441 throws XServletException {
442 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
443 getUser(request), getAuthToken(request));
444 String jobId = getResourceName(request);
445 try {
446 coordEngine.suspend(jobId);
447 }
448 catch (CoordinatorEngineException ex) {
449 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
450 }
451 }
452
453 /**
454 * Kill a wf job
455 * @param request servlet request
456 * @param response servlet response
457 * @throws XServletException
458 */
459 private void killWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
460 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
461 getAuthToken(request));
462
463 String jobId = getResourceName(request);
464 try {
465 dagEngine.kill(jobId);
466 }
467 catch (DagEngineException ex) {
468 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
469 }
470 }
471
472 /**
473 * Kill a coord job
474 * @param request servlet request
475 * @param response servlet response
476 * @throws XServletException
477 */
478 private void killCoordinatorJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
479 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
480 getUser(request), getAuthToken(request));
481 String jobId = getResourceName(request);
482 try {
483 coordEngine.kill(jobId);
484 }
485 catch (CoordinatorEngineException ex) {
486 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
487 }
488 }
489
490 /**
491 * Kill bundle job
492 *
493 * @param request servlet request
494 * @param response servlet response
495 * @throws XServletException
496 */
497 private void killBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
498 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
499 getAuthToken(request));
500 String jobId = getResourceName(request);
501 try {
502 bundleEngine.kill(jobId);
503 }
504 catch (BundleEngineException ex) {
505 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
506 }
507 }
508
509 /**
510 * Change a coordinator job
511 *
512 * @param request servlet request
513 * @param response servlet response
514 * @throws XServletException
515 */
516 private void changeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
517 throws XServletException {
518 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
519 getUser(request), getAuthToken(request));
520 String jobId = getResourceName(request);
521 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
522 try {
523 coordEngine.change(jobId, changeValue);
524 }
525 catch (CoordinatorEngineException ex) {
526 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
527 }
528 }
529
530 /**
531 * Change a bundle job
532 *
533 * @param request servlet request
534 * @param response servlet response
535 * @throws XServletException
536 */
537 private void changeBundleJob(HttpServletRequest request, HttpServletResponse response)
538 throws XServletException {
539 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(
540 getUser(request), getAuthToken(request));
541 String jobId = getResourceName(request);
542 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
543 try {
544 bundleEngine.change(jobId, changeValue);
545 }
546 catch (BundleEngineException ex) {
547 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
548 }
549 }
550
551 /**
552 * Rerun a wf job
553 *
554 * @param request servlet request
555 * @param response servlet response
556 * @param conf configuration object
557 * @throws XServletException
558 */
559 private void reRunWorkflowJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
560 throws XServletException {
561 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
562 getAuthToken(request));
563
564 String jobId = getResourceName(request);
565 try {
566 dagEngine.reRun(jobId, conf);
567 }
568 catch (DagEngineException ex) {
569 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
570 }
571 }
572
573 /**
574 * Rerun bundle job
575 *
576 * @param request servlet request
577 * @param response servlet response
578 * @param conf configration object
579 * @throws XServletException
580 */
581 private void rerunBundleJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
582 throws XServletException {
583 JSONObject json = new JSONObject();
584 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
585 getAuthToken(request));
586 String jobId = getResourceName(request);
587
588 String coordScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM);
589 String dateScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM);
590 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
591 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
592
593 XLog.getLog(getClass()).info(
594 "Rerun Bundle for jobId=" + jobId + ", coordScope=" + coordScope + ", dateScope=" + dateScope + ", refresh="
595 + refresh + ", noCleanup=" + noCleanup);
596
597 try {
598 bundleEngine.reRun(jobId, coordScope, dateScope, Boolean.valueOf(refresh), Boolean.valueOf(noCleanup));
599 }
600 catch (BaseEngineException ex) {
601 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
602 }
603 }
604
605 /**
606 * Rerun coordinator actions
607 *
608 * @param request servlet request
609 * @param response servlet response
610 * @param conf configuration object
611 * @throws XServletException
612 */
613 @SuppressWarnings("unchecked")
614 private JSONObject reRunCoordinatorActions(HttpServletRequest request, HttpServletResponse response,
615 Configuration conf) throws XServletException {
616 JSONObject json = new JSONObject();
617 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request),
618 getAuthToken(request));
619
620 String jobId = getResourceName(request);
621
622 String rerunType = request.getParameter(RestConstants.JOB_COORD_RERUN_TYPE_PARAM);
623 String scope = request.getParameter(RestConstants.JOB_COORD_RERUN_SCOPE_PARAM);
624 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
625 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
626
627 XLog.getLog(getClass()).info(
628 "Rerun coordinator for jobId=" + jobId + ", rerunType=" + rerunType + ",scope=" + scope + ",refresh="
629 + refresh + ", noCleanup=" + noCleanup);
630
631 try {
632 if (!(rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE) || rerunType
633 .equals(RestConstants.JOB_COORD_RERUN_ACTION))) {
634 throw new CommandException(ErrorCode.E1018, "date or action expected.");
635 }
636 CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh),
637 Boolean.valueOf(noCleanup));
638 List<CoordinatorActionBean> coordActions;
639 if (coordInfo != null) {
640 coordActions = coordInfo.getCoordActions();
641 }
642 else {
643 coordActions = CoordRerunXCommand.getCoordActions(rerunType, jobId, scope);
644 }
645 json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT"));
646 }
647 catch (BaseEngineException ex) {
648 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
649 }
650 catch (CommandException ex) {
651 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
652 }
653
654 return json;
655 }
656
657
658
659 /**
660 * Get workflow job
661 *
662 * @param request servlet request
663 * @param response servlet response
664 * @return JsonBean WorkflowJobBean
665 * @throws XServletException
666 */
667 private JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
668 JsonBean jobBean = null;
669 String jobId = getResourceName(request);
670 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
671 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
672 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
673 start = (start < 1) ? 1 : start;
674 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
675 len = (len < 1) ? Integer.MAX_VALUE : len;
676 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
677 getAuthToken(request));
678 try {
679 jobBean = (JsonBean) dagEngine.getJob(jobId, start, len);
680 }
681 catch (DagEngineException ex) {
682 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
683 }
684
685 return jobBean;
686 }
687
688 /**
689 * Get wf action info
690 *
691 * @param request servlet request
692 * @param response servlet response
693 * @return JsonBean WorkflowActionBean
694 * @throws XServletException
695 */
696 private JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response)
697 throws XServletException {
698 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
699 getAuthToken(request));
700
701 JsonBean actionBean = null;
702 String actionId = getResourceName(request);
703 try {
704 actionBean = dagEngine.getWorkflowAction(actionId);
705 }
706 catch (BaseEngineException ex) {
707 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
708 }
709
710 return actionBean;
711 }
712
713 /**
714 * Get coord job info
715 *
716 * @param request servlet request
717 * @param response servlet response
718 * @return JsonBean CoordinatorJobBean
719 * @throws XServletException
720 * @throws BaseEngineException
721 */
722 private JsonBean getCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
723 throws XServletException, BaseEngineException {
724 JsonBean jobBean = null;
725 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
726 getUser(request), getAuthToken(request));
727 String jobId = getResourceName(request);
728 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
729 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
730 String filter = request.getParameter(RestConstants.JOB_FILTER_PARAM);
731 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
732 start = (start < 1) ? 1 : start;
733 // Get default number of coordinator actions to be retrieved
734 int defaultLen = Services.get().getConf().getInt(COORD_ACTIONS_DEFAULT_LENGTH, 1000);
735 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
736 len = (len < 1) ? defaultLen : len;
737 try {
738 JsonCoordinatorJob coordJob = coordEngine.getCoordJob(jobId, filter, start, len);
739 jobBean = coordJob;
740 }
741 catch (CoordinatorEngineException ex) {
742 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
743 }
744
745 return jobBean;
746 }
747
748 /**
749 * Get bundle job info
750 *
751 * @param request servlet request
752 * @param response servlet response
753 * @return JsonBean bundle job bean
754 * @throws XServletException
755 * @throws BaseEngineException
756 */
757 private JsonBean getBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
758 BaseEngineException {
759 JsonBean jobBean = null;
760 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
761 getAuthToken(request));
762 String jobId = getResourceName(request);
763
764 try {
765 jobBean = (JsonBean) bundleEngine.getBundleJob(jobId);
766
767 return jobBean;
768 }
769 catch (BundleEngineException ex) {
770 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
771 }
772 }
773
774 /**
775 * Get coordinator action
776 *
777 * @param request servlet request
778 * @param response servlet response
779 * @return JsonBean CoordinatorActionBean
780 * @throws XServletException
781 * @throws BaseEngineException
782 */
783 private JsonBean getCoordinatorAction(HttpServletRequest request, HttpServletResponse response)
784 throws XServletException, BaseEngineException {
785 JsonBean actionBean = null;
786 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
787 getUser(request), getAuthToken(request));
788 String actionId = getResourceName(request);
789 try {
790 actionBean = coordEngine.getCoordAction(actionId);
791 }
792 catch (CoordinatorEngineException ex) {
793 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
794 }
795
796 return actionBean;
797 }
798
799 /**
800 * Get wf job definition
801 *
802 * @param request servlet request
803 * @param response servlet response
804 * @return String wf definition
805 * @throws XServletException
806 */
807 private String getWorkflowJobDefinition(HttpServletRequest request, HttpServletResponse response)
808 throws XServletException {
809 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
810 getAuthToken(request));
811
812 String wfDefinition;
813 String jobId = getResourceName(request);
814 try {
815 wfDefinition = dagEngine.getDefinition(jobId);
816 }
817 catch (DagEngineException ex) {
818 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
819 }
820 return wfDefinition;
821 }
822
823 /**
824 * Get bundle job definition
825 *
826 * @param request servlet request
827 * @param response servlet response
828 * @return String bundle definition
829 * @throws XServletException
830 */
831 private String getBundleJobDefinition(HttpServletRequest request, HttpServletResponse response) throws XServletException {
832 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
833 getAuthToken(request));
834 String bundleDefinition;
835 String jobId = getResourceName(request);
836 try {
837 bundleDefinition = bundleEngine.getDefinition(jobId);
838 }
839 catch (BundleEngineException ex) {
840 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
841 }
842 return bundleDefinition;
843 }
844
845 /**
846 * Get coordinator job definition
847 *
848 * @param request servlet request
849 * @param response servlet response
850 * @return String coord definition
851 * @throws XServletException
852 */
853 private String getCoordinatorJobDefinition(HttpServletRequest request, HttpServletResponse response)
854 throws XServletException {
855
856 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
857 getUser(request), getAuthToken(request));
858
859 String jobId = getResourceName(request);
860
861 String coordDefinition = null;
862 try {
863 coordDefinition = coordEngine.getDefinition(jobId);
864 }
865 catch (BaseEngineException ex) {
866 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
867 }
868 return coordDefinition;
869 }
870
871 /**
872 * Stream wf job log
873 *
874 * @param request servlet request
875 * @param response servlet response
876 * @throws XServletException
877 * @throws IOException
878 */
879 private void streamWorkflowJobLog(HttpServletRequest request, HttpServletResponse response)
880 throws XServletException, IOException {
881 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
882 getAuthToken(request));
883 String jobId = getResourceName(request);
884 try {
885 dagEngine.streamLog(jobId, response.getWriter());
886 }
887 catch (DagEngineException ex) {
888 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
889 }
890 }
891
892 /**
893 * Stream bundle job log
894 *
895 * @param request servlet request
896 * @param response servlet response
897 * @throws XServletException
898 */
899 private void streamBundleJob(HttpServletRequest request, HttpServletResponse response)
900 throws XServletException, IOException {
901 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
902 getAuthToken(request));
903 String jobId = getResourceName(request);
904 try {
905 bundleEngine.streamLog(jobId, response.getWriter());
906 }
907 catch (BundleEngineException ex) {
908 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
909 }
910 }
911
912 /**
913 * Stream coordinator job log
914 *
915 * @param request servlet request
916 * @param response servlet response
917 * @throws XServletException
918 * @throws IOException
919 */
920 private void streamCoordinatorJobLog(HttpServletRequest request, HttpServletResponse response)
921 throws XServletException, IOException {
922
923 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
924 getUser(request), getAuthToken(request));
925 String jobId = getResourceName(request);
926 String logRetrievalScope = request.getParameter(RestConstants.JOB_LOG_SCOPE_PARAM);
927 String logRetrievalType = request.getParameter(RestConstants.JOB_LOG_TYPE_PARAM);
928 try {
929 coordEngine.streamLog(jobId, logRetrievalScope, logRetrievalType, response.getWriter());
930 }
931 catch (BaseEngineException ex) {
932 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
933 }
934 catch (CommandException ex) {
935 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
936 }
937 }
938 }