001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.net.HttpURLConnection; 023 import java.net.URL; 024 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.oozie.CoordinatorActionBean; 027 import org.apache.oozie.ErrorCode; 028 import org.apache.oozie.client.OozieClient; 029 import org.apache.oozie.command.CommandException; 030 import org.apache.oozie.command.PreconditionException; 031 import org.apache.oozie.command.wf.NotificationXCommand; 032 import org.apache.oozie.service.Services; 033 import org.apache.oozie.util.LogUtils; 034 import org.apache.oozie.util.ParamChecker; 035 import org.apache.oozie.util.XConfiguration; 036 import org.apache.oozie.util.XLog; 037 038 /** 039 * This class will send the notification for the coordinator action 040 */ 041 public class CoordActionNotificationXCommand extends CoordinatorXCommand<Void> { 042 043 private final CoordinatorActionBean actionBean; 044 private static final String STATUS_PATTERN = "\\$status"; 045 private static final String ACTION_ID_PATTERN = "\\$actionId"; 046 047 //this variable is package private only for test purposes 048 int retries = 0; 049 050 public CoordActionNotificationXCommand(CoordinatorActionBean actionBean) { 051 super("coord_action_notification", "coord_action_notification", 0); 052 ParamChecker.notNull(actionBean, "Action Bean"); 053 this.actionBean = actionBean; 054 } 055 056 /* (non-Javadoc) 057 * @see org.apache.oozie.command.XCommand#execute() 058 */ 059 @Override 060 protected Void execute() throws CommandException { 061 LOG.info("STARTED Coordinator Notification actionId=" + actionBean.getId() + " : " + actionBean.getStatus()); 062 Configuration conf; 063 try { 064 conf = new XConfiguration(new StringReader(actionBean.getRunConf())); 065 } 066 catch (IOException e1) { 067 LOG.warn("Configuration parse error. read from DB :" + actionBean.getRunConf()); 068 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1); 069 } 070 String url = conf.get(OozieClient.COORD_ACTION_NOTIFICATION_URL); 071 if (url != null) { 072 url = url.replaceAll(ACTION_ID_PATTERN, actionBean.getId()); 073 url = url.replaceAll(STATUS_PATTERN, actionBean.getStatus().toString()); 074 LOG.debug("Notification URL :" + url); 075 try { 076 int timeout = Services.get().getConf().getInt( 077 NotificationXCommand.NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY, 078 NotificationXCommand.NOTIFICATION_URL_CONNECTION_TIMEOUT_DEFAULT); 079 URL urlObj = new URL(url); 080 HttpURLConnection urlConn = (HttpURLConnection) urlObj.openConnection(); 081 urlConn.setConnectTimeout(timeout); 082 urlConn.setReadTimeout(timeout); 083 if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) { 084 handleRetry(url); 085 } 086 } 087 catch (IOException ex) { 088 handleRetry(url); 089 } 090 } 091 else { 092 LOG.info("No Notification URL is defined. Therefore nothing to notify for job " + actionBean.getJobId() 093 + " action ID " + actionBean.getId()); 094 } 095 LOG.info("ENDED Coordinator Notification actionId=" + actionBean.getId()); 096 return null; 097 } 098 099 /** 100 * This method handles the retry for the coordinator action. 101 * 102 * @param url This is the URL where the notification has to be sent. 103 */ 104 private void handleRetry(String url) { 105 if (retries < 3) { 106 retries++; 107 this.resetUsed(); 108 queue(this, 60 * 1000); 109 } 110 else { 111 LOG.warn(XLog.OPS, "could not send notification [{0}]", url); 112 } 113 } 114 115 /* (non-Javadoc) 116 * @see org.apache.oozie.command.XCommand#getEntityKey() 117 */ 118 @Override 119 public String getEntityKey() { 120 return actionBean.getId(); 121 } 122 123 /* (non-Javadoc) 124 * @see org.apache.oozie.command.XCommand#isLockRequired() 125 */ 126 @Override 127 protected boolean isLockRequired() { 128 return false; 129 } 130 131 /* (non-Javadoc) 132 * @see org.apache.oozie.command.XCommand#loadState() 133 */ 134 @Override 135 protected void loadState() throws CommandException { 136 LogUtils.setLogInfo(actionBean, logInfo); 137 } 138 139 /* (non-Javadoc) 140 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 141 */ 142 @Override 143 protected void verifyPrecondition() throws CommandException, PreconditionException { 144 } 145 }