This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.servlet;
019
020 import java.io.IOException;
021 import java.util.Arrays;
022 import java.util.List;
023
024 import javax.servlet.ServletException;
025 import javax.servlet.http.HttpServletRequest;
026 import javax.servlet.http.HttpServletResponse;
027
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.oozie.BaseEngineException;
030 import org.apache.oozie.CoordinatorActionBean;
031 import org.apache.oozie.CoordinatorActionInfo;
032 import org.apache.oozie.CoordinatorEngine;
033 import org.apache.oozie.DagEngine;
034 import org.apache.oozie.DagEngineException;
035 import org.apache.oozie.ErrorCode;
036 import org.apache.oozie.client.OozieClient;
037 import org.apache.oozie.client.rest.JsonTags;
038 import org.apache.oozie.client.rest.JsonWorkflowJob;
039 import org.apache.oozie.client.rest.RestConstants;
040 import org.apache.oozie.service.AuthorizationException;
041 import org.apache.oozie.service.AuthorizationService;
042 import org.apache.oozie.service.CoordinatorEngineService;
043 import org.apache.oozie.service.DagEngineService;
044 import org.apache.oozie.service.Services;
045 import org.apache.oozie.service.XLogService;
046 import org.apache.oozie.util.XConfiguration;
047 import org.apache.oozie.util.XLog;
048 import org.json.simple.JSONObject;
049
050 public class JobServlet extends JsonRestServlet {
051 private static final String INSTRUMENTATION_NAME = "job";
052
053 private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[1];
054
055 static {
056 RESOURCES_INFO[0] = new ResourceInfo("*", Arrays.asList("PUT", "GET"), Arrays.asList(new ParameterInfo(
057 RestConstants.ACTION_PARAM, String.class, true, Arrays.asList("PUT")), new ParameterInfo(
058 RestConstants.JOB_SHOW_PARAM, String.class, false, Arrays.asList("GET"))));
059 }
060
061 public JobServlet() {
062 super(INSTRUMENTATION_NAME, RESOURCES_INFO);
063 }
064
065 /**
066 * Perform various job related actions - start, suspend, resume, kill, etc.
067 */
068 @SuppressWarnings("unchecked")
069 @Override
070 protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
071 String jobId = getResourceName(request);
072 request.setAttribute(AUDIT_PARAM, jobId);
073 request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM));
074 try {
075 AuthorizationService auth = Services.get().get(AuthorizationService.class);
076 auth.authorizeForJob(getUser(request), jobId, true);
077 }
078 catch (AuthorizationException ex) {
079 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
080 }
081
082 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
083 getAuthToken(request));
084 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
085 getUser(request), getAuthToken(request));
086 try {
087 String action = request.getParameter(RestConstants.ACTION_PARAM);
088 if (action.equals(RestConstants.JOB_ACTION_START)) {
089 stopCron();
090 dagEngine.start(jobId);
091 startCron();
092 response.setStatus(HttpServletResponse.SC_OK);
093 }
094 else if (action.equals(RestConstants.JOB_ACTION_RESUME)) {
095 stopCron();
096 dagEngine.resume(jobId);
097 startCron();
098 response.setStatus(HttpServletResponse.SC_OK);
099 }
100 else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) {
101 stopCron();
102 dagEngine.suspend(jobId);
103 startCron();
104 response.setStatus(HttpServletResponse.SC_OK);
105 }
106 else if (action.equals(RestConstants.JOB_ACTION_KILL)) {
107 stopCron();
108 dagEngine.kill(jobId);
109 startCron();
110 response.setStatus(HttpServletResponse.SC_OK);
111 }
112 else if (action.equals(RestConstants.JOB_ACTION_RERUN)) {
113 validateContentType(request, RestConstants.XML_CONTENT_TYPE);
114 XConfiguration conf = new XConfiguration(request.getInputStream());
115 stopCron();
116 conf = conf.trim();
117 conf = conf.resolve();
118 JobsServlet.validateJobConfiguration(conf);
119 checkAuthorizationForApp(getUser(request), conf);
120 dagEngine.reRun(jobId, conf);
121 startCron();
122 response.setStatus(HttpServletResponse.SC_OK);
123 }
124 else if (action.equals(RestConstants.JOB_COORD_ACTION_RERUN)) {
125 validateContentType(request, RestConstants.XML_CONTENT_TYPE);
126 stopCron();
127 String rerunType = request.getParameter(RestConstants.JOB_COORD_RERUN_TYPE_PARAM);
128 String scope = request.getParameter(RestConstants.JOB_COORD_RERUN_SCOPE_PARAM);
129 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
130 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
131 CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh),
132 Boolean.valueOf(noCleanup));
133 List<CoordinatorActionBean> actions = coordInfo.getCoordActions();
134 JSONObject json = new JSONObject();
135 json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(actions));
136 startCron();
137 sendJsonResponse(response, HttpServletResponse.SC_OK, json);
138 }
139 else {
140 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
141 RestConstants.ACTION_PARAM, action);
142 }
143 }
144 catch (DagEngineException ex) {
145 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
146 }
147 catch (BaseEngineException ex) {
148 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
149 }
150 }
151
152 /**
153 * Validate the configuration user/group. <p/>
154 *
155 * @param requestUser user in request.
156 * @param conf configuration.
157 * @throws XServletException thrown if the configuration does not have a property {@link
158 * org.apache.oozie.client.OozieClient#USER_NAME}.
159 */
160 static void checkAuthorizationForApp(String requestUser, Configuration conf) throws XServletException {
161 String user = conf.get(OozieClient.USER_NAME);
162 String group = conf.get(OozieClient.GROUP_NAME);
163 try {
164 if (user == null) {
165 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0401, OozieClient.USER_NAME);
166 }
167 if (!requestUser.equals(UNDEF) && !user.equals(requestUser)) {
168 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0400, requestUser, user);
169 }
170 AuthorizationService auth = Services.get().get(AuthorizationService.class);
171 if (group == null) {
172 group = auth.getDefaultGroup(user);
173 conf.set(OozieClient.GROUP_NAME, group);
174 }
175 else {
176 auth.authorizeForGroup(user, group);
177 }
178 XLog.Info.get().setParameter(XLogService.GROUP, group);
179 auth.authorizeForApp(user, group, conf.get(OozieClient.APP_PATH), conf);
180 }
181 catch (AuthorizationException ex) {
182 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
183 }
184 }
185
186 /**
187 * Return information about jobs.
188 */
189 @Override
190 public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
191 String jobId = getResourceName(request);
192 String show = request.getParameter(RestConstants.JOB_SHOW_PARAM);
193
194 try {
195 AuthorizationService auth = Services.get().get(AuthorizationService.class);
196 auth.authorizeForJob(getUser(request), jobId, false);
197 }
198 catch (AuthorizationException ex) {
199 throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
200 }
201
202 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
203 getAuthToken(request));
204 try {
205 if (show == null || show.equals(RestConstants.JOB_SHOW_INFO)) {
206 stopCron();
207 JsonWorkflowJob job = (JsonWorkflowJob) dagEngine.getJob(jobId);
208 startCron();
209 sendJsonResponse(response, HttpServletResponse.SC_OK, job);
210 }
211 else {
212 if (show.equals(RestConstants.JOB_SHOW_LOG)) {
213 response.setContentType(TEXT_UTF8);
214 dagEngine.streamLog(jobId, response.getWriter());
215 }
216 else {
217 if (show.equals(RestConstants.JOB_SHOW_DEFINITION)) {
218 stopCron();
219 response.setContentType(XML_UTF8);
220 String wfDefinition = dagEngine.getDefinition(jobId);
221 startCron();
222 response.setStatus(HttpServletResponse.SC_OK);
223 response.getWriter().write(wfDefinition);
224 }
225 else {
226 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
227 RestConstants.JOB_SHOW_PARAM, show);
228 }
229 }
230 }
231 }
232 catch (DagEngineException ex) {
233 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
234 }
235 }
236
237 }