001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.servlet; 020 021import java.io.IOException; 022import java.util.Arrays; 023 024import javax.servlet.ServletException; 025import javax.servlet.http.HttpServletRequest; 026import javax.servlet.http.HttpServletResponse; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.oozie.ErrorCode; 030import org.apache.oozie.client.OozieClient; 031import org.apache.oozie.client.rest.RestConstants; 032import org.apache.oozie.service.Services; 033import org.apache.oozie.service.AuthorizationException; 034import org.apache.oozie.service.AuthorizationService; 035import org.apache.oozie.util.JobUtils; 036import org.apache.oozie.util.JobsFilterUtils; 037import org.apache.oozie.util.XConfiguration; 038import org.json.simple.JSONObject; 039 040public abstract class BaseJobsServlet extends JsonRestServlet { 041 042 private static final JsonRestServlet.ResourceInfo RESOURCES_INFO[] = new JsonRestServlet.ResourceInfo[1]; 043 044 static { 045 RESOURCES_INFO[0] = new JsonRestServlet.ResourceInfo("", Arrays.asList( 046 "POST", "GET", "PUT"), Arrays.asList( 047 new JsonRestServlet.ParameterInfo(RestConstants.ACTION_PARAM, 048 String.class, false, Arrays.asList("POST", "PUT")), 049 new JsonRestServlet.ParameterInfo( 050 RestConstants.JOBS_FILTER_PARAM, String.class, false, 051 Arrays.asList("GET", "PUT")), 052 new JsonRestServlet.ParameterInfo(RestConstants.JOBTYPE_PARAM, 053 String.class, false, Arrays.asList("GET", "POST", "PUT")), 054 new JsonRestServlet.ParameterInfo(RestConstants.OFFSET_PARAM, 055 String.class, false, Arrays.asList("GET", "PUT")), 056 new JsonRestServlet.ParameterInfo(RestConstants.LEN_PARAM, 057 String.class, false, Arrays.asList("GET", "PUT")), 058 new JsonRestServlet.ParameterInfo(RestConstants.JOBS_BULK_PARAM, 059 String.class, false, Arrays.asList("GET", "PUT")), 060 new JsonRestServlet.ParameterInfo( 061 RestConstants.JOBS_EXTERNAL_ID_PARAM, String.class, 062 false, Arrays.asList("GET")))); 063 } 064 065 public BaseJobsServlet(String instrumentationName) { 066 super(instrumentationName, RESOURCES_INFO); 067 } 068 069 /** 070 * Create a job. 071 */ 072 @Override 073 @SuppressWarnings("unchecked") 074 protected void doPost(HttpServletRequest request, 075 HttpServletResponse response) throws ServletException, IOException { 076 /* 077 * Enumeration p = request.getAttributeNames(); 078 * for(;p.hasMoreElements();){ String key = (String)p.nextElement(); 079 * XLog.getLog(getClass()).warn(" key "+ key + " val "+ (String) 080 * request.getAttribute(key)); } 081 */ 082 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 083 084 String action = request.getParameter(RestConstants.ACTION_PARAM); 085 request.setAttribute(AUDIT_OPERATION, 086 (action != null) ? action : RestConstants.JOB_ACTION_SUBMIT); 087 088 XConfiguration conf = new XConfiguration(request.getInputStream()); 089 090 stopCron(); 091 092 conf = conf.trim(); 093 conf = conf.resolve(); 094 095 String requestUser = getUser(request); 096 if (!requestUser.equals(UNDEF)) { 097 conf.set(OozieClient.USER_NAME, requestUser); 098 } 099 BaseJobServlet.checkAuthorizationForApp(conf); 100 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf); 101 102 JSONObject json = submitJob(request, conf); 103 startCron(); 104 sendJsonResponse(response, HttpServletResponse.SC_CREATED, json); 105 } 106 107 /** 108 * Return information about jobs. 109 */ 110 @Override 111 public void doGet(HttpServletRequest request, HttpServletResponse response) 112 throws ServletException, IOException { 113 String externalId = request 114 .getParameter(RestConstants.JOBS_EXTERNAL_ID_PARAM); 115 if (externalId != null) { 116 stopCron(); 117 JSONObject json = getJobIdForExternalId(request, externalId); 118 startCron(); 119 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 120 } 121 else { 122 stopCron(); 123 JSONObject json = getJobs(request); 124 startCron(); 125 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 126 } 127 } 128 129 /** 130 * Perform various job related actions - suspend, resume, kill, etc. 131 */ 132 @Override 133 protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 134 request.setAttribute(AUDIT_PARAM, request.getParameter(RestConstants.JOBS_FILTER_PARAM)); 135 request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM)); 136 try { 137 AuthorizationService auth = Services.get().get(AuthorizationService.class); 138 String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); 139 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 140 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 141 String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM); 142 143 if (filter == null) { 144 throw new IllegalArgumentException("filter params must be specified for bulk write API"); 145 } 146 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 147 start = (start < 1) ? 1 : start; 148 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; 149 len = (len < 1) ? 50 : len; 150 auth.authorizeForJobs(getUser(request), JobsFilterUtils.parseFilter(filter), jobType, start, len, true); 151 } 152 catch (AuthorizationException ex) { 153 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); 154 } 155 156 String action = request.getParameter(RestConstants.ACTION_PARAM); 157 JSONObject json = null; 158 if (action.equals(RestConstants.JOB_ACTION_KILL)) { 159 stopCron(); 160 json = killJobs(request, response); 161 startCron(); 162 } 163 else if (action.equals(RestConstants.JOB_ACTION_RESUME)) { 164 stopCron(); 165 json = resumeJobs(request, response); 166 startCron(); 167 } 168 else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) { 169 stopCron(); 170 json = suspendJobs(request, response); 171 startCron(); 172 } 173 else { 174 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 175 RestConstants.ACTION_PARAM, action); 176 } 177 response.setStatus(HttpServletResponse.SC_OK); 178 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 179 } 180 181 /** 182 * abstract method to kill jobs based ona filter param. The jobs could be workflow, coordinator or bundle jobs 183 * 184 * @param request 185 * @param response 186 * @return JSONObject of all jobs being killed 187 * @throws XServletException 188 * @throws IOException 189 */ 190 abstract JSONObject killJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException, 191 IOException; 192 193 /** 194 * abstract method to suspend jobs based ona filter param. The jobs could be workflow, coordinator or bundle jobs 195 * 196 * @param request 197 * @param response 198 * @return JSONObject of all jobs being suspended 199 * @throws XServletException 200 * @throws IOException 201 */ 202 abstract JSONObject suspendJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException, 203 IOException; 204 205 /** 206 * abstract method to resume jobs based ona filter param. The jobs could be workflow, coordinator or bundle jobs 207 * 208 * @param request 209 * @param response 210 * @return JSONObject of all jobs being resumed 211 * @throws XServletException 212 * @throws IOException 213 */ 214 abstract JSONObject resumeJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException, 215 IOException; 216 /** 217 * abstract method to submit a job, either workflow or coordinator in the case of workflow job, there is an optional 218 * flag in request to indicate if want this job to be started immediately or not 219 * 220 * @param request 221 * @param conf 222 * @return JSONObject of job id 223 * @throws XServletException 224 * @throws IOException 225 */ 226 abstract JSONObject submitJob(HttpServletRequest request, Configuration conf) 227 throws XServletException, IOException; 228 229 /** 230 * abstract method to get a job from external ID 231 * 232 * @param request 233 * @param externalId 234 * @return JSONObject for the requested job 235 * @throws XServletException 236 * @throws IOException 237 */ 238 abstract JSONObject getJobIdForExternalId(HttpServletRequest request, 239 String externalId) throws XServletException, IOException; 240 241 /** 242 * abstract method to get a list of workflow jobs 243 * 244 * @param request 245 * @return JSONObject of the requested jobs 246 * @throws XServletException 247 * @throws IOException 248 */ 249 abstract JSONObject getJobs(HttpServletRequest request) 250 throws XServletException, IOException; 251 252}