001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import org.apache.oozie.service.Service;
022import org.apache.oozie.service.Services;
023import org.apache.oozie.util.ParamChecker;
024import org.apache.hadoop.conf.Configuration;
025
026import java.io.UnsupportedEncodingException;
027import java.net.URLDecoder;
028import java.text.MessageFormat;
029
030/**
031 * Service that generates and parses callback URLs.
032 */
033public class CallbackService implements Service {
034
035    public static final String CONF_PREFIX = Service.CONF_PREFIX + "CallbackService.";
036
037    public static final String CONF_BASE_URL = CONF_PREFIX + "base.url";
038
039    public static final String CONF_EARLY_REQUEUE_MAX_RETRIES = CONF_PREFIX + "early.requeue.max.retries";
040
041    private Configuration oozieConf;
042    private int earlyRequeueMaxRetries;
043
044    /**
045     * Initialize the service.
046     *
047     * @param services services instance.
048     */
049    public void init(Services services) {
050        oozieConf = services.getConf();
051        earlyRequeueMaxRetries = ConfigurationService.getInt(CONF_EARLY_REQUEUE_MAX_RETRIES);
052    }
053
054    /**
055     * Destroy the service.
056     */
057    public void destroy() {
058    }
059
060    /**
061     * Return the public interface of the Dag engine service.
062     *
063     * @return {@link CallbackService}.
064     */
065    public Class<? extends Service> getInterface() {
066        return CallbackService.class;
067    }
068
069    private static final String ID_PARAM = "id=";
070    private static final String STATUS_PARAM = "status=";
071    private static final String CALL_BACK_QUERY_STRING = "{0}?" + ID_PARAM + "{1}" + "&" + STATUS_PARAM + "{2}";
072
073    /**
074     * Create a callback URL.
075     *
076     * @param actionId action ID for the callback URL.
077     * @param externalStatusVar variable for the caller to inject the external status.
078     * @return the callback URL.
079     */
080    public String createCallBackUrl(String actionId, String externalStatusVar) {
081        ParamChecker.notEmpty(actionId, "actionId");
082        ParamChecker.notEmpty(externalStatusVar, "externalStatusVar");
083        //TODO: figure out why double encoding is happening in case of hadoop callbacks.
084        String baseCallbackUrl = ConfigurationService.get(oozieConf, CONF_BASE_URL);
085        return MessageFormat.format(CALL_BACK_QUERY_STRING, baseCallbackUrl, actionId, externalStatusVar);
086    }
087
088    private String getParam(String str, String name) {
089        String value = null;
090        int start = str.indexOf(name);
091        if (start > -1) {
092            int end = str.indexOf("&", start + 1);
093            start = start + name.length();
094            value = (end > -1) ? str.substring(start, end) : str.substring(start);
095        }
096        return value;
097    }
098
099    /**
100     * Return if a callback URL is valid or not.
101     *
102     * @param callback callback URL (it can be just the callback portion of it).
103     * @return <code>true</code> if the callback URL is valid, <code>false</code> if it is not.
104     */
105    public boolean isValid(String callback) {
106        return callback != null && getParam(callback, ID_PARAM) != null && getParam(callback, STATUS_PARAM) != null;
107    }
108
109    /**
110     * Return the action ID from a callback URL.
111     *
112     * @param callback callback URL (it can be just the callback portion of it).
113     * @return the action ID from a callback URL.
114     */
115    public String getActionId(String callback) {
116        try {
117            return URLDecoder.decode(getParam(ParamChecker.notEmpty(callback, "callback"), ID_PARAM), "UTF-8");
118        }
119        catch (UnsupportedEncodingException ex) {
120            throw new RuntimeException(ex);
121        }
122    }
123
124    /**
125     * Return the action external status from a callback URL.
126     *
127     * @param callback callback URL (it can be just the callback portion of it).
128     * @return the action external status from a callback URL.
129     */
130    public String getExternalStatus(String callback) {
131        try {
132            return URLDecoder.decode(getParam(ParamChecker.notEmpty(callback, "callback"), STATUS_PARAM), "UTF-8");
133        }
134        catch (UnsupportedEncodingException ex) {
135            throw new RuntimeException(ex);
136        }
137    }
138
139    public int getEarlyRequeueMaxRetries() {
140        return earlyRequeueMaxRetries;
141    }
142}