This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.servlet;
019
020 import java.io.IOException;
021 import java.util.List;
022 import javax.servlet.ServletInputStream;
023 import javax.servlet.http.HttpServletRequest;
024 import javax.servlet.http.HttpServletResponse;
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.oozie.*;
027 import org.apache.oozie.client.WorkflowAction;
028 import org.apache.oozie.client.WorkflowJob;
029 import org.apache.oozie.client.rest.*;
030 import org.apache.oozie.command.CommandException;
031 import org.apache.oozie.command.coord.CoordRerunXCommand;
032 import org.apache.oozie.service.BundleEngineService;
033 import org.apache.oozie.service.CoordinatorEngineService;
034 import org.apache.oozie.service.DagEngineService;
035 import org.apache.oozie.service.Services;
036 import org.apache.oozie.util.GraphGenerator;
037 import org.apache.oozie.util.XLog;
038 import org.json.simple.JSONObject;
039
040
041 @SuppressWarnings("serial")
042 public class V1JobServlet extends BaseJobServlet {
043
044 private static final String INSTRUMENTATION_NAME = "v1job";
045 public static final String COORD_ACTIONS_DEFAULT_LENGTH = "oozie.coord.actions.default.length";
046
047 public V1JobServlet() {
048 super(INSTRUMENTATION_NAME);
049 }
050
051 protected V1JobServlet(String instrumentation_name){
052 super(instrumentation_name);
053 }
054
055 /*
056 * protected method to start a job
057 */
058 @Override
059 protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
060 IOException {
061 /*
062 * Configuration conf = new XConfiguration(request.getInputStream());
063 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
064 * conf.get(OozieClient.COORDINATOR_APP_PATH);
065 *
066 * ServletUtilities.ValidateAppPath(wfPath, coordPath);
067 */
068 String jobId = getResourceName(request);
069 if (jobId.endsWith("-W")) {
070 startWorkflowJob(request, response);
071 }
072 else if (jobId.endsWith("-B")) {
073 startBundleJob(request, response);
074 }
075 else {
076 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, RestConstants.ACTION_PARAM, RestConstants.JOB_ACTION_START);
077 }
078
079 }
080
081 /*
082 * protected method to resume a job
083 */
084 @Override
085 protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
086 IOException {
087 /*
088 * Configuration conf = new XConfiguration(request.getInputStream());
089 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
090 * conf.get(OozieClient.COORDINATOR_APP_PATH);
091 *
092 * ServletUtilities.ValidateAppPath(wfPath, coordPath);
093 */
094 String jobId = getResourceName(request);
095 if (jobId.endsWith("-W")) {
096 resumeWorkflowJob(request, response);
097 }
098 else if (jobId.endsWith("-B")) {
099 resumeBundleJob(request, response);
100 }
101 else {
102 resumeCoordinatorJob(request, response);
103 }
104 }
105
106 /*
107 * protected method to suspend a job
108 */
109 @Override
110 protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
111 IOException {
112 /*
113 * Configuration conf = new XConfiguration(request.getInputStream());
114 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
115 * conf.get(OozieClient.COORDINATOR_APP_PATH);
116 *
117 * ServletUtilities.ValidateAppPath(wfPath, coordPath);
118 */
119 String jobId = getResourceName(request);
120 if (jobId.endsWith("-W")) {
121 suspendWorkflowJob(request, response);
122 }
123 else if (jobId.endsWith("-B")) {
124 suspendBundleJob(request, response);
125 }
126 else {
127 suspendCoordinatorJob(request, response);
128 }
129 }
130
131 /*
132 * protected method to kill a job
133 */
134 @Override
135 protected void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
136 IOException {
137 /*
138 * Configuration conf = new XConfiguration(request.getInputStream());
139 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
140 * conf.get(OozieClient.COORDINATOR_APP_PATH);
141 *
142 * ServletUtilities.ValidateAppPath(wfPath, coordPath);
143 */
144 String jobId = getResourceName(request);
145 if (jobId.endsWith("-W")) {
146 killWorkflowJob(request, response);
147 }
148 else if (jobId.endsWith("-B")) {
149 killBundleJob(request, response);
150 }
151 else {
152 killCoordinatorJob(request, response);
153 }
154 }
155
156 /**
157 * protected method to change a coordinator job
158 * @param request request object
159 * @param response response object
160 * @throws XServletException
161 * @throws IOException
162 */
163 @Override
164 protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
165 IOException {
166 String jobId = getResourceName(request);
167 if (jobId.endsWith("-B")) {
168 changeBundleJob(request, response);
169 }
170 else {
171 changeCoordinatorJob(request, response);
172 }
173 }
174
175 /*
176 * protected method to reRun a job
177 *
178 * @seeorg.apache.oozie.servlet.BaseJobServlet#reRunJob(javax.servlet.http.
179 * HttpServletRequest, javax.servlet.http.HttpServletResponse,
180 * org.apache.hadoop.conf.Configuration)
181 */
182 @Override
183 protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
184 throws XServletException, IOException {
185 JSONObject json = null;
186 String jobId = getResourceName(request);
187 if (jobId.endsWith("-W")) {
188 reRunWorkflowJob(request, response, conf);
189 }
190 else if (jobId.endsWith("-B")) {
191 rerunBundleJob(request, response, conf);
192 }
193 else {
194 json = reRunCoordinatorActions(request, response, conf);
195 }
196 return json;
197 }
198
199 /*
200 * protected method to get a job in JsonBean representation
201 */
202 @Override
203 protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
204 IOException, BaseEngineException {
205 ServletInputStream is = request.getInputStream();
206 byte[] b = new byte[101];
207 while (is.readLine(b, 0, 100) != -1) {
208 XLog.getLog(getClass()).warn("Printing :" + new String(b));
209 }
210
211 JsonBean jobBean = null;
212 String jobId = getResourceName(request);
213 if (jobId.endsWith("-B")) {
214 jobBean = getBundleJob(request, response);
215 }
216 else {
217 if (jobId.endsWith("-W")) {
218 jobBean = getWorkflowJob(request, response);
219 }
220 else {
221 if (jobId.contains("-W@")) {
222 jobBean = getWorkflowAction(request, response);
223 }
224 else {
225 if (jobId.contains("-C@")) {
226 jobBean = getCoordinatorAction(request, response);
227 }
228 else {
229 jobBean = getCoordinatorJob(request, response);
230 }
231 }
232 }
233 }
234
235 return jobBean;
236 }
237
238 /*
239 * protected method to get a job definition in String format
240 */
241 @Override
242 protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
243 throws XServletException, IOException {
244 String jobDefinition = null;
245 String jobId = getResourceName(request);
246 if (jobId.endsWith("-W")) {
247 jobDefinition = getWorkflowJobDefinition(request, response);
248 }
249 else if (jobId.endsWith("-B")) {
250 jobDefinition = getBundleJobDefinition(request, response);
251 }
252 else {
253 jobDefinition = getCoordinatorJobDefinition(request, response);
254 }
255 return jobDefinition;
256 }
257
258 /*
259 * protected method to stream a job log into response object
260 */
261 @Override
262 protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
263 IOException {
264 String jobId = getResourceName(request);
265 if (jobId.endsWith("-W")) {
266 streamWorkflowJobLog(request, response);
267 }
268 else if (jobId.endsWith("-B")) {
269 streamBundleJob(request, response);
270 }
271 else {
272 streamCoordinatorJobLog(request, response);
273 }
274 }
275
276 @Override
277 protected void streamJobGraph(HttpServletRequest request, HttpServletResponse response)
278 throws XServletException, IOException {
279 String jobId = getResourceName(request);
280 if (jobId.endsWith("-W")) {
281 // Applicable only to worflow, for now
282 response.setContentType(RestConstants.PNG_IMAGE_CONTENT_TYPE);
283 try {
284 String showKill = request.getParameter(RestConstants.JOB_SHOW_KILL_PARAM);
285 boolean sK = showKill != null && (showKill.equalsIgnoreCase("yes") || showKill.equals("1") || showKill.equalsIgnoreCase("true"));
286
287 new GraphGenerator(
288 getWorkflowJobDefinition(request, response),
289 (JsonWorkflowJob)getWorkflowJob(request, response),
290 sK).write(response.getOutputStream());
291 }
292 catch (Exception e) {
293 throw new XServletException(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, ErrorCode.E0307, e.getMessage(), e);
294 }
295 }
296 else {
297 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306);
298 }
299 }
300
301 /**
302 * Start wf job
303 *
304 * @param request servlet request
305 * @param response servlet response
306 * @throws XServletException
307 */
308 private void startWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
309 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
310
311 String jobId = getResourceName(request);
312 try {
313 dagEngine.start(jobId);
314 }
315 catch (DagEngineException ex) {
316 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
317 }
318 }
319
320 /**
321 * Start bundle job
322 *
323 * @param request servlet request
324 * @param response servlet response
325 * @throws XServletException
326 */
327 private void startBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
328 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
329 String jobId = getResourceName(request);
330 try {
331 bundleEngine.start(jobId);
332 }
333 catch (BundleEngineException ex) {
334 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
335 }
336 }
337
338 /**
339 * Resume workflow job
340 *
341 * @param request servlet request
342 * @param response servlet response
343 * @throws XServletException
344 */
345 private void resumeWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
346 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
347
348 String jobId = getResourceName(request);
349 try {
350 dagEngine.resume(jobId);
351 }
352 catch (DagEngineException ex) {
353 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
354 }
355 }
356
357 /**
358 * Resume bundle job
359 *
360 * @param request servlet request
361 * @param response servlet response
362 * @throws XServletException
363 */
364 private void resumeBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
365 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
366 String jobId = getResourceName(request);
367 try {
368 bundleEngine.resume(jobId);
369 }
370 catch (BundleEngineException ex) {
371 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
372 }
373 }
374
375 /**
376 * Resume coordinator job
377 *
378 * @param request servlet request
379 * @param response servlet response
380 * @throws XServletException
381 * @throws CoordinatorEngineException
382 */
383 private void resumeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
384 throws XServletException {
385 String jobId = getResourceName(request);
386 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
387 getUser(request));
388 try {
389 coordEngine.resume(jobId);
390 }
391 catch (CoordinatorEngineException ex) {
392 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
393 }
394 }
395
396 /**
397 * Suspend a wf job
398 *
399 * @param request servlet request
400 * @param response servlet response
401 * @throws XServletException
402 */
403 private void suspendWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
404 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
405
406 String jobId = getResourceName(request);
407 try {
408 dagEngine.suspend(jobId);
409 }
410 catch (DagEngineException ex) {
411 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
412 }
413 }
414
415 /**
416 * Suspend bundle job
417 *
418 * @param request servlet request
419 * @param response servlet response
420 * @throws XServletException
421 */
422 private void suspendBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
423 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
424 String jobId = getResourceName(request);
425 try {
426 bundleEngine.suspend(jobId);
427 }
428 catch (BundleEngineException ex) {
429 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
430 }
431 }
432
433 /**
434 * Suspend coordinator job
435 *
436 * @param request servlet request
437 * @param response servlet response
438 * @throws XServletException
439 */
440 private void suspendCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
441 throws XServletException {
442 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
443 getUser(request));
444 String jobId = getResourceName(request);
445 try {
446 coordEngine.suspend(jobId);
447 }
448 catch (CoordinatorEngineException ex) {
449 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
450 }
451 }
452
453 /**
454 * Kill a wf job
455 * @param request servlet request
456 * @param response servlet response
457 * @throws XServletException
458 */
459 private void killWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
460 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
461
462 String jobId = getResourceName(request);
463 try {
464 dagEngine.kill(jobId);
465 }
466 catch (DagEngineException ex) {
467 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
468 }
469 }
470
471 /**
472 * Kill a coord job
473 * @param request servlet request
474 * @param response servlet response
475 * @throws XServletException
476 */
477 private void killCoordinatorJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
478 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
479 getUser(request));
480 String jobId = getResourceName(request);
481 try {
482 coordEngine.kill(jobId);
483 }
484 catch (CoordinatorEngineException ex) {
485 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
486 }
487 }
488
489 /**
490 * Kill bundle job
491 *
492 * @param request servlet request
493 * @param response servlet response
494 * @throws XServletException
495 */
496 private void killBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
497 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
498 String jobId = getResourceName(request);
499 try {
500 bundleEngine.kill(jobId);
501 }
502 catch (BundleEngineException ex) {
503 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
504 }
505 }
506
507 /**
508 * Change a coordinator job
509 *
510 * @param request servlet request
511 * @param response servlet response
512 * @throws XServletException
513 */
514 private void changeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
515 throws XServletException {
516 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
517 getUser(request));
518 String jobId = getResourceName(request);
519 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
520 try {
521 coordEngine.change(jobId, changeValue);
522 }
523 catch (CoordinatorEngineException ex) {
524 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
525 }
526 }
527
528 /**
529 * Change a bundle job
530 *
531 * @param request servlet request
532 * @param response servlet response
533 * @throws XServletException
534 */
535 private void changeBundleJob(HttpServletRequest request, HttpServletResponse response)
536 throws XServletException {
537 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
538 String jobId = getResourceName(request);
539 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
540 try {
541 bundleEngine.change(jobId, changeValue);
542 }
543 catch (BundleEngineException ex) {
544 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
545 }
546 }
547
548 /**
549 * Rerun a wf job
550 *
551 * @param request servlet request
552 * @param response servlet response
553 * @param conf configuration object
554 * @throws XServletException
555 */
556 private void reRunWorkflowJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
557 throws XServletException {
558 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
559
560 String jobId = getResourceName(request);
561 try {
562 dagEngine.reRun(jobId, conf);
563 }
564 catch (DagEngineException ex) {
565 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
566 }
567 }
568
569 /**
570 * Rerun bundle job
571 *
572 * @param request servlet request
573 * @param response servlet response
574 * @param conf configration object
575 * @throws XServletException
576 */
577 private void rerunBundleJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
578 throws XServletException {
579 JSONObject json = new JSONObject();
580 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
581 String jobId = getResourceName(request);
582
583 String coordScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM);
584 String dateScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM);
585 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
586 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
587
588 XLog.getLog(getClass()).info(
589 "Rerun Bundle for jobId=" + jobId + ", coordScope=" + coordScope + ", dateScope=" + dateScope + ", refresh="
590 + refresh + ", noCleanup=" + noCleanup);
591
592 try {
593 bundleEngine.reRun(jobId, coordScope, dateScope, Boolean.valueOf(refresh), Boolean.valueOf(noCleanup));
594 }
595 catch (BaseEngineException ex) {
596 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
597 }
598 }
599
600 /**
601 * Rerun coordinator actions
602 *
603 * @param request servlet request
604 * @param response servlet response
605 * @param conf configuration object
606 * @throws XServletException
607 */
608 @SuppressWarnings("unchecked")
609 private JSONObject reRunCoordinatorActions(HttpServletRequest request, HttpServletResponse response,
610 Configuration conf) throws XServletException {
611 JSONObject json = new JSONObject();
612 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request));
613
614 String jobId = getResourceName(request);
615
616 String rerunType = request.getParameter(RestConstants.JOB_COORD_RERUN_TYPE_PARAM);
617 String scope = request.getParameter(RestConstants.JOB_COORD_RERUN_SCOPE_PARAM);
618 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
619 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
620
621 XLog.getLog(getClass()).info(
622 "Rerun coordinator for jobId=" + jobId + ", rerunType=" + rerunType + ",scope=" + scope + ",refresh="
623 + refresh + ", noCleanup=" + noCleanup);
624
625 try {
626 if (!(rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE) || rerunType
627 .equals(RestConstants.JOB_COORD_RERUN_ACTION))) {
628 throw new CommandException(ErrorCode.E1018, "date or action expected.");
629 }
630 CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh),
631 Boolean.valueOf(noCleanup));
632 List<CoordinatorActionBean> coordActions;
633 if (coordInfo != null) {
634 coordActions = coordInfo.getCoordActions();
635 }
636 else {
637 coordActions = CoordRerunXCommand.getCoordActions(rerunType, jobId, scope);
638 }
639 json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT"));
640 }
641 catch (BaseEngineException ex) {
642 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
643 }
644 catch (CommandException ex) {
645 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
646 }
647
648 return json;
649 }
650
651
652
653 /**
654 * Get workflow job
655 *
656 * @param request servlet request
657 * @param response servlet response
658 * @return JsonBean WorkflowJobBean
659 * @throws XServletException
660 */
661 protected JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
662 JsonBean jobBean = getWorkflowJobBean(request, response);
663 // for backward compatibility (OOZIE-1231)
664 swapMRActionID((WorkflowJob)jobBean);
665 return jobBean;
666 }
667
668 /**
669 * Get workflow job
670 *
671 * @param request servlet request
672 * @param response servlet response
673 * @return JsonBean WorkflowJobBean
674 * @throws XServletException
675 */
676 protected JsonBean getWorkflowJobBean(HttpServletRequest request, HttpServletResponse response) throws XServletException {
677 JsonBean jobBean = null;
678 String jobId = getResourceName(request);
679 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
680 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
681 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
682 start = (start < 1) ? 1 : start;
683 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
684 len = (len < 1) ? Integer.MAX_VALUE : len;
685 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
686 try {
687 jobBean = (JsonBean) dagEngine.getJob(jobId, start, len);
688 }
689 catch (DagEngineException ex) {
690 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
691 }
692 return jobBean;
693 }
694
695 private void swapMRActionID(WorkflowJob wjBean) {
696 List<WorkflowAction> actions = wjBean.getActions();
697 if (actions != null) {
698 for (WorkflowAction wa : actions) {
699 swapMRActionID(wa);
700 }
701 }
702 }
703
704 private void swapMRActionID(WorkflowAction waBean) {
705 if (waBean.getType().equals("map-reduce")) {
706 String childId = waBean.getExternalChildIDs();
707 if (childId != null && !childId.equals("")) {
708 String consoleBase = getConsoleBase(waBean.getConsoleUrl());
709 ((WorkflowActionBean) waBean).setConsoleUrl(consoleBase + childId);
710 ((WorkflowActionBean) waBean).setExternalId(childId);
711 ((WorkflowActionBean) waBean).setExternalChildIDs("");
712 }
713 }
714 }
715
716 private String getConsoleBase(String url) {
717 String consoleBase = null;
718 if (url.indexOf("application") != -1) {
719 consoleBase = url.split("application_[0-9]+_[0-9]+")[0];
720 }
721 else {
722 consoleBase = url.split("job_[0-9]+_[0-9]+")[0];
723 }
724 return consoleBase;
725 }
726
727 /**
728 * Get wf action info
729 *
730 * @param request servlet request
731 * @param response servlet response
732 * @return JsonBean WorkflowActionBean
733 * @throws XServletException
734 */
735 protected JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response)
736 throws XServletException {
737
738 JsonBean actionBean = getWorkflowActionBean(request, response);
739 // for backward compatibility (OOZIE-1231)
740 swapMRActionID((WorkflowAction)actionBean);
741 return actionBean;
742 }
743
744 protected JsonBean getWorkflowActionBean(HttpServletRequest request, HttpServletResponse response)
745 throws XServletException {
746 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
747
748 JsonBean actionBean = null;
749 String actionId = getResourceName(request);
750 try {
751 actionBean = dagEngine.getWorkflowAction(actionId);
752 }
753 catch (BaseEngineException ex) {
754 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
755 }
756 return actionBean;
757 }
758
759 /**
760 * Get coord job info
761 *
762 * @param request servlet request
763 * @param response servlet response
764 * @return JsonBean CoordinatorJobBean
765 * @throws XServletException
766 * @throws BaseEngineException
767 */
768 protected JsonBean getCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
769 throws XServletException, BaseEngineException {
770 JsonBean jobBean = null;
771 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
772 getUser(request));
773 String jobId = getResourceName(request);
774 String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
775 String lenStr = request.getParameter(RestConstants.LEN_PARAM);
776 String filter = request.getParameter(RestConstants.JOB_FILTER_PARAM);
777 String orderStr = request.getParameter(RestConstants.ORDER_PARAM);
778 boolean order = (orderStr != null && orderStr.equals("desc")) ? true : false;
779 int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
780 start = (start < 1) ? 1 : start;
781 // Get default number of coordinator actions to be retrieved
782 int defaultLen = Services.get().getConf().getInt(COORD_ACTIONS_DEFAULT_LENGTH, 1000);
783 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
784 len = (len < 0) ? defaultLen : len;
785 try {
786 JsonCoordinatorJob coordJob = coordEngine.getCoordJob(jobId, filter, start, len, order);
787 jobBean = coordJob;
788 }
789 catch (CoordinatorEngineException ex) {
790 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
791 }
792
793 return jobBean;
794 }
795
796 /**
797 * Get bundle job info
798 *
799 * @param request servlet request
800 * @param response servlet response
801 * @return JsonBean bundle job bean
802 * @throws XServletException
803 * @throws BaseEngineException
804 */
805 private JsonBean getBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
806 BaseEngineException {
807 JsonBean jobBean = null;
808 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
809 String jobId = getResourceName(request);
810
811 try {
812 jobBean = (JsonBean) bundleEngine.getBundleJob(jobId);
813
814 return jobBean;
815 }
816 catch (BundleEngineException ex) {
817 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
818 }
819 }
820
821 /**
822 * Get coordinator action
823 *
824 * @param request servlet request
825 * @param response servlet response
826 * @return JsonBean CoordinatorActionBean
827 * @throws XServletException
828 * @throws BaseEngineException
829 */
830 private JsonBean getCoordinatorAction(HttpServletRequest request, HttpServletResponse response)
831 throws XServletException, BaseEngineException {
832 JsonBean actionBean = null;
833 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
834 getUser(request));
835 String actionId = getResourceName(request);
836 try {
837 actionBean = coordEngine.getCoordAction(actionId);
838 }
839 catch (CoordinatorEngineException ex) {
840 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
841 }
842
843 return actionBean;
844 }
845
846 /**
847 * Get wf job definition
848 *
849 * @param request servlet request
850 * @param response servlet response
851 * @return String wf definition
852 * @throws XServletException
853 */
854 private String getWorkflowJobDefinition(HttpServletRequest request, HttpServletResponse response)
855 throws XServletException {
856 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
857
858 String wfDefinition;
859 String jobId = getResourceName(request);
860 try {
861 wfDefinition = dagEngine.getDefinition(jobId);
862 }
863 catch (DagEngineException ex) {
864 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
865 }
866 return wfDefinition;
867 }
868
869 /**
870 * Get bundle job definition
871 *
872 * @param request servlet request
873 * @param response servlet response
874 * @return String bundle definition
875 * @throws XServletException
876 */
877 private String getBundleJobDefinition(HttpServletRequest request, HttpServletResponse response) throws XServletException {
878 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
879 String bundleDefinition;
880 String jobId = getResourceName(request);
881 try {
882 bundleDefinition = bundleEngine.getDefinition(jobId);
883 }
884 catch (BundleEngineException ex) {
885 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
886 }
887 return bundleDefinition;
888 }
889
890 /**
891 * Get coordinator job definition
892 *
893 * @param request servlet request
894 * @param response servlet response
895 * @return String coord definition
896 * @throws XServletException
897 */
898 private String getCoordinatorJobDefinition(HttpServletRequest request, HttpServletResponse response)
899 throws XServletException {
900
901 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
902 getUser(request));
903
904 String jobId = getResourceName(request);
905
906 String coordDefinition = null;
907 try {
908 coordDefinition = coordEngine.getDefinition(jobId);
909 }
910 catch (BaseEngineException ex) {
911 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
912 }
913 return coordDefinition;
914 }
915
916 /**
917 * Stream wf job log
918 *
919 * @param request servlet request
920 * @param response servlet response
921 * @throws XServletException
922 * @throws IOException
923 */
924 private void streamWorkflowJobLog(HttpServletRequest request, HttpServletResponse response)
925 throws XServletException, IOException {
926 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
927 String jobId = getResourceName(request);
928 try {
929 dagEngine.streamLog(jobId, response.getWriter());
930 }
931 catch (DagEngineException ex) {
932 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
933 }
934 }
935
936 /**
937 * Stream bundle job log
938 *
939 * @param request servlet request
940 * @param response servlet response
941 * @throws XServletException
942 */
943 private void streamBundleJob(HttpServletRequest request, HttpServletResponse response)
944 throws XServletException, IOException {
945 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
946 String jobId = getResourceName(request);
947 try {
948 bundleEngine.streamLog(jobId, response.getWriter());
949 }
950 catch (BundleEngineException ex) {
951 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
952 }
953 }
954
955 /**
956 * Stream coordinator job log
957 *
958 * @param request servlet request
959 * @param response servlet response
960 * @throws XServletException
961 * @throws IOException
962 */
963 private void streamCoordinatorJobLog(HttpServletRequest request, HttpServletResponse response)
964 throws XServletException, IOException {
965
966 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
967 getUser(request));
968 String jobId = getResourceName(request);
969 String logRetrievalScope = request.getParameter(RestConstants.JOB_LOG_SCOPE_PARAM);
970 String logRetrievalType = request.getParameter(RestConstants.JOB_LOG_TYPE_PARAM);
971 try {
972 coordEngine.streamLog(jobId, logRetrievalScope, logRetrievalType, response.getWriter());
973 }
974 catch (BaseEngineException ex) {
975 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
976 }
977 catch (CommandException ex) {
978 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
979 }
980 }
981
982 @Override
983 protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException,
984 IOException {
985 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
986 }
987
988 }