001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.servlet; 020 021import java.io.IOException; 022import java.util.Arrays; 023import java.util.Collections; 024import java.util.Properties; 025 026import javax.servlet.ServletException; 027import javax.servlet.http.HttpServletRequest; 028import javax.servlet.http.HttpServletResponse; 029 030import org.apache.oozie.DagEngine; 031import org.apache.oozie.DagEngineException; 032import org.apache.oozie.ErrorCode; 033import org.apache.oozie.client.rest.RestConstants; 034import org.apache.oozie.service.CallbackService; 035import org.apache.oozie.service.ConfigurationService; 036import org.apache.oozie.service.DagEngineService; 037import org.apache.oozie.service.Services; 038import org.apache.oozie.util.IOUtils; 039import org.apache.oozie.util.PropertiesUtils; 040import org.apache.oozie.util.XLog; 041 042public class CallbackServlet extends JsonRestServlet { 043 private static final String INSTRUMENTATION_NAME = "callback"; 044 045 private static final ResourceInfo RESOURCE_INFO = 046 new ResourceInfo("", Arrays.asList("POST", "GET"), Collections.EMPTY_LIST); 047 048 public final static String CONF_MAX_DATA_LEN = "oozie.servlet.CallbackServlet.max.data.len"; 049 050 private static int maxDataLen; 051 052 private XLog log = null; 053 054 public CallbackServlet() { 055 super(INSTRUMENTATION_NAME, RESOURCE_INFO); 056 } 057 058 @Override 059 public void init() { 060 maxDataLen = ConfigurationService.getInt(CONF_MAX_DATA_LEN); 061 } 062 063 /** 064 * GET callback 065 */ 066 @Override 067 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 068 String queryString = request.getQueryString(); 069 CallbackService callbackService = Services.get().get(CallbackService.class); 070 071 if (!callbackService.isValid(queryString)) { 072 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString); 073 } 074 075 String actionId = callbackService.getActionId(queryString); 076 if (actionId == null) { 077 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString); 078 } 079 log = XLog.getLog(getClass()); 080 setLogInfo(actionId); 081 log.debug("Received a CallbackServlet.doGet() with query string " + queryString); 082 083 DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine(); 084 try { 085 log.info(XLog.STD, "callback for action [{0}]", actionId); 086 dagEngine.processCallback(actionId, callbackService.getExternalStatus(queryString), null); 087 } 088 catch (DagEngineException ex) { 089 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 090 } 091 } 092 093 /** 094 * POST callback 095 */ 096 @Override 097 protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, 098 IOException { 099 String queryString = request.getQueryString(); 100 CallbackService callbackService = Services.get().get(CallbackService.class); 101 102 if (!callbackService.isValid(queryString)) { 103 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString); 104 } 105 106 String actionId = callbackService.getActionId(queryString); 107 if (actionId == null) { 108 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString); 109 } 110 log = XLog.getLog(getClass()); 111 setLogInfo(actionId); 112 log.debug("Received a CallbackServlet.doPost() with query string " + queryString); 113 114 validateContentType(request, RestConstants.TEXT_CONTENT_TYPE); 115 try { 116 log.info(XLog.STD, "callback for action [{0}]", actionId); 117 String data = IOUtils.getReaderAsString(request.getReader(), maxDataLen); 118 Properties props = PropertiesUtils.stringToProperties(data); 119 DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine(); 120 dagEngine.processCallback(actionId, callbackService.getExternalStatus(queryString), props); 121 } 122 catch (IOException ex) { 123 if (ex.getMessage().startsWith("stream exceeds limit")) { 124 // TODO, WE MUST SET THE ACTION TO ERROR 125 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0403, "data too long"); 126 } 127 else { 128 throw ex; 129 } 130 } 131 catch (DagEngineException ex) { 132 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 133 } 134 } 135}