001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.servlet;
020
021import java.io.IOException;
022import java.util.Arrays;
023
024import javax.servlet.ServletException;
025import javax.servlet.http.HttpServletRequest;
026import javax.servlet.http.HttpServletResponse;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.oozie.ErrorCode;
030import org.apache.oozie.client.OozieClient;
031import org.apache.oozie.client.rest.RestConstants;
032import org.apache.oozie.service.Services;
033import org.apache.oozie.service.AuthorizationException;
034import org.apache.oozie.service.AuthorizationService;
035import org.apache.oozie.util.JobUtils;
036import org.apache.oozie.util.JobsFilterUtils;
037import org.apache.oozie.util.XConfiguration;
038import org.json.simple.JSONObject;
039
040public abstract class BaseJobsServlet extends JsonRestServlet {
041
042    private static final JsonRestServlet.ResourceInfo RESOURCES_INFO[] = new JsonRestServlet.ResourceInfo[1];
043
044    static {
045        RESOURCES_INFO[0] = new JsonRestServlet.ResourceInfo("", Arrays.asList(
046                "POST", "GET", "PUT"), Arrays.asList(
047                new JsonRestServlet.ParameterInfo(RestConstants.ACTION_PARAM,
048                                                  String.class, false, Arrays.asList("POST", "PUT")),
049                new JsonRestServlet.ParameterInfo(
050                        RestConstants.JOBS_FILTER_PARAM, String.class, false,
051                        Arrays.asList("GET", "PUT")),
052                new JsonRestServlet.ParameterInfo(RestConstants.JOBTYPE_PARAM,
053                                                  String.class, false, Arrays.asList("GET", "POST", "PUT")),
054                new JsonRestServlet.ParameterInfo(RestConstants.OFFSET_PARAM,
055                                                  String.class, false, Arrays.asList("GET", "PUT")),
056                new JsonRestServlet.ParameterInfo(RestConstants.LEN_PARAM,
057                                                  String.class, false, Arrays.asList("GET", "PUT")),
058                new JsonRestServlet.ParameterInfo(RestConstants.JOBS_BULK_PARAM,
059                                                  String.class, false, Arrays.asList("GET", "PUT")),
060                new JsonRestServlet.ParameterInfo(
061                        RestConstants.JOBS_EXTERNAL_ID_PARAM, String.class,
062                        false, Arrays.asList("GET"))));
063    }
064
065    public BaseJobsServlet(String instrumentationName) {
066        super(instrumentationName, RESOURCES_INFO);
067    }
068
069    /**
070     * Create a job.
071     */
072    @Override
073    @SuppressWarnings("unchecked")
074    protected void doPost(HttpServletRequest request,
075            HttpServletResponse response) throws ServletException, IOException {
076        /*
077         * Enumeration p = request.getAttributeNames();
078         * for(;p.hasMoreElements();){ String key = (String)p.nextElement();
079         * XLog.getLog(getClass()).warn(" key "+ key + " val "+ (String)
080         * request.getAttribute(key)); }
081         */
082        validateContentType(request, RestConstants.XML_CONTENT_TYPE);
083
084        String action = request.getParameter(RestConstants.ACTION_PARAM);
085        request.setAttribute(AUDIT_OPERATION,
086                (action != null) ? action : RestConstants.JOB_ACTION_SUBMIT);
087
088        XConfiguration conf = new XConfiguration(request.getInputStream());
089
090        stopCron();
091
092        conf = conf.trim();
093        conf = conf.resolve();
094
095        String requestUser = getUser(request);
096        if (!requestUser.equals(UNDEF)) {
097            conf.set(OozieClient.USER_NAME, requestUser);
098        }
099        BaseJobServlet.checkAuthorizationForApp(conf);
100        JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
101
102        JSONObject json = submitJob(request, conf);
103        startCron();
104        sendJsonResponse(response, HttpServletResponse.SC_CREATED, json);
105    }
106
107    /**
108     * Return information about jobs.
109     */
110    @Override
111    public void doGet(HttpServletRequest request, HttpServletResponse response)
112    throws ServletException, IOException {
113        String externalId = request
114        .getParameter(RestConstants.JOBS_EXTERNAL_ID_PARAM);
115        if (externalId != null) {
116            stopCron();
117            JSONObject json = getJobIdForExternalId(request, externalId);
118            startCron();
119            sendJsonResponse(response, HttpServletResponse.SC_OK, json);
120        }
121        else {
122            stopCron();
123            JSONObject json = getJobs(request);
124            startCron();
125            sendJsonResponse(response, HttpServletResponse.SC_OK, json);
126        }
127    }
128
129    /**
130     * Perform various job related actions - suspend, resume, kill, etc.
131     */
132    @Override
133    protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
134        request.setAttribute(AUDIT_PARAM, request.getParameter(RestConstants.JOBS_FILTER_PARAM));
135        request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM));
136        try {
137            AuthorizationService auth = Services.get().get(AuthorizationService.class);
138            String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
139            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
140            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
141            String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM);
142
143            if (filter == null) {
144                throw new IllegalArgumentException("filter params must be specified for bulk write API");
145            }
146            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
147            start = (start < 1) ? 1 : start;
148            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
149            len = (len < 1) ? 50 : len;
150            auth.authorizeForJobs(getUser(request), JobsFilterUtils.parseFilter(filter), jobType, start, len, true);
151        }
152        catch (AuthorizationException ex) {
153            throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
154        }
155
156        String action = request.getParameter(RestConstants.ACTION_PARAM);
157        JSONObject json = null;
158        if (action.equals(RestConstants.JOB_ACTION_KILL)) {
159            stopCron();
160            json = killJobs(request, response);
161            startCron();
162        }
163        else if (action.equals(RestConstants.JOB_ACTION_RESUME)) {
164            stopCron();
165            json = resumeJobs(request, response);
166            startCron();
167        }
168        else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) {
169            stopCron();
170            json = suspendJobs(request, response);
171            startCron();
172        }
173        else {
174            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
175                    RestConstants.ACTION_PARAM, action);
176        }
177        response.setStatus(HttpServletResponse.SC_OK);
178        sendJsonResponse(response, HttpServletResponse.SC_OK, json);
179    }
180
181    /**
182     * abstract method to kill jobs based ona filter param. The jobs could be workflow, coordinator or bundle jobs
183     *
184     * @param request
185     * @param response
186     * @return JSONObject of all jobs being killed
187     * @throws XServletException
188     * @throws IOException
189     */
190    abstract JSONObject killJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException,
191            IOException;
192
193    /**
194     * abstract method to suspend jobs based ona filter param. The jobs could be workflow, coordinator or bundle jobs
195     *
196     * @param request
197     * @param response
198     * @return JSONObject of all jobs being suspended
199     * @throws XServletException
200     * @throws IOException
201     */
202    abstract JSONObject suspendJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException,
203            IOException;
204
205    /**
206     * abstract method to resume jobs based ona filter param. The jobs could be workflow, coordinator or bundle jobs
207     *
208     * @param request
209     * @param response
210     * @return JSONObject of all jobs being resumed
211     * @throws XServletException
212     * @throws IOException
213     */
214    abstract JSONObject resumeJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException,
215            IOException;
216    /**
217     * abstract method to submit a job, either workflow or coordinator in the case of workflow job, there is an optional
218     * flag in request to indicate if want this job to be started immediately or not
219     *
220     * @param request
221     * @param conf
222     * @return JSONObject of job id
223     * @throws XServletException
224     * @throws IOException
225     */
226    abstract JSONObject submitJob(HttpServletRequest request, Configuration conf)
227    throws XServletException, IOException;
228
229    /**
230     * abstract method to get a job from external ID
231     *
232     * @param request
233     * @param externalId
234     * @return JSONObject for the requested job
235     * @throws XServletException
236     * @throws IOException
237     */
238    abstract JSONObject getJobIdForExternalId(HttpServletRequest request,
239            String externalId) throws XServletException, IOException;
240
241    /**
242     * abstract method to get a list of workflow jobs
243     *
244     * @param request
245     * @return JSONObject of the requested jobs
246     * @throws XServletException
247     * @throws IOException
248     */
249    abstract JSONObject getJobs(HttpServletRequest request)
250    throws XServletException, IOException;
251
252}