This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.servlet;
019
020 import java.io.IOException;
021 import java.util.Arrays;
022 import java.util.Collections;
023 import java.util.Properties;
024
025 import javax.servlet.ServletException;
026 import javax.servlet.http.HttpServletRequest;
027 import javax.servlet.http.HttpServletResponse;
028
029 import org.apache.oozie.DagEngine;
030 import org.apache.oozie.DagEngineException;
031 import org.apache.oozie.ErrorCode;
032 import org.apache.oozie.client.rest.RestConstants;
033 import org.apache.oozie.service.CallbackService;
034 import org.apache.oozie.service.DagEngineService;
035 import org.apache.oozie.service.Services;
036 import org.apache.oozie.util.IOUtils;
037 import org.apache.oozie.util.PropertiesUtils;
038 import org.apache.oozie.util.XLog;
039
040 public class CallbackServlet extends JsonRestServlet {
041 private static final String INSTRUMENTATION_NAME = "callback";
042
043 private static final ResourceInfo RESOURCE_INFO =
044 new ResourceInfo("", Arrays.asList("POST", "GET"), Collections.EMPTY_LIST);
045
046 public final static String CONF_MAX_DATA_LEN = "oozie.servlet.CallbackServlet.max.data.len";
047
048 private static int maxDataLen;
049
050 private XLog log = null;
051
052 public CallbackServlet() {
053 super(INSTRUMENTATION_NAME, RESOURCE_INFO);
054 }
055
056 @Override
057 public void init() {
058 maxDataLen = Services.get().getConf().getInt(CONF_MAX_DATA_LEN, 2 * 1024);
059 }
060
061 /**
062 * GET callback
063 */
064 @Override
065 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
066 String queryString = request.getQueryString();
067 CallbackService callbackService = Services.get().get(CallbackService.class);
068
069 if (!callbackService.isValid(queryString)) {
070 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
071 }
072
073 String actionId = callbackService.getActionId(queryString);
074 if (actionId == null) {
075 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
076 }
077 int idx = actionId.lastIndexOf('@', actionId.length());
078 String jobId;
079 if (idx == -1) {
080 jobId = actionId;
081 }
082 else {
083 jobId = actionId.substring(0, idx);
084 }
085 setLogInfo(jobId, actionId);
086 log = XLog.getLog(getClass());
087 log.debug("Received a CallbackServlet.doGet() with query string " + queryString);
088
089 DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine();
090 try {
091 log.info(XLog.STD, "callback for action [{0}]", actionId);
092 dagEngine.processCallback(actionId, callbackService.getExternalStatus(queryString), null);
093 }
094 catch (DagEngineException ex) {
095 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
096 }
097 }
098
099 /**
100 * POST callback
101 */
102 @Override
103 protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException,
104 IOException {
105 String queryString = request.getQueryString();
106 CallbackService callbackService = Services.get().get(CallbackService.class);
107
108 if (!callbackService.isValid(queryString)) {
109 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
110 }
111
112 String actionId = callbackService.getActionId(queryString);
113 if (actionId == null) {
114 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
115 }
116 int idx = actionId.lastIndexOf('@', actionId.length());
117 String jobId;
118 if (idx == -1) {
119 jobId = actionId;
120 }
121 else {
122 jobId = actionId.substring(0, idx);
123 }
124 setLogInfo(jobId, actionId);
125 log = XLog.getLog(getClass());
126 log.debug("Received a CallbackServlet.doPost() with query string " + queryString);
127
128 validateContentType(request, RestConstants.TEXT_CONTENT_TYPE);
129 try {
130 log.info(XLog.STD, "callback for action [{0}]", actionId);
131 String data = IOUtils.getReaderAsString(request.getReader(), maxDataLen);
132 Properties props = PropertiesUtils.stringToProperties(data);
133 DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine();
134 dagEngine.processCallback(actionId, callbackService.getExternalStatus(queryString), props);
135 }
136 catch (IOException ex) {
137 if (ex.getMessage().startsWith("stream exceeds limit")) {
138 // TODO, WE MUST SET THE ACTION TO ERROR
139 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0403, "data too long");
140 }
141 else {
142 throw ex;
143 }
144 }
145 catch (DagEngineException ex) {
146 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
147 }
148 }
149 }