001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command; 019 020import java.io.IOException; 021import java.net.HttpURLConnection; 022import java.net.InetSocketAddress; 023import java.net.Proxy; 024import java.net.URL; 025import org.apache.oozie.service.ConfigurationService; 026import org.apache.oozie.util.LogUtils; 027import org.apache.oozie.util.XLog; 028 029public abstract class NotificationXCommand extends XCommand<Void> { 030 031 public static final String NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY = "oozie.notification.url.connection.timeout"; 032 public static final String NOTIFICATION_PROXY_KEY = "oozie.notification.proxy"; 033 034 protected int retries = 0; 035 protected String jobId; 036 protected String url; 037 protected String proxyConf; 038 039 public NotificationXCommand(String name, String type, int priority) { 040 super(name, type, priority); 041 } 042 043 @Override 044 final protected boolean isLockRequired() { 045 return false; 046 } 047 048 @Override 049 public String getEntityKey() { 050 return jobId; 051 } 052 053 @Override 054 protected void verifyPrecondition() throws CommandException, PreconditionException { 055 056 } 057 058 @Override 059 protected Void execute() throws CommandException { 060 sendNotification(); 061 return null; 062 } 063 064 @Override 065 protected void setLogInfo() { 066 LogUtils.setLogInfo(jobId); 067 } 068 069 protected Proxy getProxy(String proxyConf) { 070 // Configure the proxy to use if its set. It should be set like 071 // proxyType@proxyHostname:port 072 if (proxyConf != null && !proxyConf.trim().equals("") && proxyConf.lastIndexOf(":") != -1) { 073 int typeIndex = proxyConf.indexOf("@"); 074 Proxy.Type proxyType = Proxy.Type.HTTP; 075 if (typeIndex != -1 && proxyConf.substring(0, typeIndex).compareToIgnoreCase("socks") == 0) { 076 proxyType = Proxy.Type.SOCKS; 077 } 078 String hostname = proxyConf.substring(typeIndex + 1, proxyConf.lastIndexOf(":")); 079 String portConf = proxyConf.substring(proxyConf.lastIndexOf(":") + 1); 080 try { 081 int port = Integer.parseInt(portConf); 082 LOG.info("Workflow notification using proxy type \"" + proxyType + "\" hostname \"" + hostname 083 + "\" and port \"" + port + "\""); 084 return new Proxy(proxyType, new InetSocketAddress(hostname, port)); 085 } 086 catch (NumberFormatException nfe) { 087 LOG.warn("Workflow notification couldn't parse configured proxy's port " + portConf 088 + ". Not going to use a proxy"); 089 } 090 } 091 return Proxy.NO_PROXY; 092 } 093 094 protected void handleRetry() { 095 if (retries < 3) { 096 retries++; 097 this.resetUsed(); 098 queue(this, 60 * 1000); 099 } 100 else { 101 LOG.warn(XLog.OPS, "could not send notification [{0}]", url); 102 } 103 } 104 105 protected void sendNotification() { 106 if (url != null) { 107 Proxy proxy = getProxy(proxyConf); 108 try { 109 URL url = new URL(this.url); 110 HttpURLConnection urlConn = (HttpURLConnection) url.openConnection(proxy); 111 urlConn.setConnectTimeout(getTimeOut()); 112 urlConn.setReadTimeout(getTimeOut()); 113 if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) { 114 handleRetry(); 115 } 116 } 117 catch (IOException ex) { 118 handleRetry(); 119 } 120 } 121 else { 122 LOG.info("No Notification URL is defined. Therefore nothing to notify for job " + jobId); 123 124 } 125 126 } 127 128 private int getTimeOut() { 129 return ConfigurationService.getInt(NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY); 130 } 131 132 public void setRetry(int retries) { 133 this.retries = retries; 134 135 } 136 137}