001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.servlet;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.Collections;
023import java.util.Properties;
024
025import javax.servlet.ServletException;
026import javax.servlet.http.HttpServletRequest;
027import javax.servlet.http.HttpServletResponse;
028
029import org.apache.oozie.DagEngine;
030import org.apache.oozie.DagEngineException;
031import org.apache.oozie.ErrorCode;
032import org.apache.oozie.client.rest.RestConstants;
033import org.apache.oozie.service.CallbackService;
034import org.apache.oozie.service.DagEngineService;
035import org.apache.oozie.service.Services;
036import org.apache.oozie.util.IOUtils;
037import org.apache.oozie.util.PropertiesUtils;
038import org.apache.oozie.util.XLog;
039
040public class CallbackServlet extends JsonRestServlet {
041    private static final String INSTRUMENTATION_NAME = "callback";
042
043    private static final ResourceInfo RESOURCE_INFO =
044            new ResourceInfo("", Arrays.asList("POST", "GET"), Collections.EMPTY_LIST);
045
046    public final static String CONF_MAX_DATA_LEN = "oozie.servlet.CallbackServlet.max.data.len";
047
048    private static int maxDataLen;
049
050    private XLog log = null;
051
052    public CallbackServlet() {
053        super(INSTRUMENTATION_NAME, RESOURCE_INFO);
054    }
055
056    @Override
057    public void init() {
058        maxDataLen = Services.get().getConf().getInt(CONF_MAX_DATA_LEN, 2 * 1024);
059    }
060
061    /**
062     * GET callback
063     */
064    @Override
065    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
066        String queryString = request.getQueryString();
067        CallbackService callbackService = Services.get().get(CallbackService.class);
068
069        if (!callbackService.isValid(queryString)) {
070            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
071        }
072
073        String actionId = callbackService.getActionId(queryString);
074        if (actionId == null) {
075            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
076        }
077        int idx = actionId.lastIndexOf('@', actionId.length());
078        String jobId;
079        if (idx == -1) {
080            jobId = actionId;
081        }
082        else {
083            jobId = actionId.substring(0, idx);
084        }
085        setLogInfo(jobId, actionId);
086        log = XLog.getLog(getClass());
087        log.debug("Received a CallbackServlet.doGet() with query string " + queryString);
088
089        DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine();
090        try {
091            log.info(XLog.STD, "callback for action [{0}]", actionId);
092            dagEngine.processCallback(actionId, callbackService.getExternalStatus(queryString), null);
093        }
094        catch (DagEngineException ex) {
095            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
096        }
097    }
098
099    /**
100     * POST callback
101     */
102    @Override
103    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException,
104            IOException {
105        String queryString = request.getQueryString();
106        CallbackService callbackService = Services.get().get(CallbackService.class);
107
108        if (!callbackService.isValid(queryString)) {
109            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
110        }
111
112        String actionId = callbackService.getActionId(queryString);
113        if (actionId == null) {
114            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
115        }
116        int idx = actionId.lastIndexOf('@', actionId.length());
117        String jobId;
118        if (idx == -1) {
119            jobId = actionId;
120        }
121        else {
122            jobId = actionId.substring(0, idx);
123        }
124        setLogInfo(jobId, actionId);
125        log = XLog.getLog(getClass());
126        log.debug("Received a CallbackServlet.doPost() with query string " + queryString);
127
128        validateContentType(request, RestConstants.TEXT_CONTENT_TYPE);
129        try {
130            log.info(XLog.STD, "callback for action [{0}]", actionId);
131            String data = IOUtils.getReaderAsString(request.getReader(), maxDataLen);
132            Properties props = PropertiesUtils.stringToProperties(data);
133            DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine();
134            dagEngine.processCallback(actionId, callbackService.getExternalStatus(queryString), props);
135        }
136        catch (IOException ex) {
137            if (ex.getMessage().startsWith("stream exceeds limit")) {
138                // TODO, WE MUST SET THE ACTION TO ERROR
139                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0403, "data too long");
140            }
141            else {
142                throw ex;
143            }
144        }
145        catch (DagEngineException ex) {
146            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
147        }
148    }
149}