This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.servlet;
019
020 import java.io.IOException;
021 import java.util.Arrays;
022
023 import javax.servlet.ServletException;
024 import javax.servlet.http.HttpServletRequest;
025 import javax.servlet.http.HttpServletResponse;
026
027 import org.apache.hadoop.conf.Configuration;
028 import org.apache.oozie.BaseEngineException;
029 import org.apache.oozie.ErrorCode;
030 import org.apache.oozie.client.OozieClient;
031 import org.apache.oozie.client.XOozieClient;
032 import org.apache.oozie.client.rest.JsonBean;
033 import org.apache.oozie.client.rest.RestConstants;
034 import org.apache.oozie.service.AuthorizationException;
035 import org.apache.oozie.service.AuthorizationService;
036 import org.apache.oozie.service.Services;
037 import org.apache.oozie.service.XLogService;
038 import org.apache.oozie.util.ConfigUtils;
039 import org.apache.oozie.util.JobUtils;
040 import org.apache.oozie.util.XConfiguration;
041 import org.apache.oozie.util.XLog;
042 import org.json.simple.JSONObject;
043
044 public abstract class BaseJobServlet extends JsonRestServlet {
045
046 private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[1];
047
048 static {
049 RESOURCES_INFO[0] = new ResourceInfo("*", Arrays.asList("PUT", "GET"), Arrays.asList(new ParameterInfo(
050 RestConstants.ACTION_PARAM, String.class, true, Arrays.asList("PUT")), new ParameterInfo(
051 RestConstants.JOB_SHOW_PARAM, String.class, false, Arrays.asList("GET"))));
052 }
053
054 public BaseJobServlet(String instrumentationName) {
055 super(instrumentationName, RESOURCES_INFO);
056 }
057
058 /**
059 * Perform various job related actions - start, suspend, resume, kill, etc.
060 */
061 @Override
062 protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
063 String jobId = getResourceName(request);
064 request.setAttribute(AUDIT_PARAM, jobId);
065 request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM));
066 try {
067 AuthorizationService auth = Services.get().get(AuthorizationService.class);
068 auth.authorizeForJob(getUser(request), jobId, true);
069 }
070 catch (AuthorizationException ex) {
071 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
072 }
073
074 String action = request.getParameter(RestConstants.ACTION_PARAM);
075 if (action.equals(RestConstants.JOB_ACTION_START)) {
076 stopCron();
077 startJob(request, response);
078 startCron();
079 response.setStatus(HttpServletResponse.SC_OK);
080 }
081 else if (action.equals(RestConstants.JOB_ACTION_RESUME)) {
082 stopCron();
083 resumeJob(request, response);
084 startCron();
085 response.setStatus(HttpServletResponse.SC_OK);
086 }
087 else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) {
088 stopCron();
089 suspendJob(request, response);
090 startCron();
091 response.setStatus(HttpServletResponse.SC_OK);
092 }
093 else if (action.equals(RestConstants.JOB_ACTION_KILL)) {
094 stopCron();
095 killJob(request, response);
096 startCron();
097 response.setStatus(HttpServletResponse.SC_OK);
098 }
099 else if (action.equals(RestConstants.JOB_ACTION_CHANGE)) {
100 stopCron();
101 changeJob(request, response);
102 startCron();
103 response.setStatus(HttpServletResponse.SC_OK);
104 }
105 else if (action.equals(RestConstants.JOB_ACTION_RERUN)) {
106 validateContentType(request, RestConstants.XML_CONTENT_TYPE);
107 Configuration conf = new XConfiguration(request.getInputStream());
108 stopCron();
109 String requestUser = getUser(request);
110 if (!requestUser.equals(UNDEF)) {
111 conf.set(OozieClient.USER_NAME, requestUser);
112 }
113 BaseJobServlet.checkAuthorizationForApp(conf);
114 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
115 reRunJob(request, response, conf);
116 startCron();
117 response.setStatus(HttpServletResponse.SC_OK);
118 }
119 else if (action.equals(RestConstants.JOB_COORD_ACTION_RERUN)) {
120 validateContentType(request, RestConstants.XML_CONTENT_TYPE);
121 stopCron();
122 JSONObject json = reRunJob(request, response, null);
123 startCron();
124 if (json != null) {
125 sendJsonResponse(response, HttpServletResponse.SC_OK, json);
126 }
127 else {
128 response.setStatus(HttpServletResponse.SC_OK);
129 }
130 }
131 else if (action.equals(RestConstants.JOB_BUNDLE_ACTION_RERUN)) {
132 validateContentType(request, RestConstants.XML_CONTENT_TYPE);
133 stopCron();
134 JSONObject json = reRunJob(request, response, null);
135 startCron();
136 if (json != null) {
137 sendJsonResponse(response, HttpServletResponse.SC_OK, json);
138 }
139 else {
140 response.setStatus(HttpServletResponse.SC_OK);
141 }
142 }
143 else {
144 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
145 RestConstants.ACTION_PARAM, action);
146 }
147 }
148
149 /**
150 * Validate the configuration user/group. <p/>
151 *
152 * @param conf configuration.
153 * @throws XServletException thrown if the configuration does not have a property {@link
154 * org.apache.oozie.client.OozieClient#USER_NAME}.
155 */
156 static void checkAuthorizationForApp(Configuration conf) throws XServletException {
157 String user = conf.get(OozieClient.USER_NAME);
158 String acl = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.GROUP_NAME, OozieClient.JOB_ACL, null);
159 try {
160 if (user == null) {
161 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0401, OozieClient.USER_NAME);
162 }
163 AuthorizationService auth = Services.get().get(AuthorizationService.class);
164
165 if (acl != null){
166 conf.set(OozieClient.GROUP_NAME, acl);
167 }
168 else if (acl == null && auth.useDefaultGroupAsAcl()) {
169 acl = auth.getDefaultGroup(user);
170 conf.set(OozieClient.GROUP_NAME, acl);
171 }
172 XLog.Info.get().setParameter(XLogService.GROUP, acl);
173 String wfPath = conf.get(OozieClient.APP_PATH);
174 String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH);
175 String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH);
176
177 if (wfPath == null && coordPath == null && bundlePath == null) {
178 String[] libPaths = conf.getStrings(XOozieClient.LIBPATH);
179 if (libPaths != null && libPaths.length > 0 && libPaths[0].trim().length() > 0) {
180 conf.set(OozieClient.APP_PATH, libPaths[0].trim());
181 wfPath = libPaths[0].trim();
182 }
183 else {
184 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0405);
185 }
186 }
187 ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath);
188
189 if (wfPath != null) {
190 auth.authorizeForApp(user, acl, wfPath, "workflow.xml", conf);
191 }
192 else if (coordPath != null){
193 auth.authorizeForApp(user, acl, coordPath, "coordinator.xml", conf);
194 }
195 else if (bundlePath != null){
196 auth.authorizeForApp(user, acl, bundlePath, "bundle.xml", conf);
197 }
198 }
199 catch (AuthorizationException ex) {
200 XLog.getLog(BaseJobServlet.class).info("AuthorizationException ", ex);
201 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
202 }
203 }
204
205 /**
206 * Return information about jobs.
207 */
208 @Override
209 public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
210 String jobId = getResourceName(request);
211 String show = request.getParameter(RestConstants.JOB_SHOW_PARAM);
212 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
213 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
214
215 try {
216 AuthorizationService auth = Services.get().get(AuthorizationService.class);
217 auth.authorizeForJob(getUser(request), jobId, false);
218 }
219 catch (AuthorizationException ex) {
220 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
221 }
222
223 if (show == null || show.equals(RestConstants.JOB_SHOW_INFO)) {
224 stopCron();
225 JsonBean job = null;
226 try {
227 job = getJob(request, response);
228 }
229 catch (BaseEngineException e) {
230 // TODO Auto-generated catch block
231 // e.printStackTrace();
232
233 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
234 }
235 startCron();
236 sendJsonResponse(response, HttpServletResponse.SC_OK, job, timeZoneId);
237 }
238 else if (show.equals(RestConstants.JOB_SHOW_LOG)) {
239 response.setContentType(TEXT_UTF8);
240 streamJobLog(request, response);
241 }
242 else if (show.equals(RestConstants.JOB_SHOW_DEFINITION)) {
243 stopCron();
244 response.setContentType(XML_UTF8);
245 String wfDefinition = getJobDefinition(request, response);
246 startCron();
247 response.setStatus(HttpServletResponse.SC_OK);
248 response.getWriter().write(wfDefinition);
249 }
250 else if (show.equals(RestConstants.JOB_SHOW_GRAPH)) {
251 stopCron();
252 streamJobGraph(request, response);
253 startCron(); // -- should happen before you stream anything in response?
254 }
255 else {
256 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
257 RestConstants.JOB_SHOW_PARAM, show);
258 }
259 }
260
261 /**
262 * abstract method to start a job, either workflow or coordinator
263 *
264 * @param request
265 * @param response
266 * @throws XServletException
267 * @throws IOException TODO
268 */
269 abstract void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
270 IOException;
271
272 /**
273 * abstract method to resume a job, either workflow or coordinator
274 *
275 * @param request
276 * @param response
277 * @throws XServletException
278 * @throws IOException TODO
279 */
280 abstract void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
281 IOException;
282
283 /**
284 * abstract method to suspend a job, either workflow or coordinator
285 *
286 * @param request
287 * @param response
288 * @throws XServletException
289 * @throws IOException TODO
290 */
291 abstract void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
292 IOException;
293
294 /**
295 * abstract method to kill a job, either workflow or coordinator
296 *
297 * @param request
298 * @param response
299 * @throws XServletException
300 * @throws IOException TODO
301 */
302 abstract void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
303 IOException;
304
305 /**
306 * abstract method to change a coordinator job
307 *
308 * @param request
309 * @param response
310 * @throws XServletException
311 * @throws IOException TODO
312 */
313 abstract void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
314 IOException;
315
316 /**
317 * abstract method to re-run a job, either workflow or coordinator
318 *
319 * @param request
320 * @param response
321 * @param conf
322 * @throws XServletException
323 * @throws IOException TODO
324 */
325 abstract JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
326 throws XServletException, IOException;
327
328 /**
329 * abstract method to get a job, either workflow or coordinator, in JsonBean representation
330 *
331 * @param request
332 * @param response
333 * @return JsonBean representation of a job, either workflow or coordinator
334 * @throws XServletException
335 * @throws IOException TODO
336 * @throws BaseEngineException
337 */
338 abstract JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
339 IOException, BaseEngineException;
340
341 /**
342 * abstract method to get definition of a job, either workflow or coordinator
343 *
344 * @param request
345 * @param response
346 * @return job, either workflow or coordinator, definition in string format
347 * @throws XServletException
348 * @throws IOException TODO
349 */
350 abstract String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
351 throws XServletException, IOException;
352
353 /**
354 * abstract method to get and stream log information of job, either workflow or coordinator
355 *
356 * @param request
357 * @param response
358 * @throws XServletException
359 * @throws IOException
360 */
361 abstract void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
362 IOException;
363
364 /**
365 * abstract method to create and stream image for runtime DAG -- workflow only
366 *
367 * @param request
368 * @param response
369 * @throws XServletException
370 * @throws IOException
371 */
372 abstract void streamJobGraph(HttpServletRequest request, HttpServletResponse response)
373 throws XServletException, IOException;
374 }