This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.servlet;
019
020 import java.io.IOException;
021 import java.util.Arrays;
022
023 import javax.servlet.ServletException;
024 import javax.servlet.http.HttpServletRequest;
025 import javax.servlet.http.HttpServletResponse;
026
027 import org.apache.hadoop.conf.Configuration;
028 import org.apache.oozie.BaseEngineException;
029 import org.apache.oozie.ErrorCode;
030 import org.apache.oozie.client.OozieClient;
031 import org.apache.oozie.client.XOozieClient;
032 import org.apache.oozie.client.rest.JsonBean;
033 import org.apache.oozie.client.rest.JsonTags;
034 import org.apache.oozie.client.rest.RestConstants;
035 import org.apache.oozie.service.AuthorizationException;
036 import org.apache.oozie.service.AuthorizationService;
037 import org.apache.oozie.service.Services;
038 import org.apache.oozie.service.XLogService;
039 import org.apache.oozie.util.ConfigUtils;
040 import org.apache.oozie.util.JobUtils;
041 import org.apache.oozie.util.XConfiguration;
042 import org.apache.oozie.util.XLog;
043 import org.json.simple.JSONObject;
044
045 public abstract class BaseJobServlet extends JsonRestServlet {
046
047 private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[1];
048
049 static {
050 RESOURCES_INFO[0] = new ResourceInfo("*", Arrays.asList("PUT", "GET"), Arrays.asList(new ParameterInfo(
051 RestConstants.ACTION_PARAM, String.class, true, Arrays.asList("PUT")), new ParameterInfo(
052 RestConstants.JOB_SHOW_PARAM, String.class, false, Arrays.asList("GET")), new ParameterInfo(
053 RestConstants.ORDER_PARAM, String.class, false, Arrays.asList("GET"))));
054 }
055
056 public BaseJobServlet(String instrumentationName) {
057 super(instrumentationName, RESOURCES_INFO);
058 }
059
060 /**
061 * Perform various job related actions - start, suspend, resume, kill, etc.
062 */
063 @Override
064 protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
065 String jobId = getResourceName(request);
066 request.setAttribute(AUDIT_PARAM, jobId);
067 request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM));
068 try {
069 AuthorizationService auth = Services.get().get(AuthorizationService.class);
070 auth.authorizeForJob(getUser(request), jobId, true);
071 }
072 catch (AuthorizationException ex) {
073 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
074 }
075
076 String action = request.getParameter(RestConstants.ACTION_PARAM);
077 if (action.equals(RestConstants.JOB_ACTION_START)) {
078 stopCron();
079 startJob(request, response);
080 startCron();
081 response.setStatus(HttpServletResponse.SC_OK);
082 }
083 else if (action.equals(RestConstants.JOB_ACTION_RESUME)) {
084 stopCron();
085 resumeJob(request, response);
086 startCron();
087 response.setStatus(HttpServletResponse.SC_OK);
088 }
089 else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) {
090 stopCron();
091 suspendJob(request, response);
092 startCron();
093 response.setStatus(HttpServletResponse.SC_OK);
094 }
095 else if (action.equals(RestConstants.JOB_ACTION_KILL)) {
096 stopCron();
097 killJob(request, response);
098 startCron();
099 response.setStatus(HttpServletResponse.SC_OK);
100 }
101 else if (action.equals(RestConstants.JOB_ACTION_CHANGE)) {
102 stopCron();
103 changeJob(request, response);
104 startCron();
105 response.setStatus(HttpServletResponse.SC_OK);
106 }
107 else if (action.equals(RestConstants.JOB_ACTION_RERUN)) {
108 validateContentType(request, RestConstants.XML_CONTENT_TYPE);
109 Configuration conf = new XConfiguration(request.getInputStream());
110 stopCron();
111 String requestUser = getUser(request);
112 if (!requestUser.equals(UNDEF)) {
113 conf.set(OozieClient.USER_NAME, requestUser);
114 }
115 BaseJobServlet.checkAuthorizationForApp(conf);
116 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
117 reRunJob(request, response, conf);
118 startCron();
119 response.setStatus(HttpServletResponse.SC_OK);
120 }
121 else if (action.equals(RestConstants.JOB_COORD_ACTION_RERUN)) {
122 validateContentType(request, RestConstants.XML_CONTENT_TYPE);
123 stopCron();
124 JSONObject json = reRunJob(request, response, null);
125 startCron();
126 if (json != null) {
127 sendJsonResponse(response, HttpServletResponse.SC_OK, json);
128 }
129 else {
130 response.setStatus(HttpServletResponse.SC_OK);
131 }
132 }
133 else if (action.equals(RestConstants.JOB_BUNDLE_ACTION_RERUN)) {
134 validateContentType(request, RestConstants.XML_CONTENT_TYPE);
135 stopCron();
136 JSONObject json = reRunJob(request, response, null);
137 startCron();
138 if (json != null) {
139 sendJsonResponse(response, HttpServletResponse.SC_OK, json);
140 }
141 else {
142 response.setStatus(HttpServletResponse.SC_OK);
143 }
144 }
145 else {
146 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
147 RestConstants.ACTION_PARAM, action);
148 }
149 }
150
151 /**
152 * Validate the configuration user/group. <p/>
153 *
154 * @param conf configuration.
155 * @throws XServletException thrown if the configuration does not have a property {@link
156 * org.apache.oozie.client.OozieClient#USER_NAME}.
157 */
158 static void checkAuthorizationForApp(Configuration conf) throws XServletException {
159 String user = conf.get(OozieClient.USER_NAME);
160 String acl = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.GROUP_NAME, OozieClient.JOB_ACL, null);
161 try {
162 if (user == null) {
163 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0401, OozieClient.USER_NAME);
164 }
165 AuthorizationService auth = Services.get().get(AuthorizationService.class);
166
167 if (acl != null){
168 conf.set(OozieClient.GROUP_NAME, acl);
169 }
170 else if (acl == null && auth.useDefaultGroupAsAcl()) {
171 acl = auth.getDefaultGroup(user);
172 conf.set(OozieClient.GROUP_NAME, acl);
173 }
174 XLog.Info.get().setParameter(XLogService.GROUP, acl);
175 String wfPath = conf.get(OozieClient.APP_PATH);
176 String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH);
177 String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH);
178
179 if (wfPath == null && coordPath == null && bundlePath == null) {
180 String[] libPaths = conf.getStrings(XOozieClient.LIBPATH);
181 if (libPaths != null && libPaths.length > 0 && libPaths[0].trim().length() > 0) {
182 conf.set(OozieClient.APP_PATH, libPaths[0].trim());
183 wfPath = libPaths[0].trim();
184 }
185 else {
186 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0405);
187 }
188 }
189 ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath);
190
191 if (wfPath != null) {
192 auth.authorizeForApp(user, acl, wfPath, "workflow.xml", conf);
193 }
194 else if (coordPath != null){
195 auth.authorizeForApp(user, acl, coordPath, "coordinator.xml", conf);
196 }
197 else if (bundlePath != null){
198 auth.authorizeForApp(user, acl, bundlePath, "bundle.xml", conf);
199 }
200 }
201 catch (AuthorizationException ex) {
202 XLog.getLog(BaseJobServlet.class).info("AuthorizationException ", ex);
203 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
204 }
205 }
206
207 /**
208 * Return information about jobs.
209 */
210 @Override
211 public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
212 String jobId = getResourceName(request);
213 String show = request.getParameter(RestConstants.JOB_SHOW_PARAM);
214 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
215 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
216
217 try {
218 AuthorizationService auth = Services.get().get(AuthorizationService.class);
219 auth.authorizeForJob(getUser(request), jobId, false);
220 }
221 catch (AuthorizationException ex) {
222 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
223 }
224
225 if (show == null || show.equals(RestConstants.JOB_SHOW_INFO)) {
226 stopCron();
227 JsonBean job = null;
228 try {
229 job = getJob(request, response);
230 }
231 catch (BaseEngineException e) {
232 // TODO Auto-generated catch block
233 // e.printStackTrace();
234
235 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
236 }
237 startCron();
238 sendJsonResponse(response, HttpServletResponse.SC_OK, job, timeZoneId);
239 }
240
241 else if (show.equals(RestConstants.JOB_SHOW_JMS_TOPIC)) {
242 stopCron();
243 String jmsTopicName = getJMSTopicName(request, response);
244 JSONObject json = new JSONObject();
245 json.put(JsonTags.JMS_TOPIC_NAME, jmsTopicName);
246 startCron();
247 sendJsonResponse(response, HttpServletResponse.SC_OK, json);
248 }
249
250 else if (show.equals(RestConstants.JOB_SHOW_LOG)) {
251 response.setContentType(TEXT_UTF8);
252 streamJobLog(request, response);
253 }
254 else if (show.equals(RestConstants.JOB_SHOW_DEFINITION)) {
255 stopCron();
256 response.setContentType(XML_UTF8);
257 String wfDefinition = getJobDefinition(request, response);
258 startCron();
259 response.setStatus(HttpServletResponse.SC_OK);
260 response.getWriter().write(wfDefinition);
261 }
262 else if (show.equals(RestConstants.JOB_SHOW_GRAPH)) {
263 stopCron();
264 streamJobGraph(request, response);
265 startCron(); // -- should happen before you stream anything in response?
266 }
267 else {
268 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
269 RestConstants.JOB_SHOW_PARAM, show);
270 }
271 }
272
273 /**
274 * abstract method to start a job, either workflow or coordinator
275 *
276 * @param request
277 * @param response
278 * @throws XServletException
279 * @throws IOException TODO
280 */
281 abstract void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
282 IOException;
283
284 /**
285 * abstract method to resume a job, either workflow or coordinator
286 *
287 * @param request
288 * @param response
289 * @throws XServletException
290 * @throws IOException TODO
291 */
292 abstract void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
293 IOException;
294
295 /**
296 * abstract method to suspend a job, either workflow or coordinator
297 *
298 * @param request
299 * @param response
300 * @throws XServletException
301 * @throws IOException TODO
302 */
303 abstract void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
304 IOException;
305
306 /**
307 * abstract method to kill a job, either workflow or coordinator
308 *
309 * @param request
310 * @param response
311 * @throws XServletException
312 * @throws IOException TODO
313 */
314 abstract void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
315 IOException;
316
317 /**
318 * abstract method to change a coordinator job
319 *
320 * @param request
321 * @param response
322 * @throws XServletException
323 * @throws IOException TODO
324 */
325 abstract void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
326 IOException;
327
328 /**
329 * abstract method to re-run a job, either workflow or coordinator
330 *
331 * @param request
332 * @param response
333 * @param conf
334 * @throws XServletException
335 * @throws IOException TODO
336 */
337 abstract JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
338 throws XServletException, IOException;
339
340 /**
341 * abstract method to get a job, either workflow or coordinator, in JsonBean representation
342 *
343 * @param request
344 * @param response
345 * @return JsonBean representation of a job, either workflow or coordinator
346 * @throws XServletException
347 * @throws IOException TODO
348 * @throws BaseEngineException
349 */
350 abstract JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
351 IOException, BaseEngineException;
352
353 /**
354 * abstract method to get definition of a job, either workflow or coordinator
355 *
356 * @param request
357 * @param response
358 * @return job, either workflow or coordinator, definition in string format
359 * @throws XServletException
360 * @throws IOException TODO
361 */
362 abstract String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
363 throws XServletException, IOException;
364
365 /**
366 * abstract method to get and stream log information of job, either workflow or coordinator
367 *
368 * @param request
369 * @param response
370 * @throws XServletException
371 * @throws IOException
372 */
373 abstract void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
374 IOException;
375
376 /**
377 * abstract method to create and stream image for runtime DAG -- workflow only
378 *
379 * @param request
380 * @param response
381 * @throws XServletException
382 * @throws IOException
383 */
384 abstract void streamJobGraph(HttpServletRequest request, HttpServletResponse response)
385 throws XServletException, IOException;
386
387 /**
388 * abstract method to get JMS topic name for a job
389 * @param request
390 * @param response
391 * @throws XServletException
392 * @throws IOException
393 */
394 abstract String getJMSTopicName(HttpServletRequest request, HttpServletResponse response)
395 throws XServletException, IOException;
396 }