001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.servlet;
019    
020    import java.io.IOException;
021    import java.util.Arrays;
022    import java.util.Collections;
023    import java.util.Properties;
024    
025    import javax.servlet.ServletException;
026    import javax.servlet.http.HttpServletRequest;
027    import javax.servlet.http.HttpServletResponse;
028    
029    import org.apache.oozie.DagEngine;
030    import org.apache.oozie.DagEngineException;
031    import org.apache.oozie.ErrorCode;
032    import org.apache.oozie.client.rest.RestConstants;
033    import org.apache.oozie.service.CallbackService;
034    import org.apache.oozie.service.DagEngineService;
035    import org.apache.oozie.service.Services;
036    import org.apache.oozie.util.IOUtils;
037    import org.apache.oozie.util.PropertiesUtils;
038    import org.apache.oozie.util.XLog;
039    
040    public class CallbackServlet extends JsonRestServlet {
041        private static final String INSTRUMENTATION_NAME = "callback";
042    
043        private static final ResourceInfo RESOURCE_INFO =
044                new ResourceInfo("", Arrays.asList("POST", "GET"), Collections.EMPTY_LIST);
045    
046        public final static String CONF_MAX_DATA_LEN = "oozie.servlet.CallbackServlet.max.data.len";
047    
048        private static int maxDataLen;
049    
050        private XLog log = null;
051    
052        public CallbackServlet() {
053            super(INSTRUMENTATION_NAME, RESOURCE_INFO);
054        }
055    
056        @Override
057        public void init() {
058            maxDataLen = Services.get().getConf().getInt(CONF_MAX_DATA_LEN, 2 * 1024);
059        }
060    
061        /**
062         * GET callback
063         */
064        @Override
065        protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
066            String queryString = request.getQueryString();
067            CallbackService callbackService = Services.get().get(CallbackService.class);
068    
069            if (!callbackService.isValid(queryString)) {
070                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
071            }
072    
073            String actionId = callbackService.getActionId(queryString);
074            if (actionId == null) {
075                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
076            }
077            int idx = actionId.lastIndexOf('@', actionId.length());
078            String jobId;
079            if (idx == -1) {
080                jobId = actionId;
081            }
082            else {
083                jobId = actionId.substring(0, idx);
084            }
085            setLogInfo(jobId, actionId);
086            log = XLog.getLog(getClass());
087            log.debug("Received a CallbackServlet.doGet() with query string " + queryString);
088    
089            DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine();
090            try {
091                log.info(XLog.STD, "callback for action [{0}]", actionId);
092                dagEngine.processCallback(actionId, callbackService.getExternalStatus(queryString), null);
093            }
094            catch (DagEngineException ex) {
095                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
096            }
097        }
098    
099        /**
100         * POST callback
101         */
102        @Override
103        protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException,
104                IOException {
105            String queryString = request.getQueryString();
106            CallbackService callbackService = Services.get().get(CallbackService.class);
107    
108            if (!callbackService.isValid(queryString)) {
109                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
110            }
111    
112            String actionId = callbackService.getActionId(queryString);
113            if (actionId == null) {
114                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
115            }
116            int idx = actionId.lastIndexOf('@', actionId.length());
117            String jobId;
118            if (idx == -1) {
119                jobId = actionId;
120            }
121            else {
122                jobId = actionId.substring(0, idx);
123            }
124            setLogInfo(jobId, actionId);
125            log = XLog.getLog(getClass());
126            log.debug("Received a CallbackServlet.doPost() with query string " + queryString);
127    
128            validateContentType(request, RestConstants.TEXT_CONTENT_TYPE);
129            try {
130                log.info(XLog.STD, "callback for action [{0}]", actionId);
131                String data = IOUtils.getReaderAsString(request.getReader(), maxDataLen);
132                Properties props = PropertiesUtils.stringToProperties(data);
133                DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine();
134                dagEngine.processCallback(actionId, callbackService.getExternalStatus(queryString), props);
135            }
136            catch (IOException ex) {
137                if (ex.getMessage().startsWith("stream exceeds limit")) {
138                    // TODO, WE MUST SET THE ACTION TO ERROR
139                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0403, "data too long");
140                }
141                else {
142                    throw ex;
143                }
144            }
145            catch (DagEngineException ex) {
146                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
147            }
148        }
149    }