001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import org.apache.oozie.service.Service; 022import org.apache.oozie.service.Services; 023import org.apache.oozie.util.ParamChecker; 024import org.apache.hadoop.conf.Configuration; 025 026import java.io.UnsupportedEncodingException; 027import java.net.URLDecoder; 028import java.text.MessageFormat; 029 030/** 031 * Service that generates and parses callback URLs. 032 */ 033public class CallbackService implements Service { 034 035 public static final String CONF_PREFIX = Service.CONF_PREFIX + "CallbackService."; 036 037 public static final String CONF_BASE_URL = CONF_PREFIX + "base.url"; 038 039 public static final String CONF_EARLY_REQUEUE_MAX_RETRIES = CONF_PREFIX + "early.requeue.max.retries"; 040 041 private Configuration oozieConf; 042 private int earlyRequeueMaxRetries; 043 044 /** 045 * Initialize the service. 046 * 047 * @param services services instance. 048 */ 049 public void init(Services services) { 050 oozieConf = services.getConf(); 051 earlyRequeueMaxRetries = ConfigurationService.getInt(CONF_EARLY_REQUEUE_MAX_RETRIES); 052 } 053 054 /** 055 * Destroy the service. 056 */ 057 public void destroy() { 058 } 059 060 /** 061 * Return the public interface of the Dag engine service. 062 * 063 * @return {@link CallbackService}. 064 */ 065 public Class<? extends Service> getInterface() { 066 return CallbackService.class; 067 } 068 069 private static final String ID_PARAM = "id="; 070 private static final String STATUS_PARAM = "status="; 071 private static final String CALL_BACK_QUERY_STRING = "{0}?" + ID_PARAM + "{1}" + "&" + STATUS_PARAM + "{2}"; 072 073 /** 074 * Create a callback URL. 075 * 076 * @param actionId action ID for the callback URL. 077 * @param externalStatusVar variable for the caller to inject the external status. 078 * @return the callback URL. 079 */ 080 public String createCallBackUrl(String actionId, String externalStatusVar) { 081 ParamChecker.notEmpty(actionId, "actionId"); 082 ParamChecker.notEmpty(externalStatusVar, "externalStatusVar"); 083 //TODO: figure out why double encoding is happening in case of hadoop callbacks. 084 String baseCallbackUrl = ConfigurationService.get(oozieConf, CONF_BASE_URL); 085 return MessageFormat.format(CALL_BACK_QUERY_STRING, baseCallbackUrl, actionId, externalStatusVar); 086 } 087 088 private String getParam(String str, String name) { 089 String value = null; 090 int start = str.indexOf(name); 091 if (start > -1) { 092 int end = str.indexOf("&", start + 1); 093 start = start + name.length(); 094 value = (end > -1) ? str.substring(start, end) : str.substring(start); 095 } 096 return value; 097 } 098 099 /** 100 * Return if a callback URL is valid or not. 101 * 102 * @param callback callback URL (it can be just the callback portion of it). 103 * @return <code>true</code> if the callback URL is valid, <code>false</code> if it is not. 104 */ 105 public boolean isValid(String callback) { 106 return callback != null && getParam(callback, ID_PARAM) != null && getParam(callback, STATUS_PARAM) != null; 107 } 108 109 /** 110 * Return the action ID from a callback URL. 111 * 112 * @param callback callback URL (it can be just the callback portion of it). 113 * @return the action ID from a callback URL. 114 */ 115 public String getActionId(String callback) { 116 try { 117 return URLDecoder.decode(getParam(ParamChecker.notEmpty(callback, "callback"), ID_PARAM), "UTF-8"); 118 } 119 catch (UnsupportedEncodingException ex) { 120 throw new RuntimeException(ex); 121 } 122 } 123 124 /** 125 * Return the action external status from a callback URL. 126 * 127 * @param callback callback URL (it can be just the callback portion of it). 128 * @return the action external status from a callback URL. 129 */ 130 public String getExternalStatus(String callback) { 131 try { 132 return URLDecoder.decode(getParam(ParamChecker.notEmpty(callback, "callback"), STATUS_PARAM), "UTF-8"); 133 } 134 catch (UnsupportedEncodingException ex) { 135 throw new RuntimeException(ex); 136 } 137 } 138 139 public int getEarlyRequeueMaxRetries() { 140 return earlyRequeueMaxRetries; 141 } 142}