001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.servlet; 019 020 import java.io.IOException; 021 import java.util.Arrays; 022 023 import javax.servlet.ServletException; 024 import javax.servlet.http.HttpServletRequest; 025 import javax.servlet.http.HttpServletResponse; 026 027 import org.apache.hadoop.conf.Configuration; 028 import org.apache.oozie.BaseEngineException; 029 import org.apache.oozie.ErrorCode; 030 import org.apache.oozie.client.OozieClient; 031 import org.apache.oozie.client.XOozieClient; 032 import org.apache.oozie.client.rest.JsonBean; 033 import org.apache.oozie.client.rest.RestConstants; 034 import org.apache.oozie.service.AuthorizationException; 035 import org.apache.oozie.service.AuthorizationService; 036 import org.apache.oozie.service.Services; 037 import org.apache.oozie.service.XLogService; 038 import org.apache.oozie.util.ConfigUtils; 039 import org.apache.oozie.util.JobUtils; 040 import org.apache.oozie.util.XConfiguration; 041 import org.apache.oozie.util.XLog; 042 import org.json.simple.JSONObject; 043 044 public abstract class BaseJobServlet extends JsonRestServlet { 045 046 private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[1]; 047 048 static { 049 RESOURCES_INFO[0] = new ResourceInfo("*", Arrays.asList("PUT", "GET"), Arrays.asList(new ParameterInfo( 050 RestConstants.ACTION_PARAM, String.class, true, Arrays.asList("PUT")), new ParameterInfo( 051 RestConstants.JOB_SHOW_PARAM, String.class, false, Arrays.asList("GET")))); 052 } 053 054 public BaseJobServlet(String instrumentationName) { 055 super(instrumentationName, RESOURCES_INFO); 056 } 057 058 /** 059 * Perform various job related actions - start, suspend, resume, kill, etc. 060 */ 061 @Override 062 protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 063 String jobId = getResourceName(request); 064 request.setAttribute(AUDIT_PARAM, jobId); 065 request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM)); 066 try { 067 AuthorizationService auth = Services.get().get(AuthorizationService.class); 068 auth.authorizeForJob(getUser(request), jobId, true); 069 } 070 catch (AuthorizationException ex) { 071 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); 072 } 073 074 String action = request.getParameter(RestConstants.ACTION_PARAM); 075 if (action.equals(RestConstants.JOB_ACTION_START)) { 076 stopCron(); 077 startJob(request, response); 078 startCron(); 079 response.setStatus(HttpServletResponse.SC_OK); 080 } 081 else if (action.equals(RestConstants.JOB_ACTION_RESUME)) { 082 stopCron(); 083 resumeJob(request, response); 084 startCron(); 085 response.setStatus(HttpServletResponse.SC_OK); 086 } 087 else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) { 088 stopCron(); 089 suspendJob(request, response); 090 startCron(); 091 response.setStatus(HttpServletResponse.SC_OK); 092 } 093 else if (action.equals(RestConstants.JOB_ACTION_KILL)) { 094 stopCron(); 095 killJob(request, response); 096 startCron(); 097 response.setStatus(HttpServletResponse.SC_OK); 098 } 099 else if (action.equals(RestConstants.JOB_ACTION_CHANGE)) { 100 stopCron(); 101 changeJob(request, response); 102 startCron(); 103 response.setStatus(HttpServletResponse.SC_OK); 104 } 105 else if (action.equals(RestConstants.JOB_ACTION_RERUN)) { 106 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 107 Configuration conf = new XConfiguration(request.getInputStream()); 108 stopCron(); 109 String requestUser = getUser(request); 110 if (!requestUser.equals(UNDEF)) { 111 conf.set(OozieClient.USER_NAME, requestUser); 112 } 113 BaseJobServlet.checkAuthorizationForApp(conf); 114 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf); 115 reRunJob(request, response, conf); 116 startCron(); 117 response.setStatus(HttpServletResponse.SC_OK); 118 } 119 else if (action.equals(RestConstants.JOB_COORD_ACTION_RERUN)) { 120 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 121 stopCron(); 122 JSONObject json = reRunJob(request, response, null); 123 startCron(); 124 if (json != null) { 125 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 126 } 127 else { 128 response.setStatus(HttpServletResponse.SC_OK); 129 } 130 } 131 else if (action.equals(RestConstants.JOB_BUNDLE_ACTION_RERUN)) { 132 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 133 stopCron(); 134 JSONObject json = reRunJob(request, response, null); 135 startCron(); 136 if (json != null) { 137 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 138 } 139 else { 140 response.setStatus(HttpServletResponse.SC_OK); 141 } 142 } 143 else { 144 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 145 RestConstants.ACTION_PARAM, action); 146 } 147 } 148 149 /** 150 * Validate the configuration user/group. <p/> 151 * 152 * @param conf configuration. 153 * @throws XServletException thrown if the configuration does not have a property {@link 154 * org.apache.oozie.client.OozieClient#USER_NAME}. 155 */ 156 static void checkAuthorizationForApp(Configuration conf) throws XServletException { 157 String user = conf.get(OozieClient.USER_NAME); 158 String acl = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.GROUP_NAME, OozieClient.JOB_ACL, null); 159 try { 160 if (user == null) { 161 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0401, OozieClient.USER_NAME); 162 } 163 AuthorizationService auth = Services.get().get(AuthorizationService.class); 164 165 if (acl != null){ 166 conf.set(OozieClient.GROUP_NAME, acl); 167 } 168 else if (acl == null && auth.useDefaultGroupAsAcl()) { 169 acl = auth.getDefaultGroup(user); 170 conf.set(OozieClient.GROUP_NAME, acl); 171 } 172 XLog.Info.get().setParameter(XLogService.GROUP, acl); 173 String wfPath = conf.get(OozieClient.APP_PATH); 174 String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH); 175 String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH); 176 177 if (wfPath == null && coordPath == null && bundlePath == null) { 178 String[] libPaths = conf.getStrings(XOozieClient.LIBPATH); 179 if (libPaths != null && libPaths.length > 0 && libPaths[0].trim().length() > 0) { 180 conf.set(OozieClient.APP_PATH, libPaths[0].trim()); 181 wfPath = libPaths[0].trim(); 182 } 183 else { 184 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0405); 185 } 186 } 187 ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath); 188 189 if (wfPath != null) { 190 auth.authorizeForApp(user, acl, wfPath, "workflow.xml", conf); 191 } 192 else if (coordPath != null){ 193 auth.authorizeForApp(user, acl, coordPath, "coordinator.xml", conf); 194 } 195 else if (bundlePath != null){ 196 auth.authorizeForApp(user, acl, bundlePath, "bundle.xml", conf); 197 } 198 } 199 catch (AuthorizationException ex) { 200 XLog.getLog(BaseJobServlet.class).info("AuthorizationException ", ex); 201 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); 202 } 203 } 204 205 /** 206 * Return information about jobs. 207 */ 208 @Override 209 public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 210 String jobId = getResourceName(request); 211 String show = request.getParameter(RestConstants.JOB_SHOW_PARAM); 212 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 213 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 214 215 try { 216 AuthorizationService auth = Services.get().get(AuthorizationService.class); 217 auth.authorizeForJob(getUser(request), jobId, false); 218 } 219 catch (AuthorizationException ex) { 220 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); 221 } 222 223 if (show == null || show.equals(RestConstants.JOB_SHOW_INFO)) { 224 stopCron(); 225 JsonBean job = null; 226 try { 227 job = getJob(request, response); 228 } 229 catch (BaseEngineException e) { 230 // TODO Auto-generated catch block 231 // e.printStackTrace(); 232 233 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); 234 } 235 startCron(); 236 sendJsonResponse(response, HttpServletResponse.SC_OK, job, timeZoneId); 237 } 238 else if (show.equals(RestConstants.JOB_SHOW_LOG)) { 239 response.setContentType(TEXT_UTF8); 240 streamJobLog(request, response); 241 } 242 else if (show.equals(RestConstants.JOB_SHOW_DEFINITION)) { 243 stopCron(); 244 response.setContentType(XML_UTF8); 245 String wfDefinition = getJobDefinition(request, response); 246 startCron(); 247 response.setStatus(HttpServletResponse.SC_OK); 248 response.getWriter().write(wfDefinition); 249 } 250 else if (show.equals(RestConstants.JOB_SHOW_GRAPH)) { 251 stopCron(); 252 streamJobGraph(request, response); 253 startCron(); // -- should happen before you stream anything in response? 254 } 255 else { 256 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 257 RestConstants.JOB_SHOW_PARAM, show); 258 } 259 } 260 261 /** 262 * abstract method to start a job, either workflow or coordinator 263 * 264 * @param request 265 * @param response 266 * @throws XServletException 267 * @throws IOException TODO 268 */ 269 abstract void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 270 IOException; 271 272 /** 273 * abstract method to resume a job, either workflow or coordinator 274 * 275 * @param request 276 * @param response 277 * @throws XServletException 278 * @throws IOException TODO 279 */ 280 abstract void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 281 IOException; 282 283 /** 284 * abstract method to suspend a job, either workflow or coordinator 285 * 286 * @param request 287 * @param response 288 * @throws XServletException 289 * @throws IOException TODO 290 */ 291 abstract void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 292 IOException; 293 294 /** 295 * abstract method to kill a job, either workflow or coordinator 296 * 297 * @param request 298 * @param response 299 * @throws XServletException 300 * @throws IOException TODO 301 */ 302 abstract void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 303 IOException; 304 305 /** 306 * abstract method to change a coordinator job 307 * 308 * @param request 309 * @param response 310 * @throws XServletException 311 * @throws IOException TODO 312 */ 313 abstract void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 314 IOException; 315 316 /** 317 * abstract method to re-run a job, either workflow or coordinator 318 * 319 * @param request 320 * @param response 321 * @param conf 322 * @throws XServletException 323 * @throws IOException TODO 324 */ 325 abstract JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 326 throws XServletException, IOException; 327 328 /** 329 * abstract method to get a job, either workflow or coordinator, in JsonBean representation 330 * 331 * @param request 332 * @param response 333 * @return JsonBean representation of a job, either workflow or coordinator 334 * @throws XServletException 335 * @throws IOException TODO 336 * @throws BaseEngineException 337 */ 338 abstract JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 339 IOException, BaseEngineException; 340 341 /** 342 * abstract method to get definition of a job, either workflow or coordinator 343 * 344 * @param request 345 * @param response 346 * @return job, either workflow or coordinator, definition in string format 347 * @throws XServletException 348 * @throws IOException TODO 349 */ 350 abstract String getJobDefinition(HttpServletRequest request, HttpServletResponse response) 351 throws XServletException, IOException; 352 353 /** 354 * abstract method to get and stream log information of job, either workflow or coordinator 355 * 356 * @param request 357 * @param response 358 * @throws XServletException 359 * @throws IOException 360 */ 361 abstract void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 362 IOException; 363 364 /** 365 * abstract method to create and stream image for runtime DAG -- workflow only 366 * 367 * @param request 368 * @param response 369 * @throws XServletException 370 * @throws IOException 371 */ 372 abstract void streamJobGraph(HttpServletRequest request, HttpServletResponse response) 373 throws XServletException, IOException; 374 }