001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.servlet;
019    
020    import java.io.IOException;
021    import java.util.Arrays;
022    import java.util.List;
023    
024    import javax.servlet.ServletException;
025    import javax.servlet.http.HttpServletRequest;
026    import javax.servlet.http.HttpServletResponse;
027    
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.oozie.BaseEngineException;
030    import org.apache.oozie.CoordinatorActionBean;
031    import org.apache.oozie.CoordinatorActionInfo;
032    import org.apache.oozie.CoordinatorEngine;
033    import org.apache.oozie.DagEngine;
034    import org.apache.oozie.DagEngineException;
035    import org.apache.oozie.ErrorCode;
036    import org.apache.oozie.client.OozieClient;
037    import org.apache.oozie.client.rest.JsonTags;
038    import org.apache.oozie.client.rest.JsonWorkflowJob;
039    import org.apache.oozie.client.rest.RestConstants;
040    import org.apache.oozie.service.AuthorizationException;
041    import org.apache.oozie.service.AuthorizationService;
042    import org.apache.oozie.service.CoordinatorEngineService;
043    import org.apache.oozie.service.DagEngineService;
044    import org.apache.oozie.service.Services;
045    import org.apache.oozie.service.XLogService;
046    import org.apache.oozie.util.XConfiguration;
047    import org.apache.oozie.util.XLog;
048    import org.json.simple.JSONObject;
049    
050    public class JobServlet extends JsonRestServlet {
051        private static final String INSTRUMENTATION_NAME = "job";
052    
053        private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[1];
054    
055        static {
056            RESOURCES_INFO[0] = new ResourceInfo("*", Arrays.asList("PUT", "GET"), Arrays.asList(new ParameterInfo(
057                    RestConstants.ACTION_PARAM, String.class, true, Arrays.asList("PUT")), new ParameterInfo(
058                    RestConstants.JOB_SHOW_PARAM, String.class, false, Arrays.asList("GET"))));
059        }
060    
061        public JobServlet() {
062            super(INSTRUMENTATION_NAME, RESOURCES_INFO);
063        }
064    
065        /**
066         * Perform various job related actions - start, suspend, resume, kill, etc.
067         */
068        @SuppressWarnings("unchecked")
069        @Override
070        protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
071            String jobId = getResourceName(request);
072            request.setAttribute(AUDIT_PARAM, jobId);
073            request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM));
074            try {
075                AuthorizationService auth = Services.get().get(AuthorizationService.class);
076                auth.authorizeForJob(getUser(request), jobId, true);
077            }
078            catch (AuthorizationException ex) {
079                throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
080            }
081    
082            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
083                                                                                          getAuthToken(request));
084            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
085                    getUser(request), getAuthToken(request));
086            try {
087                String action = request.getParameter(RestConstants.ACTION_PARAM);
088                if (action.equals(RestConstants.JOB_ACTION_START)) {
089                    stopCron();
090                    dagEngine.start(jobId);
091                    startCron();
092                    response.setStatus(HttpServletResponse.SC_OK);
093                }
094                else if (action.equals(RestConstants.JOB_ACTION_RESUME)) {
095                    stopCron();
096                    dagEngine.resume(jobId);
097                    startCron();
098                    response.setStatus(HttpServletResponse.SC_OK);
099                }
100                else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) {
101                    stopCron();
102                    dagEngine.suspend(jobId);
103                    startCron();
104                    response.setStatus(HttpServletResponse.SC_OK);
105                }
106                else if (action.equals(RestConstants.JOB_ACTION_KILL)) {
107                    stopCron();
108                    dagEngine.kill(jobId);
109                    startCron();
110                    response.setStatus(HttpServletResponse.SC_OK);
111                }
112                else if (action.equals(RestConstants.JOB_ACTION_RERUN)) {
113                    validateContentType(request, RestConstants.XML_CONTENT_TYPE);
114                    XConfiguration conf = new XConfiguration(request.getInputStream());
115                    stopCron();
116                    conf = conf.trim();
117                    conf = conf.resolve();
118                    JobsServlet.validateJobConfiguration(conf);
119                    checkAuthorizationForApp(getUser(request), conf);
120                    dagEngine.reRun(jobId, conf);
121                    startCron();
122                    response.setStatus(HttpServletResponse.SC_OK);
123                }
124                else if (action.equals(RestConstants.JOB_COORD_ACTION_RERUN)) {
125                    validateContentType(request, RestConstants.XML_CONTENT_TYPE);
126                    stopCron();
127                    String rerunType = request.getParameter(RestConstants.JOB_COORD_RERUN_TYPE_PARAM);
128                    String scope = request.getParameter(RestConstants.JOB_COORD_RERUN_SCOPE_PARAM);
129                    String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
130                    String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
131                    CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh),
132                            Boolean.valueOf(noCleanup));
133                    List<CoordinatorActionBean> actions = coordInfo.getCoordActions();
134                    JSONObject json = new JSONObject();
135                    json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(actions));
136                    startCron();
137                    sendJsonResponse(response, HttpServletResponse.SC_OK, json);
138                }
139                else {
140                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
141                            RestConstants.ACTION_PARAM, action);
142                }
143            }
144            catch (DagEngineException ex) {
145                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
146            }
147            catch (BaseEngineException ex) {
148                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
149            }
150        }
151    
152        /**
153         * Validate the configuration user/group. <p/>
154         *
155         * @param requestUser user in request.
156         * @param conf configuration.
157         * @throws XServletException thrown if the configuration does not have a property {@link
158         * org.apache.oozie.client.OozieClient#USER_NAME}.
159         */
160        static void checkAuthorizationForApp(String requestUser, Configuration conf) throws XServletException {
161            String user = conf.get(OozieClient.USER_NAME);
162            String group = conf.get(OozieClient.GROUP_NAME);
163            try {
164                if (user == null) {
165                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0401, OozieClient.USER_NAME);
166                }
167                if (!requestUser.equals(UNDEF) && !user.equals(requestUser)) {
168                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0400, requestUser, user);
169                }
170                AuthorizationService auth = Services.get().get(AuthorizationService.class);
171                if (group == null) {
172                    group = auth.getDefaultGroup(user);
173                    conf.set(OozieClient.GROUP_NAME, group);
174                }
175                else {
176                    auth.authorizeForGroup(user, group);
177                }
178                XLog.Info.get().setParameter(XLogService.GROUP, group);
179                auth.authorizeForApp(user, group, conf.get(OozieClient.APP_PATH), conf);
180            }
181            catch (AuthorizationException ex) {
182                throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
183            }
184        }
185    
186        /**
187         * Return information about jobs.
188         */
189        @Override
190        public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
191            String jobId = getResourceName(request);
192            String show = request.getParameter(RestConstants.JOB_SHOW_PARAM);
193    
194            try {
195                AuthorizationService auth = Services.get().get(AuthorizationService.class);
196                auth.authorizeForJob(getUser(request), jobId, false);
197            }
198            catch (AuthorizationException ex) {
199                throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
200            }
201    
202            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
203                                                                                          getAuthToken(request));
204            try {
205                if (show == null || show.equals(RestConstants.JOB_SHOW_INFO)) {
206                    stopCron();
207                    JsonWorkflowJob job = (JsonWorkflowJob) dagEngine.getJob(jobId);
208                    startCron();
209                    sendJsonResponse(response, HttpServletResponse.SC_OK, job);
210                }
211                else {
212                    if (show.equals(RestConstants.JOB_SHOW_LOG)) {
213                        response.setContentType(TEXT_UTF8);
214                        dagEngine.streamLog(jobId, response.getWriter());
215                    }
216                    else {
217                        if (show.equals(RestConstants.JOB_SHOW_DEFINITION)) {
218                            stopCron();
219                            response.setContentType(XML_UTF8);
220                            String wfDefinition = dagEngine.getDefinition(jobId);
221                            startCron();
222                            response.setStatus(HttpServletResponse.SC_OK);
223                            response.getWriter().write(wfDefinition);
224                        }
225                        else {
226                            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
227                                                        RestConstants.JOB_SHOW_PARAM, show);
228                        }
229                    }
230                }
231            }
232            catch (DagEngineException ex) {
233                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
234            }
235        }
236    
237    }