This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.servlet;
019
020 import java.io.IOException;
021 import java.util.Arrays;
022
023 import javax.servlet.ServletConfig;
024 import javax.servlet.ServletException;
025 import javax.servlet.http.HttpServletRequest;
026 import javax.servlet.http.HttpServletResponse;
027
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.hadoop.fs.FileStatus;
030 import org.apache.hadoop.fs.FileSystem;
031 import org.apache.hadoop.fs.Path;
032 import org.apache.oozie.BaseEngineException;
033 import org.apache.oozie.ErrorCode;
034 import org.apache.oozie.client.OozieClient;
035 import org.apache.oozie.client.XOozieClient;
036 import org.apache.oozie.client.rest.JsonBean;
037 import org.apache.oozie.client.rest.RestConstants;
038 import org.apache.oozie.service.AuthorizationException;
039 import org.apache.oozie.service.AuthorizationService;
040 import org.apache.oozie.service.HadoopAccessorException;
041 import org.apache.oozie.service.HadoopAccessorService;
042 import org.apache.oozie.service.Services;
043 import org.apache.oozie.service.XLogService;
044 import org.apache.oozie.util.ConfigUtils;
045 import org.apache.oozie.util.JobUtils;
046 import org.apache.oozie.util.XConfiguration;
047 import org.apache.oozie.util.XLog;
048 import org.json.simple.JSONObject;
049
050 public abstract class BaseJobServlet extends JsonRestServlet {
051
052 private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[1];
053
054 static {
055 RESOURCES_INFO[0] = new ResourceInfo("*", Arrays.asList("PUT", "GET"), Arrays.asList(new ParameterInfo(
056 RestConstants.ACTION_PARAM, String.class, true, Arrays.asList("PUT")), new ParameterInfo(
057 RestConstants.JOB_SHOW_PARAM, String.class, false, Arrays.asList("GET"))));
058 }
059
060 public BaseJobServlet(String instrumentationName) {
061 super(instrumentationName, RESOURCES_INFO);
062 }
063
064 /**
065 * Perform various job related actions - start, suspend, resume, kill, etc.
066 */
067 @Override
068 protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
069 String jobId = getResourceName(request);
070 request.setAttribute(AUDIT_PARAM, jobId);
071 request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM));
072 try {
073 AuthorizationService auth = Services.get().get(AuthorizationService.class);
074 auth.authorizeForJob(getUser(request), jobId, true);
075 }
076 catch (AuthorizationException ex) {
077 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
078 }
079
080 String action = request.getParameter(RestConstants.ACTION_PARAM);
081 if (action.equals(RestConstants.JOB_ACTION_START)) {
082 stopCron();
083 startJob(request, response);
084 startCron();
085 response.setStatus(HttpServletResponse.SC_OK);
086 }
087 else if (action.equals(RestConstants.JOB_ACTION_RESUME)) {
088 stopCron();
089 resumeJob(request, response);
090 startCron();
091 response.setStatus(HttpServletResponse.SC_OK);
092 }
093 else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) {
094 stopCron();
095 suspendJob(request, response);
096 startCron();
097 response.setStatus(HttpServletResponse.SC_OK);
098 }
099 else if (action.equals(RestConstants.JOB_ACTION_KILL)) {
100 stopCron();
101 killJob(request, response);
102 startCron();
103 response.setStatus(HttpServletResponse.SC_OK);
104 }
105 else if (action.equals(RestConstants.JOB_ACTION_CHANGE)) {
106 stopCron();
107 changeJob(request, response);
108 startCron();
109 response.setStatus(HttpServletResponse.SC_OK);
110 }
111 else if (action.equals(RestConstants.JOB_ACTION_RERUN)) {
112 validateContentType(request, RestConstants.XML_CONTENT_TYPE);
113 Configuration conf = new XConfiguration(request.getInputStream());
114 stopCron();
115 checkAuthorizationForApp(getUser(request), conf);
116 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
117 reRunJob(request, response, conf);
118 startCron();
119 response.setStatus(HttpServletResponse.SC_OK);
120 }
121 else if (action.equals(RestConstants.JOB_COORD_ACTION_RERUN)) {
122 validateContentType(request, RestConstants.XML_CONTENT_TYPE);
123 stopCron();
124 JSONObject json = reRunJob(request, response, null);
125 startCron();
126 if (json != null) {
127 sendJsonResponse(response, HttpServletResponse.SC_OK, json);
128 }
129 else {
130 response.setStatus(HttpServletResponse.SC_OK);
131 }
132 }
133 else if (action.equals(RestConstants.JOB_BUNDLE_ACTION_RERUN)) {
134 validateContentType(request, RestConstants.XML_CONTENT_TYPE);
135 stopCron();
136 JSONObject json = reRunJob(request, response, null);
137 startCron();
138 if (json != null) {
139 sendJsonResponse(response, HttpServletResponse.SC_OK, json);
140 }
141 else {
142 response.setStatus(HttpServletResponse.SC_OK);
143 }
144 }
145 else {
146 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
147 RestConstants.ACTION_PARAM, action);
148 }
149 }
150
151 /**
152 * Validate the configuration user/group. <p/>
153 *
154 * @param requestUser user in request.
155 * @param conf configuration.
156 * @throws XServletException thrown if the configuration does not have a property {@link
157 * org.apache.oozie.client.OozieClient#USER_NAME}.
158 */
159 static void checkAuthorizationForApp(String requestUser, Configuration conf) throws XServletException {
160 String user = conf.get(OozieClient.USER_NAME);
161 String acl = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.GROUP_NAME, OozieClient.JOB_ACL, null);
162 try {
163 if (user == null) {
164 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0401, OozieClient.USER_NAME);
165 }
166 if (!requestUser.equals(UNDEF) && !user.equals(requestUser)) {
167 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0400, requestUser, user);
168 }
169 AuthorizationService auth = Services.get().get(AuthorizationService.class);
170
171 if (acl == null && auth.useDefaultGroupAsAcl()) {
172 acl = auth.getDefaultGroup(user);
173 conf.set(OozieClient.GROUP_NAME, acl);
174 }
175 XLog.Info.get().setParameter(XLogService.GROUP, acl);
176 String wfPath = conf.get(OozieClient.APP_PATH);
177 String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH);
178 String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH);
179
180 if (wfPath == null && coordPath == null && bundlePath == null) {
181 String libPath = conf.get(XOozieClient.LIBPATH);
182 conf.set(OozieClient.APP_PATH, libPath);
183 wfPath = libPath;
184 }
185 ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath);
186
187 if (wfPath != null) {
188 auth.authorizeForApp(user, acl, wfPath, "workflow.xml", conf);
189 }
190 else if (coordPath != null){
191 auth.authorizeForApp(user, acl, coordPath, "coordinator.xml", conf);
192 }
193 else if (bundlePath != null){
194 auth.authorizeForApp(user, acl, bundlePath, "bundle.xml", conf);
195 }
196 }
197 catch (AuthorizationException ex) {
198 XLog.getLog(BaseJobServlet.class).info("AuthorizationException ", ex);
199 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
200 }
201 }
202
203 /**
204 * Return information about jobs.
205 */
206 @Override
207 public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
208 String jobId = getResourceName(request);
209 String show = request.getParameter(RestConstants.JOB_SHOW_PARAM);
210
211 try {
212 AuthorizationService auth = Services.get().get(AuthorizationService.class);
213 auth.authorizeForJob(getUser(request), jobId, false);
214 }
215 catch (AuthorizationException ex) {
216 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
217 }
218
219 if (show == null || show.equals(RestConstants.JOB_SHOW_INFO)) {
220 stopCron();
221 JsonBean job = null;
222 try {
223 job = getJob(request, response);
224 }
225 catch (BaseEngineException e) {
226 // TODO Auto-generated catch block
227 // e.printStackTrace();
228
229 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
230 }
231 startCron();
232 sendJsonResponse(response, HttpServletResponse.SC_OK, job);
233 }
234 else if (show.equals(RestConstants.JOB_SHOW_LOG)) {
235 response.setContentType(TEXT_UTF8);
236 streamJobLog(request, response);
237 }
238 else if (show.equals(RestConstants.JOB_SHOW_DEFINITION)) {
239 stopCron();
240 response.setContentType(XML_UTF8);
241 String wfDefinition = getJobDefinition(request, response);
242 startCron();
243 response.setStatus(HttpServletResponse.SC_OK);
244 response.getWriter().write(wfDefinition);
245 }
246 else {
247 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
248 RestConstants.JOB_SHOW_PARAM, show);
249 }
250 }
251
252 /**
253 * abstract method to start a job, either workflow or coordinator
254 *
255 * @param request
256 * @param response
257 * @throws XServletException
258 * @throws IOException TODO
259 */
260 abstract void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
261 IOException;
262
263 /**
264 * abstract method to resume a job, either workflow or coordinator
265 *
266 * @param request
267 * @param response
268 * @throws XServletException
269 * @throws IOException TODO
270 */
271 abstract void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
272 IOException;
273
274 /**
275 * abstract method to suspend a job, either workflow or coordinator
276 *
277 * @param request
278 * @param response
279 * @throws XServletException
280 * @throws IOException TODO
281 */
282 abstract void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
283 IOException;
284
285 /**
286 * abstract method to kill a job, either workflow or coordinator
287 *
288 * @param request
289 * @param response
290 * @throws XServletException
291 * @throws IOException TODO
292 */
293 abstract void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
294 IOException;
295
296 /**
297 * abstract method to change a coordinator job
298 *
299 * @param request
300 * @param response
301 * @throws XServletException
302 * @throws IOException TODO
303 */
304 abstract void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
305 IOException;
306
307 /**
308 * abstract method to re-run a job, either workflow or coordinator
309 *
310 * @param request
311 * @param response
312 * @param conf
313 * @throws XServletException
314 * @throws IOException TODO
315 */
316 abstract JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
317 throws XServletException, IOException;
318
319 /**
320 * abstract method to get a job, either workflow or coordinator, in JsonBean representation
321 *
322 * @param request
323 * @param response
324 * @return JsonBean representation of a job, either workflow or coordinator
325 * @throws XServletException
326 * @throws IOException TODO
327 * @throws BaseEngineException
328 */
329 abstract JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
330 IOException, BaseEngineException;
331
332 /**
333 * abstract method to get definition of a job, either workflow or coordinator
334 *
335 * @param request
336 * @param response
337 * @return job, either workflow or coordinator, definition in string format
338 * @throws XServletException
339 * @throws IOException TODO
340 */
341 abstract String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
342 throws XServletException, IOException;
343
344 /**
345 * abstract method to get and stream log information of job, either workflow or coordinator
346 *
347 * @param request
348 * @param response
349 * @throws XServletException
350 * @throws IOException
351 */
352 abstract void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
353 IOException;
354
355 }