001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.servlet;
019
020import java.io.IOException;
021import java.util.Arrays;
022
023import javax.servlet.ServletException;
024import javax.servlet.http.HttpServletRequest;
025import javax.servlet.http.HttpServletResponse;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.oozie.BaseEngineException;
029import org.apache.oozie.ErrorCode;
030import org.apache.oozie.client.OozieClient;
031import org.apache.oozie.client.XOozieClient;
032import org.apache.oozie.client.rest.JsonBean;
033import org.apache.oozie.client.rest.JsonTags;
034import org.apache.oozie.client.rest.RestConstants;
035import org.apache.oozie.service.AuthorizationException;
036import org.apache.oozie.service.AuthorizationService;
037import org.apache.oozie.service.Services;
038import org.apache.oozie.service.XLogService;
039import org.apache.oozie.util.ConfigUtils;
040import org.apache.oozie.util.JobUtils;
041import org.apache.oozie.util.XConfiguration;
042import org.apache.oozie.util.XLog;
043import org.json.simple.JSONObject;
044
045public abstract class BaseJobServlet extends JsonRestServlet {
046
047    private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[1];
048
049    static {
050        RESOURCES_INFO[0] = new ResourceInfo("*", Arrays.asList("PUT", "GET"), Arrays.asList(new ParameterInfo(
051                RestConstants.ACTION_PARAM, String.class, true, Arrays.asList("PUT")), new ParameterInfo(
052                RestConstants.JOB_SHOW_PARAM, String.class, false, Arrays.asList("GET")), new ParameterInfo(
053                        RestConstants.ORDER_PARAM, String.class, false, Arrays.asList("GET"))));
054    }
055
056    public BaseJobServlet(String instrumentationName) {
057        super(instrumentationName, RESOURCES_INFO);
058    }
059
060    /**
061     * Perform various job related actions - start, suspend, resume, kill, etc.
062     */
063    @Override
064    protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
065        String jobId = getResourceName(request);
066        request.setAttribute(AUDIT_PARAM, jobId);
067        request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM));
068        try {
069            AuthorizationService auth = Services.get().get(AuthorizationService.class);
070            auth.authorizeForJob(getUser(request), jobId, true);
071        }
072        catch (AuthorizationException ex) {
073            throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
074        }
075
076        String action = request.getParameter(RestConstants.ACTION_PARAM);
077        if (action.equals(RestConstants.JOB_ACTION_START)) {
078            stopCron();
079            startJob(request, response);
080            startCron();
081            response.setStatus(HttpServletResponse.SC_OK);
082        }
083        else if (action.equals(RestConstants.JOB_ACTION_RESUME)) {
084            stopCron();
085            resumeJob(request, response);
086            startCron();
087            response.setStatus(HttpServletResponse.SC_OK);
088        }
089        else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) {
090            stopCron();
091            suspendJob(request, response);
092            startCron();
093            response.setStatus(HttpServletResponse.SC_OK);
094        }
095        else if (action.equals(RestConstants.JOB_ACTION_KILL)) {
096            stopCron();
097            JSONObject json =  killJob(request, response);
098            startCron();
099            if (json != null) {
100                sendJsonResponse(response, HttpServletResponse.SC_OK, json);
101            }
102            else {
103                response.setStatus(HttpServletResponse.SC_OK);
104            }
105        }
106        else if (action.equals(RestConstants.JOB_ACTION_CHANGE)) {
107            stopCron();
108            changeJob(request, response);
109            startCron();
110            response.setStatus(HttpServletResponse.SC_OK);
111        }
112        else if (action.equals(RestConstants.JOB_ACTION_IGNORE)) {
113            stopCron();
114            JSONObject json = ignoreJob(request, response);
115            startCron();
116            if (json != null) {
117                sendJsonResponse(response, HttpServletResponse.SC_OK, json);
118            }
119            else {
120            response.setStatus(HttpServletResponse.SC_OK);
121            }
122        }
123        else if (action.equals(RestConstants.JOB_ACTION_RERUN)) {
124            validateContentType(request, RestConstants.XML_CONTENT_TYPE);
125            Configuration conf = new XConfiguration(request.getInputStream());
126            stopCron();
127            String requestUser = getUser(request);
128            if (!requestUser.equals(UNDEF)) {
129                conf.set(OozieClient.USER_NAME, requestUser);
130            }
131            if (conf.get(OozieClient.APP_PATH) != null) {
132                BaseJobServlet.checkAuthorizationForApp(conf);
133                JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
134            }
135            reRunJob(request, response, conf);
136            startCron();
137            response.setStatus(HttpServletResponse.SC_OK);
138        }
139        else if (action.equals(RestConstants.JOB_COORD_ACTION_RERUN)) {
140            validateContentType(request, RestConstants.XML_CONTENT_TYPE);
141            stopCron();
142            JSONObject json = reRunJob(request, response, null);
143            startCron();
144            if (json != null) {
145                sendJsonResponse(response, HttpServletResponse.SC_OK, json);
146            }
147            else {
148                response.setStatus(HttpServletResponse.SC_OK);
149            }
150        }
151        else if (action.equals(RestConstants.JOB_BUNDLE_ACTION_RERUN)) {
152            validateContentType(request, RestConstants.XML_CONTENT_TYPE);
153            stopCron();
154            JSONObject json = reRunJob(request, response, null);
155            startCron();
156            if (json != null) {
157                sendJsonResponse(response, HttpServletResponse.SC_OK, json);
158            }
159            else {
160                response.setStatus(HttpServletResponse.SC_OK);
161            }
162        }
163        else if (action.equals(RestConstants.JOB_COORD_UPDATE)) {
164            validateContentType(request, RestConstants.XML_CONTENT_TYPE);
165            Configuration conf = new XConfiguration(request.getInputStream());
166            stopCron();
167            String requestUser = getUser(request);
168            if (!requestUser.equals(UNDEF)) {
169                conf.set(OozieClient.USER_NAME, requestUser);
170            }
171            if (conf.get(OozieClient.COORDINATOR_APP_PATH) != null) {
172                BaseJobServlet.checkAuthorizationForApp(conf);
173                JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
174            }
175            JSONObject json = updateJob(request, response, conf);
176            startCron();
177            sendJsonResponse(response, HttpServletResponse.SC_OK, json);
178        }
179        else {
180            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
181                    RestConstants.ACTION_PARAM, action);
182        }
183    }
184
185    abstract JSONObject ignoreJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
186            IOException;
187
188    /**
189     * Validate the configuration user/group. <p/>
190     *
191     * @param conf configuration.
192     * @throws XServletException thrown if the configuration does not have a property {@link
193     * org.apache.oozie.client.OozieClient#USER_NAME}.
194     */
195    static void checkAuthorizationForApp(Configuration conf) throws XServletException {
196        String user = conf.get(OozieClient.USER_NAME);
197        String acl = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.GROUP_NAME, OozieClient.JOB_ACL, null);
198        try {
199            if (user == null) {
200                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0401, OozieClient.USER_NAME);
201            }
202            AuthorizationService auth = Services.get().get(AuthorizationService.class);
203
204            if (acl != null){
205                 conf.set(OozieClient.GROUP_NAME, acl);
206            }
207            else if (acl == null && auth.useDefaultGroupAsAcl()) {
208                acl = auth.getDefaultGroup(user);
209                conf.set(OozieClient.GROUP_NAME, acl);
210            }
211            XLog.Info.get().setParameter(XLogService.GROUP, acl);
212            String wfPath = conf.get(OozieClient.APP_PATH);
213            String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH);
214            String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH);
215
216            if (wfPath == null && coordPath == null && bundlePath == null) {
217                String[] libPaths = conf.getStrings(XOozieClient.LIBPATH);
218                if (libPaths != null && libPaths.length > 0 && libPaths[0].trim().length() > 0) {
219                    conf.set(OozieClient.APP_PATH, libPaths[0].trim());
220                    wfPath = libPaths[0].trim();
221                }
222                else {
223                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0405);
224                }
225            }
226            ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath);
227
228            if (wfPath != null) {
229                auth.authorizeForApp(user, acl, wfPath, "workflow.xml", conf);
230            }
231            else if (coordPath != null){
232                auth.authorizeForApp(user, acl, coordPath, "coordinator.xml", conf);
233            }
234            else if (bundlePath != null){
235                auth.authorizeForApp(user, acl, bundlePath, "bundle.xml", conf);
236            }
237        }
238        catch (AuthorizationException ex) {
239            XLog.getLog(BaseJobServlet.class).info("AuthorizationException ", ex);
240            throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
241        }
242    }
243
244    /**
245     * Return information about jobs.
246     */
247    @Override
248    public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
249        String jobId = getResourceName(request);
250        String show = request.getParameter(RestConstants.JOB_SHOW_PARAM);
251        String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
252                ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
253
254        try {
255            AuthorizationService auth = Services.get().get(AuthorizationService.class);
256            auth.authorizeForJob(getUser(request), jobId, false);
257        }
258        catch (AuthorizationException ex) {
259            throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
260        }
261
262        if (show == null || show.equals(RestConstants.JOB_SHOW_INFO)) {
263            stopCron();
264            JsonBean job = null;
265            try {
266                job = getJob(request, response);
267            }
268            catch (BaseEngineException e) {
269                // TODO Auto-generated catch block
270                // e.printStackTrace();
271
272                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
273            }
274            startCron();
275            sendJsonResponse(response, HttpServletResponse.SC_OK, job, timeZoneId);
276        }
277        else if (show.equals(RestConstants.ALL_WORKFLOWS_FOR_COORD_ACTION)) {
278            stopCron();
279            JSONObject json = getJobsByParentId(request, response);
280            startCron();
281            sendJsonResponse(response, HttpServletResponse.SC_OK, json);
282        }
283        else if (show.equals(RestConstants.JOB_SHOW_JMS_TOPIC)) {
284            stopCron();
285            String jmsTopicName = getJMSTopicName(request, response);
286            JSONObject json = new JSONObject();
287            json.put(JsonTags.JMS_TOPIC_NAME, jmsTopicName);
288            startCron();
289            sendJsonResponse(response, HttpServletResponse.SC_OK, json);
290        }
291
292        else if (show.equals(RestConstants.JOB_SHOW_LOG)) {
293            response.setContentType(TEXT_UTF8);
294            streamJobLog(request, response);
295        }
296        else if (show.equals(RestConstants.JOB_SHOW_DEFINITION)) {
297            stopCron();
298            response.setContentType(XML_UTF8);
299            String wfDefinition = getJobDefinition(request, response);
300            startCron();
301            response.setStatus(HttpServletResponse.SC_OK);
302            response.getWriter().write(wfDefinition);
303        }
304        else if (show.equals(RestConstants.JOB_SHOW_GRAPH)) {
305            stopCron();
306            streamJobGraph(request, response);
307            startCron(); // -- should happen before you stream anything in response?
308        }
309        else {
310            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
311                    RestConstants.JOB_SHOW_PARAM, show);
312        }
313    }
314
315    /**
316     * abstract method to start a job, either workflow or coordinator
317     *
318     * @param request
319     * @param response
320     * @throws XServletException
321     * @throws IOException TODO
322     */
323    abstract void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
324            IOException;
325
326    /**
327     * abstract method to resume a job, either workflow or coordinator
328     *
329     * @param request
330     * @param response
331     * @throws XServletException
332     * @throws IOException TODO
333     */
334    abstract void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
335            IOException;
336
337    /**
338     * abstract method to suspend a job, either workflow or coordinator
339     *
340     * @param request
341     * @param response
342     * @throws XServletException
343     * @throws IOException TODO
344     */
345    abstract void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
346            IOException;
347
348    /**
349     * abstract method to kill a job, either workflow or coordinator
350     *
351     * @param request
352     * @param response
353     * @return
354     * @throws XServletException
355     * @throws IOException TODO
356     */
357    abstract JSONObject killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
358            IOException;
359
360    /**
361     * abstract method to change a coordinator job
362     *
363     * @param request
364     * @param response
365     * @throws XServletException
366     * @throws IOException TODO
367     */
368    abstract void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
369            IOException;
370
371    /**
372     * abstract method to re-run a job, either workflow or coordinator
373     *
374     * @param request
375     * @param response
376     * @param conf
377     * @throws XServletException
378     * @throws IOException TODO
379     */
380    abstract JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
381            throws XServletException, IOException;
382
383    /**
384     * abstract method to get a job, either workflow or coordinator, in JsonBean representation
385     *
386     * @param request
387     * @param response
388     * @return JsonBean representation of a job, either workflow or coordinator
389     * @throws XServletException
390     * @throws IOException TODO
391     * @throws BaseEngineException
392     */
393    abstract JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
394            IOException, BaseEngineException;
395
396    /**
397     * abstract method to get definition of a job, either workflow or coordinator
398     *
399     * @param request
400     * @param response
401     * @return job, either workflow or coordinator, definition in string format
402     * @throws XServletException
403     * @throws IOException TODO
404     */
405    abstract String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
406            throws XServletException, IOException;
407
408    /**
409     * abstract method to get and stream log information of job, either workflow or coordinator
410     *
411     * @param request
412     * @param response
413     * @throws XServletException
414     * @throws IOException
415     */
416    abstract void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
417            IOException;
418
419    /**
420     * abstract method to create and stream image for runtime DAG -- workflow only
421     *
422     * @param request
423     * @param response
424     * @throws XServletException
425     * @throws IOException
426     */
427    abstract void streamJobGraph(HttpServletRequest request, HttpServletResponse response)
428            throws XServletException, IOException;
429
430    /**
431     * abstract method to get JMS topic name for a job
432     * @param request
433     * @param response
434     * @throws XServletException
435     * @throws IOException
436     */
437    abstract String getJMSTopicName(HttpServletRequest request, HttpServletResponse response)
438            throws XServletException, IOException;
439
440    /**
441     * abstract method to get workflow job ids from the parent id
442     * i.e. coordinator action
443     * @param request
444     * @param response
445     * @return comma-separated list of workflow job ids
446     * @throws XServletException
447     * @throws IOException
448     */
449    abstract JSONObject getJobsByParentId(HttpServletRequest request, HttpServletResponse response)
450            throws XServletException, IOException;
451
452    /**
453     * Abstract method to Update coord job.
454     *
455     * @param request the request
456     * @param response the response
457     * @param Configuration conf
458     * @return the JSON object
459     * @throws XServletException the x servlet exception
460     * @throws IOException Signals that an I/O exception has occurred.
461     */
462    abstract JSONObject updateJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
463            throws XServletException, IOException;
464}
465