001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.servlet; 019 020 import java.io.IOException; 021 import java.util.Arrays; 022 023 import javax.servlet.ServletException; 024 import javax.servlet.http.HttpServletRequest; 025 import javax.servlet.http.HttpServletResponse; 026 027 import org.apache.hadoop.conf.Configuration; 028 import org.apache.oozie.BaseEngineException; 029 import org.apache.oozie.ErrorCode; 030 import org.apache.oozie.client.OozieClient; 031 import org.apache.oozie.client.XOozieClient; 032 import org.apache.oozie.client.rest.JsonBean; 033 import org.apache.oozie.client.rest.JsonTags; 034 import org.apache.oozie.client.rest.RestConstants; 035 import org.apache.oozie.service.AuthorizationException; 036 import org.apache.oozie.service.AuthorizationService; 037 import org.apache.oozie.service.Services; 038 import org.apache.oozie.service.XLogService; 039 import org.apache.oozie.util.ConfigUtils; 040 import org.apache.oozie.util.JobUtils; 041 import org.apache.oozie.util.XConfiguration; 042 import org.apache.oozie.util.XLog; 043 import org.json.simple.JSONObject; 044 045 public abstract class BaseJobServlet extends JsonRestServlet { 046 047 private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[1]; 048 049 static { 050 RESOURCES_INFO[0] = new ResourceInfo("*", Arrays.asList("PUT", "GET"), Arrays.asList(new ParameterInfo( 051 RestConstants.ACTION_PARAM, String.class, true, Arrays.asList("PUT")), new ParameterInfo( 052 RestConstants.JOB_SHOW_PARAM, String.class, false, Arrays.asList("GET")), new ParameterInfo( 053 RestConstants.ORDER_PARAM, String.class, false, Arrays.asList("GET")))); 054 } 055 056 public BaseJobServlet(String instrumentationName) { 057 super(instrumentationName, RESOURCES_INFO); 058 } 059 060 /** 061 * Perform various job related actions - start, suspend, resume, kill, etc. 062 */ 063 @Override 064 protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 065 String jobId = getResourceName(request); 066 request.setAttribute(AUDIT_PARAM, jobId); 067 request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM)); 068 try { 069 AuthorizationService auth = Services.get().get(AuthorizationService.class); 070 auth.authorizeForJob(getUser(request), jobId, true); 071 } 072 catch (AuthorizationException ex) { 073 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); 074 } 075 076 String action = request.getParameter(RestConstants.ACTION_PARAM); 077 if (action.equals(RestConstants.JOB_ACTION_START)) { 078 stopCron(); 079 startJob(request, response); 080 startCron(); 081 response.setStatus(HttpServletResponse.SC_OK); 082 } 083 else if (action.equals(RestConstants.JOB_ACTION_RESUME)) { 084 stopCron(); 085 resumeJob(request, response); 086 startCron(); 087 response.setStatus(HttpServletResponse.SC_OK); 088 } 089 else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) { 090 stopCron(); 091 suspendJob(request, response); 092 startCron(); 093 response.setStatus(HttpServletResponse.SC_OK); 094 } 095 else if (action.equals(RestConstants.JOB_ACTION_KILL)) { 096 stopCron(); 097 killJob(request, response); 098 startCron(); 099 response.setStatus(HttpServletResponse.SC_OK); 100 } 101 else if (action.equals(RestConstants.JOB_ACTION_CHANGE)) { 102 stopCron(); 103 changeJob(request, response); 104 startCron(); 105 response.setStatus(HttpServletResponse.SC_OK); 106 } 107 else if (action.equals(RestConstants.JOB_ACTION_RERUN)) { 108 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 109 Configuration conf = new XConfiguration(request.getInputStream()); 110 stopCron(); 111 String requestUser = getUser(request); 112 if (!requestUser.equals(UNDEF)) { 113 conf.set(OozieClient.USER_NAME, requestUser); 114 } 115 BaseJobServlet.checkAuthorizationForApp(conf); 116 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf); 117 reRunJob(request, response, conf); 118 startCron(); 119 response.setStatus(HttpServletResponse.SC_OK); 120 } 121 else if (action.equals(RestConstants.JOB_COORD_ACTION_RERUN)) { 122 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 123 stopCron(); 124 JSONObject json = reRunJob(request, response, null); 125 startCron(); 126 if (json != null) { 127 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 128 } 129 else { 130 response.setStatus(HttpServletResponse.SC_OK); 131 } 132 } 133 else if (action.equals(RestConstants.JOB_BUNDLE_ACTION_RERUN)) { 134 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 135 stopCron(); 136 JSONObject json = reRunJob(request, response, null); 137 startCron(); 138 if (json != null) { 139 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 140 } 141 else { 142 response.setStatus(HttpServletResponse.SC_OK); 143 } 144 } 145 else { 146 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 147 RestConstants.ACTION_PARAM, action); 148 } 149 } 150 151 /** 152 * Validate the configuration user/group. <p/> 153 * 154 * @param conf configuration. 155 * @throws XServletException thrown if the configuration does not have a property {@link 156 * org.apache.oozie.client.OozieClient#USER_NAME}. 157 */ 158 static void checkAuthorizationForApp(Configuration conf) throws XServletException { 159 String user = conf.get(OozieClient.USER_NAME); 160 String acl = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.GROUP_NAME, OozieClient.JOB_ACL, null); 161 try { 162 if (user == null) { 163 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0401, OozieClient.USER_NAME); 164 } 165 AuthorizationService auth = Services.get().get(AuthorizationService.class); 166 167 if (acl != null){ 168 conf.set(OozieClient.GROUP_NAME, acl); 169 } 170 else if (acl == null && auth.useDefaultGroupAsAcl()) { 171 acl = auth.getDefaultGroup(user); 172 conf.set(OozieClient.GROUP_NAME, acl); 173 } 174 XLog.Info.get().setParameter(XLogService.GROUP, acl); 175 String wfPath = conf.get(OozieClient.APP_PATH); 176 String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH); 177 String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH); 178 179 if (wfPath == null && coordPath == null && bundlePath == null) { 180 String[] libPaths = conf.getStrings(XOozieClient.LIBPATH); 181 if (libPaths != null && libPaths.length > 0 && libPaths[0].trim().length() > 0) { 182 conf.set(OozieClient.APP_PATH, libPaths[0].trim()); 183 wfPath = libPaths[0].trim(); 184 } 185 else { 186 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0405); 187 } 188 } 189 ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath); 190 191 if (wfPath != null) { 192 auth.authorizeForApp(user, acl, wfPath, "workflow.xml", conf); 193 } 194 else if (coordPath != null){ 195 auth.authorizeForApp(user, acl, coordPath, "coordinator.xml", conf); 196 } 197 else if (bundlePath != null){ 198 auth.authorizeForApp(user, acl, bundlePath, "bundle.xml", conf); 199 } 200 } 201 catch (AuthorizationException ex) { 202 XLog.getLog(BaseJobServlet.class).info("AuthorizationException ", ex); 203 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); 204 } 205 } 206 207 /** 208 * Return information about jobs. 209 */ 210 @Override 211 public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 212 String jobId = getResourceName(request); 213 String show = request.getParameter(RestConstants.JOB_SHOW_PARAM); 214 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 215 ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); 216 217 try { 218 AuthorizationService auth = Services.get().get(AuthorizationService.class); 219 auth.authorizeForJob(getUser(request), jobId, false); 220 } 221 catch (AuthorizationException ex) { 222 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); 223 } 224 225 if (show == null || show.equals(RestConstants.JOB_SHOW_INFO)) { 226 stopCron(); 227 JsonBean job = null; 228 try { 229 job = getJob(request, response); 230 } 231 catch (BaseEngineException e) { 232 // TODO Auto-generated catch block 233 // e.printStackTrace(); 234 235 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); 236 } 237 startCron(); 238 sendJsonResponse(response, HttpServletResponse.SC_OK, job, timeZoneId); 239 } 240 241 else if (show.equals(RestConstants.JOB_SHOW_JMS_TOPIC)) { 242 stopCron(); 243 String jmsTopicName = getJMSTopicName(request, response); 244 JSONObject json = new JSONObject(); 245 json.put(JsonTags.JMS_TOPIC_NAME, jmsTopicName); 246 startCron(); 247 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 248 } 249 250 else if (show.equals(RestConstants.JOB_SHOW_LOG)) { 251 response.setContentType(TEXT_UTF8); 252 streamJobLog(request, response); 253 } 254 else if (show.equals(RestConstants.JOB_SHOW_DEFINITION)) { 255 stopCron(); 256 response.setContentType(XML_UTF8); 257 String wfDefinition = getJobDefinition(request, response); 258 startCron(); 259 response.setStatus(HttpServletResponse.SC_OK); 260 response.getWriter().write(wfDefinition); 261 } 262 else if (show.equals(RestConstants.JOB_SHOW_GRAPH)) { 263 stopCron(); 264 streamJobGraph(request, response); 265 startCron(); // -- should happen before you stream anything in response? 266 } 267 else { 268 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 269 RestConstants.JOB_SHOW_PARAM, show); 270 } 271 } 272 273 /** 274 * abstract method to start a job, either workflow or coordinator 275 * 276 * @param request 277 * @param response 278 * @throws XServletException 279 * @throws IOException TODO 280 */ 281 abstract void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 282 IOException; 283 284 /** 285 * abstract method to resume a job, either workflow or coordinator 286 * 287 * @param request 288 * @param response 289 * @throws XServletException 290 * @throws IOException TODO 291 */ 292 abstract void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 293 IOException; 294 295 /** 296 * abstract method to suspend a job, either workflow or coordinator 297 * 298 * @param request 299 * @param response 300 * @throws XServletException 301 * @throws IOException TODO 302 */ 303 abstract void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 304 IOException; 305 306 /** 307 * abstract method to kill a job, either workflow or coordinator 308 * 309 * @param request 310 * @param response 311 * @throws XServletException 312 * @throws IOException TODO 313 */ 314 abstract void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 315 IOException; 316 317 /** 318 * abstract method to change a coordinator job 319 * 320 * @param request 321 * @param response 322 * @throws XServletException 323 * @throws IOException TODO 324 */ 325 abstract void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 326 IOException; 327 328 /** 329 * abstract method to re-run a job, either workflow or coordinator 330 * 331 * @param request 332 * @param response 333 * @param conf 334 * @throws XServletException 335 * @throws IOException TODO 336 */ 337 abstract JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 338 throws XServletException, IOException; 339 340 /** 341 * abstract method to get a job, either workflow or coordinator, in JsonBean representation 342 * 343 * @param request 344 * @param response 345 * @return JsonBean representation of a job, either workflow or coordinator 346 * @throws XServletException 347 * @throws IOException TODO 348 * @throws BaseEngineException 349 */ 350 abstract JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 351 IOException, BaseEngineException; 352 353 /** 354 * abstract method to get definition of a job, either workflow or coordinator 355 * 356 * @param request 357 * @param response 358 * @return job, either workflow or coordinator, definition in string format 359 * @throws XServletException 360 * @throws IOException TODO 361 */ 362 abstract String getJobDefinition(HttpServletRequest request, HttpServletResponse response) 363 throws XServletException, IOException; 364 365 /** 366 * abstract method to get and stream log information of job, either workflow or coordinator 367 * 368 * @param request 369 * @param response 370 * @throws XServletException 371 * @throws IOException 372 */ 373 abstract void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 374 IOException; 375 376 /** 377 * abstract method to create and stream image for runtime DAG -- workflow only 378 * 379 * @param request 380 * @param response 381 * @throws XServletException 382 * @throws IOException 383 */ 384 abstract void streamJobGraph(HttpServletRequest request, HttpServletResponse response) 385 throws XServletException, IOException; 386 387 /** 388 * abstract method to get JMS topic name for a job 389 * @param request 390 * @param response 391 * @throws XServletException 392 * @throws IOException 393 */ 394 abstract String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) 395 throws XServletException, IOException; 396 }