001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.servlet;
019
020import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
021import org.apache.oozie.client.rest.JsonBean;
022import org.apache.oozie.client.rest.RestConstants;
023import org.apache.oozie.service.DagXLogInfoService;
024import org.apache.oozie.service.InstrumentationService;
025import org.apache.oozie.service.ProxyUserService;
026import org.apache.oozie.service.Services;
027import org.apache.oozie.service.XLogService;
028import org.apache.oozie.util.Instrumentation;
029import org.apache.oozie.util.ParamChecker;
030import org.apache.oozie.util.XLog;
031import org.apache.oozie.ErrorCode;
032import org.json.simple.JSONObject;
033import org.json.simple.JSONStreamAware;
034
035import javax.servlet.ServletConfig;
036import javax.servlet.ServletException;
037import javax.servlet.http.HttpServlet;
038import javax.servlet.http.HttpServletRequest;
039import javax.servlet.http.HttpServletResponse;
040import java.io.IOException;
041import java.security.AccessControlException;
042import java.util.*;
043import java.util.concurrent.atomic.AtomicLong;
044
045/**
046 * Base class for Oozie web service API Servlets. <p/> This class provides common instrumentation, error logging and
047 * other common functionality.
048 */
049public abstract class JsonRestServlet extends HttpServlet {
050
051    static final String JSON_UTF8 = RestConstants.JSON_CONTENT_TYPE + "; charset=\"UTF-8\"";
052
053    protected static final String XML_UTF8 = RestConstants.XML_CONTENT_TYPE + "; charset=\"UTF-8\"";
054
055    protected static final String TEXT_UTF8 = RestConstants.TEXT_CONTENT_TYPE + "; charset=\"UTF-8\"";
056
057    protected static final String AUDIT_OPERATION = "audit.operation";
058    protected static final String AUDIT_PARAM = "audit.param";
059    protected static final String AUDIT_ERROR_CODE = "audit.error.code";
060    protected static final String AUDIT_ERROR_MESSAGE = "audit.error.message";
061    protected static final String AUDIT_HTTP_STATUS_CODE = "audit.http.status.code";
062
063    private XLog auditLog;
064    XLog.Info logInfo;
065
066    /**
067     * This bean defines a query string parameter.
068     */
069    public static class ParameterInfo {
070        private String name;
071        private Class type;
072        private List<String> methods;
073        private boolean required;
074
075        /**
076         * Creates a ParameterInfo with querystring parameter definition.
077         *
078         * @param name querystring parameter name.
079         * @param type type for the parameter value, valid types are: <code>Integer, Boolean and String</code>
080         * @param required indicates if the parameter is required.
081         * @param methods HTTP methods the parameter is used by.
082         */
083        public ParameterInfo(String name, Class type, boolean required, List<String> methods) {
084            this.name = ParamChecker.notEmpty(name, "name");
085            if (type != Integer.class && type != Boolean.class && type != String.class) {
086                throw new IllegalArgumentException("Type must be integer, boolean or string");
087            }
088            this.type = ParamChecker.notNull(type, "type");
089            this.required = required;
090            this.methods = ParamChecker.notNull(methods, "methods");
091        }
092
093    }
094
095    /**
096     * This bean defines a REST resource.
097     */
098    public static class ResourceInfo {
099        private String name;
100        private boolean wildcard;
101        private List<String> methods;
102        private Map<String, ParameterInfo> parameters = new HashMap<String, ParameterInfo>();
103
104        /**
105         * Creates a ResourceInfo with a REST resource definition.
106         *
107         * @param name name of the REST resource, it can be an fixed resource name, empty or a wildcard ('*').
108         * @param methods HTTP methods supported by the resource.
109         * @param parameters parameters supported by the resource.
110         */
111        public ResourceInfo(String name, List<String> methods, List<ParameterInfo> parameters) {
112            this.name = name;
113            wildcard = name.equals("*");
114            for (ParameterInfo parameter : parameters) {
115                this.parameters.put(parameter.name, parameter);
116            }
117            this.methods = ParamChecker.notNull(methods, "methods");
118        }
119    }
120
121    /**
122     * Name of the instrumentation group for the WS layer, value is 'webservices'.
123     */
124    protected static final String INSTRUMENTATION_GROUP = "webservices";
125
126    private static final String INSTR_TOTAL_REQUESTS_SAMPLER = "requests";
127    private static final String INSTR_TOTAL_REQUESTS_COUNTER = "requests";
128    private static final String INSTR_TOTAL_FAILED_REQUESTS_COUNTER = "failed";
129    private static AtomicLong TOTAL_REQUESTS_SAMPLER_COUNTER;
130
131    private Instrumentation instrumentation;
132    private String instrumentationName;
133    private AtomicLong samplerCounter = new AtomicLong();
134    private ThreadLocal<Instrumentation.Cron> requestCron = new ThreadLocal<Instrumentation.Cron>();
135    private List<ResourceInfo> resourcesInfo = new ArrayList<ResourceInfo>();
136    private boolean allowSafeModeChanges;
137
138    /**
139     * Creates a servlet with a specified instrumentation sampler name for its requests.
140     *
141     * @param instrumentationName instrumentation name for timer and samplers for the servlet.
142     * @param resourcesInfo list of resource definitions supported by the servlet, empty and wildcard resources must be
143     * the last ones, in that order, first empty and the wildcard.
144     */
145    public JsonRestServlet(String instrumentationName, ResourceInfo... resourcesInfo) {
146        this.instrumentationName = ParamChecker.notEmpty(instrumentationName, "instrumentationName");
147        if (resourcesInfo.length == 0) {
148            throw new IllegalArgumentException("There must be at least one ResourceInfo");
149        }
150        this.resourcesInfo = Arrays.asList(resourcesInfo);
151        auditLog = XLog.getLog("oozieaudit");
152        auditLog.setMsgPrefix("");
153        logInfo = new XLog.Info(XLog.Info.get());
154    }
155
156    /**
157     * Enable HTTP POST/PUT/DELETE methods while in safe mode.
158     *
159     * @param allow <code>true</code> enabled safe mode changes, <code>false</code> disable safe mode changes
160     * (default).
161     */
162    protected void setAllowSafeModeChanges(boolean allow) {
163        allowSafeModeChanges = allow;
164    }
165
166    /**
167     * Define an instrumentation sampler. <p/> Sampling period is 60 seconds, the sampling frequency is 1 second. <p/>
168     * The instrumentation group used is {@link #INSTRUMENTATION_GROUP}.
169     *
170     * @param samplerName sampler name.
171     * @param samplerCounter sampler counter.
172     */
173    private void defineSampler(String samplerName, final AtomicLong samplerCounter) {
174        instrumentation.addSampler(INSTRUMENTATION_GROUP, samplerName, 60, 1, new Instrumentation.Variable<Long>() {
175            public Long getValue() {
176                return samplerCounter.get();
177            }
178        });
179    }
180
181    /**
182     * Add an instrumentation cron.
183     *
184     * @param name name of the timer for the cron.
185     * @param cron cron to add to a instrumentation timer.
186     */
187    private void addCron(String name, Instrumentation.Cron cron) {
188        instrumentation.addCron(INSTRUMENTATION_GROUP, name, cron);
189    }
190
191    /**
192     * Start the request instrumentation cron.
193     */
194    protected void startCron() {
195        requestCron.get().start();
196    }
197
198    /**
199     * Stop the request instrumentation cron.
200     */
201    protected void stopCron() {
202        requestCron.get().stop();
203    }
204
205    /**
206     * Initializes total request and servlet request samplers.
207     */
208    public void init(ServletConfig servletConfig) throws ServletException {
209        super.init(servletConfig);
210        instrumentation = Services.get().get(InstrumentationService.class).get();
211        synchronized (JsonRestServlet.class) {
212            if (TOTAL_REQUESTS_SAMPLER_COUNTER == null) {
213                TOTAL_REQUESTS_SAMPLER_COUNTER = new AtomicLong();
214                defineSampler(INSTR_TOTAL_REQUESTS_SAMPLER, TOTAL_REQUESTS_SAMPLER_COUNTER);
215            }
216        }
217        defineSampler(instrumentationName, samplerCounter);
218    }
219
220    /**
221     * Convenience method for instrumentation counters.
222     *
223     * @param name counter name.
224     * @param count count to increment the counter.
225     */
226    private void incrCounter(String name, int count) {
227        if (instrumentation != null) {
228            instrumentation.incr(INSTRUMENTATION_GROUP, name, count);
229        }
230    }
231
232    /**
233     * Logs audit information for write requests to the audit log.
234     *
235     * @param request the http request.
236     */
237    private void logAuditInfo(HttpServletRequest request) {
238        if (request.getAttribute(AUDIT_OPERATION) != null) {
239            Integer httpStatusCode = (Integer) request.getAttribute(AUDIT_HTTP_STATUS_CODE);
240            httpStatusCode = (httpStatusCode != null) ? httpStatusCode : HttpServletResponse.SC_OK;
241            String status = (httpStatusCode == HttpServletResponse.SC_OK) ? "SUCCESS" : "FAILED";
242            String operation = (String) request.getAttribute(AUDIT_OPERATION);
243            String param = (String) request.getAttribute(AUDIT_PARAM);
244            String user = XLog.Info.get().getParameter(XLogService.USER);
245            String group = XLog.Info.get().getParameter(XLogService.GROUP);
246            String jobId = XLog.Info.get().getParameter(DagXLogInfoService.JOB);
247            String app = XLog.Info.get().getParameter(DagXLogInfoService.APP);
248
249            String errorCode = (String) request.getAttribute(AUDIT_ERROR_CODE);
250            String errorMessage = (String) request.getAttribute(AUDIT_ERROR_MESSAGE);
251            String hostDetail = request.getRemoteAddr();
252
253            auditLog.info(
254                    "IP [{0}], USER [{1}], GROUP [{2}], APP [{3}], JOBID [{4}], OPERATION [{5}], PARAMETER [{6}], STATUS [{7}],"
255                            + " HTTPCODE [{8}], ERRORCODE [{9}], ERRORMESSAGE [{10}]", hostDetail, user, group, app,
256                    jobId, operation, param, status, httpStatusCode, errorCode, errorMessage);
257        }
258    }
259
260    /**
261     * Dispatches to super after loginfo and intrumentation handling. In case of errors dispatches error response codes
262     * and does error logging.
263     */
264    @SuppressWarnings("unchecked")
265    protected final void service(HttpServletRequest request, HttpServletResponse response) throws ServletException,
266            IOException {
267        //if (Services.get().isSafeMode() && !request.getMethod().equals("GET") && !allowSafeModeChanges) {
268        if (Services.get().getSystemMode() != SYSTEM_MODE.NORMAL && !request.getMethod().equals("GET") && !allowSafeModeChanges) {
269            sendErrorResponse(response, HttpServletResponse.SC_SERVICE_UNAVAILABLE, ErrorCode.E0002.toString(),
270                              ErrorCode.E0002.getTemplate());
271            return;
272        }
273        Instrumentation.Cron cron = new Instrumentation.Cron();
274        requestCron.set(cron);
275        try {
276            cron.start();
277            validateRestUrl(request.getMethod(), getResourceName(request), request.getParameterMap());
278            XLog.Info.get().clear();
279            String user = getUser(request);
280            TOTAL_REQUESTS_SAMPLER_COUNTER.incrementAndGet();
281            samplerCounter.incrementAndGet();
282            //If trace is enabled then display the request headers
283            XLog log = XLog.getLog(getClass());
284            if (log.isTraceEnabled()){
285             logHeaderInfo(request);
286            }
287            super.service(request, response);
288        }
289        catch (XServletException ex) {
290            XLog log = XLog.getLog(getClass());
291            log.warn("URL[{0} {1}] error[{2}], {3}", request.getMethod(), getRequestUrl(request), ex.getErrorCode(), ex
292                    .getMessage(), ex);
293            request.setAttribute(AUDIT_ERROR_MESSAGE, ex.getMessage());
294            request.setAttribute(AUDIT_ERROR_CODE, ex.getErrorCode().toString());
295            request.setAttribute(AUDIT_HTTP_STATUS_CODE, ex.getHttpStatusCode());
296            incrCounter(INSTR_TOTAL_FAILED_REQUESTS_COUNTER, 1);
297            sendErrorResponse(response, ex.getHttpStatusCode(), ex.getErrorCode().toString(), ex.getMessage());
298        }
299        catch (AccessControlException ex) {
300            XLog log = XLog.getLog(getClass());
301            log.error("URL[{0} {1}] error, {2}", request.getMethod(), getRequestUrl(request), ex.getMessage(), ex);
302            incrCounter(INSTR_TOTAL_FAILED_REQUESTS_COUNTER, 1);
303            sendErrorResponse(response, HttpServletResponse.SC_UNAUTHORIZED, ErrorCode.E1400.toString(),
304                              ex.getMessage());
305        }
306        catch (IllegalArgumentException ex){
307          XLog log = XLog.getLog(getClass());
308          log.warn("URL[{0} {1}] user error, {2}", request.getMethod(), getRequestUrl(request), ex.getMessage(), ex);
309          incrCounter(INSTR_TOTAL_FAILED_REQUESTS_COUNTER, 1);
310          sendErrorResponse(response, HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E1603.toString(),
311                            ex.getMessage());
312        }
313        catch (RuntimeException ex) {
314            XLog log = XLog.getLog(getClass());
315            log.error("URL[{0} {1}] error, {2}", request.getMethod(), getRequestUrl(request), ex.getMessage(), ex);
316            incrCounter(INSTR_TOTAL_FAILED_REQUESTS_COUNTER, 1);
317            throw ex;
318        }
319        finally {
320            logAuditInfo(request);
321            TOTAL_REQUESTS_SAMPLER_COUNTER.decrementAndGet();
322            incrCounter(INSTR_TOTAL_REQUESTS_COUNTER, 1);
323            samplerCounter.decrementAndGet();
324            XLog.Info.remove();
325            cron.stop();
326            // TODO
327            incrCounter(instrumentationName, 1);
328            incrCounter(instrumentationName + "-" + request.getMethod(), 1);
329            addCron(instrumentationName, cron);
330            addCron(instrumentationName + "-" + request.getMethod(), cron);
331            requestCron.remove();
332        }
333    }
334
335    private void logHeaderInfo(HttpServletRequest request){
336        XLog log = XLog.getLog(getClass());
337        StringBuilder traceInfo = new StringBuilder(4096);
338            //Display request URL and request.getHeaderNames();
339            Enumeration<String> names = (Enumeration<String>) request.getHeaderNames();
340            traceInfo.append("Request URL: ").append(getRequestUrl(request)).append("\nRequest Headers:\n");
341            while (names.hasMoreElements()) {
342                String name = names.nextElement();
343                String value = request.getHeader(name);
344                traceInfo.append(name).append(" : ").append(value).append("\n");
345            }
346            log.trace(traceInfo);
347    }
348
349    private String getRequestUrl(HttpServletRequest request) {
350        StringBuffer url = request.getRequestURL();
351        if (request.getQueryString() != null) {
352            url.append("?").append(request.getQueryString());
353        }
354        return url.toString();
355    }
356
357    /**
358     * Sends a JSON response.
359     *
360     * @param response servlet response.
361     * @param statusCode HTTP status code.
362     * @param bean bean to send as JSON response.
363     * @param timeZoneId time zone to use for dates in the JSON response.
364     * @throws java.io.IOException thrown if the bean could not be serialized to the response output stream.
365     */
366    protected void sendJsonResponse(HttpServletResponse response, int statusCode, JsonBean bean, String timeZoneId) 
367            throws IOException {
368        response.setStatus(statusCode);
369        JSONObject json = bean.toJSONObject(timeZoneId);
370        response.setContentType(JSON_UTF8);
371        json.writeJSONString(response.getWriter());
372    }
373
374    /**
375     * Sends a error response.
376     *
377     * @param response servlet response.
378     * @param statusCode HTTP status code.
379     * @param error error code.
380     * @param message error message.
381     * @throws java.io.IOException thrown if the error response could not be set.
382     */
383    protected void sendErrorResponse(HttpServletResponse response, int statusCode, String error, String message)
384            throws IOException {
385        response.setHeader(RestConstants.OOZIE_ERROR_CODE, error);
386        response.setHeader(RestConstants.OOZIE_ERROR_MESSAGE, message);
387        response.sendError(statusCode);
388    }
389
390    protected void sendJsonResponse(HttpServletResponse response, int statusCode, JSONStreamAware json)
391            throws IOException {
392        if (statusCode == HttpServletResponse.SC_OK || statusCode == HttpServletResponse.SC_CREATED) {
393            response.setStatus(statusCode);
394        }
395        else {
396            response.sendError(statusCode);
397        }
398        response.setStatus(statusCode);
399        response.setContentType(JSON_UTF8);
400        json.writeJSONString(response.getWriter());
401    }
402
403    /**
404     * Validates REST URL using the ResourceInfos of the servlet.
405     *
406     * @param method HTTP method.
407     * @param resourceName resource name.
408     * @param queryStringParams query string parameters.
409     * @throws javax.servlet.ServletException thrown if the resource name or parameters are incorrect.
410     */
411    @SuppressWarnings("unchecked")
412    protected void validateRestUrl(String method, String resourceName, Map<String, String[]> queryStringParams)
413            throws ServletException {
414
415        if (resourceName.contains("/")) {
416            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0301, resourceName);
417        }
418
419        boolean valid = false;
420        for (int i = 0; !valid && i < resourcesInfo.size(); i++) {
421            ResourceInfo resourceInfo = resourcesInfo.get(i);
422            if (resourceInfo.name.equals(resourceName) || resourceInfo.wildcard) {
423                if (!resourceInfo.methods.contains(method)) {
424                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0301, resourceName);
425                }
426                for (Map.Entry<String, String[]> entry : queryStringParams.entrySet()) {
427                    String name = entry.getKey();
428                    ParameterInfo parameterInfo = resourceInfo.parameters.get(name);
429                    if (parameterInfo != null) {
430                        if (!parameterInfo.methods.contains(method)) {
431                            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, name);
432                        }
433                        String value = entry.getValue()[0].trim();
434                        if (parameterInfo.type.equals(Boolean.class)) {
435                            value = value.toLowerCase();
436                            if (!value.equals("true") && !value.equals("false")) {
437                                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0304, name,
438                                                            "boolean");
439                            }
440                        }
441                        if (parameterInfo.type.equals(Integer.class)) {
442                            try {
443                                Integer.parseInt(value);
444                            }
445                            catch (NumberFormatException ex) {
446                                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0304, name,
447                                                            "integer");
448                            }
449                        }
450                    }
451                }
452                for (ParameterInfo parameterInfo : resourceInfo.parameters.values()) {
453                    if (parameterInfo.methods.contains(method) && parameterInfo.required
454                            && queryStringParams.get(parameterInfo.name) == null) {
455                        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0305,
456                                                    parameterInfo.name);
457                    }
458                }
459                valid = true;
460            }
461        }
462        if (!valid) {
463            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0301, resourceName);
464        }
465    }
466
467    /**
468     * Return the resource name of the request. <p/> The resource name is the whole extra path. If the extra path starts
469     * with '/', the first '/' is trimmed.
470     *
471     * @param request request instance
472     * @return the resource name, <code>null</code> if none.
473     */
474    protected String getResourceName(HttpServletRequest request) {
475        String requestPath = request.getPathInfo();
476        if (requestPath != null) {
477            while (requestPath.startsWith("/")) {
478                requestPath = requestPath.substring(1);
479            }
480            requestPath = requestPath.trim();
481        }
482        else {
483            requestPath = "";
484        }
485        return requestPath;
486    }
487
488    /**
489     * Return the request content type, lowercase and without attributes.
490     *
491     * @param request servlet request.
492     * @return the request content type, <code>null</code> if none.
493     */
494    protected String getContentType(HttpServletRequest request) {
495        String contentType = request.getContentType();
496        if (contentType != null) {
497            int index = contentType.indexOf(";");
498            if (index > -1) {
499                contentType = contentType.substring(0, index);
500            }
501            contentType = contentType.toLowerCase();
502        }
503        return contentType;
504    }
505
506    /**
507     * Validate and return the content type of the request.
508     *
509     * @param request servlet request.
510     * @param expected expected contentType.
511     * @return the normalized content type (lowercase and without modifiers).
512     * @throws XServletException thrown if the content type is invalid.
513     */
514    protected String validateContentType(HttpServletRequest request, String expected) throws XServletException {
515        String contentType = getContentType(request);
516        if (contentType == null || contentType.trim().length() == 0) {
517            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0300, contentType);
518        }
519        if (!contentType.equals(expected)) {
520            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0300, contentType);
521        }
522        return contentType;
523    }
524
525    /**
526     * Request attribute constant for the authenticatio token.
527     */
528    public static final String AUTH_TOKEN = "oozie.auth.token";
529
530    /**
531     * Request attribute constant for the user name.
532     */
533    public static final String USER_NAME = "oozie.user.name";
534
535    protected static final String UNDEF = "?";
536
537    /**
538     * Return the user name of the request if any.
539     *
540     * @param request request.
541     * @return the user name, <code>null</code> if there is none.
542     */
543    protected String getUser(HttpServletRequest request) {
544        String userName = (String) request.getAttribute(USER_NAME);
545
546        String doAsUserName = request.getParameter(RestConstants.DO_AS_PARAM);
547        if (doAsUserName != null && !doAsUserName.equals(userName)) {
548            ProxyUserService proxyUser = Services.get().get(ProxyUserService.class);
549            try {
550                proxyUser.validate(userName, HostnameFilter.get(), doAsUserName);
551            }
552            catch (IOException ex) {
553                throw new RuntimeException(ex);
554            }
555            auditLog.info("Proxy user [{0}] DoAs user [{1}] Request [{2}]", userName, doAsUserName,
556                          getRequestUrl(request));
557
558            XLog.Info.get().setParameter(XLogService.USER, userName + " doAs " + doAsUserName);
559
560            userName = doAsUserName;
561        }
562        else {
563            XLog.Info.get().setParameter(XLogService.USER, userName);
564        }
565        return (userName != null) ? userName : UNDEF;
566    }
567
568    /**
569     * Set the log info with the given information.
570     *
571     * @param jobid job ID.
572     * @param actionid action ID.
573     */
574    protected void setLogInfo(String jobid, String actionid) {
575        logInfo.setParameter(DagXLogInfoService.JOB, jobid);
576        logInfo.setParameter(DagXLogInfoService.ACTION, actionid);
577
578        XLog.Info.get().setParameters(logInfo);
579    }
580}