001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.servlet; 019 020 import java.io.IOException; 021 import java.util.Arrays; 022 023 import javax.servlet.ServletException; 024 import javax.servlet.http.HttpServletRequest; 025 import javax.servlet.http.HttpServletResponse; 026 027 import org.apache.hadoop.conf.Configuration; 028 import org.apache.hadoop.fs.FileStatus; 029 import org.apache.hadoop.fs.FileSystem; 030 import org.apache.hadoop.fs.Path; 031 import org.apache.oozie.BaseEngineException; 032 import org.apache.oozie.ErrorCode; 033 import org.apache.oozie.client.OozieClient; 034 import org.apache.oozie.client.XOozieClient; 035 import org.apache.oozie.client.rest.JsonBean; 036 import org.apache.oozie.client.rest.RestConstants; 037 import org.apache.oozie.service.AuthorizationException; 038 import org.apache.oozie.service.AuthorizationService; 039 import org.apache.oozie.service.HadoopAccessorException; 040 import org.apache.oozie.service.HadoopAccessorService; 041 import org.apache.oozie.service.Services; 042 import org.apache.oozie.service.XLogService; 043 import org.apache.oozie.util.JobUtils; 044 import org.apache.oozie.util.XConfiguration; 045 import org.apache.oozie.util.XLog; 046 import org.json.simple.JSONObject; 047 048 public abstract class BaseJobServlet extends JsonRestServlet { 049 050 private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[1]; 051 052 static { 053 RESOURCES_INFO[0] = new ResourceInfo("*", Arrays.asList("PUT", "GET"), Arrays.asList(new ParameterInfo( 054 RestConstants.ACTION_PARAM, String.class, true, Arrays.asList("PUT")), new ParameterInfo( 055 RestConstants.JOB_SHOW_PARAM, String.class, false, Arrays.asList("GET")))); 056 } 057 058 public BaseJobServlet(String instrumentationName) { 059 super(instrumentationName, RESOURCES_INFO); 060 } 061 062 /** 063 * Perform various job related actions - start, suspend, resume, kill, etc. 064 */ 065 @Override 066 protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 067 String jobId = getResourceName(request); 068 request.setAttribute(AUDIT_PARAM, jobId); 069 request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM)); 070 try { 071 AuthorizationService auth = Services.get().get(AuthorizationService.class); 072 auth.authorizeForJob(getUser(request), jobId, true); 073 } 074 catch (AuthorizationException ex) { 075 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); 076 } 077 078 String action = request.getParameter(RestConstants.ACTION_PARAM); 079 if (action.equals(RestConstants.JOB_ACTION_START)) { 080 stopCron(); 081 startJob(request, response); 082 startCron(); 083 response.setStatus(HttpServletResponse.SC_OK); 084 } 085 else if (action.equals(RestConstants.JOB_ACTION_RESUME)) { 086 stopCron(); 087 resumeJob(request, response); 088 startCron(); 089 response.setStatus(HttpServletResponse.SC_OK); 090 } 091 else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) { 092 stopCron(); 093 suspendJob(request, response); 094 startCron(); 095 response.setStatus(HttpServletResponse.SC_OK); 096 } 097 else if (action.equals(RestConstants.JOB_ACTION_KILL)) { 098 stopCron(); 099 killJob(request, response); 100 startCron(); 101 response.setStatus(HttpServletResponse.SC_OK); 102 } 103 else if (action.equals(RestConstants.JOB_ACTION_CHANGE)) { 104 stopCron(); 105 changeJob(request, response); 106 startCron(); 107 response.setStatus(HttpServletResponse.SC_OK); 108 } 109 else if (action.equals(RestConstants.JOB_ACTION_RERUN)) { 110 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 111 Configuration conf = new XConfiguration(request.getInputStream()); 112 stopCron(); 113 checkAuthorizationForApp(getUser(request), conf); 114 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf); 115 reRunJob(request, response, conf); 116 startCron(); 117 response.setStatus(HttpServletResponse.SC_OK); 118 } 119 else if (action.equals(RestConstants.JOB_COORD_ACTION_RERUN)) { 120 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 121 stopCron(); 122 JSONObject json = reRunJob(request, response, null); 123 startCron(); 124 if (json != null) { 125 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 126 } 127 else { 128 response.setStatus(HttpServletResponse.SC_OK); 129 } 130 } 131 else if (action.equals(RestConstants.JOB_BUNDLE_ACTION_RERUN)) { 132 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 133 stopCron(); 134 JSONObject json = reRunJob(request, response, null); 135 startCron(); 136 if (json != null) { 137 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 138 } 139 else { 140 response.setStatus(HttpServletResponse.SC_OK); 141 } 142 } 143 else { 144 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 145 RestConstants.ACTION_PARAM, action); 146 } 147 } 148 149 /** 150 * Validate the configuration user/group. <p/> 151 * 152 * @param requestUser user in request. 153 * @param conf configuration. 154 * @throws XServletException thrown if the configuration does not have a property {@link 155 * org.apache.oozie.client.OozieClient#USER_NAME}. 156 */ 157 static void checkAuthorizationForApp(String requestUser, Configuration conf) throws XServletException { 158 String user = conf.get(OozieClient.USER_NAME); 159 String group = conf.get(OozieClient.GROUP_NAME); 160 try { 161 if (user == null) { 162 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0401, OozieClient.USER_NAME); 163 } 164 if (!requestUser.equals(UNDEF) && !user.equals(requestUser)) { 165 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0400, requestUser, user); 166 } 167 AuthorizationService auth = Services.get().get(AuthorizationService.class); 168 if (group == null) { 169 group = auth.getDefaultGroup(user); 170 conf.set(OozieClient.GROUP_NAME, group); 171 } 172 else { 173 auth.authorizeForGroup(user, group); 174 } 175 XLog.Info.get().setParameter(XLogService.GROUP, group); 176 String wfPath = conf.get(OozieClient.APP_PATH); 177 String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH); 178 String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH); 179 180 if (wfPath == null && coordPath == null && bundlePath == null) { 181 String libPath = conf.get(XOozieClient.LIBPATH); 182 conf.set(OozieClient.APP_PATH, libPath); 183 wfPath = libPath; 184 } 185 ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath); 186 187 if (wfPath != null) { 188 auth.authorizeForApp(user, group, wfPath, "workflow.xml", conf); 189 } 190 else if (coordPath != null){ 191 auth.authorizeForApp(user, group, coordPath, "coordinator.xml", conf); 192 } 193 else if (bundlePath != null){ 194 auth.authorizeForApp(user, group, bundlePath, "bundle.xml", conf); 195 } 196 } 197 catch (AuthorizationException ex) { 198 XLog.getLog(BaseJobServlet.class).info("AuthorizationException ", ex); 199 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); 200 } 201 } 202 203 /** 204 * Return information about jobs. 205 */ 206 @Override 207 public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 208 String jobId = getResourceName(request); 209 String show = request.getParameter(RestConstants.JOB_SHOW_PARAM); 210 211 try { 212 AuthorizationService auth = Services.get().get(AuthorizationService.class); 213 auth.authorizeForJob(getUser(request), jobId, false); 214 } 215 catch (AuthorizationException ex) { 216 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); 217 } 218 219 if (show == null || show.equals(RestConstants.JOB_SHOW_INFO)) { 220 stopCron(); 221 JsonBean job = null; 222 try { 223 job = getJob(request, response); 224 } 225 catch (BaseEngineException e) { 226 // TODO Auto-generated catch block 227 // e.printStackTrace(); 228 229 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); 230 } 231 startCron(); 232 sendJsonResponse(response, HttpServletResponse.SC_OK, job); 233 } 234 else if (show.equals(RestConstants.JOB_SHOW_LOG)) { 235 response.setContentType(TEXT_UTF8); 236 streamJobLog(request, response); 237 } 238 else if (show.equals(RestConstants.JOB_SHOW_DEFINITION)) { 239 stopCron(); 240 response.setContentType(XML_UTF8); 241 String wfDefinition = getJobDefinition(request, response); 242 startCron(); 243 response.setStatus(HttpServletResponse.SC_OK); 244 response.getWriter().write(wfDefinition); 245 } 246 else { 247 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 248 RestConstants.JOB_SHOW_PARAM, show); 249 } 250 } 251 252 /** 253 * abstract method to start a job, either workflow or coordinator 254 * 255 * @param request 256 * @param response 257 * @throws XServletException 258 * @throws IOException TODO 259 */ 260 abstract void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 261 IOException; 262 263 /** 264 * abstract method to resume a job, either workflow or coordinator 265 * 266 * @param request 267 * @param response 268 * @throws XServletException 269 * @throws IOException TODO 270 */ 271 abstract void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 272 IOException; 273 274 /** 275 * abstract method to suspend a job, either workflow or coordinator 276 * 277 * @param request 278 * @param response 279 * @throws XServletException 280 * @throws IOException TODO 281 */ 282 abstract void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 283 IOException; 284 285 /** 286 * abstract method to kill a job, either workflow or coordinator 287 * 288 * @param request 289 * @param response 290 * @throws XServletException 291 * @throws IOException TODO 292 */ 293 abstract void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 294 IOException; 295 296 /** 297 * abstract method to change a coordinator job 298 * 299 * @param request 300 * @param response 301 * @throws XServletException 302 * @throws IOException TODO 303 */ 304 abstract void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 305 IOException; 306 307 /** 308 * abstract method to re-run a job, either workflow or coordinator 309 * 310 * @param request 311 * @param response 312 * @param conf 313 * @throws XServletException 314 * @throws IOException TODO 315 */ 316 abstract JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 317 throws XServletException, IOException; 318 319 /** 320 * abstract method to get a job, either workflow or coordinator, in JsonBean representation 321 * 322 * @param request 323 * @param response 324 * @return JsonBean representation of a job, either workflow or coordinator 325 * @throws XServletException 326 * @throws IOException TODO 327 * @throws BaseEngineException 328 */ 329 abstract JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 330 IOException, BaseEngineException; 331 332 /** 333 * abstract method to get definition of a job, either workflow or coordinator 334 * 335 * @param request 336 * @param response 337 * @return job, either workflow or coordinator, definition in string format 338 * @throws XServletException 339 * @throws IOException TODO 340 */ 341 abstract String getJobDefinition(HttpServletRequest request, HttpServletResponse response) 342 throws XServletException, IOException; 343 344 /** 345 * abstract method to get and stream log information of job, either workflow or coordinator 346 * 347 * @param request 348 * @param response 349 * @throws XServletException 350 * @throws IOException 351 */ 352 abstract void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 353 IOException; 354 355 }