001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.servlet; 019 020 import java.io.IOException; 021 import java.util.Arrays; 022 import java.util.List; 023 024 import javax.servlet.ServletException; 025 import javax.servlet.http.HttpServletRequest; 026 import javax.servlet.http.HttpServletResponse; 027 028 import org.apache.hadoop.conf.Configuration; 029 import org.apache.oozie.BaseEngineException; 030 import org.apache.oozie.CoordinatorActionBean; 031 import org.apache.oozie.CoordinatorActionInfo; 032 import org.apache.oozie.CoordinatorEngine; 033 import org.apache.oozie.DagEngine; 034 import org.apache.oozie.DagEngineException; 035 import org.apache.oozie.ErrorCode; 036 import org.apache.oozie.client.OozieClient; 037 import org.apache.oozie.client.rest.JsonTags; 038 import org.apache.oozie.client.rest.JsonWorkflowJob; 039 import org.apache.oozie.client.rest.RestConstants; 040 import org.apache.oozie.service.AuthorizationException; 041 import org.apache.oozie.service.AuthorizationService; 042 import org.apache.oozie.service.CoordinatorEngineService; 043 import org.apache.oozie.service.DagEngineService; 044 import org.apache.oozie.service.Services; 045 import org.apache.oozie.service.XLogService; 046 import org.apache.oozie.util.XConfiguration; 047 import org.apache.oozie.util.XLog; 048 import org.json.simple.JSONObject; 049 050 public class JobServlet extends JsonRestServlet { 051 private static final String INSTRUMENTATION_NAME = "job"; 052 053 private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[1]; 054 055 static { 056 RESOURCES_INFO[0] = new ResourceInfo("*", Arrays.asList("PUT", "GET"), Arrays.asList(new ParameterInfo( 057 RestConstants.ACTION_PARAM, String.class, true, Arrays.asList("PUT")), new ParameterInfo( 058 RestConstants.JOB_SHOW_PARAM, String.class, false, Arrays.asList("GET")))); 059 } 060 061 public JobServlet() { 062 super(INSTRUMENTATION_NAME, RESOURCES_INFO); 063 } 064 065 /** 066 * Perform various job related actions - start, suspend, resume, kill, etc. 067 */ 068 @SuppressWarnings("unchecked") 069 @Override 070 protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 071 String jobId = getResourceName(request); 072 request.setAttribute(AUDIT_PARAM, jobId); 073 request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM)); 074 try { 075 AuthorizationService auth = Services.get().get(AuthorizationService.class); 076 auth.authorizeForJob(getUser(request), jobId, true); 077 } 078 catch (AuthorizationException ex) { 079 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); 080 } 081 082 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 083 getAuthToken(request)); 084 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 085 getUser(request), getAuthToken(request)); 086 try { 087 String action = request.getParameter(RestConstants.ACTION_PARAM); 088 if (action.equals(RestConstants.JOB_ACTION_START)) { 089 stopCron(); 090 dagEngine.start(jobId); 091 startCron(); 092 response.setStatus(HttpServletResponse.SC_OK); 093 } 094 else if (action.equals(RestConstants.JOB_ACTION_RESUME)) { 095 stopCron(); 096 dagEngine.resume(jobId); 097 startCron(); 098 response.setStatus(HttpServletResponse.SC_OK); 099 } 100 else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) { 101 stopCron(); 102 dagEngine.suspend(jobId); 103 startCron(); 104 response.setStatus(HttpServletResponse.SC_OK); 105 } 106 else if (action.equals(RestConstants.JOB_ACTION_KILL)) { 107 stopCron(); 108 dagEngine.kill(jobId); 109 startCron(); 110 response.setStatus(HttpServletResponse.SC_OK); 111 } 112 else if (action.equals(RestConstants.JOB_ACTION_RERUN)) { 113 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 114 XConfiguration conf = new XConfiguration(request.getInputStream()); 115 stopCron(); 116 conf = conf.trim(); 117 conf = conf.resolve(); 118 JobsServlet.validateJobConfiguration(conf); 119 checkAuthorizationForApp(getUser(request), conf); 120 dagEngine.reRun(jobId, conf); 121 startCron(); 122 response.setStatus(HttpServletResponse.SC_OK); 123 } 124 else if (action.equals(RestConstants.JOB_COORD_ACTION_RERUN)) { 125 validateContentType(request, RestConstants.XML_CONTENT_TYPE); 126 stopCron(); 127 String rerunType = request.getParameter(RestConstants.JOB_COORD_RERUN_TYPE_PARAM); 128 String scope = request.getParameter(RestConstants.JOB_COORD_RERUN_SCOPE_PARAM); 129 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM); 130 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM); 131 CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh), 132 Boolean.valueOf(noCleanup)); 133 List<CoordinatorActionBean> actions = coordInfo.getCoordActions(); 134 JSONObject json = new JSONObject(); 135 json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(actions)); 136 startCron(); 137 sendJsonResponse(response, HttpServletResponse.SC_OK, json); 138 } 139 else { 140 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 141 RestConstants.ACTION_PARAM, action); 142 } 143 } 144 catch (DagEngineException ex) { 145 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 146 } 147 catch (BaseEngineException ex) { 148 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 149 } 150 } 151 152 /** 153 * Validate the configuration user/group. <p/> 154 * 155 * @param requestUser user in request. 156 * @param conf configuration. 157 * @throws XServletException thrown if the configuration does not have a property {@link 158 * org.apache.oozie.client.OozieClient#USER_NAME}. 159 */ 160 static void checkAuthorizationForApp(String requestUser, Configuration conf) throws XServletException { 161 String user = conf.get(OozieClient.USER_NAME); 162 String group = conf.get(OozieClient.GROUP_NAME); 163 try { 164 if (user == null) { 165 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0401, OozieClient.USER_NAME); 166 } 167 if (!requestUser.equals(UNDEF) && !user.equals(requestUser)) { 168 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0400, requestUser, user); 169 } 170 AuthorizationService auth = Services.get().get(AuthorizationService.class); 171 if (group == null) { 172 group = auth.getDefaultGroup(user); 173 conf.set(OozieClient.GROUP_NAME, group); 174 } 175 else { 176 auth.authorizeForGroup(user, group); 177 } 178 XLog.Info.get().setParameter(XLogService.GROUP, group); 179 auth.authorizeForApp(user, group, conf.get(OozieClient.APP_PATH), conf); 180 } 181 catch (AuthorizationException ex) { 182 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); 183 } 184 } 185 186 /** 187 * Return information about jobs. 188 */ 189 @Override 190 public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 191 String jobId = getResourceName(request); 192 String show = request.getParameter(RestConstants.JOB_SHOW_PARAM); 193 194 try { 195 AuthorizationService auth = Services.get().get(AuthorizationService.class); 196 auth.authorizeForJob(getUser(request), jobId, false); 197 } 198 catch (AuthorizationException ex) { 199 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); 200 } 201 202 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 203 getAuthToken(request)); 204 try { 205 if (show == null || show.equals(RestConstants.JOB_SHOW_INFO)) { 206 stopCron(); 207 JsonWorkflowJob job = (JsonWorkflowJob) dagEngine.getJob(jobId); 208 startCron(); 209 sendJsonResponse(response, HttpServletResponse.SC_OK, job); 210 } 211 else { 212 if (show.equals(RestConstants.JOB_SHOW_LOG)) { 213 response.setContentType(TEXT_UTF8); 214 dagEngine.streamLog(jobId, response.getWriter()); 215 } 216 else { 217 if (show.equals(RestConstants.JOB_SHOW_DEFINITION)) { 218 stopCron(); 219 response.setContentType(XML_UTF8); 220 String wfDefinition = dagEngine.getDefinition(jobId); 221 startCron(); 222 response.setStatus(HttpServletResponse.SC_OK); 223 response.getWriter().write(wfDefinition); 224 } 225 else { 226 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, 227 RestConstants.JOB_SHOW_PARAM, show); 228 } 229 } 230 } 231 } 232 catch (DagEngineException ex) { 233 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 234 } 235 } 236 237 }