001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.servlet; 020 021import java.io.IOException; 022import java.util.Arrays; 023import java.util.Collections; 024import java.util.Properties; 025 026import javax.servlet.ServletException; 027import javax.servlet.http.HttpServletRequest; 028import javax.servlet.http.HttpServletResponse; 029 030import org.apache.oozie.DagEngine; 031import org.apache.oozie.DagEngineException; 032import org.apache.oozie.ErrorCode; 033import org.apache.oozie.client.rest.RestConstants; 034import org.apache.oozie.service.CallbackService; 035import org.apache.oozie.service.ConfigurationService; 036import org.apache.oozie.service.DagEngineService; 037import org.apache.oozie.service.Services; 038import org.apache.oozie.util.IOUtils; 039import org.apache.oozie.util.PropertiesUtils; 040import org.apache.oozie.util.XLog; 041 042public class CallbackServlet extends JsonRestServlet { 043 044 private static final String INSTRUMENTATION_NAME = "callback"; 045 046 private static final ResourceInfo RESOURCE_INFO = 047 new ResourceInfo("", Arrays.asList("POST", "GET"), Collections.<ParameterInfo>emptyList()); 048 049 public final static String CONF_MAX_DATA_LEN = "oozie.servlet.CallbackServlet.max.data.len"; 050 051 private static int maxDataLen; 052 053 private XLog log = null; 054 055 public CallbackServlet() { 056 super(INSTRUMENTATION_NAME, RESOURCE_INFO); 057 } 058 059 @Override 060 public void init() { 061 maxDataLen = ConfigurationService.getInt(CONF_MAX_DATA_LEN); 062 } 063 064 /** 065 * GET callback 066 */ 067 @Override 068 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 069 String queryString = request.getQueryString(); 070 CallbackService callbackService = Services.get().get(CallbackService.class); 071 072 if (!callbackService.isValid(queryString)) { 073 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString); 074 } 075 076 String actionId = callbackService.getActionId(queryString); 077 if (actionId == null) { 078 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString); 079 } 080 log = XLog.getLog(getClass()); 081 setLogInfo(actionId); 082 log.debug("Received a CallbackServlet.doGet() with query string " + queryString); 083 084 DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine(); 085 try { 086 log.info(XLog.STD, "callback for action [{0}]", actionId); 087 dagEngine.processCallback(actionId, callbackService.getExternalStatus(queryString), null); 088 } 089 catch (DagEngineException ex) { 090 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 091 } 092 } 093 094 /** 095 * POST callback 096 */ 097 @Override 098 protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, 099 IOException { 100 String queryString = request.getQueryString(); 101 CallbackService callbackService = Services.get().get(CallbackService.class); 102 103 if (!callbackService.isValid(queryString)) { 104 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString); 105 } 106 107 String actionId = callbackService.getActionId(queryString); 108 if (actionId == null) { 109 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString); 110 } 111 log = XLog.getLog(getClass()); 112 setLogInfo(actionId); 113 log.debug("Received a CallbackServlet.doPost() with query string " + queryString); 114 115 validateContentType(request, RestConstants.TEXT_CONTENT_TYPE); 116 try { 117 log.info(XLog.STD, "callback for action [{0}]", actionId); 118 String data = IOUtils.getReaderAsString(request.getReader(), maxDataLen); 119 Properties props = PropertiesUtils.stringToProperties(data); 120 DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine(); 121 dagEngine.processCallback(actionId, callbackService.getExternalStatus(queryString), props); 122 } 123 catch (IOException ex) { 124 if (ex.getMessage().startsWith("stream exceeds limit")) { 125 // TODO, WE MUST SET THE ACTION TO ERROR 126 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0403, "data too long"); 127 } 128 else { 129 throw ex; 130 } 131 } 132 catch (DagEngineException ex) { 133 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 134 } 135 } 136}