001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.servlet;
020
021import java.io.IOException;
022import java.util.Arrays;
023
024import javax.servlet.ServletException;
025import javax.servlet.http.HttpServletRequest;
026import javax.servlet.http.HttpServletResponse;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.oozie.BaseEngineException;
030import org.apache.oozie.ErrorCode;
031import org.apache.oozie.client.OozieClient;
032import org.apache.oozie.client.XOozieClient;
033import org.apache.oozie.client.rest.JsonBean;
034import org.apache.oozie.client.rest.JsonTags;
035import org.apache.oozie.client.rest.RestConstants;
036import org.apache.oozie.service.AuthorizationException;
037import org.apache.oozie.service.AuthorizationService;
038import org.apache.oozie.service.Services;
039import org.apache.oozie.service.XLogService;
040import org.apache.oozie.util.ConfigUtils;
041import org.apache.oozie.util.JobUtils;
042import org.apache.oozie.util.XConfiguration;
043import org.apache.oozie.util.XLog;
044import org.json.simple.JSONObject;
045
046public abstract class BaseJobServlet extends JsonRestServlet {
047
048    private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[1];
049
050    static {
051        RESOURCES_INFO[0] = new ResourceInfo("*", Arrays.asList("PUT", "GET"), Arrays.asList(new ParameterInfo(
052                RestConstants.ACTION_PARAM, String.class, true, Arrays.asList("PUT")), new ParameterInfo(
053                RestConstants.JOB_SHOW_PARAM, String.class, false, Arrays.asList("GET")), new ParameterInfo(
054                        RestConstants.ORDER_PARAM, String.class, false, Arrays.asList("GET"))));
055    }
056
057    public BaseJobServlet(String instrumentationName) {
058        super(instrumentationName, RESOURCES_INFO);
059    }
060
061    /**
062     * Perform various job related actions - start, suspend, resume, kill, etc.
063     */
064    @Override
065    protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
066        String jobId = getResourceName(request);
067        request.setAttribute(AUDIT_PARAM, jobId);
068        request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM));
069        try {
070            AuthorizationService auth = Services.get().get(AuthorizationService.class);
071            auth.authorizeForJob(getUser(request), jobId, true);
072        }
073        catch (AuthorizationException ex) {
074            throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
075        }
076
077        String action = request.getParameter(RestConstants.ACTION_PARAM);
078        if (action.equals(RestConstants.JOB_ACTION_START)) {
079            stopCron();
080            startJob(request, response);
081            startCron();
082            response.setStatus(HttpServletResponse.SC_OK);
083        }
084        else if (action.equals(RestConstants.JOB_ACTION_RESUME)) {
085            stopCron();
086            resumeJob(request, response);
087            startCron();
088            response.setStatus(HttpServletResponse.SC_OK);
089        }
090        else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) {
091            stopCron();
092            suspendJob(request, response);
093            startCron();
094            response.setStatus(HttpServletResponse.SC_OK);
095        }
096        else if (action.equals(RestConstants.JOB_ACTION_KILL)) {
097            stopCron();
098            JSONObject json =  killJob(request, response);
099            startCron();
100            if (json != null) {
101                sendJsonResponse(response, HttpServletResponse.SC_OK, json);
102            }
103            else {
104                response.setStatus(HttpServletResponse.SC_OK);
105            }
106        }
107        else if (action.equals(RestConstants.JOB_ACTION_CHANGE)) {
108            stopCron();
109            changeJob(request, response);
110            startCron();
111            response.setStatus(HttpServletResponse.SC_OK);
112        }
113        else if (action.equals(RestConstants.JOB_ACTION_IGNORE)) {
114            stopCron();
115            JSONObject json = ignoreJob(request, response);
116            startCron();
117            if (json != null) {
118                sendJsonResponse(response, HttpServletResponse.SC_OK, json);
119            }
120            else {
121            response.setStatus(HttpServletResponse.SC_OK);
122            }
123        }
124        else if (action.equals(RestConstants.JOB_ACTION_RERUN)) {
125            validateContentType(request, RestConstants.XML_CONTENT_TYPE);
126            Configuration conf = new XConfiguration(request.getInputStream());
127            stopCron();
128            String requestUser = getUser(request);
129            if (!requestUser.equals(UNDEF)) {
130                conf.set(OozieClient.USER_NAME, requestUser);
131            }
132            if (conf.get(OozieClient.APP_PATH) != null) {
133                BaseJobServlet.checkAuthorizationForApp(conf);
134                JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
135            }
136            reRunJob(request, response, conf);
137            startCron();
138            response.setStatus(HttpServletResponse.SC_OK);
139        }
140        else if (action.equals(RestConstants.JOB_COORD_ACTION_RERUN)) {
141            validateContentType(request, RestConstants.XML_CONTENT_TYPE);
142            stopCron();
143            JSONObject json = reRunJob(request, response, null);
144            startCron();
145            if (json != null) {
146                sendJsonResponse(response, HttpServletResponse.SC_OK, json);
147            }
148            else {
149                response.setStatus(HttpServletResponse.SC_OK);
150            }
151        }
152        else if (action.equals(RestConstants.JOB_BUNDLE_ACTION_RERUN)) {
153            validateContentType(request, RestConstants.XML_CONTENT_TYPE);
154            stopCron();
155            JSONObject json = reRunJob(request, response, null);
156            startCron();
157            if (json != null) {
158                sendJsonResponse(response, HttpServletResponse.SC_OK, json);
159            }
160            else {
161                response.setStatus(HttpServletResponse.SC_OK);
162            }
163        }
164        else if (action.equals(RestConstants.JOB_COORD_UPDATE)) {
165            validateContentType(request, RestConstants.XML_CONTENT_TYPE);
166            Configuration conf = new XConfiguration(request.getInputStream());
167            stopCron();
168            String requestUser = getUser(request);
169            if (!requestUser.equals(UNDEF)) {
170                conf.set(OozieClient.USER_NAME, requestUser);
171            }
172            if (conf.get(OozieClient.COORDINATOR_APP_PATH) != null) {
173                //If coord is submitted from bundle, user may want to update individual coord job with bundle properties
174                //If COORDINATOR_APP_PATH is set, we should check only COORDINATOR_APP_PATH path permission
175                String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH);
176                if (bundlePath != null) {
177                    conf.unset(OozieClient.BUNDLE_APP_PATH);
178                }
179                BaseJobServlet.checkAuthorizationForApp(conf);
180                JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
181                if (bundlePath != null) {
182                    conf.set(OozieClient.BUNDLE_APP_PATH, bundlePath);
183                }
184            }
185            JSONObject json = updateJob(request, response, conf);
186            startCron();
187            sendJsonResponse(response, HttpServletResponse.SC_OK, json);
188        }
189        else if (action.equals(RestConstants.SLA_ENABLE_ALERT)) {
190            validateContentType(request, RestConstants.XML_CONTENT_TYPE);
191            stopCron();
192            slaEnableAlert(request, response);
193            startCron();
194            response.setStatus(HttpServletResponse.SC_OK);
195        }
196        else if (action.equals(RestConstants.SLA_DISABLE_ALERT)) {
197            validateContentType(request, RestConstants.XML_CONTENT_TYPE);
198            stopCron();
199            slaDisableAlert(request, response);
200            startCron();
201            response.setStatus(HttpServletResponse.SC_OK);
202        }
203        else if (action.equals(RestConstants.SLA_CHANGE)) {
204            validateContentType(request, RestConstants.XML_CONTENT_TYPE);
205            stopCron();
206            slaChange(request, response);
207            startCron();
208            response.setStatus(HttpServletResponse.SC_OK);
209        }
210        else {
211            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
212                    RestConstants.ACTION_PARAM, action);
213        }
214    }
215
216    abstract JSONObject ignoreJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
217            IOException;
218
219    /**
220     * Validate the configuration user/group. <p>
221     *
222     * @param conf configuration.
223     * @throws XServletException thrown if the configuration does not have a property {@link
224     * org.apache.oozie.client.OozieClient#USER_NAME}.
225     */
226    static void checkAuthorizationForApp(Configuration conf) throws XServletException {
227        String user = conf.get(OozieClient.USER_NAME);
228        String acl = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.GROUP_NAME, OozieClient.JOB_ACL, null);
229        try {
230            if (user == null) {
231                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0401, OozieClient.USER_NAME);
232            }
233            AuthorizationService auth = Services.get().get(AuthorizationService.class);
234
235            if (acl != null){
236                 conf.set(OozieClient.GROUP_NAME, acl);
237            }
238            else if (acl == null && auth.useDefaultGroupAsAcl()) {
239                acl = auth.getDefaultGroup(user);
240                conf.set(OozieClient.GROUP_NAME, acl);
241            }
242            XLog.Info.get().setParameter(XLogService.GROUP, acl);
243            String wfPath = conf.get(OozieClient.APP_PATH);
244            String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH);
245            String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH);
246
247            if (wfPath == null && coordPath == null && bundlePath == null) {
248                String[] libPaths = conf.getStrings(XOozieClient.LIBPATH);
249                if (libPaths != null && libPaths.length > 0 && libPaths[0].trim().length() > 0) {
250                    conf.set(OozieClient.APP_PATH, libPaths[0].trim());
251                    wfPath = libPaths[0].trim();
252                }
253                else {
254                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0405);
255                }
256            }
257            ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath);
258
259            if (wfPath != null) {
260                auth.authorizeForApp(user, acl, wfPath, "workflow.xml", conf);
261            }
262            else if (coordPath != null){
263                auth.authorizeForApp(user, acl, coordPath, "coordinator.xml", conf);
264            }
265            else if (bundlePath != null){
266                auth.authorizeForApp(user, acl, bundlePath, "bundle.xml", conf);
267            }
268        }
269        catch (AuthorizationException ex) {
270            XLog.getLog(BaseJobServlet.class).info("AuthorizationException ", ex);
271            throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
272        }
273    }
274
275    /**
276     * Return information about jobs.
277     */
278    @Override
279    @SuppressWarnings("unchecked")
280    public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
281        String jobId = getResourceName(request);
282        String show = request.getParameter(RestConstants.JOB_SHOW_PARAM);
283        String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
284                ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
285
286        try {
287            AuthorizationService auth = Services.get().get(AuthorizationService.class);
288            auth.authorizeForJob(getUser(request), jobId, false);
289        }
290        catch (AuthorizationException ex) {
291            throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
292        }
293
294        if (show == null || show.equals(RestConstants.JOB_SHOW_INFO)) {
295            stopCron();
296            JsonBean job = null;
297            try {
298                job = getJob(request, response);
299            }
300            catch (BaseEngineException e) {
301                // TODO Auto-generated catch block
302                // e.printStackTrace();
303
304                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
305            }
306            startCron();
307            sendJsonResponse(response, HttpServletResponse.SC_OK, job, timeZoneId);
308        }
309        else if (show.equals(RestConstants.ALL_WORKFLOWS_FOR_COORD_ACTION)) {
310            stopCron();
311            JSONObject json = getJobsByParentId(request, response);
312            startCron();
313            sendJsonResponse(response, HttpServletResponse.SC_OK, json);
314        }
315        else if (show.equals(RestConstants.JOB_SHOW_JMS_TOPIC)) {
316            stopCron();
317            String jmsTopicName = getJMSTopicName(request, response);
318            JSONObject json = new JSONObject();
319            json.put(JsonTags.JMS_TOPIC_NAME, jmsTopicName);
320            startCron();
321            sendJsonResponse(response, HttpServletResponse.SC_OK, json);
322        }
323
324        else if (show.equals(RestConstants.JOB_SHOW_LOG)) {
325            response.setContentType(TEXT_UTF8);
326            streamJobLog(request, response);
327        }
328        else if (show.equals(RestConstants.JOB_SHOW_ERROR_LOG)) {
329            response.setContentType(TEXT_UTF8);
330            streamJobErrorLog(request, response);
331        }
332        else if (show.equals(RestConstants.JOB_SHOW_AUDIT_LOG)) {
333            response.setContentType(TEXT_UTF8);
334            streamJobAuditLog(request, response);
335        }
336
337        else if (show.equals(RestConstants.JOB_SHOW_DEFINITION)) {
338            stopCron();
339            response.setContentType(XML_UTF8);
340            String wfDefinition = getJobDefinition(request, response);
341            startCron();
342            response.setStatus(HttpServletResponse.SC_OK);
343            response.getWriter().write(wfDefinition);
344        }
345        else if (show.equals(RestConstants.JOB_SHOW_GRAPH)) {
346            stopCron();
347            streamJobGraph(request, response);
348            startCron(); // -- should happen before you stream anything in response?
349        } else if (show.equals(RestConstants.JOB_SHOW_STATUS)) {
350            stopCron();
351            String status = getJobStatus(request, response);
352            JSONObject json = new JSONObject();
353            json.put(JsonTags.STATUS, status);
354            startCron();
355            sendJsonResponse(response, HttpServletResponse.SC_OK, json);
356        }
357        else {
358            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
359                    RestConstants.JOB_SHOW_PARAM, show);
360        }
361    }
362
363    /**
364     * abstract method to start a job, either workflow or coordinator
365     *
366     * @param request
367     * @param response
368     * @throws XServletException
369     * @throws IOException TODO
370     */
371    abstract void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
372            IOException;
373
374    /**
375     * abstract method to resume a job, either workflow or coordinator
376     *
377     * @param request
378     * @param response
379     * @throws XServletException
380     * @throws IOException TODO
381     */
382    abstract void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
383            IOException;
384
385    /**
386     * abstract method to suspend a job, either workflow or coordinator
387     *
388     * @param request
389     * @param response
390     * @throws XServletException
391     * @throws IOException TODO
392     */
393    abstract void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
394            IOException;
395
396    /**
397     * abstract method to kill a job, either workflow or coordinator
398     *
399     * @param request
400     * @param response
401     * @return
402     * @throws XServletException
403     * @throws IOException TODO
404     */
405    abstract JSONObject killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
406            IOException;
407
408    /**
409     * abstract method to change a coordinator job
410     *
411     * @param request
412     * @param response
413     * @throws XServletException
414     * @throws IOException TODO
415     */
416    abstract void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
417            IOException;
418
419    /**
420     * abstract method to re-run a job, either workflow or coordinator
421     *
422     * @param request
423     * @param response
424     * @param conf
425     * @throws XServletException
426     * @throws IOException TODO
427     */
428    abstract JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
429            throws XServletException, IOException;
430
431    /**
432     * abstract method to get a job, either workflow or coordinator, in JsonBean representation
433     *
434     * @param request
435     * @param response
436     * @return JsonBean representation of a job, either workflow or coordinator
437     * @throws XServletException
438     * @throws IOException TODO
439     * @throws BaseEngineException
440     */
441    abstract JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
442            IOException, BaseEngineException;
443
444    /**
445     * abstract method to get definition of a job, either workflow or coordinator
446     *
447     * @param request
448     * @param response
449     * @return job, either workflow or coordinator, definition in string format
450     * @throws XServletException
451     * @throws IOException TODO
452     */
453    abstract String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
454            throws XServletException, IOException;
455
456    /**
457     * abstract method to get and stream log information of job, either workflow or coordinator
458     *
459     * @param request
460     * @param response
461     * @throws XServletException
462     * @throws IOException
463     */
464    abstract void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
465            IOException;
466
467    /**
468     * abstract method to get and stream error log information of job, either workflow, coordinator or bundle
469     *
470     * @param request
471     * @param response
472     * @throws XServletException
473     * @throws IOException
474     */
475    abstract void streamJobErrorLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
476    IOException;
477
478
479    abstract void streamJobAuditLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
480            IOException;
481
482    /**
483     * abstract method to create and stream image for runtime DAG -- workflow only
484     *
485     * @param request
486     * @param response
487     * @throws XServletException
488     * @throws IOException
489     */
490    abstract void streamJobGraph(HttpServletRequest request, HttpServletResponse response)
491            throws XServletException, IOException;
492
493    /**
494     * abstract method to get JMS topic name for a job
495     * @param request
496     * @param response
497     * @throws XServletException
498     * @throws IOException
499     */
500    abstract String getJMSTopicName(HttpServletRequest request, HttpServletResponse response)
501            throws XServletException, IOException;
502
503    /**
504     * abstract method to get workflow job ids from the parent id
505     * i.e. coordinator action
506     * @param request
507     * @param response
508     * @return comma-separated list of workflow job ids
509     * @throws XServletException
510     * @throws IOException
511     */
512    abstract JSONObject getJobsByParentId(HttpServletRequest request, HttpServletResponse response)
513            throws XServletException, IOException;
514
515    /**
516     * Abstract method to Update coord job.
517     *
518     * @param request the request
519     * @param response the response
520     * @param conf the Configuration
521     * @return the JSON object
522     * @throws XServletException the x servlet exception
523     * @throws IOException Signals that an I/O exception has occurred.
524     */
525    abstract JSONObject updateJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
526            throws XServletException, IOException;
527
528    /**
529     * Abstract method to get status for a job
530     *
531     * @param request the request
532     * @param response the response
533     * @return the JSON object
534     * @throws XServletException the x servlet exception
535     * @throws IOException Signals that an I/O exception has occurred.
536     */
537    abstract String getJobStatus(HttpServletRequest request, HttpServletResponse response)
538            throws XServletException, IOException;
539
540    /**
541     * Abstract method to enable SLA alert.
542     *
543     * @param request the request
544     * @param response the response
545     * @throws XServletException the x servlet exception
546     * @throws IOException Signals that an I/O exception has occurred.
547     */
548    abstract void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException,
549            IOException;
550
551    /**
552     * Abstract method to disable SLA alert.
553     *
554     * @param request the request
555     * @param response the response
556     * @throws XServletException the x servlet exception
557     * @throws IOException Signals that an I/O exception has occurred.
558     */
559    abstract void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException,
560            IOException;
561
562    /**
563     * Abstract method to change SLA definition.
564     *
565     * @param request the request
566     * @param response the response
567     * @throws XServletException the x servlet exception
568     * @throws IOException Signals that an I/O exception has occurred.
569     */
570    abstract void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException,
571            IOException;
572
573}