This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.servlet;
019
020 import java.io.IOException;
021 import java.util.Arrays;
022
023 import javax.servlet.ServletException;
024 import javax.servlet.http.HttpServletRequest;
025 import javax.servlet.http.HttpServletResponse;
026
027 import org.apache.hadoop.conf.Configuration;
028 import org.apache.hadoop.fs.FileStatus;
029 import org.apache.hadoop.fs.FileSystem;
030 import org.apache.hadoop.fs.Path;
031 import org.apache.oozie.BaseEngineException;
032 import org.apache.oozie.ErrorCode;
033 import org.apache.oozie.client.OozieClient;
034 import org.apache.oozie.client.XOozieClient;
035 import org.apache.oozie.client.rest.JsonBean;
036 import org.apache.oozie.client.rest.RestConstants;
037 import org.apache.oozie.service.AuthorizationException;
038 import org.apache.oozie.service.AuthorizationService;
039 import org.apache.oozie.service.HadoopAccessorException;
040 import org.apache.oozie.service.HadoopAccessorService;
041 import org.apache.oozie.service.Services;
042 import org.apache.oozie.service.XLogService;
043 import org.apache.oozie.util.JobUtils;
044 import org.apache.oozie.util.XConfiguration;
045 import org.apache.oozie.util.XLog;
046 import org.json.simple.JSONObject;
047
048 public abstract class BaseJobServlet extends JsonRestServlet {
049
050 private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[1];
051
052 static {
053 RESOURCES_INFO[0] = new ResourceInfo("*", Arrays.asList("PUT", "GET"), Arrays.asList(new ParameterInfo(
054 RestConstants.ACTION_PARAM, String.class, true, Arrays.asList("PUT")), new ParameterInfo(
055 RestConstants.JOB_SHOW_PARAM, String.class, false, Arrays.asList("GET"))));
056 }
057
058 public BaseJobServlet(String instrumentationName) {
059 super(instrumentationName, RESOURCES_INFO);
060 }
061
062 /**
063 * Perform various job related actions - start, suspend, resume, kill, etc.
064 */
065 @Override
066 protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
067 String jobId = getResourceName(request);
068 request.setAttribute(AUDIT_PARAM, jobId);
069 request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM));
070 try {
071 AuthorizationService auth = Services.get().get(AuthorizationService.class);
072 auth.authorizeForJob(getUser(request), jobId, true);
073 }
074 catch (AuthorizationException ex) {
075 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
076 }
077
078 String action = request.getParameter(RestConstants.ACTION_PARAM);
079 if (action.equals(RestConstants.JOB_ACTION_START)) {
080 stopCron();
081 startJob(request, response);
082 startCron();
083 response.setStatus(HttpServletResponse.SC_OK);
084 }
085 else if (action.equals(RestConstants.JOB_ACTION_RESUME)) {
086 stopCron();
087 resumeJob(request, response);
088 startCron();
089 response.setStatus(HttpServletResponse.SC_OK);
090 }
091 else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) {
092 stopCron();
093 suspendJob(request, response);
094 startCron();
095 response.setStatus(HttpServletResponse.SC_OK);
096 }
097 else if (action.equals(RestConstants.JOB_ACTION_KILL)) {
098 stopCron();
099 killJob(request, response);
100 startCron();
101 response.setStatus(HttpServletResponse.SC_OK);
102 }
103 else if (action.equals(RestConstants.JOB_ACTION_CHANGE)) {
104 stopCron();
105 changeJob(request, response);
106 startCron();
107 response.setStatus(HttpServletResponse.SC_OK);
108 }
109 else if (action.equals(RestConstants.JOB_ACTION_RERUN)) {
110 validateContentType(request, RestConstants.XML_CONTENT_TYPE);
111 Configuration conf = new XConfiguration(request.getInputStream());
112 stopCron();
113 checkAuthorizationForApp(getUser(request), conf);
114 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
115 reRunJob(request, response, conf);
116 startCron();
117 response.setStatus(HttpServletResponse.SC_OK);
118 }
119 else if (action.equals(RestConstants.JOB_COORD_ACTION_RERUN)) {
120 validateContentType(request, RestConstants.XML_CONTENT_TYPE);
121 stopCron();
122 JSONObject json = reRunJob(request, response, null);
123 startCron();
124 if (json != null) {
125 sendJsonResponse(response, HttpServletResponse.SC_OK, json);
126 }
127 else {
128 response.setStatus(HttpServletResponse.SC_OK);
129 }
130 }
131 else if (action.equals(RestConstants.JOB_BUNDLE_ACTION_RERUN)) {
132 validateContentType(request, RestConstants.XML_CONTENT_TYPE);
133 stopCron();
134 JSONObject json = reRunJob(request, response, null);
135 startCron();
136 if (json != null) {
137 sendJsonResponse(response, HttpServletResponse.SC_OK, json);
138 }
139 else {
140 response.setStatus(HttpServletResponse.SC_OK);
141 }
142 }
143 else {
144 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
145 RestConstants.ACTION_PARAM, action);
146 }
147 }
148
149 /**
150 * Validate the configuration user/group. <p/>
151 *
152 * @param requestUser user in request.
153 * @param conf configuration.
154 * @throws XServletException thrown if the configuration does not have a property {@link
155 * org.apache.oozie.client.OozieClient#USER_NAME}.
156 */
157 static void checkAuthorizationForApp(String requestUser, Configuration conf) throws XServletException {
158 String user = conf.get(OozieClient.USER_NAME);
159 String group = conf.get(OozieClient.GROUP_NAME);
160 try {
161 if (user == null) {
162 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0401, OozieClient.USER_NAME);
163 }
164 if (!requestUser.equals(UNDEF) && !user.equals(requestUser)) {
165 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0400, requestUser, user);
166 }
167 AuthorizationService auth = Services.get().get(AuthorizationService.class);
168 if (group == null) {
169 group = auth.getDefaultGroup(user);
170 conf.set(OozieClient.GROUP_NAME, group);
171 }
172 else {
173 auth.authorizeForGroup(user, group);
174 }
175 XLog.Info.get().setParameter(XLogService.GROUP, group);
176 String wfPath = conf.get(OozieClient.APP_PATH);
177 String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH);
178 String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH);
179
180 if (wfPath == null && coordPath == null && bundlePath == null) {
181 String libPath = conf.get(XOozieClient.LIBPATH);
182 conf.set(OozieClient.APP_PATH, libPath);
183 wfPath = libPath;
184 }
185 ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath);
186
187 if (wfPath != null) {
188 auth.authorizeForApp(user, group, wfPath, "workflow.xml", conf);
189 }
190 else if (coordPath != null){
191 auth.authorizeForApp(user, group, coordPath, "coordinator.xml", conf);
192 }
193 else if (bundlePath != null){
194 auth.authorizeForApp(user, group, bundlePath, "bundle.xml", conf);
195 }
196 }
197 catch (AuthorizationException ex) {
198 XLog.getLog(BaseJobServlet.class).info("AuthorizationException ", ex);
199 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
200 }
201 }
202
203 /**
204 * Return information about jobs.
205 */
206 @Override
207 public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
208 String jobId = getResourceName(request);
209 String show = request.getParameter(RestConstants.JOB_SHOW_PARAM);
210
211 try {
212 AuthorizationService auth = Services.get().get(AuthorizationService.class);
213 auth.authorizeForJob(getUser(request), jobId, false);
214 }
215 catch (AuthorizationException ex) {
216 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
217 }
218
219 if (show == null || show.equals(RestConstants.JOB_SHOW_INFO)) {
220 stopCron();
221 JsonBean job = null;
222 try {
223 job = getJob(request, response);
224 }
225 catch (BaseEngineException e) {
226 // TODO Auto-generated catch block
227 // e.printStackTrace();
228
229 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
230 }
231 startCron();
232 sendJsonResponse(response, HttpServletResponse.SC_OK, job);
233 }
234 else if (show.equals(RestConstants.JOB_SHOW_LOG)) {
235 response.setContentType(TEXT_UTF8);
236 streamJobLog(request, response);
237 }
238 else if (show.equals(RestConstants.JOB_SHOW_DEFINITION)) {
239 stopCron();
240 response.setContentType(XML_UTF8);
241 String wfDefinition = getJobDefinition(request, response);
242 startCron();
243 response.setStatus(HttpServletResponse.SC_OK);
244 response.getWriter().write(wfDefinition);
245 }
246 else {
247 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
248 RestConstants.JOB_SHOW_PARAM, show);
249 }
250 }
251
252 /**
253 * abstract method to start a job, either workflow or coordinator
254 *
255 * @param request
256 * @param response
257 * @throws XServletException
258 * @throws IOException TODO
259 */
260 abstract void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
261 IOException;
262
263 /**
264 * abstract method to resume a job, either workflow or coordinator
265 *
266 * @param request
267 * @param response
268 * @throws XServletException
269 * @throws IOException TODO
270 */
271 abstract void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
272 IOException;
273
274 /**
275 * abstract method to suspend a job, either workflow or coordinator
276 *
277 * @param request
278 * @param response
279 * @throws XServletException
280 * @throws IOException TODO
281 */
282 abstract void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
283 IOException;
284
285 /**
286 * abstract method to kill a job, either workflow or coordinator
287 *
288 * @param request
289 * @param response
290 * @throws XServletException
291 * @throws IOException TODO
292 */
293 abstract void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
294 IOException;
295
296 /**
297 * abstract method to change a coordinator job
298 *
299 * @param request
300 * @param response
301 * @throws XServletException
302 * @throws IOException TODO
303 */
304 abstract void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
305 IOException;
306
307 /**
308 * abstract method to re-run a job, either workflow or coordinator
309 *
310 * @param request
311 * @param response
312 * @param conf
313 * @throws XServletException
314 * @throws IOException TODO
315 */
316 abstract JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
317 throws XServletException, IOException;
318
319 /**
320 * abstract method to get a job, either workflow or coordinator, in JsonBean representation
321 *
322 * @param request
323 * @param response
324 * @return JsonBean representation of a job, either workflow or coordinator
325 * @throws XServletException
326 * @throws IOException TODO
327 * @throws BaseEngineException
328 */
329 abstract JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
330 IOException, BaseEngineException;
331
332 /**
333 * abstract method to get definition of a job, either workflow or coordinator
334 *
335 * @param request
336 * @param response
337 * @return job, either workflow or coordinator, definition in string format
338 * @throws XServletException
339 * @throws IOException TODO
340 */
341 abstract String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
342 throws XServletException, IOException;
343
344 /**
345 * abstract method to get and stream log information of job, either workflow or coordinator
346 *
347 * @param request
348 * @param response
349 * @throws XServletException
350 * @throws IOException
351 */
352 abstract void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
353 IOException;
354
355 }