001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command;
019
020import java.io.IOException;
021import java.net.HttpURLConnection;
022import java.net.InetSocketAddress;
023import java.net.Proxy;
024import java.net.URL;
025import org.apache.oozie.service.ConfigurationService;
026import org.apache.oozie.util.LogUtils;
027import org.apache.oozie.util.XLog;
028
029public abstract class NotificationXCommand extends XCommand<Void> {
030
031    public static final String NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY = "oozie.notification.url.connection.timeout";
032    public static final String NOTIFICATION_PROXY_KEY = "oozie.notification.proxy";
033
034    protected int retries = 0;
035    protected String jobId;
036    protected String url;
037    protected String proxyConf;
038
039    public NotificationXCommand(String name, String type, int priority) {
040        super(name, type, priority);
041    }
042
043    @Override
044    final protected boolean isLockRequired() {
045        return false;
046    }
047
048    @Override
049    public String getEntityKey() {
050        return jobId;
051    }
052
053    @Override
054    protected void verifyPrecondition() throws CommandException, PreconditionException {
055
056    }
057
058    @Override
059    protected Void execute() throws CommandException {
060        sendNotification();
061        return null;
062    }
063
064    @Override
065    protected void setLogInfo() {
066        LogUtils.setLogInfo(jobId);
067    }
068
069    protected Proxy getProxy(String proxyConf) {
070        // Configure the proxy to use if its set. It should be set like
071        // proxyType@proxyHostname:port
072        if (proxyConf != null && !proxyConf.trim().equals("") && proxyConf.lastIndexOf(":") != -1) {
073            int typeIndex = proxyConf.indexOf("@");
074            Proxy.Type proxyType = Proxy.Type.HTTP;
075            if (typeIndex != -1 && proxyConf.substring(0, typeIndex).compareToIgnoreCase("socks") == 0) {
076                proxyType = Proxy.Type.SOCKS;
077            }
078            String hostname = proxyConf.substring(typeIndex + 1, proxyConf.lastIndexOf(":"));
079            String portConf = proxyConf.substring(proxyConf.lastIndexOf(":") + 1);
080            try {
081                int port = Integer.parseInt(portConf);
082                LOG.info("Workflow notification using proxy type \"" + proxyType + "\" hostname \"" + hostname
083                        + "\" and port \"" + port + "\"");
084                return new Proxy(proxyType, new InetSocketAddress(hostname, port));
085            }
086            catch (NumberFormatException nfe) {
087                LOG.warn("Workflow notification couldn't parse configured proxy's port " + portConf
088                        + ". Not going to use a proxy");
089            }
090        }
091        return Proxy.NO_PROXY;
092    }
093
094    protected void handleRetry() {
095        if (retries < 3) {
096            retries++;
097            this.resetUsed();
098            queue(this, 60 * 1000);
099        }
100        else {
101            LOG.warn(XLog.OPS, "could not send notification [{0}]", url);
102        }
103    }
104
105    protected void sendNotification() {
106        if (url != null) {
107            Proxy proxy = getProxy(proxyConf);
108            try {
109                URL url = new URL(this.url);
110                HttpURLConnection urlConn = (HttpURLConnection) url.openConnection(proxy);
111                urlConn.setConnectTimeout(getTimeOut());
112                urlConn.setReadTimeout(getTimeOut());
113                if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) {
114                    handleRetry();
115                }
116            }
117            catch (IOException ex) {
118                handleRetry();
119            }
120        }
121        else {
122            LOG.info("No Notification URL is defined. Therefore nothing to notify for job " + jobId);
123
124        }
125
126    }
127
128    private int getTimeOut() {
129        return ConfigurationService.getInt(NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY);
130    }
131
132    public void setRetry(int retries) {
133        this.retries = retries;
134
135    }
136
137}