001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.servlet;
020
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.Collections;
024import java.util.Properties;
025
026import javax.servlet.ServletException;
027import javax.servlet.http.HttpServletRequest;
028import javax.servlet.http.HttpServletResponse;
029
030import org.apache.oozie.DagEngine;
031import org.apache.oozie.DagEngineException;
032import org.apache.oozie.ErrorCode;
033import org.apache.oozie.client.rest.RestConstants;
034import org.apache.oozie.service.CallbackService;
035import org.apache.oozie.service.ConfigurationService;
036import org.apache.oozie.service.DagEngineService;
037import org.apache.oozie.service.Services;
038import org.apache.oozie.util.IOUtils;
039import org.apache.oozie.util.PropertiesUtils;
040import org.apache.oozie.util.XLog;
041
042public class CallbackServlet extends JsonRestServlet {
043
044    private static final String INSTRUMENTATION_NAME = "callback";
045
046    private static final ResourceInfo RESOURCE_INFO =
047            new ResourceInfo("", Arrays.asList("POST", "GET"), Collections.<ParameterInfo>emptyList());
048
049    public final static String CONF_MAX_DATA_LEN = "oozie.servlet.CallbackServlet.max.data.len";
050
051    private static int maxDataLen;
052
053    private XLog log = null;
054
055    public CallbackServlet() {
056        super(INSTRUMENTATION_NAME, RESOURCE_INFO);
057    }
058
059    @Override
060    public void init() {
061        maxDataLen = ConfigurationService.getInt(CONF_MAX_DATA_LEN);
062    }
063
064    /**
065     * GET callback
066     */
067    @Override
068    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
069        String queryString = request.getQueryString();
070        CallbackService callbackService = Services.get().get(CallbackService.class);
071
072        if (!callbackService.isValid(queryString)) {
073            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
074        }
075
076        String actionId = callbackService.getActionId(queryString);
077        if (actionId == null) {
078            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
079        }
080        log = XLog.getLog(getClass());
081        setLogInfo(actionId);
082        log.debug("Received a CallbackServlet.doGet() with query string " + queryString);
083
084        DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine();
085        try {
086            log.info(XLog.STD, "callback for action [{0}]", actionId);
087            dagEngine.processCallback(actionId, callbackService.getExternalStatus(queryString), null);
088        }
089        catch (DagEngineException ex) {
090            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
091        }
092    }
093
094    /**
095     * POST callback
096     */
097    @Override
098    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException,
099            IOException {
100        String queryString = request.getQueryString();
101        CallbackService callbackService = Services.get().get(CallbackService.class);
102
103        if (!callbackService.isValid(queryString)) {
104            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
105        }
106
107        String actionId = callbackService.getActionId(queryString);
108        if (actionId == null) {
109            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
110        }
111        log = XLog.getLog(getClass());
112        setLogInfo(actionId);
113        log.debug("Received a CallbackServlet.doPost() with query string " + queryString);
114
115        validateContentType(request, RestConstants.TEXT_CONTENT_TYPE);
116        try {
117            log.info(XLog.STD, "callback for action [{0}]", actionId);
118            String data = IOUtils.getReaderAsString(request.getReader(), maxDataLen);
119            Properties props = PropertiesUtils.stringToProperties(data);
120            DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine();
121            dagEngine.processCallback(actionId, callbackService.getExternalStatus(queryString), props);
122        }
123        catch (IOException ex) {
124            if (ex.getMessage().startsWith("stream exceeds limit")) {
125                // TODO, WE MUST SET THE ACTION TO ERROR
126                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0403, "data too long");
127            }
128            else {
129                throw ex;
130            }
131        }
132        catch (DagEngineException ex) {
133            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
134        }
135    }
136}