001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.servlet; 019 020 import java.io.IOException; 021 import java.util.Arrays; 022 import java.util.Collections; 023 import java.util.Properties; 024 025 import javax.servlet.ServletException; 026 import javax.servlet.http.HttpServletRequest; 027 import javax.servlet.http.HttpServletResponse; 028 029 import org.apache.oozie.DagEngine; 030 import org.apache.oozie.DagEngineException; 031 import org.apache.oozie.ErrorCode; 032 import org.apache.oozie.client.rest.RestConstants; 033 import org.apache.oozie.service.CallbackService; 034 import org.apache.oozie.service.DagEngineService; 035 import org.apache.oozie.service.Services; 036 import org.apache.oozie.util.IOUtils; 037 import org.apache.oozie.util.PropertiesUtils; 038 import org.apache.oozie.util.XLog; 039 040 public class CallbackServlet extends JsonRestServlet { 041 private static final String INSTRUMENTATION_NAME = "callback"; 042 043 private static final ResourceInfo RESOURCE_INFO = 044 new ResourceInfo("", Arrays.asList("POST", "GET"), Collections.EMPTY_LIST); 045 046 public final static String CONF_MAX_DATA_LEN = "oozie.servlet.CallbackServlet.max.data.len"; 047 048 private static int maxDataLen; 049 050 private XLog log = null; 051 052 public CallbackServlet() { 053 super(INSTRUMENTATION_NAME, RESOURCE_INFO); 054 } 055 056 @Override 057 public void init() { 058 maxDataLen = Services.get().getConf().getInt(CONF_MAX_DATA_LEN, 2 * 1024); 059 } 060 061 /** 062 * GET callback 063 */ 064 @Override 065 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 066 String queryString = request.getQueryString(); 067 CallbackService callbackService = Services.get().get(CallbackService.class); 068 069 if (!callbackService.isValid(queryString)) { 070 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString); 071 } 072 073 String actionId = callbackService.getActionId(queryString); 074 if (actionId == null) { 075 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString); 076 } 077 int idx = actionId.lastIndexOf('@', actionId.length()); 078 String jobId; 079 if (idx == -1) { 080 jobId = actionId; 081 } 082 else { 083 jobId = actionId.substring(0, idx); 084 } 085 setLogInfo(jobId, actionId); 086 log = XLog.getLog(getClass()); 087 log.debug("Received a CallbackServlet.doGet() with query string " + queryString); 088 089 DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine(); 090 try { 091 log.info(XLog.STD, "callback for action [{0}]", actionId); 092 dagEngine.processCallback(actionId, callbackService.getExternalStatus(queryString), null); 093 } 094 catch (DagEngineException ex) { 095 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 096 } 097 } 098 099 /** 100 * POST callback 101 */ 102 @Override 103 protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, 104 IOException { 105 String queryString = request.getQueryString(); 106 CallbackService callbackService = Services.get().get(CallbackService.class); 107 108 if (!callbackService.isValid(queryString)) { 109 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString); 110 } 111 112 String actionId = callbackService.getActionId(queryString); 113 if (actionId == null) { 114 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString); 115 } 116 int idx = actionId.lastIndexOf('@', actionId.length()); 117 String jobId; 118 if (idx == -1) { 119 jobId = actionId; 120 } 121 else { 122 jobId = actionId.substring(0, idx); 123 } 124 setLogInfo(jobId, actionId); 125 log = XLog.getLog(getClass()); 126 log.debug("Received a CallbackServlet.doPost() with query string " + queryString); 127 128 validateContentType(request, RestConstants.TEXT_CONTENT_TYPE); 129 try { 130 log.info(XLog.STD, "callback for action [{0}]", actionId); 131 String data = IOUtils.getReaderAsString(request.getReader(), maxDataLen); 132 Properties props = PropertiesUtils.stringToProperties(data); 133 DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine(); 134 dagEngine.processCallback(actionId, callbackService.getExternalStatus(queryString), props); 135 } 136 catch (IOException ex) { 137 if (ex.getMessage().startsWith("stream exceeds limit")) { 138 // TODO, WE MUST SET THE ACTION TO ERROR 139 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0403, "data too long"); 140 } 141 else { 142 throw ex; 143 } 144 } 145 catch (DagEngineException ex) { 146 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 147 } 148 } 149 }