001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.servlet;
020
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.Collections;
024import java.util.Properties;
025
026import javax.servlet.ServletException;
027import javax.servlet.http.HttpServletRequest;
028import javax.servlet.http.HttpServletResponse;
029
030import org.apache.oozie.DagEngine;
031import org.apache.oozie.DagEngineException;
032import org.apache.oozie.ErrorCode;
033import org.apache.oozie.client.rest.RestConstants;
034import org.apache.oozie.service.CallbackService;
035import org.apache.oozie.service.ConfigurationService;
036import org.apache.oozie.service.DagEngineService;
037import org.apache.oozie.service.Services;
038import org.apache.oozie.util.IOUtils;
039import org.apache.oozie.util.PropertiesUtils;
040import org.apache.oozie.util.XLog;
041
042public class CallbackServlet extends JsonRestServlet {
043    private static final String INSTRUMENTATION_NAME = "callback";
044
045    private static final ResourceInfo RESOURCE_INFO =
046            new ResourceInfo("", Arrays.asList("POST", "GET"), Collections.EMPTY_LIST);
047
048    public final static String CONF_MAX_DATA_LEN = "oozie.servlet.CallbackServlet.max.data.len";
049
050    private static int maxDataLen;
051
052    private XLog log = null;
053
054    public CallbackServlet() {
055        super(INSTRUMENTATION_NAME, RESOURCE_INFO);
056    }
057
058    @Override
059    public void init() {
060        maxDataLen = ConfigurationService.getInt(CONF_MAX_DATA_LEN);
061    }
062
063    /**
064     * GET callback
065     */
066    @Override
067    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
068        String queryString = request.getQueryString();
069        CallbackService callbackService = Services.get().get(CallbackService.class);
070
071        if (!callbackService.isValid(queryString)) {
072            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
073        }
074
075        String actionId = callbackService.getActionId(queryString);
076        if (actionId == null) {
077            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
078        }
079        log = XLog.getLog(getClass());
080        setLogInfo(actionId);
081        log.debug("Received a CallbackServlet.doGet() with query string " + queryString);
082
083        DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine();
084        try {
085            log.info(XLog.STD, "callback for action [{0}]", actionId);
086            dagEngine.processCallback(actionId, callbackService.getExternalStatus(queryString), null);
087        }
088        catch (DagEngineException ex) {
089            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
090        }
091    }
092
093    /**
094     * POST callback
095     */
096    @Override
097    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException,
098            IOException {
099        String queryString = request.getQueryString();
100        CallbackService callbackService = Services.get().get(CallbackService.class);
101
102        if (!callbackService.isValid(queryString)) {
103            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
104        }
105
106        String actionId = callbackService.getActionId(queryString);
107        if (actionId == null) {
108            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
109        }
110        log = XLog.getLog(getClass());
111        setLogInfo(actionId);
112        log.debug("Received a CallbackServlet.doPost() with query string " + queryString);
113
114        validateContentType(request, RestConstants.TEXT_CONTENT_TYPE);
115        try {
116            log.info(XLog.STD, "callback for action [{0}]", actionId);
117            String data = IOUtils.getReaderAsString(request.getReader(), maxDataLen);
118            Properties props = PropertiesUtils.stringToProperties(data);
119            DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine();
120            dagEngine.processCallback(actionId, callbackService.getExternalStatus(queryString), props);
121        }
122        catch (IOException ex) {
123            if (ex.getMessage().startsWith("stream exceeds limit")) {
124                // TODO, WE MUST SET THE ACTION TO ERROR
125                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0403, "data too long");
126            }
127            else {
128                throw ex;
129            }
130        }
131        catch (DagEngineException ex) {
132            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
133        }
134    }
135}