001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.servlet;
019    
020    import java.io.IOException;
021    
022    import javax.servlet.http.HttpServletRequest;
023    import javax.servlet.http.HttpServletResponse;
024    
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.oozie.DagEngine;
027    import org.apache.oozie.DagEngineException;
028    import org.apache.oozie.client.rest.JsonBean;
029    import org.apache.oozie.service.DagEngineService;
030    import org.apache.oozie.service.Services;
031    import org.json.simple.JSONObject;
032    import org.apache.oozie.ErrorCode;
033    
034    @SuppressWarnings("serial")
035    public class V0JobServlet extends BaseJobServlet {
036    
037        private static final String INSTRUMENTATION_NAME = "v0job";
038    
039        public V0JobServlet() {
040            super(INSTRUMENTATION_NAME);
041        }
042    
043        /*
044         * v0 service method to start a job
045         */
046        @Override
047        protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
048                IOException {
049            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
050                                                                                          getAuthToken(request));
051    
052            String jobId = getResourceName(request);
053            try {
054                dagEngine.start(jobId);
055            }
056            catch (DagEngineException ex) {
057                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
058            }
059        }
060    
061        /*
062         * v0 service method to resume a job
063         */
064        @Override
065        protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
066                IOException {
067            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
068                                                                                          getAuthToken(request));
069    
070            String jobId = getResourceName(request);
071            try {
072                dagEngine.resume(jobId);
073            }
074            catch (DagEngineException ex) {
075                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
076            }
077        }
078    
079        /*
080         * v0 service method to suspend a job
081         */
082        @Override
083        protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
084                IOException {
085            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
086                                                                                          getAuthToken(request));
087    
088            String jobId = getResourceName(request);
089            try {
090                dagEngine.suspend(jobId);
091            }
092            catch (DagEngineException ex) {
093                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
094            }
095        }
096    
097        /*
098         * v0 service method to kill a job
099         */
100        @Override
101        protected void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
102                IOException {
103            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
104                                                                                          getAuthToken(request));
105    
106            String jobId = getResourceName(request);
107            try {
108                dagEngine.kill(jobId);
109            }
110            catch (DagEngineException ex) {
111                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
112            }
113        }
114    
115        /*
116         * v0 service method to change a job
117         */
118        protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
119                IOException {
120            // This code should not be reached. But if it happens somehow, we throw
121            // bad request exception.
122            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E1014);
123        }
124    
125        /*
126         * v0 service method to reRun a job
127         */
128        @Override
129        protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
130                throws XServletException, IOException {
131            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
132                                                                                          getAuthToken(request));
133    
134            String jobId = getResourceName(request);
135            try {
136                dagEngine.reRun(jobId, conf);
137            }
138            catch (DagEngineException ex) {
139                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
140            }
141            return null;
142        }
143    
144        /*
145         * v0 service method to get a job in JsonBean representation
146         */
147        @Override
148        protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
149                IOException {
150            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
151                                                                                          getAuthToken(request));
152    
153            JsonBean jobBean = null;
154            String jobId = getResourceName(request);
155            try {
156                jobBean = (JsonBean) dagEngine.getJob(jobId);
157            }
158            catch (DagEngineException ex) {
159                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
160            }
161    
162            return jobBean;
163        }
164    
165        /*
166         * v0 service method to get a job definition in String format
167         */
168        @Override
169        protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
170                throws XServletException, IOException {
171            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
172                                                                                          getAuthToken(request));
173    
174            String wfDefinition = null;
175            String jobId = getResourceName(request);
176            try {
177                wfDefinition = dagEngine.getDefinition(jobId);
178            }
179            catch (DagEngineException ex) {
180                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
181            }
182            return wfDefinition;
183        }
184    
185        /*
186         * v0 service method to stream a job log into response object
187         */
188        @Override
189        protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
190                IOException {
191            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
192                                                                                          getAuthToken(request));
193    
194            String jobId = getResourceName(request);
195            try {
196                dagEngine.streamLog(jobId, response.getWriter());
197            }
198            catch (DagEngineException ex) {
199                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
200            }
201        }
202    
203        /*
204         * Not implemented in v0
205         */
206        @Override
207        protected void streamJobGraph(HttpServletRequest request, HttpServletResponse response)
208                throws XServletException, IOException {
209            // Should this error code be NOT_IMPLEMENTED?
210            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306);
211        }
212    }